Skip to content

Commit

Permalink
Updating README.md and removing service in proto and refactoring sink…
Browse files Browse the repository at this point in the history
… cmd
  • Loading branch information
ArnaudBger committed Feb 26, 2024
1 parent 876b797 commit d714f93
Show file tree
Hide file tree
Showing 15 changed files with 24 additions and 281 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ Before sinking any data to a PubSub, make sure to have the following prerequisit
Create a PubSub with a Google cloud projectID associated and a topic on which to publish the data.

- **Substreams creation**:
- Use the `pubsub_substream` provided in the [examples](./examples) directory or create your own substreams.
- Compile the `pubsub_substream` project (or your own substreams):
- Use the `simple` provided in the [examples](./examples) directory or create your own substreams.
- Compile the `simple` project (or your own substreams):

```bash
cd examples/pubsub_substream
cd examples/simple
cargo build --target wasm32-unknown-unknown --release
```
**Note:** *If you are creating your own substreams, make sure to create a `map` module with an output type of `sf.substreams.sink.pubsub.v1.Publish` message https://github.com/streamingfast/substreams-sink-pubsub/blob/develop/proto/sf/substreams/sink/pubsub/v1/pubsub.proto*
Expand All @@ -36,19 +36,19 @@ Run the sink providing the `substreams manifest` and the substreams `module name
using the following command:

```bash
substreams-sink-pubsub sink -e <endpoint> <substreams_module_name> <substreams_manifest> --project <projectId>,<topicName>
substreams-sink-pubsub sink -e <endpoint> --project <projectId> <substreams_manifest> <substreams_module_name> <topicName>
```

**Note:** *--help flag can be used to get more information on the flags used in the sink command.*

## Example

As an example, let's sink the ethereum blockchain data from the `pubsub_substream` module's named `map_clocks`, provided in the [examples](./examples) directory.
As an example, let's sink the ethereum blockchain data from the `simple` module's named `map_clocks`, provided in the [examples](./examples) directory.

Run the following command, to publish the data on the PubSub topic `myTopic` associated with the Google cloud project-id `myProjectId`:

```bash
substreams-sink-pubsub sink -e mainnet.eth.streamingfast.io:443 map_clocks ./examples/pubsub_substream/substreams.yaml --project "1","topic"
substreams-sink-pubsub sink -e mainnet.eth.streamingfast.io:443 --project "1" ./examples/simple/substreams.yaml map_clocks "topic"
```


Expand Down
35 changes: 16 additions & 19 deletions cmd/substreams-sink-pubsub/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,44 @@ import (
)

var sinkCmd = Command(sinkRunE,
"sink <moduleName> [<manifest-path>] [<block-range>]",
"sink <manifest-path> <module-name> <topic-name> [<block-range>]",
"Substreams Pubsub sinking",
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags)

flags.String("cursor_path", "/tmp/sink-state/", "Sink cursor's path")
flags.StringSlice("project", nil, "Project details: Google Cloud Project ID and PubSub topic name on which data will be published")
flags.String("project", "", "Google Cloud Project ID")
flags.StringP("endpoint", "e", "", "Substreams gRPC endpoint (e.g. 'mainnet.eth.streamingfast.io:443')")
}),
Description(`
Publishs block data on a google PubSub from a Substreams output.
The required arguments are:
- <manifest-path>: URL or local path to a '.yaml' file (e.g. './examples/pubsub_substream/substreams.yaml').
- <moduleName>: The module name returning publish instructions in the substreams.
- <topicName>: The PubSub topic name to publish the messages to.
The optional arguments are:
- <manifest>: URL or local path to a '.yaml' file (e.g. './examples/pubsub_substream/substreams.yaml').
- <start>:<stop>: The range of block to sync, if not provided, will sync from the module's initial block and then forever.
If the <manifest> is not provided, assume '.' contains a Substreams project to run. If
<start>:<stop> is not provided, assumes the whole chain.
If <start>:<stop> is not provided, assumes the whole chain.
`),
ExamplePrefixed("substreams-sink-pubsub sink", `
# Publish block data messages produced by map_clocks for the whole chain
-e mainnet.eth.streamingfast.io:443 map_clocks ./examples/pubsub_substream/substreams.yaml --project "1","topic"
-e mainnet.eth.streamingfast.io:443 ./examples/simple/substreams.yaml map_clocks "topic" --project "1"
# Publish block data messages produced by map_clocks for a specific range of blocks
-e mainnet.eth.streamingfast.io:443 map_clocks ./examples/pubsub_substream/substreams.yaml 0:100000 --project "1","topic"
-e mainnet.eth.streamingfast.io:443 ./examples/simple/substreams.yaml map_clocks "topic" 0:1000 --project "1"
`),
)

func sinkRunE(cmd *cobra.Command, args []string) error {
app := shutter.New()
ctx := cmd.Context()

module, manifestPath, blockRange := extractInjectArgs(cmd, args)
manifestPath, module, topicName, blockRange := extractInjectArgs(cmd, args)
endpoint := sflags.MustGetString(cmd, "endpoint")
cursorPath := sflags.MustGetString(cmd, "cursor_path")
project := sflags.MustGetStringSlice(cmd, "project")

projectID := project[0]
topicName := project[1]
projectID := sflags.MustGetString(cmd, "project")

client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
Expand Down Expand Up @@ -91,16 +88,16 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return nil
}

func extractInjectArgs(_ *cobra.Command, args []string) (moduleName, manifestPath, blockRange string) {
fmt.Println(args)
moduleName = args[0]
func extractInjectArgs(_ *cobra.Command, args []string) (manifestPath, moduleName, topicName, blockRange string) {
manifestPath = args[0]
moduleName = args[1]

if len(args) >= 2 {
manifestPath = args[1]
if len(args) >= 3 {
topicName = args[2]
}

if len(args) == 3 {
blockRange = args[2]
if len(args) == 4 {
blockRange = args[3]
}
return
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package:
protobuf:
files:
- sf/substreams/sink/pubsub/v1/pubsub.proto
- sf/substreams/sink/pubsub/v1/service.proto
importPaths:
- ../../proto/

Expand Down
239 changes: 0 additions & 239 deletions pb/sf/substreams/sink/pubsub/v1/service.pb.go

This file was deleted.

4 changes: 2 additions & 2 deletions proto/buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ plugins:
opt: paths=source_relative

- plugin: buf.build/community/neoeinstein-prost:v0.3.1
out: ../examples/pubsub_substream/src/pb
out: ../examples/simple/src/pb

- plugin: buf.build/community/neoeinstein-prost-crate:v0.3.1
out: ../examples/pubsub_substream/src/pb
out: ../examples/simple/src/pb
opt:
- no_features

Expand Down
14 changes: 0 additions & 14 deletions proto/sf/substreams/sink/pubsub/v1/service.proto

This file was deleted.

0 comments on commit d714f93

Please sign in to comment.