diff --git a/README.md b/README.md index 2a44c87..9245410 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,11 @@ Before sinking any data to a PubSub, make sure to have the following prerequisit Create a PubSub with a Google cloud projectID associated and a topic on which to publish the data. - **Substreams creation**: - - Use the `pubsub_substream` provided in the [examples](./examples) directory or create your own substreams. - - Compile the `pubsub_substream` project (or your own substreams): + - Use the `simple` provided in the [examples](./examples) directory or create your own substreams. + - Compile the `simple` project (or your own substreams): ```bash - cd examples/pubsub_substream + cd examples/simple cargo build --target wasm32-unknown-unknown --release ``` **Note:** *If you are creating your own substreams, make sure to create a `map` module with an output type of `sf.substreams.sink.pubsub.v1.Publish` message https://github.com/streamingfast/substreams-sink-pubsub/blob/develop/proto/sf/substreams/sink/pubsub/v1/pubsub.proto* @@ -36,19 +36,19 @@ Run the sink providing the `substreams manifest` and the substreams `module name using the following command: ```bash -substreams-sink-pubsub sink -e --project , +substreams-sink-pubsub sink -e --project ``` **Note:** *--help flag can be used to get more information on the flags used in the sink command.* ## Example -As an example, let's sink the ethereum blockchain data from the `pubsub_substream` module's named `map_clocks`, provided in the [examples](./examples) directory. +As an example, let's sink the ethereum blockchain data from the `simple` module's named `map_clocks`, provided in the [examples](./examples) directory. Run the following command, to publish the data on the PubSub topic `myTopic` associated with the Google cloud project-id `myProjectId`: ```bash -substreams-sink-pubsub sink -e mainnet.eth.streamingfast.io:443 map_clocks ./examples/pubsub_substream/substreams.yaml --project "1","topic" +substreams-sink-pubsub sink -e mainnet.eth.streamingfast.io:443 --project "1" ./examples/simple/substreams.yaml map_clocks "topic" ``` diff --git a/cmd/substreams-sink-pubsub/sink.go b/cmd/substreams-sink-pubsub/sink.go index ba3f9fe..5bcb019 100644 --- a/cmd/substreams-sink-pubsub/sink.go +++ b/cmd/substreams-sink-pubsub/sink.go @@ -13,33 +13,33 @@ import ( ) var sinkCmd = Command(sinkRunE, - "sink [] []", + "sink []", "Substreams Pubsub sinking", Flags(func(flags *pflag.FlagSet) { sink.AddFlagsToSet(flags) flags.String("cursor_path", "/tmp/sink-state/", "Sink cursor's path") - flags.StringSlice("project", nil, "Project details: Google Cloud Project ID and PubSub topic name on which data will be published") + flags.String("project", "", "Google Cloud Project ID") flags.StringP("endpoint", "e", "", "Substreams gRPC endpoint (e.g. 'mainnet.eth.streamingfast.io:443')") }), Description(` Publishs block data on a google PubSub from a Substreams output. The required arguments are: + - : URL or local path to a '.yaml' file (e.g. './examples/pubsub_substream/substreams.yaml'). - : The module name returning publish instructions in the substreams. + - : The PubSub topic name to publish the messages to. The optional arguments are: - - : URL or local path to a '.yaml' file (e.g. './examples/pubsub_substream/substreams.yaml'). - :: The range of block to sync, if not provided, will sync from the module's initial block and then forever. - If the is not provided, assume '.' contains a Substreams project to run. If - : is not provided, assumes the whole chain. + If : is not provided, assumes the whole chain. `), ExamplePrefixed("substreams-sink-pubsub sink", ` # Publish block data messages produced by map_clocks for the whole chain - -e mainnet.eth.streamingfast.io:443 map_clocks ./examples/pubsub_substream/substreams.yaml --project "1","topic" + -e mainnet.eth.streamingfast.io:443 ./examples/simple/substreams.yaml map_clocks "topic" --project "1" # Publish block data messages produced by map_clocks for a specific range of blocks - -e mainnet.eth.streamingfast.io:443 map_clocks ./examples/pubsub_substream/substreams.yaml 0:100000 --project "1","topic" + -e mainnet.eth.streamingfast.io:443 ./examples/simple/substreams.yaml map_clocks "topic" 0:1000 --project "1" `), ) @@ -47,13 +47,10 @@ func sinkRunE(cmd *cobra.Command, args []string) error { app := shutter.New() ctx := cmd.Context() - module, manifestPath, blockRange := extractInjectArgs(cmd, args) + manifestPath, module, topicName, blockRange := extractInjectArgs(cmd, args) endpoint := sflags.MustGetString(cmd, "endpoint") cursorPath := sflags.MustGetString(cmd, "cursor_path") - project := sflags.MustGetStringSlice(cmd, "project") - - projectID := project[0] - topicName := project[1] + projectID := sflags.MustGetString(cmd, "project") client, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -91,16 +88,16 @@ func sinkRunE(cmd *cobra.Command, args []string) error { return nil } -func extractInjectArgs(_ *cobra.Command, args []string) (moduleName, manifestPath, blockRange string) { - fmt.Println(args) - moduleName = args[0] +func extractInjectArgs(_ *cobra.Command, args []string) (manifestPath, moduleName, topicName, blockRange string) { + manifestPath = args[0] + moduleName = args[1] - if len(args) >= 2 { - manifestPath = args[1] + if len(args) >= 3 { + topicName = args[2] } - if len(args) == 3 { - blockRange = args[2] + if len(args) == 4 { + blockRange = args[3] } return } diff --git a/examples/pubsub_substream/.gitignore b/examples/simple/.gitignore similarity index 100% rename from examples/pubsub_substream/.gitignore rename to examples/simple/.gitignore diff --git a/examples/pubsub_substream/Cargo.lock b/examples/simple/Cargo.lock similarity index 100% rename from examples/pubsub_substream/Cargo.lock rename to examples/simple/Cargo.lock diff --git a/examples/pubsub_substream/Cargo.toml b/examples/simple/Cargo.toml similarity index 100% rename from examples/pubsub_substream/Cargo.toml rename to examples/simple/Cargo.toml diff --git a/examples/pubsub_substream/Makefile b/examples/simple/Makefile similarity index 100% rename from examples/pubsub_substream/Makefile rename to examples/simple/Makefile diff --git a/examples/pubsub_substream/rust-toolchain.toml b/examples/simple/rust-toolchain.toml similarity index 100% rename from examples/pubsub_substream/rust-toolchain.toml rename to examples/simple/rust-toolchain.toml diff --git a/examples/pubsub_substream/src/lib.rs b/examples/simple/src/lib.rs similarity index 100% rename from examples/pubsub_substream/src/lib.rs rename to examples/simple/src/lib.rs diff --git a/examples/pubsub_substream/src/pb/mod.rs b/examples/simple/src/pb/mod.rs similarity index 100% rename from examples/pubsub_substream/src/pb/mod.rs rename to examples/simple/src/pb/mod.rs diff --git a/examples/pubsub_substream/src/pb/pubsub.v1.rs b/examples/simple/src/pb/pubsub.v1.rs similarity index 100% rename from examples/pubsub_substream/src/pb/pubsub.v1.rs rename to examples/simple/src/pb/pubsub.v1.rs diff --git a/examples/pubsub_substream/src/pb/sf.substreams.sink.pubsub.v1.rs b/examples/simple/src/pb/sf.substreams.sink.pubsub.v1.rs similarity index 100% rename from examples/pubsub_substream/src/pb/sf.substreams.sink.pubsub.v1.rs rename to examples/simple/src/pb/sf.substreams.sink.pubsub.v1.rs diff --git a/examples/pubsub_substream/substreams.yaml b/examples/simple/substreams.yaml similarity index 90% rename from examples/pubsub_substream/substreams.yaml rename to examples/simple/substreams.yaml index 720abe1..dcc2c58 100644 --- a/examples/pubsub_substream/substreams.yaml +++ b/examples/simple/substreams.yaml @@ -6,7 +6,6 @@ package: protobuf: files: - sf/substreams/sink/pubsub/v1/pubsub.proto - - sf/substreams/sink/pubsub/v1/service.proto importPaths: - ../../proto/ diff --git a/pb/sf/substreams/sink/pubsub/v1/service.pb.go b/pb/sf/substreams/sink/pubsub/v1/service.pb.go deleted file mode 100644 index 22e5291..0000000 --- a/pb/sf/substreams/sink/pubsub/v1/service.pb.go +++ /dev/null @@ -1,239 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc (unknown) -// source: sf/substreams/sink/pubsub/v1/service.proto - -package pubsubv1 - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Config struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - StartBlock int64 `protobuf:"varint,1,opt,name=start_block,json=startBlock,proto3" json:"start_block,omitempty"` - InputModule string `protobuf:"bytes,2,opt,name=input_module,json=inputModule,proto3" json:"input_module,omitempty"` -} - -func (x *Config) Reset() { - *x = Config{} - if protoimpl.UnsafeEnabled { - mi := &file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Config) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Config) ProtoMessage() {} - -func (x *Config) ProtoReflect() protoreflect.Message { - mi := &file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Config.ProtoReflect.Descriptor instead. -func (*Config) Descriptor() ([]byte, []int) { - return file_sf_substreams_sink_pubsub_v1_service_proto_rawDescGZIP(), []int{0} -} - -func (x *Config) GetStartBlock() int64 { - if x != nil { - return x.StartBlock - } - return 0 -} - -func (x *Config) GetInputModule() string { - if x != nil { - return x.InputModule - } - return "" -} - -type Service struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - SinkConfig *Config `protobuf:"bytes,1,opt,name=sink_config,json=sinkConfig,proto3" json:"sink_config,omitempty"` -} - -func (x *Service) Reset() { - *x = Service{} - if protoimpl.UnsafeEnabled { - mi := &file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Service) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Service) ProtoMessage() {} - -func (x *Service) ProtoReflect() protoreflect.Message { - mi := &file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Service.ProtoReflect.Descriptor instead. -func (*Service) Descriptor() ([]byte, []int) { - return file_sf_substreams_sink_pubsub_v1_service_proto_rawDescGZIP(), []int{1} -} - -func (x *Service) GetSinkConfig() *Config { - if x != nil { - return x.SinkConfig - } - return nil -} - -var File_sf_substreams_sink_pubsub_v1_service_proto protoreflect.FileDescriptor - -var file_sf_substreams_sink_pubsub_v1_service_proto_rawDesc = []byte{ - 0x0a, 0x2a, 0x73, 0x66, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, - 0x73, 0x69, 0x6e, 0x6b, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2f, 0x76, 0x31, 0x2f, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x73, 0x66, - 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, - 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x22, 0x4c, 0x0a, 0x06, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x62, 0x6c, - 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, - 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x5f, 0x6d, - 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x69, 0x6e, 0x70, - 0x75, 0x74, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x22, 0x50, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a, 0x0b, 0x73, 0x69, 0x6e, 0x6b, 0x5f, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, - 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x70, 0x75, - 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, - 0x73, 0x69, 0x6e, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x9f, 0x02, 0x0a, 0x20, 0x63, - 0x6f, 0x6d, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, - 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x42, - 0x0c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x58, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x66, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x73, 0x2d, 0x73, 0x69, 0x6e, 0x6b, 0x2d, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, - 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x66, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x73, 0x2f, 0x73, 0x69, 0x6e, 0x6b, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2f, 0x76, 0x31, - 0x3b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x76, 0x31, 0xa2, 0x02, 0x04, 0x53, 0x53, 0x53, 0x50, - 0xaa, 0x02, 0x1c, 0x53, 0x66, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, - 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x2e, 0x50, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x56, 0x31, 0xca, - 0x02, 0x1c, 0x53, 0x66, 0x5c, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5c, - 0x53, 0x69, 0x6e, 0x6b, 0x5c, 0x50, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5c, 0x56, 0x31, 0xe2, 0x02, - 0x28, 0x53, 0x66, 0x5c, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5c, 0x53, - 0x69, 0x6e, 0x6b, 0x5c, 0x50, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, - 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x20, 0x53, 0x66, 0x3a, 0x3a, - 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x3a, 0x3a, 0x53, 0x69, 0x6e, 0x6b, - 0x3a, 0x3a, 0x50, 0x75, 0x62, 0x73, 0x75, 0x62, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_sf_substreams_sink_pubsub_v1_service_proto_rawDescOnce sync.Once - file_sf_substreams_sink_pubsub_v1_service_proto_rawDescData = file_sf_substreams_sink_pubsub_v1_service_proto_rawDesc -) - -func file_sf_substreams_sink_pubsub_v1_service_proto_rawDescGZIP() []byte { - file_sf_substreams_sink_pubsub_v1_service_proto_rawDescOnce.Do(func() { - file_sf_substreams_sink_pubsub_v1_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_sf_substreams_sink_pubsub_v1_service_proto_rawDescData) - }) - return file_sf_substreams_sink_pubsub_v1_service_proto_rawDescData -} - -var file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_sf_substreams_sink_pubsub_v1_service_proto_goTypes = []interface{}{ - (*Config)(nil), // 0: sf.substreams.sink.pubsub.v1.Config - (*Service)(nil), // 1: sf.substreams.sink.pubsub.v1.Service -} -var file_sf_substreams_sink_pubsub_v1_service_proto_depIdxs = []int32{ - 0, // 0: sf.substreams.sink.pubsub.v1.Service.sink_config:type_name -> sf.substreams.sink.pubsub.v1.Config - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_sf_substreams_sink_pubsub_v1_service_proto_init() } -func file_sf_substreams_sink_pubsub_v1_service_proto_init() { - if File_sf_substreams_sink_pubsub_v1_service_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Config); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Service); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_sf_substreams_sink_pubsub_v1_service_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_sf_substreams_sink_pubsub_v1_service_proto_goTypes, - DependencyIndexes: file_sf_substreams_sink_pubsub_v1_service_proto_depIdxs, - MessageInfos: file_sf_substreams_sink_pubsub_v1_service_proto_msgTypes, - }.Build() - File_sf_substreams_sink_pubsub_v1_service_proto = out.File - file_sf_substreams_sink_pubsub_v1_service_proto_rawDesc = nil - file_sf_substreams_sink_pubsub_v1_service_proto_goTypes = nil - file_sf_substreams_sink_pubsub_v1_service_proto_depIdxs = nil -} diff --git a/proto/buf.gen.yaml b/proto/buf.gen.yaml index 333f78c..3ad6b4a 100644 --- a/proto/buf.gen.yaml +++ b/proto/buf.gen.yaml @@ -9,10 +9,10 @@ plugins: opt: paths=source_relative - plugin: buf.build/community/neoeinstein-prost:v0.3.1 - out: ../examples/pubsub_substream/src/pb + out: ../examples/simple/src/pb - plugin: buf.build/community/neoeinstein-prost-crate:v0.3.1 - out: ../examples/pubsub_substream/src/pb + out: ../examples/simple/src/pb opt: - no_features diff --git a/proto/sf/substreams/sink/pubsub/v1/service.proto b/proto/sf/substreams/sink/pubsub/v1/service.proto deleted file mode 100644 index 23adc7f..0000000 --- a/proto/sf/substreams/sink/pubsub/v1/service.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; -package sf.substreams.sink.pubsub.v1; - -option go_package = "github.com/streamingfast/substreams-sink-pubsub/pb/sf/substreams/sink/pubsub/v1;pbpubsub"; - - -message Config { - int64 start_block = 1; - string input_module = 2; -} - -message Service { - Config sink_config = 1; -} \ No newline at end of file