Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CHVER=25.5.10
CHVER=25.5.10
10 changes: 8 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
FROM golang:1.25 AS builder
WORKDIR /src
COPY go.sum go.mod ./

# Copy sibling dependencies that are referenced in go.mod replace directives
COPY tysm/ /tysm/
COPY hermes/ /hermes/

# Copy xatu project files
COPY xatu/go.sum xatu/go.mod ./
RUN go mod download
COPY . .
COPY xatu/ .
RUN go build -o /bin/app .

FROM ubuntu:latest
Expand Down
17 changes: 17 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ transforms:
libp2p_trace_gossipsub_aggregate_and_proof: .event.name == "LIBP2P_TRACE_GOSSIPSUB_AGGREGATE_AND_PROOF"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
libp2p_trace_gossipsub_data_column_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_DATA_COLUMN_SIDECAR"
libp2p_trace_rpc_data_column_custody_probe: .event.name == "LIBP2P_TRACE_RPC_DATA_COLUMN_CUSTODY_PROBE"
libp2p_trace_synthetic_heartbeat: .event.name == "LIBP2P_TRACE_SYNTHETIC_HEARTBEAT"
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
Expand Down Expand Up @@ -1298,6 +1299,22 @@ sinks:
key_field: "event.id"
topic: node-record-execution
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_rpc_data_column_custody_probe_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_rpc_data_column_custody_probe
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-rpc-data-column-custody-probe
compression: snappy
healthcheck:
enabled: true
encoding:
Expand Down
138 changes: 137 additions & 1 deletion deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ sources:
codec: json
topics:
- "^libp2p-trace.+"
- "^libp2p-rpc.+"
auto_offset_reset: earliest
librdkafka_options:
message.max.bytes: "10485760" # 10MB
Expand Down Expand Up @@ -226,6 +227,7 @@ transforms:
libp2p_trace_gossipsub_aggregate_and_proof: .event.name == "LIBP2P_TRACE_GOSSIPSUB_AGGREGATE_AND_PROOF"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
libp2p_trace_gossipsub_data_column_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_DATA_COLUMN_SIDECAR"
libp2p_trace_rpc_data_column_custody_probe: .event.name == "LIBP2P_TRACE_RPC_DATA_COLUMN_CUSTODY_PROBE"
libp2p_trace_synthetic_heartbeat: .event.name == "LIBP2P_TRACE_SYNTHETIC_HEARTBEAT"
libp2p_trace_rpc_meta_control_ihave: .event.name == "LIBP2P_TRACE_RPC_META_CONTROL_IHAVE"
libp2p_trace_rpc_meta_control_iwant: .event.name == "LIBP2P_TRACE_RPC_META_CONTROL_IWANT"
Expand Down Expand Up @@ -259,6 +261,7 @@ transforms:
- xatu_server_events_router.libp2p_trace_gossipsub_aggregate_and_proof
- xatu_server_events_router.libp2p_trace_gossipsub_blob_sidecar
- xatu_server_events_router.libp2p_trace_gossipsub_data_column_sidecar
- xatu_server_events_router.libp2p_trace_rpc_data_column_custody_probe
- xatu_server_events_router.libp2p_trace_synthetic_heartbeat
- xatu_server_events_router.libp2p_trace_rpc_meta_control_ihave
- xatu_server_events_router.libp2p_trace_rpc_meta_control_iwant
Expand Down Expand Up @@ -987,7 +990,7 @@ transforms:
.error_description = "failed to split topic"
} else {
if length(topicParts) != 5 {
errDebug = {
.errDebug = {
"topic": .meta.client.additional_data.topic,
}
.error_description = "failed to split topic"
Expand Down Expand Up @@ -1416,6 +1419,119 @@ transforms:
del(.data)
del(.path)

libp2p_trace_rpc_data_column_custody_probe_formatted:
type: remap
inputs:
- xatu_server_events_router.libp2p_trace_rpc_data_column_custody_probe
source: |-
.updated_date_time = to_unix_timestamp(now())
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

# Extract from .data
.slot = .data.slot
.epoch = .data.epoch

if .data.column_index != null {
.column_index = .data.column_index
}

if .data.column_rows_count != null {
.column_rows_count = .data.column_rows_count
}

if .data.beacon_block_root != null {
.beacon_block_root = .data.beacon_block_root
}

if .data.result != null {
.result = .data.result
}

if .data.response_time_ms != null {
.response_time_ms = .data.response_time_ms
}

if .data.error != null {
.error = .data.error
}

# Extract timestamp fields from metadata
if exists(.meta.client.additional_data.slot.start_date_time) {
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
}

if exists(.meta.client.additional_data.epoch.start_date_time) {
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
}

# Extract wallclock fields from metadata
if exists(.meta.client.additional_data.wallclock_slot.number) {
.wallclock_request_slot = .meta.client.additional_data.wallclock_slot.number
}

if exists(.meta.client.additional_data.wallclock_slot.start_date_time) {
wallclock_request_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_request_slot_start_date_time = to_unix_timestamp(wallclock_request_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock request slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
}

if exists(.meta.client.additional_data.wallclock_epoch.number) {
.wallclock_request_epoch = .meta.client.additional_data.wallclock_epoch.number
}

if exists(.meta.client.additional_data.wallclock_epoch.start_date_time) {
wallclock_request_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_request_epoch_start_date_time = to_unix_timestamp(wallclock_request_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock request epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
}

# Use peer_id_unique_key from metadata
if exists(.meta.client.additional_data.metadata.peer_id) {
peer_id_key, err = to_string(.meta.client.additional_data.metadata.peer_id) + to_string(.meta_network_name)
if err != null {
.error = err
.error_description = "failed to generate peer id unique key"
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
}

del(.event)
del(.meta)
del(.data)
del(.path)

libp2p_trace_handle_status_formatted:
type: remap
inputs:
Expand Down Expand Up @@ -2798,6 +2914,26 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_rpc_data_column_custody_probe_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_rpc_data_column_custody_probe_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_rpc_data_column_custody_probe
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 10485760
max_events: 10000
timeout_secs: 5
buffer:
max_events: 10000
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_synthetic_heartbeat_clickhouse:
type: clickhouse
inputs:
Expand Down
2 changes: 1 addition & 1 deletion deploy/local/docker-compose/xatu-server.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
logging: "info" # panic,fatal,warn,info,debug,trace
logging: "trace" # panic,fatal,warn,info,debug,trace
addr: ":8080"
metricsAddr: ":9090"
# pprofAddr: ":6060" # optional. if supplied it enables pprof server
Expand Down
2 changes: 2 additions & 0 deletions deploy/migrations/clickhouse/079_custody_probe.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS libp2p_rpc_data_column_custody_probe ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS libp2p_rpc_data_column_custody_probe_local ON CLUSTER '{cluster}';
85 changes: 85 additions & 0 deletions deploy/migrations/clickhouse/079_custody_probe.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
-- Create local table (actual storage)
CREATE TABLE IF NOT EXISTS libp2p_rpc_data_column_custody_probe_local ON CLUSTER '{cluster}' (
-- Timestamps
updated_date_time DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) COMMENT 'When the probe was executed' CODEC(DoubleDelta, ZSTD(1)),

-- Probe identifiers
slot UInt32 COMMENT 'Slot number being probed' CODEC(DoubleDelta, ZSTD(1)),
slot_start_date_time DateTime COMMENT 'The wall clock time when the slot started' CODEC(DoubleDelta, ZSTD(1)),
epoch UInt32 COMMENT 'Epoch number of the slot being probed' CODEC(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime COMMENT 'The wall clock time when the epoch started' CODEC(DoubleDelta, ZSTD(1)),

wallclock_request_slot UInt32 COMMENT 'The wallclock slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
wallclock_request_slot_start_date_time DateTime COMMENT 'The start time for the slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
wallclock_request_epoch UInt32 COMMENT 'The wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
wallclock_request_epoch_start_date_time DateTime COMMENT 'The start time for the wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),

-- Column information
column_index UInt64 COMMENT 'Column index being probed' CODEC(ZSTD(1)),
column_rows_count UInt16 COMMENT 'Number of rows in the column' CODEC(ZSTD(1)),
beacon_block_root FixedString(66) COMMENT 'Root of the beacon block' CODEC(ZSTD(1)),

-- Peer information
peer_id_unique_key Int64 COMMENT 'Unique key associated with the identifier of the peer',

-- Probe results
result LowCardinality(String) COMMENT 'Result of the probe' CODEC(ZSTD(1)),
response_time_ms Int32 COMMENT 'Response time in milliseconds' CODEC(ZSTD(1)),
error Nullable(String) COMMENT 'Error message if probe failed' CODEC(ZSTD(1)),

-- Standard metadata fields
meta_client_name LowCardinality(String) COMMENT 'Name of the client that executed the probe',
meta_client_id String COMMENT 'Unique Session ID of the client' CODEC(ZSTD(1)),
meta_client_version LowCardinality(String) COMMENT 'Version of the client',
meta_client_implementation LowCardinality(String) COMMENT 'Implementation of the client',
meta_client_os LowCardinality(String) COMMENT 'Operating system of the client',
meta_client_ip Nullable(IPv6) COMMENT 'IP address of the client' CODEC(ZSTD(1)),
meta_client_geo_city LowCardinality(String) COMMENT 'City of the client' CODEC(ZSTD(1)),
meta_client_geo_country LowCardinality(String) COMMENT 'Country of the client' CODEC(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) COMMENT 'Country code of the client' CODEC(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) COMMENT 'Continent code of the client' CODEC(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) COMMENT 'Longitude of the client' CODEC(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) COMMENT 'Latitude of the client' CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) COMMENT 'Autonomous system number of the client' CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) COMMENT 'Autonomous system organization of the client' CODEC(ZSTD(1)),
meta_network_id Int32 COMMENT 'Ethereum network ID' CODEC(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String) COMMENT 'Ethereum network name',
meta_consensus_version LowCardinality(String) COMMENT 'Ethereum consensus client version',
meta_consensus_version_major LowCardinality(String) COMMENT 'Ethereum consensus client major version',
meta_consensus_version_minor LowCardinality(String) COMMENT 'Ethereum consensus client minor version',
meta_consensus_version_patch LowCardinality(String) COMMENT 'Ethereum consensus client patch version',
meta_consensus_implementation LowCardinality(String) COMMENT 'Ethereum consensus client implementation',
meta_labels Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
)
PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY (
slot_start_date_time,
meta_network_name,
meta_client_name,
peer_id_unique_key,
slot,
column_index
)
COMMENT 'Contains custody probe events for data column availability verification';

-- Create distributed table (query interface)
CREATE TABLE IF NOT EXISTS libp2p_rpc_data_column_custody_probe ON CLUSTER '{cluster}'
AS libp2p_rpc_data_column_custody_probe_local
ENGINE = Distributed(
'{cluster}',
default,
libp2p_rpc_data_column_custody_probe_local,
cityHash64(
slot_start_date_time,
meta_network_name,
meta_client_name,
peer_id_unique_key,
slot,
column_index
)
);
9 changes: 5 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ services:
container_name: xatu-server
hostname: xatu-server
build:
context: .
dockerfile: Dockerfile
context: ..
dockerfile: xatu/Dockerfile
ports:
- "${XATU_SERVER_ADDRESS:-0.0.0.0}:${XATU_SERVER_PORT:-8080}:8080"
# environment:
Expand Down Expand Up @@ -325,6 +325,7 @@ services:
"libp2p-trace-rpc-meta-control-ihave"
"libp2p-trace-rpc-meta-subscription"
"libp2p-trace-rpc-meta-message"
"libp2p-trace-rpc-data-column-custody-probe"
"beacon-api-eth-v1-beacon-validators"
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
Expand Down Expand Up @@ -613,8 +614,8 @@ services:
container_name: xatu-cannon
hostname: xatu-cannon
build:
context: .
dockerfile: Dockerfile
context: ..
dockerfile: xatu/Dockerfile
environment:
# Default
CANNON_XATU_COORDINATOR_AUTHORIZATION: ${CANNON_XATU_COORDINATOR_AUTHORIZATION:-Bearer super_secret}
Expand Down
Loading
Loading