From 92504b15ebb24528c0d97d5396d371ae6e210630 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Wed, 6 Nov 2024 13:13:23 +1000 Subject: [PATCH] feat: add beacon_api_eth_v3_validator_block --- .../docker-compose/vector-http-kafka.yaml | 23 +++- .../vector-kafka-clickhouse.yaml | 122 +++++++++++++++++- .../clickhouse/053_validator_block.down.sql | 2 + .../clickhouse/053_validator_block.up.sql | 68 ++++++++++ docker-compose.yml | 3 +- example_sentry.yaml | 12 ++ pkg/proto/eth/block.go | 2 +- pkg/sentry/config.go | 8 +- .../beacon/eth/v2/proposed_validator_block.go | 61 +++++---- pkg/sentry/proposed_validator_block.go | 32 ++--- pkg/sentry/sentry.go | 2 +- .../beacon/eth/v2/proposed_validator_block.go | 38 +++--- .../service/event-ingester/event/event.go | 6 +- 13 files changed, 294 insertions(+), 85 deletions(-) create mode 100644 deploy/migrations/clickhouse/053_validator_block.down.sql create mode 100644 deploy/migrations/clickhouse/053_validator_block.up.sql diff --git a/deploy/local/docker-compose/vector-http-kafka.yaml b/deploy/local/docker-compose/vector-http-kafka.yaml index b0d3eebd..920168c6 100644 --- a/deploy/local/docker-compose/vector-http-kafka.yaml +++ b/deploy/local/docker-compose/vector-http-kafka.yaml @@ -68,6 +68,7 @@ transforms: beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS" mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION" mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED" + beacon_api_eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK" sinks: metrics: type: prometheus_exporter @@ -890,7 +891,7 @@ sinks: healthcheck: enabled: true encoding: - codec: json + codec: json mev_relay_proposer_payload_delivered_kafka: type: kafka buffer: @@ -906,4 +907,22 @@ sinks: healthcheck: enabled: true encoding: - codec: json \ No newline at end of file + codec: json + beacon_api_eth_v3_validator_block_kafka: + type: kafka + buffer: + max_events: 500000 + batch: + timeout_secs: 0.5 + inputs: + - xatu_server_events_router.beacon_api_eth_v3_validator_block + bootstrap_servers: "${KAFKA_BROKERS}" + key_field: "event.id" + topic: beacon-api-eth-v3-validator-block + compression: snappy + healthcheck: + enabled: true + encoding: + codec: json + librdkafka_options: + message.max.bytes: "10485760" # 10MB diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml index 303bcd61..643b8da5 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml @@ -123,6 +123,18 @@ sources: - "^mev-relay-.+" librdkafka_options: message.max.bytes: "10485760" # 10MB + beacon_api_eth_v3_validator_block_kafka: + type: kafka + bootstrap_servers: "${KAFKA_BROKERS}" + auto_offset_reset: earliest + group_id: xatu-vector-kafka-clickhouse-beacon-api-eth-v3-validator-block-events + key_field: "event.id" + decoding: + codec: json + topics: + - "beacon-api-eth-v3-validator-block" + librdkafka_options: + message.max.bytes: "10485760" # 10MB transforms: xatu_server_events_meta: type: remap @@ -137,6 +149,7 @@ transforms: - beacon_api_eth_v1_proposer_kafka - beacon_api_eth_v1_beacon_validators_kafka - mev_relay_kafka + - beacon_api_eth_v3_validator_block_kafka source: |- .meta_client_name = .meta.client.name .meta_client_id = .meta.client.id @@ -355,6 +368,7 @@ transforms: mempool_transaction: .event.name == "MEMPOOL_TRANSACTION" mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION" mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED" + eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK" xatu_server_events_router_matched: type: log_to_metric inputs: @@ -396,6 +410,7 @@ transforms: - xatu_server_events_router.mempool_transaction_v2 - xatu_server_events_router.mev_relay_bid_trace_builder_block_submission - xatu_server_events_router.mev_relay_proposer_payload_delivered + - xatu_server_events_router.eth_v3_validator_block metrics: - type: counter field: event.name @@ -1645,14 +1660,14 @@ transforms: .error_description = "failed to parse block epoch start date time" log(., level: "error", rate_limit_secs: 60) } - + .position_in_block = .meta.client.additional_data.position_in_block .block_root = .meta.client.additional_data.block.root .validators = .data.validator_indexes .committee_index = .data.data.index .beacon_block_root = .data.data.beacon_block_root .slot = .data.data.slot - + slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+"); if err == null { .slot_start_date_time = to_unix_timestamp(slot_start_date_time) @@ -1777,7 +1792,7 @@ transforms: .slot = .data.slot .block_number = .data.block_number - + slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+"); if err == null { .slot_start_date_time = to_unix_timestamp(slot_start_date_time) @@ -1787,7 +1802,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .epoch = .meta.client.additional_data.epoch.number - + epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); if err == null { .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) @@ -1838,7 +1853,7 @@ transforms: del(.meta_consensus_implementation) del(.meta_network_id) - + del(.event) del(.meta) del(.data) @@ -1869,7 +1884,7 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .epoch = .meta.client.additional_data.epoch.number - + epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); if err == null { .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) @@ -1918,11 +1933,83 @@ transforms: del(.meta_consensus_implementation) del(.meta_network_id) - + + del(.event) + del(.meta) + del(.data) + beacon_api_eth_v3_validator_block_formatted: + type: remap + inputs: + - xatu_server_events_router.eth_v3_validator_block + source: |- + # handle message version name pathing and map it back to .data.message + if !exists(.data.message) { + message, err = get(value: .data, path: [.data.version]) + if err == null { + .data.message = message + } else { + .error = err + .error_description = "failed to get data.message" + log(., level: "error", rate_limit_secs: 60) + } + + cleanedUpData, err = remove(value: .data, path: [.data.version]) + if err == null { + .data = cleanedUpData + } else { + .error = err + .error_description = "failed to remove data.message" + log(., level: "error", rate_limit_secs: 60) + } + } + + event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); + if err == null { + .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") + } else { + .error = err + .error_description = "failed to parse event date time" + log(., level: "error", rate_limit_secs: 60) + } + .slot = .data.message.slot + slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+"); + if err == null { + .slot_start_date_time = to_unix_timestamp(slot_start_date_time) + } else { + .error = err + .error_description = "failed to parse slot start date time" + log(., level: "error", rate_limit_secs: 60) + } + .epoch = .meta.client.additional_data.epoch.number + epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); + if err == null { + .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) + } else { + .error = err + .error_description = "failed to parse epoch start date time" + log(., level: "error", rate_limit_secs: 60) + } + .block_version = .meta.client.additional_data.version + .block_total_bytes = .meta.client.additional_data.total_bytes + .block_total_bytes_compressed = .meta.client.additional_data.total_bytes_compressed + + .execution_payload_block_number = .data.message.body.execution_payload.block_number + .execution_payload_base_fee_per_gas = .data.message.body.execution_payload.base_fee_per_gas + .execution_payload_blob_gas_used = .data.message.body.execution_payload.blob_gas_used + .execution_payload_excess_blob_gas = .data.message.body.execution_payload.excess_blob_gas + .execution_payload_gas_limit = .data.message.body.execution_payload.gas_limit + .execution_payload_gas_used = .data.message.body.execution_payload.gas_used + .execution_payload_transactions_count = .meta.client.additional_data.transactions_count + .execution_payload_transactions_total_bytes = .meta.client.additional_data.transactions_total_bytes + .execution_payload_transactions_total_bytes_compressed = .meta.client.additional_data.transactions_total_bytes_compressed + .consensus_payload_value = .meta.client.additional_data.consensus_value + .execution_payload_value = .meta.client.additional_data.execution_value + .updated_date_time = to_unix_timestamp(now()) del(.event) del(.meta) del(.data) + sinks: metrics: type: prometheus_exporter @@ -2551,3 +2638,24 @@ sinks: healthcheck: enabled: true skip_unknown_fields: true + beacon_api_eth_v3_validator_block_clickhouse: + type: clickhouse + inputs: + - beacon_api_eth_v3_validator_block_formatted + database: default + endpoint: "${CLICKHOUSE_ENDPOINT}" + table: beacon_api_eth_v3_validator_block + auth: + strategy: basic + user: "${CLICKHOUSE_USER}" + password: "${CLICKHOUSE_PASSWORD}" + batch: + max_bytes: 52428800 + max_events: 200000 + timeout_secs: 1 + buffer: + max_events: 200000 + healthcheck: + enabled: true + skip_unknown_fields: false + diff --git a/deploy/migrations/clickhouse/053_validator_block.down.sql b/deploy/migrations/clickhouse/053_validator_block.down.sql new file mode 100644 index 00000000..7971d5fb --- /dev/null +++ b/deploy/migrations/clickhouse/053_validator_block.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS beacon_api_eth_v3_validator_block ON CLUSTER '{cluster}' SYNC; +DROP TABLE IF EXISTS beacon_api_eth_v3_validator_block_local ON CLUSTER '{cluster}' SYNC; diff --git a/deploy/migrations/clickhouse/053_validator_block.up.sql b/deploy/migrations/clickhouse/053_validator_block.up.sql new file mode 100644 index 00000000..a73e89c8 --- /dev/null +++ b/deploy/migrations/clickhouse/053_validator_block.up.sql @@ -0,0 +1,68 @@ +-- Step 1: Create the new table. +CREATE TABLE default.beacon_api_eth_v3_validator_block_local ON CLUSTER '{cluster}' ( + `updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)), + `event_date_time` DateTime64(3) COMMENT 'When the sentry received the event from a beacon node' CODEC(DoubleDelta, ZSTD(1)), + `slot` UInt32 COMMENT 'Slot number within the payload' CODEC(DoubleDelta, ZSTD(1)), + `slot_start_date_time` DateTime COMMENT 'The wall clock time when the reorg slot started', + `epoch` UInt32 COMMENT 'The epoch number in the beacon API event stream payload' CODEC(DoubleDelta, ZSTD(1)), + `epoch_start_date_time` DateTime COMMENT 'The wall clock time when the epoch started' CODEC(DoubleDelta, ZSTD(1)), + `block_version` LowCardinality(String) COMMENT 'The version of the beacon block', + `block_total_bytes` Nullable(UInt32) COMMENT 'The total bytes of the beacon block payload' CODEC(ZSTD(1)), + `block_total_bytes_compressed` Nullable(UInt32) COMMENT 'The total bytes of the beacon block payload when compressed using snappy' CODEC(ZSTD(1)), + `consensus_payload_value` Nullable(UInt64) COMMENT 'Consensus rewards paid to the proposer for this block, in Wei. Use to determine relative value of consensus blocks.' CODEC(ZSTD(1)), + `execution_payload_value` Nullable(UInt64) COMMENT 'Execution payload value in Wei. Use to determine relative value of execution payload.' CODEC(ZSTD(1)), + `execution_payload_block_number` UInt32 COMMENT 'The block number of the execution payload', + `execution_payload_base_fee_per_gas` Nullable(UInt128) COMMENT 'Base fee per gas for execution payload' CODEC(ZSTD(1)), + `execution_payload_blob_gas_used` Nullable(UInt64) COMMENT 'Gas used for blobs in execution payload' CODEC(ZSTD(1)), + `execution_payload_excess_blob_gas` Nullable(UInt64) COMMENT 'Excess gas used for blobs in execution payload' CODEC(ZSTD(1)), + `execution_payload_gas_limit` Nullable(UInt64) COMMENT 'Gas limit for execution payload' CODEC(DoubleDelta, ZSTD(1)), + `execution_payload_gas_used` Nullable(UInt64) COMMENT 'Gas used for execution payload' CODEC(ZSTD(1)), + `execution_payload_transactions_count` Nullable(UInt32) COMMENT 'The transaction count of the execution payload' CODEC(ZSTD(1)), + `execution_payload_transactions_total_bytes` Nullable(UInt32) COMMENT 'The transaction total bytes of the execution payload' CODEC(ZSTD(1)), + `execution_payload_transactions_total_bytes_compressed` Nullable(UInt32) COMMENT 'The transaction total bytes of the execution payload when compressed using snappy' CODEC(ZSTD(1)), + `meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event', + `meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)), + `meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event', + `meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event', + `meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event', + `meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)), + `meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event', + `meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event', + `meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event', + `meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event', + `meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event', + `meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)), + `meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)), + `meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)), + `meta_network_id` Int32 COMMENT 'Ethereum network ID' CODEC(DoubleDelta, ZSTD(1)), + `meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name', + `meta_consensus_version` LowCardinality(String) COMMENT 'Ethereum consensus client version that generated the event', + `meta_consensus_version_major` LowCardinality(String) COMMENT 'Ethereum consensus client major version that generated the event', + `meta_consensus_version_minor` LowCardinality(String) COMMENT 'Ethereum consensus client minor version that generated the event', + `meta_consensus_version_patch` LowCardinality(String) COMMENT 'Ethereum consensus client patch version that generated the event', + `meta_consensus_implementation` LowCardinality(String) COMMENT 'Ethereum consensus client implementation that generated the event', + `meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1)) +) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{installation}/{cluster}/default/tables/{table}/{shard}', + '{replica}', + updated_date_time +) PARTITION BY toStartOfMonth(slot_start_date_time) +ORDER BY +( + slot_start_date_time, + meta_network_name, + meta_client_name +) +COMMENT 'Contains beacon API /eth/v3/validator/blocks/{slot} data from each sentry client attached to a beacon node.'; + +-- Step 2: Create the distributed table. +CREATE TABLE default.beacon_api_eth_v3_validator_block ON CLUSTER '{cluster}' AS default.beacon_api_eth_v3_validator_block_local ENGINE = Distributed( + '{cluster}', + default, + beacon_api_eth_v3_validator_block_local, + cityHash64( + slot_start_date_time, + meta_network_name, + meta_client_name + ) +); diff --git a/docker-compose.yml b/docker-compose.yml index 0d7cb5f6..a72741e5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -293,6 +293,7 @@ services: "beacon-api-eth-v1-beacon-validators" "mev-relay-bid-trace-builder-block-submission" "mev-relay-proposer-payload-delivered" + "beacon-api-eth-v3-validator-block" ) for topic in "$${topics[@]}"; do echo "Creating topic: $$topic"; @@ -565,7 +566,7 @@ services: - ./deploy/local/docker-compose/xatu-cannon.yaml:/etc/cannon/config.yaml networks: - xatu-net - + networks: xatu-net: driver: bridge diff --git a/example_sentry.yaml b/example_sentry.yaml index 3be1ebfa..6fa481cd 100644 --- a/example_sentry.yaml +++ b/example_sentry.yaml @@ -52,6 +52,18 @@ attestationData: beaconCommittees: enabled: true +validatorBlock: + enabled: false + + interval: + enabled: false + every: 30s + + at: + enabled: false + slotTimes: + - 4s + outputs: - name: http-sink type: http diff --git a/pkg/proto/eth/block.go b/pkg/proto/eth/block.go index 2d14bb53..963f6d9d 100644 --- a/pkg/proto/eth/block.go +++ b/pkg/proto/eth/block.go @@ -16,7 +16,7 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) -func NewEventBlockV2FromVersionProposedBeaconBlock(proposal *api.VersionedProposal) (*v2.EventBlockV2, error) { +func NewEventBlockV2FromVersionedProposal(proposal *api.VersionedProposal) (*v2.EventBlockV2, error) { var data *v2.EventBlockV2 switch proposal.Version { diff --git a/pkg/sentry/config.go b/pkg/sentry/config.go index 2228f100..8f6a1600 100644 --- a/pkg/sentry/config.go +++ b/pkg/sentry/config.go @@ -48,8 +48,8 @@ type Config struct { // ProposerDuty configuration ProposerDuty *ProposerDutyConfig `yaml:"proposerDuty" default:"{'enabled': true}"` - // ProposedValidatorBlock configuration - ProposedValidatorBlock *ProposedValidatorBlockConfig `yaml:"proposedValidatorBlock" default:"{'enabled': false}"` + // ValidatorBlock configuration + ValidatorBlock *ValidatorBlockConfig `yaml:"validatorBlock" default:"{'enabled': false}"` // Tracing configuration Tracing observability.TracingConfig `yaml:"tracing"` @@ -169,7 +169,7 @@ func (f *AttestationDataConfig) Validate() error { return nil } -type ProposedValidatorBlockConfig struct { +type ValidatorBlockConfig struct { Enabled bool `yaml:"enabled" default:"false"` Interval struct { @@ -183,7 +183,7 @@ type ProposedValidatorBlockConfig struct { } `yaml:"at"` } -func (f *ProposedValidatorBlockConfig) Validate() error { +func (f *ValidatorBlockConfig) Validate() error { if f.At.Enabled { if len(f.At.SlotTimes) == 0 { return errors.New("at.slotTimes must be provided when at.enabled is true") diff --git a/pkg/sentry/event/beacon/eth/v2/proposed_validator_block.go b/pkg/sentry/event/beacon/eth/v2/proposed_validator_block.go index ddeab64e..28dc9da2 100644 --- a/pkg/sentry/event/beacon/eth/v2/proposed_validator_block.go +++ b/pkg/sentry/event/beacon/eth/v2/proposed_validator_block.go @@ -19,29 +19,29 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) -type ProposedValidatorBlock struct { +type ValidatorBlock struct { log logrus.FieldLogger - snapshot *ProposedValidatorBlockDataSnapshot + snapshot *ValidatorBlockDataSnapshot event *api.VersionedProposal beacon *ethereum.BeaconNode clientMeta *xatu.ClientMeta id uuid.UUID } -type ProposedValidatorBlockDataSnapshot struct { +type ValidatorBlockDataSnapshot struct { RequestAt time.Time RequestDuration time.Duration } -func NewProposedValidatorBlock( +func NewValidatorBlock( log logrus.FieldLogger, event *api.VersionedProposal, - snapshot *ProposedValidatorBlockDataSnapshot, + snapshot *ValidatorBlockDataSnapshot, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta, -) *ProposedValidatorBlock { - return &ProposedValidatorBlock{ - log: log.WithField("event", "BEACON_API_ETH_V3_PROPOSED_VALIDATOR_BLOCK"), +) *ValidatorBlock { + return &ValidatorBlock{ + log: log.WithField("event", "BEACON_API_ETH_V3_VALIDATOR_BLOCK"), event: event, snapshot: snapshot, beacon: beacon, @@ -50,46 +50,51 @@ func NewProposedValidatorBlock( } } -func (e *ProposedValidatorBlock) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error) { - data, err := eth.NewEventBlockV2FromVersionProposedBeaconBlock(e.event) +func (e *ValidatorBlock) Decorate(_ context.Context) (*xatu.DecoratedEvent, error) { + data, err := eth.NewEventBlockV2FromVersionedProposal(e.event) if err != nil { return nil, err } decoratedEvent := &xatu.DecoratedEvent{ Event: &xatu.Event{ - Name: xatu.Event_BEACON_API_ETH_V3_PROPOSED_VALIDATOR_BLOCK, + Name: xatu.Event_BEACON_API_ETH_V3_VALIDATOR_BLOCK, DateTime: timestamppb.New(e.snapshot.RequestAt), Id: e.id.String(), }, Meta: &xatu.Meta{ Client: e.clientMeta, }, - Data: &xatu.DecoratedEvent_EthV3ProposedValidatorBlock{ - EthV3ProposedValidatorBlock: data, + Data: &xatu.DecoratedEvent_EthV3ValidatorBlock{ + EthV3ValidatorBlock: data, }, } - additionalData, err := e.getAdditionalData(ctx) - if err != nil { + if err := e.addAdditionalData(decoratedEvent); err != nil { e.log.WithError(err).Error("Failed to get extra beacon block data") - } else { - decoratedEvent.Meta.Client.AdditionalData = &xatu.ClientMeta_EthV3ProposedValidatorBlock{ - EthV3ProposedValidatorBlock: additionalData, - } } return decoratedEvent, nil } -func (e *ProposedValidatorBlock) ShouldIgnore(_ context.Context) (bool, error) { +func (e *ValidatorBlock) ShouldIgnore(_ context.Context) (bool, error) { return e.event == nil, nil } -func (e *ProposedValidatorBlock) getAdditionalData(_ context.Context) ( - *xatu.ClientMeta_AdditionalEthV3ProposedValidatorBlockData, - error, -) { +func (e *ValidatorBlock) addAdditionalData(decoratedEvent *xatu.DecoratedEvent) error { + additionalData, err := e.getAdditionalData() + if err != nil { + return err + } + + decoratedEvent.Meta.Client.AdditionalData = &xatu.ClientMeta_EthV3ValidatorBlock{ + EthV3ValidatorBlock: additionalData, + } + + return nil +} + +func (e *ValidatorBlock) getAdditionalData() (*xatu.ClientMeta_AdditionalEthV3ValidatorBlockData, error) { proposalSlot, err := e.event.Slot() if err != nil { return nil, err @@ -98,18 +103,12 @@ func (e *ProposedValidatorBlock) getAdditionalData(_ context.Context) ( slot := e.beacon.Metadata().Wallclock().Slots().FromNumber(uint64(proposalSlot)) epoch := e.beacon.Metadata().Wallclock().Epochs().FromSlot(uint64(proposalSlot)) - root, err := e.event.Root() - if err != nil { - e.log.WithError(err).Warn("Failed to get block root") - } - - extra := &xatu.ClientMeta_AdditionalEthV3ProposedValidatorBlockData{ + extra := &xatu.ClientMeta_AdditionalEthV3ValidatorBlockData{ Version: e.event.Version.String(), RequestedAt: timestamppb.New(e.snapshot.RequestAt), RequestDurationMs: &wrapperspb.UInt64Value{ Value: safeUint64FromInt64(e.snapshot.RequestDuration.Milliseconds()), }, - BlockRoot: root.String(), Slot: &xatu.SlotV2{ StartDateTime: timestamppb.New(slot.TimeWindow().Start()), Number: &wrapperspb.UInt64Value{Value: uint64(proposalSlot)}, diff --git a/pkg/sentry/proposed_validator_block.go b/pkg/sentry/proposed_validator_block.go index 3e623e30..5599668c 100644 --- a/pkg/sentry/proposed_validator_block.go +++ b/pkg/sentry/proposed_validator_block.go @@ -12,18 +12,18 @@ import ( v2 "github.com/ethpandaops/xatu/pkg/sentry/event/beacon/eth/v2" ) -func (s *Sentry) startProposedValidatorBlockSchedule(ctx context.Context) error { - if !s.Config.ProposedValidatorBlock.Enabled { +func (s *Sentry) startValidatorBlockSchedule(ctx context.Context) error { + if !s.Config.ValidatorBlock.Enabled { return nil } - if s.Config.ProposedValidatorBlock.Interval.Enabled { - logCtx := s.log.WithField("proccer", "interval").WithField("interval", s.Config.ProposedValidatorBlock.Interval.Every.String()) + if s.Config.ValidatorBlock.Interval.Enabled { + logCtx := s.log.WithField("proccer", "interval").WithField("interval", s.Config.ValidatorBlock.Interval.Every.String()) - if _, err := s.scheduler.Every(s.Config.ProposedValidatorBlock.Interval.Every.Duration).Do(func() { + if _, err := s.scheduler.Every(s.Config.ValidatorBlock.Interval.Every.Duration).Do(func() { logCtx.Debug("Fetching validator beacon block") - if err := s.fetchDecoratedProposedValidatorBlock(ctx); err != nil { + if err := s.fetchDecoratedValidatorBlock(ctx); err != nil { logCtx.WithError(err).Error("Failed to fetch validator beacon block") } }); err != nil { @@ -31,16 +31,16 @@ func (s *Sentry) startProposedValidatorBlockSchedule(ctx context.Context) error } } - if s.Config.ProposedValidatorBlock.At.Enabled { - for _, slotTime := range s.Config.ProposedValidatorBlock.At.SlotTimes { - s.scheduleProposedValidatorBlockFetchingAtSlotTime(ctx, slotTime.Duration) + if s.Config.ValidatorBlock.At.Enabled { + for _, slotTime := range s.Config.ValidatorBlock.At.SlotTimes { + s.scheduleValidatorBlockFetchingAtSlotTime(ctx, slotTime.Duration) } } return nil } -func (s *Sentry) scheduleProposedValidatorBlockFetchingAtSlotTime(ctx context.Context, at time.Duration) { +func (s *Sentry) scheduleValidatorBlockFetchingAtSlotTime(ctx context.Context, at time.Duration) { offset := at logCtx := s.log. @@ -54,14 +54,14 @@ func (s *Sentry) scheduleProposedValidatorBlockFetchingAtSlotTime(ctx context.Co logCtx.WithField("slot", slot.Number()).Debug("Fetching validator beacon block") - if err := s.fetchDecoratedProposedValidatorBlock(ctx); err != nil { + if err := s.fetchDecoratedValidatorBlock(ctx); err != nil { logCtx.WithField("slot_time", offset.String()).WithError(err).Error("Failed to fetch validator beacon block") } }) } -func (s *Sentry) fetchProposedValidatorBlock(ctx context.Context) (*v2.ProposedValidatorBlock, error) { - snapshot := &v2.ProposedValidatorBlockDataSnapshot{RequestAt: time.Now()} +func (s *Sentry) fetchValidatorBlock(ctx context.Context) (*v2.ValidatorBlock, error) { + snapshot := &v2.ValidatorBlockDataSnapshot{RequestAt: time.Now()} slot, _, err := s.beacon.Metadata().Wallclock().Now() if err != nil { @@ -106,11 +106,11 @@ func (s *Sentry) fetchProposedValidatorBlock(ctx context.Context) (*v2.ProposedV snapshot.RequestDuration = time.Since(snapshot.RequestAt) - return v2.NewProposedValidatorBlock(s.log, proposedBlock, snapshot, s.beacon, meta), nil + return v2.NewValidatorBlock(s.log, proposedBlock, snapshot, s.beacon, meta), nil } -func (s *Sentry) fetchDecoratedProposedValidatorBlock(ctx context.Context) error { - fc, err := s.fetchProposedValidatorBlock(ctx) +func (s *Sentry) fetchDecoratedValidatorBlock(ctx context.Context) error { + fc, err := s.fetchValidatorBlock(ctx) if err != nil { return err } diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index afb03984..e60ba5a4 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -547,7 +547,7 @@ func (s *Sentry) Start(ctx context.Context) error { return err } - if err := s.startProposedValidatorBlockSchedule(ctx); err != nil { + if err := s.startValidatorBlockSchedule(ctx); err != nil { return err } diff --git a/pkg/server/service/event-ingester/event/beacon/eth/v2/proposed_validator_block.go b/pkg/server/service/event-ingester/event/beacon/eth/v2/proposed_validator_block.go index 3b185539..220adf38 100644 --- a/pkg/server/service/event-ingester/event/beacon/eth/v2/proposed_validator_block.go +++ b/pkg/server/service/event-ingester/event/beacon/eth/v2/proposed_validator_block.go @@ -11,49 +11,49 @@ import ( ) const ( - ProposedValidatorBlockType = "BEACON_API_ETH_V3_PROPOSED_VALIDATOR_BLOCK" + ValidatorBlockType = "BEACON_API_ETH_V3_VALIDATOR_BLOCK" ) -type ProposedValidatorBlock struct { +type ValidatorBlock struct { log logrus.FieldLogger event *xatu.DecoratedEvent } -func NewProposedValidatorBlock(log logrus.FieldLogger, event *xatu.DecoratedEvent) *ProposedValidatorBlock { - return &ProposedValidatorBlock{ - log: log.WithField("event", ProposedValidatorBlockType), +func NewValidatorBlock(log logrus.FieldLogger, event *xatu.DecoratedEvent) *ValidatorBlock { + return &ValidatorBlock{ + log: log.WithField("event", ValidatorBlockType), event: event, } } -func (b *ProposedValidatorBlock) Type() string { - return ProposedValidatorBlockType +func (b *ValidatorBlock) Type() string { + return ValidatorBlockType } -func (b *ProposedValidatorBlock) Validate(_ context.Context) error { - if _, ok := b.event.Data.(*xatu.DecoratedEvent_EthV3ProposedValidatorBlock); !ok { +func (b *ValidatorBlock) Validate(_ context.Context) error { + if _, ok := b.event.Data.(*xatu.DecoratedEvent_EthV3ValidatorBlock); !ok { return errors.New("failed to cast event data") } return nil } -func (b *ProposedValidatorBlock) Filter(_ context.Context) bool { - data, ok := b.event.Data.(*xatu.DecoratedEvent_EthV3ProposedValidatorBlock) +func (b *ValidatorBlock) Filter(_ context.Context) bool { + data, ok := b.event.Data.(*xatu.DecoratedEvent_EthV3ValidatorBlock) if !ok { b.log.Error("failed to cast event data") return true } - additionalData, ok := b.event.Meta.Client.AdditionalData.(*xatu.ClientMeta_EthV3ProposedValidatorBlock) + additionalData, ok := b.event.Meta.Client.AdditionalData.(*xatu.ClientMeta_EthV3ValidatorBlock) if !ok { b.log.Error("failed to cast client additional data") return true } - version := additionalData.EthV3ProposedValidatorBlock.GetVersion() + version := additionalData.EthV3ValidatorBlock.GetVersion() if version == "" { b.log.Error("failed to get version") @@ -64,15 +64,15 @@ func (b *ProposedValidatorBlock) Filter(_ context.Context) bool { switch version { case "phase0": - hash = data.EthV3ProposedValidatorBlock.Message.(*v2.EventBlockV2_Phase0Block).Phase0Block.StateRoot + hash = data.EthV3ValidatorBlock.Message.(*v2.EventBlockV2_Phase0Block).Phase0Block.StateRoot case "altair": - hash = data.EthV3ProposedValidatorBlock.Message.(*v2.EventBlockV2_AltairBlock).AltairBlock.StateRoot + hash = data.EthV3ValidatorBlock.Message.(*v2.EventBlockV2_AltairBlock).AltairBlock.StateRoot case "bellatrix": - hash = data.EthV3ProposedValidatorBlock.Message.(*v2.EventBlockV2_BellatrixBlock).BellatrixBlock.StateRoot + hash = data.EthV3ValidatorBlock.Message.(*v2.EventBlockV2_BellatrixBlock).BellatrixBlock.StateRoot case "capella": - hash = data.EthV3ProposedValidatorBlock.Message.(*v2.EventBlockV2_CapellaBlock).CapellaBlock.StateRoot + hash = data.EthV3ValidatorBlock.Message.(*v2.EventBlockV2_CapellaBlock).CapellaBlock.StateRoot case "deneb": - hash = data.EthV3ProposedValidatorBlock.Message.(*v2.EventBlockV2_DenebBlock).DenebBlock.StateRoot + hash = data.EthV3ValidatorBlock.Message.(*v2.EventBlockV2_DenebBlock).DenebBlock.StateRoot default: b.log.Error(fmt.Errorf("unknown version: %s", version)) @@ -88,6 +88,6 @@ func (b *ProposedValidatorBlock) Filter(_ context.Context) bool { return false } -func (b *ProposedValidatorBlock) AppendServerMeta(_ context.Context, meta *xatu.ServerMeta) *xatu.ServerMeta { +func (b *ValidatorBlock) AppendServerMeta(_ context.Context, meta *xatu.ServerMeta) *xatu.ServerMeta { return meta } diff --git a/pkg/server/service/event-ingester/event/event.go b/pkg/server/service/event-ingester/event/event.go index 1c17070c..eec98ed7 100644 --- a/pkg/server/service/event-ingester/event/event.go +++ b/pkg/server/service/event-ingester/event/event.go @@ -74,7 +74,7 @@ var ( TypeBeaconETHV1BeaconValidators Type = Type(v1.BeaconValidatorsType) TypeMEVRelayBidTraceBuilderBlockSubmission Type = Type(mevrelay.BidTraceBuilderBlockSubmissionType) TypeMEVRelayProposerPayloadDelivered Type = Type(mevrelay.ProposerPayloadDeliveredType) - TypeBeaconETHV3ProposedValidatorBlock Type = v2.ProposedValidatorBlockType + TypeBeaconETHV3ValidatorBlock Type = v2.ValidatorBlockType ) type Event interface { @@ -255,8 +255,8 @@ func NewEventRouter(log logrus.FieldLogger, cache store.Cache, geoipProvider geo router.RegisterHandler(TypeMEVRelayProposerPayloadDelivered, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { return mevrelay.NewProposerPayloadDelivered(router.log, event), nil }) - router.RegisterHandler(TypeBeaconETHV3ProposedValidatorBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { - return v2.NewProposedValidatorBlock(router.log, event), nil + router.RegisterHandler(TypeBeaconETHV3ValidatorBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewValidatorBlock(router.log, event), nil }) return router