Skip to content

Commit e91a046

Browse files
authored
feat(omnievent): module to listen for events from multiple chains (#26)
* omnievent: create new workspace crate * feat(omnievent): integrate proto build in omnievent * feat(superalloy): create a MultiProvider trait and structs * feat(omnievent): add event occurrence proto def * fix(omnievent): fix occurence type in proto def * feat(omnievent): add rpc to register event * feat(omnievent): add rpc to unregister event * feat(omnievent): add rpc to list registered events * fix(omnievent): replace string with bytes for uuid in unregister event rpc * feat(omnievent): add rpc to stream events * feat(omnievent): add rpc to get historical event occurences * feat(omnievent): rename EventOccurrence registered_uuid to event_uuid in proto def * chore(omnievent-rpc): remove safety_level from BlockInfo * feat(superalloy): add blanket implementation of MultiChainProvider for Arc<MP> where MP: MultiChainProvider * fix(omnievent/rpc): change abi_bytes type to bytes * feat(superalloy/multiprovider): make MultiProvider Send + Sync * feat(omnievent): create grpc service stub * feat(omnievent): create events.proto-like rusty types * feat(omnievent): implement multi-event listener * feat(omnievent): database trait to store events * feat(omnievent): non-persistent in-memory database * feat(omnievent): switch to uuid v5 * refactor(omnievent): rework various types * tests(omnievent): log decode & listener decoding test * chore(omnievent): clippy fix in memory db * feat(omnievent): implement event manager for registration and fordwarding events * tests(omnievent): omnievent registration & event stream test * tests(omnievent): multi chain event manager * feat(omnievent): add event log * feat(omnievent): register/stream events grpc implementation * chore(omnievent): rename EventStreamId to EventId * fix(omnievent/rpc): change event_uuids type to bytes * fix(omnievent/rpc): rename events -> occurrences * feat(omnievent): historical events grpc implementation * chore(omnievent): clippy recommendations * feat(omnievent): do nothing if event has already been registered * fix(omnievent/grpc): rename events_uuids -> event_uuids * docs(omnievent): add comment on event occurence stream deletion * docs(omnievent): expand on when decode_log panics
1 parent 32976e0 commit e91a046

17 files changed

Lines changed: 2120 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = [
44
"blocklock-agent",
55
"dcipher-agents",
66
"dsigner",
7+
"omnievent",
78
"onlyswaps-verifier",
89
"randomness-agent",
910
"superalloy"
@@ -36,6 +37,7 @@ sha3 = "0.10"
3637
futures = "0.3"
3738
futures-util = "0.3"
3839
tokio = "1.44"
40+
tokio-stream = "0.1"
3941
tokio-util = "0.7"
4042

4143
# logs / metrics
@@ -46,7 +48,14 @@ tracing-subscriber = { version = "0.3" }
4648
# network
4749
libp2p = { version = "0.55", features = ["serde"] }
4850

51+
# api / rpc
52+
tonic = { version = "0.13" }
53+
tonic-build = { version = "0.13" }
54+
4955
# serde
56+
prost = { version = "0.13" }
57+
prost-build = { version = "0.13" }
58+
prost-types = { version = "0.13" }
5059
serde = { version = "1.0", features = ["derive"], default-features = false }
5160
serde_json = "1.0"
5261

omnievent/Cargo.toml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
[package]
2+
name = "omnievent"
3+
version.workspace = true
4+
edition.workspace = true
5+
6+
[dependencies]
7+
alloy = { workspace = true, features = ["default", "provider-ws"] }
8+
superalloy.workspace = true
9+
10+
# async
11+
futures.workspace = true
12+
futures-util.workspace = true
13+
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
14+
tokio-stream.workspace = true
15+
tokio-util.workspace = true
16+
17+
# logs / metrics
18+
tracing.workspace = true
19+
prometheus.workspace = true
20+
21+
# api / rpc
22+
tonic = { version = "0.13" }
23+
24+
# serde
25+
prost.workspace = true
26+
prost-types.workspace = true
27+
28+
# misc
29+
itertools.workspace = true
30+
thiserror.workspace = true
31+
uuid = { version = "1.0", features = ["v5"] }
32+
33+
[dev-dependencies]
34+
alloy = { workspace = true, features = ["default", "provider-ws", "provider-anvil-node"] }
35+
36+
[build-dependencies]
37+
tonic-build.workspace = true

omnievent/build.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
fn main() -> std::io::Result<()> {
2+
tonic_build::configure()
3+
.bytes(["."])
4+
.compile_protos(&["proto/events.proto"], &["proto/"])?;
5+
Ok(())
6+
}

omnievent/proto/events.proto

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
syntax = "proto3";
2+
3+
package events;
4+
5+
import "google/protobuf/empty.proto";
6+
import "google/protobuf/timestamp.proto";
7+
8+
service OmniEventService {
9+
// Register a new event
10+
rpc RegisterEvent(RegisterNewEventRequest) returns (RegisterNewEventResponse);
11+
12+
// Unregister an event
13+
rpc UnregisterEvent(UnregisterEventRequest) returns (google.protobuf.Empty);
14+
15+
// List registered events
16+
rpc ListRegisteredEvents(ListRegisteredEventsRequest) returns (ListRegisteredEventsResponse);
17+
18+
// Stream events as they occur
19+
rpc StreamEvents(StreamEventsRequest) returns (stream EventOccurrence);
20+
21+
// Get historical events (batch request)
22+
rpc GetHistoricalEvents(GetHistoricalEventsRequest) returns (GetHistoricalEventsResponse);
23+
}
24+
25+
// Request message for registering a new event
26+
message RegisterNewEventRequest {
27+
// Chain ID
28+
uint64 chain_id = 1;
29+
30+
// Ethereum contract address (20 bytes) - what contract we're watching
31+
bytes address = 2;
32+
33+
// Event name - what event we're watching for
34+
string event_name = 3;
35+
36+
// Event fields - the structure of the event
37+
repeated EventField fields = 4;
38+
39+
// Block safety level - how we want to handle block finality
40+
BlockSafety block_safety = 5;
41+
}
42+
43+
// Response after registering a new event
44+
message RegisterNewEventResponse {
45+
// Uuid uniquely representing the event
46+
bytes uuid = 1;
47+
}
48+
49+
// Request to unregister an event
50+
message UnregisterEventRequest {
51+
// Uuid uniquely representing the event
52+
bytes uuid = 1;
53+
}
54+
55+
// Request to obtain a list of registered events
56+
message ListRegisteredEventsRequest {
57+
// Empty for now, add filters (chain id, address, custom tags, etc) in the future
58+
}
59+
60+
// Request to obtain a list of registered events
61+
message ListRegisteredEventsResponse {
62+
repeated RegisteredEvent events = 1;
63+
}
64+
65+
message StreamEventsRequest {
66+
// List of events to listen to
67+
repeated bytes event_uuids = 1;
68+
}
69+
70+
// Request for historical events
71+
message GetHistoricalEventsRequest {
72+
// Uuid uniquely representing the event
73+
repeated bytes event_uuids = 1;
74+
75+
// Block range (required for historical queries)
76+
BlockRange block_range = 2;
77+
78+
// Add filters (chain id, address, custom tags, values, etc) in the future
79+
}
80+
81+
// Response with historical events
82+
message GetHistoricalEventsResponse {
83+
// List of event occurrences
84+
repeated EventOccurrence occurrences = 1;
85+
}
86+
87+
// Represents a registered event
88+
message RegisteredEvent {
89+
// Uuid uniquely representing the event
90+
bytes event_uuid = 1;
91+
92+
// Chain ID
93+
uint64 chain_id = 2;
94+
95+
// Contract address
96+
bytes address = 3;
97+
98+
// Event name
99+
string event_name = 4;
100+
101+
// Event fields
102+
repeated EventField fields = 5;
103+
104+
// Block safety level
105+
BlockSafety block_safety = 6;
106+
}
107+
108+
message EventOccurrence {
109+
// Uuid uniquely representing the registered event
110+
bytes event_uuid = 1;
111+
112+
// Block information
113+
BlockInfo block_info = 2;
114+
115+
// Raw log data (for advanced use cases)
116+
optional bytes raw_log_data = 3;
117+
118+
// Event data (decoded parameters)
119+
repeated EventData event_data = 4;
120+
}
121+
122+
// Block information
123+
message BlockInfo {
124+
// Block number
125+
uint64 block_number = 1;
126+
127+
// Block hash
128+
bytes block_hash = 2;
129+
130+
// Block timestamp
131+
google.protobuf.Timestamp timestamp = 3;
132+
}
133+
134+
// Represents one of the field of a solidity event
135+
message EventField {
136+
// Solidity type as string (e.g., "address", "uint256", "bytes32", "uint256[]")
137+
string sol_type = 1;
138+
139+
// Whether the field is indexed
140+
bool indexed = 2;
141+
}
142+
143+
// Represents decoded event data
144+
message EventData {
145+
// Solidity type as string (e.g., "address", "uint256", "bytes32", "uint256[]")
146+
string sol_type = 1;
147+
148+
// Whether this parameter was indexed
149+
bool indexed = 2;
150+
151+
// The actual value (encoded as appropriate type)
152+
oneof value {
153+
string string_value = 10;
154+
string int_hex_value = 11;
155+
bool bool_value = 12;
156+
bytes bytes_value = 13;
157+
bytes address_value = 14;
158+
// For other types
159+
bytes abi_bytes = 19;
160+
}
161+
}
162+
163+
// Block safety levels
164+
enum BlockSafety {
165+
BLOCK_SAFETY_LATEST = 0;
166+
BLOCK_SAFETY_FINALIZED = 1;
167+
BLOCK_SAFETY_SAFE = 2;
168+
}
169+
170+
// Block range for queries
171+
message BlockRange {
172+
// Start block (inclusive)
173+
uint64 from_block = 1;
174+
175+
// Optional end block (exclusive)
176+
// if not set, relies on the event's block safety or latest.
177+
optional uint64 to_block = 2;
178+
}

0 commit comments

Comments
 (0)