Skip to content

Commit

Permalink
feat: add beacon_api_eth_v3_validator_block
Browse files Browse the repository at this point in the history
  • Loading branch information
mattevans committed Nov 6, 2024
1 parent 4585afb commit 92504b1
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 85 deletions.
23 changes: 21 additions & 2 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ transforms:
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
beacon_api_eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -890,7 +891,7 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
mev_relay_proposer_payload_delivered_kafka:
type: kafka
buffer:
Expand All @@ -906,4 +907,22 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
beacon_api_eth_v3_validator_block_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.beacon_api_eth_v3_validator_block
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: beacon-api-eth-v3-validator-block
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
librdkafka_options:
message.max.bytes: "10485760" # 10MB
122 changes: 115 additions & 7 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ sources:
- "^mev-relay-.+"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
beacon_api_eth_v3_validator_block_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
auto_offset_reset: earliest
group_id: xatu-vector-kafka-clickhouse-beacon-api-eth-v3-validator-block-events
key_field: "event.id"
decoding:
codec: json
topics:
- "beacon-api-eth-v3-validator-block"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
transforms:
xatu_server_events_meta:
type: remap
Expand All @@ -137,6 +149,7 @@ transforms:
- beacon_api_eth_v1_proposer_kafka
- beacon_api_eth_v1_beacon_validators_kafka
- mev_relay_kafka
- beacon_api_eth_v3_validator_block_kafka
source: |-
.meta_client_name = .meta.client.name
.meta_client_id = .meta.client.id
Expand Down Expand Up @@ -355,6 +368,7 @@ transforms:
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -396,6 +410,7 @@ transforms:
- xatu_server_events_router.mempool_transaction_v2
- xatu_server_events_router.mev_relay_bid_trace_builder_block_submission
- xatu_server_events_router.mev_relay_proposer_payload_delivered
- xatu_server_events_router.eth_v3_validator_block
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -1645,14 +1660,14 @@ transforms:
.error_description = "failed to parse block epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.position_in_block = .meta.client.additional_data.position_in_block
.block_root = .meta.client.additional_data.block.root
.validators = .data.validator_indexes
.committee_index = .data.data.index
.beacon_block_root = .data.data.beacon_block_root
.slot = .data.data.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
Expand Down Expand Up @@ -1777,7 +1792,7 @@ transforms:
.slot = .data.slot
.block_number = .data.block_number
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
Expand All @@ -1787,7 +1802,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
Expand Down Expand Up @@ -1838,7 +1853,7 @@ transforms:
del(.meta_consensus_implementation)
del(.meta_network_id)
del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -1869,7 +1884,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
Expand Down Expand Up @@ -1918,11 +1933,83 @@ transforms:
del(.meta_consensus_implementation)
del(.meta_network_id)
del(.event)
del(.meta)
del(.data)
beacon_api_eth_v3_validator_block_formatted:
type: remap
inputs:
- xatu_server_events_router.eth_v3_validator_block
source: |-
# handle message version name pathing and map it back to .data.message
if !exists(.data.message) {
message, err = get(value: .data, path: [.data.version])
if err == null {
.data.message = message
} else {
.error = err
.error_description = "failed to get data.message"
log(., level: "error", rate_limit_secs: 60)
}
cleanedUpData, err = remove(value: .data, path: [.data.version])
if err == null {
.data = cleanedUpData
} else {
.error = err
.error_description = "failed to remove data.message"
log(., level: "error", rate_limit_secs: 60)
}
}
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.slot = .data.message.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.block_version = .meta.client.additional_data.version
.block_total_bytes = .meta.client.additional_data.total_bytes
.block_total_bytes_compressed = .meta.client.additional_data.total_bytes_compressed
.execution_payload_block_number = .data.message.body.execution_payload.block_number
.execution_payload_base_fee_per_gas = .data.message.body.execution_payload.base_fee_per_gas
.execution_payload_blob_gas_used = .data.message.body.execution_payload.blob_gas_used
.execution_payload_excess_blob_gas = .data.message.body.execution_payload.excess_blob_gas
.execution_payload_gas_limit = .data.message.body.execution_payload.gas_limit
.execution_payload_gas_used = .data.message.body.execution_payload.gas_used
.execution_payload_transactions_count = .meta.client.additional_data.transactions_count
.execution_payload_transactions_total_bytes = .meta.client.additional_data.transactions_total_bytes
.execution_payload_transactions_total_bytes_compressed = .meta.client.additional_data.transactions_total_bytes_compressed
.consensus_payload_value = .meta.client.additional_data.consensus_value
.execution_payload_value = .meta.client.additional_data.execution_value
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -2551,3 +2638,24 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: true
beacon_api_eth_v3_validator_block_clickhouse:
type: clickhouse
inputs:
- beacon_api_eth_v3_validator_block_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: beacon_api_eth_v3_validator_block
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false

2 changes: 2 additions & 0 deletions deploy/migrations/clickhouse/053_validator_block.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS beacon_api_eth_v3_validator_block ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS beacon_api_eth_v3_validator_block_local ON CLUSTER '{cluster}' SYNC;
68 changes: 68 additions & 0 deletions deploy/migrations/clickhouse/053_validator_block.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- Step 1: Create the new table.
CREATE TABLE default.beacon_api_eth_v3_validator_block_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the sentry received the event from a beacon node' CODEC(DoubleDelta, ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number within the payload' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The wall clock time when the reorg slot started',
`epoch` UInt32 COMMENT 'The epoch number in the beacon API event stream payload' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The wall clock time when the epoch started' CODEC(DoubleDelta, ZSTD(1)),
`block_version` LowCardinality(String) COMMENT 'The version of the beacon block',
`block_total_bytes` Nullable(UInt32) COMMENT 'The total bytes of the beacon block payload' CODEC(ZSTD(1)),
`block_total_bytes_compressed` Nullable(UInt32) COMMENT 'The total bytes of the beacon block payload when compressed using snappy' CODEC(ZSTD(1)),
`consensus_payload_value` Nullable(UInt64) COMMENT 'Consensus rewards paid to the proposer for this block, in Wei. Use to determine relative value of consensus blocks.' CODEC(ZSTD(1)),
`execution_payload_value` Nullable(UInt64) COMMENT 'Execution payload value in Wei. Use to determine relative value of execution payload.' CODEC(ZSTD(1)),
`execution_payload_block_number` UInt32 COMMENT 'The block number of the execution payload',
`execution_payload_base_fee_per_gas` Nullable(UInt128) COMMENT 'Base fee per gas for execution payload' CODEC(ZSTD(1)),
`execution_payload_blob_gas_used` Nullable(UInt64) COMMENT 'Gas used for blobs in execution payload' CODEC(ZSTD(1)),
`execution_payload_excess_blob_gas` Nullable(UInt64) COMMENT 'Excess gas used for blobs in execution payload' CODEC(ZSTD(1)),
`execution_payload_gas_limit` Nullable(UInt64) COMMENT 'Gas limit for execution payload' CODEC(DoubleDelta, ZSTD(1)),
`execution_payload_gas_used` Nullable(UInt64) COMMENT 'Gas used for execution payload' CODEC(ZSTD(1)),
`execution_payload_transactions_count` Nullable(UInt32) COMMENT 'The transaction count of the execution payload' CODEC(ZSTD(1)),
`execution_payload_transactions_total_bytes` Nullable(UInt32) COMMENT 'The transaction total bytes of the execution payload' CODEC(ZSTD(1)),
`execution_payload_transactions_total_bytes_compressed` Nullable(UInt32) COMMENT 'The transaction total bytes of the execution payload when compressed using snappy' CODEC(ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event',
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event',
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event',
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event',
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event',
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_id` Int32 COMMENT 'Ethereum network ID' CODEC(DoubleDelta, ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_consensus_version` LowCardinality(String) COMMENT 'Ethereum consensus client version that generated the event',
`meta_consensus_version_major` LowCardinality(String) COMMENT 'Ethereum consensus client major version that generated the event',
`meta_consensus_version_minor` LowCardinality(String) COMMENT 'Ethereum consensus client minor version that generated the event',
`meta_consensus_version_patch` LowCardinality(String) COMMENT 'Ethereum consensus client patch version that generated the event',
`meta_consensus_implementation` LowCardinality(String) COMMENT 'Ethereum consensus client implementation that generated the event',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/default/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
meta_client_name
)
COMMENT 'Contains beacon API /eth/v3/validator/blocks/{slot} data from each sentry client attached to a beacon node.';

-- Step 2: Create the distributed table.
CREATE TABLE default.beacon_api_eth_v3_validator_block ON CLUSTER '{cluster}' AS default.beacon_api_eth_v3_validator_block_local ENGINE = Distributed(
'{cluster}',
default,
beacon_api_eth_v3_validator_block_local,
cityHash64(
slot_start_date_time,
meta_network_name,
meta_client_name
)
);
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ services:
"beacon-api-eth-v1-beacon-validators"
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
"beacon-api-eth-v3-validator-block"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down Expand Up @@ -565,7 +566,7 @@ services:
- ./deploy/local/docker-compose/xatu-cannon.yaml:/etc/cannon/config.yaml
networks:
- xatu-net

networks:
xatu-net:
driver: bridge
Expand Down
12 changes: 12 additions & 0 deletions example_sentry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ attestationData:
beaconCommittees:
enabled: true

validatorBlock:
enabled: false

interval:
enabled: false
every: 30s

at:
enabled: false
slotTimes:
- 4s

outputs:
- name: http-sink
type: http
Expand Down
2 changes: 1 addition & 1 deletion pkg/proto/eth/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

func NewEventBlockV2FromVersionProposedBeaconBlock(proposal *api.VersionedProposal) (*v2.EventBlockV2, error) {
func NewEventBlockV2FromVersionedProposal(proposal *api.VersionedProposal) (*v2.EventBlockV2, error) {
var data *v2.EventBlockV2

switch proposal.Version {
Expand Down
8 changes: 4 additions & 4 deletions pkg/sentry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type Config struct {
// ProposerDuty configuration
ProposerDuty *ProposerDutyConfig `yaml:"proposerDuty" default:"{'enabled': true}"`

// ProposedValidatorBlock configuration
ProposedValidatorBlock *ProposedValidatorBlockConfig `yaml:"proposedValidatorBlock" default:"{'enabled': false}"`
// ValidatorBlock configuration
ValidatorBlock *ValidatorBlockConfig `yaml:"validatorBlock" default:"{'enabled': false}"`

// Tracing configuration
Tracing observability.TracingConfig `yaml:"tracing"`
Expand Down Expand Up @@ -169,7 +169,7 @@ func (f *AttestationDataConfig) Validate() error {
return nil
}

type ProposedValidatorBlockConfig struct {
type ValidatorBlockConfig struct {
Enabled bool `yaml:"enabled" default:"false"`

Interval struct {
Expand All @@ -183,7 +183,7 @@ type ProposedValidatorBlockConfig struct {
} `yaml:"at"`
}

func (f *ProposedValidatorBlockConfig) Validate() error {
func (f *ValidatorBlockConfig) Validate() error {
if f.At.Enabled {
if len(f.At.SlotTimes) == 0 {
return errors.New("at.slotTimes must be provided when at.enabled is true")
Expand Down
Loading

0 comments on commit 92504b1

Please sign in to comment.