Skip to content

Commit 4e8291d

Browse files
azixusCluEleSsUK
andauthored
feat: various new omnievent features (#33)
* feat(omnievent): properly cleanup grpc stream * feat(omnievent): historical event occurrences filtering * refactor(omnievent): move events filtering to new module * feat(omnievent): add time measurement to filter * feat(omnievent): add rusty type for BlockInfo * feat(sql): events database for sqlite implementation * docs(omnievent): various doc comments * tests(omnievent/sqlite): add various tests * docs(omnievent): various docs update * chore: update cargo.lock * chore(omnievent): add .sqlx queries * chore: mark in_memory module as public * feat(omnievent): impl From<&RegisterNewEventRequest> for uuid::Uuid * fix(omnievent/sql): use varchar(20) for u64 integers * fix(omnievent/grpc): rename abi to abi_bytes * Update omnievent/src/event_manager/db/sql/sqlite.rs Co-authored-by: PM <3749956+CluEleSsUK@users.noreply.github.com> * docs(omnievent): add readme --------- Co-authored-by: PM <3749956+CluEleSsUK@users.noreply.github.com>
1 parent 792fbd2 commit 4e8291d

17 files changed

Lines changed: 1642 additions & 57 deletions

Cargo.lock

Lines changed: 594 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,14 @@ prost-types = { version = "0.13" }
5959
serde = { version = "1.0", features = ["derive"], default-features = false }
6060
serde_json = "1.0"
6161

62+
# storage
63+
sqlx = { version = "0.8" }
64+
6265
# misc
6366
anyhow = "1.0"
6467
base64 = "0.22"
68+
bytes = "1.10"
69+
chrono = "0.4"
6570
clap = { version = "4.5", features = ["derive", "env"] }
6671
figment = { version = "0.10" }
6772
hex = "0.4"

omnievent/.sqlx/query-1ed67d9efd3904696b7a687bdb094f8d50f9dbb9a05769880c3f7b8ce66a0285.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

omnievent/.sqlx/query-7d637919d9e2e9efe2772c5e52cc53caf6e666671341eb0a67235836ac8b48c3.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

omnievent/Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@ name = "omnievent"
33
version.workspace = true
44
edition.workspace = true
55

6+
[features]
7+
sqlite = ["sql", "sqlx/sqlite", "sqlx/uuid", "sqlx/chrono"]
8+
sql = ["dep:sqlx"]
9+
timings = []
10+
611
[dependencies]
7-
alloy = { workspace = true, features = ["default", "provider-ws"] }
12+
alloy = { workspace = true, features = ["default", "provider-ws", "serde"] }
813
superalloy.workspace = true
914

1015
# async
@@ -24,8 +29,15 @@ tonic = { version = "0.13" }
2429
# serde
2530
prost.workspace = true
2631
prost-types.workspace = true
32+
serde.workspace = true
33+
serde_json.workspace = true
34+
35+
# storage
36+
sqlx = { workspace = true, features = ["runtime-tokio"], optional = true }
2737

2838
# misc
39+
bytes.workspace = true
40+
chrono.workspace = true
2941
itertools.workspace = true
3042
thiserror.workspace = true
3143
uuid = { version = "1.0", features = ["v5"] }

omnievent/README.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# omnievent - a multi chain event listener
2+
3+
## gRPC
4+
5+
We support various operations through gRPC: register events, obtain a stream of upcoming event occurrences, and fetch historical event occurrences.
6+
7+
One easy way to interact with the server with a cli is to rely on [`grpcurl`](https://github.com/fullstorydev/grpcurl).
8+
9+
### Register a new event
10+
To register an event, you need to provide a chain id, a contract address, a requested block safety level, the name of the event and its fields.
11+
12+
This can be done with the following command:
13+
```bash
14+
> grpcurl -import-path ./proto -proto events.proto -plaintext -d '{"chain_id": 1337, "address": "IO7wOMg7eg81fUq8ZLj2OUJ9evY=", "event_name": "StringEmitted", "fields": [{"sol_type": "string", "indexed": false}], "block_safety": "BLOCK_SAFETY_LATEST" }' 127.0.0.1:8089 events.OmniEventService/RegisterEvent
15+
{
16+
"uuid": "ijWGFy9LUq+s2fJASjY7VQ=="
17+
}
18+
```
19+
20+
It returns a deterministic `uuid` v5 which is obtained from the protobuf encoding of the registration request.
21+
22+
### Stream event occurrences
23+
Upcoming event occurrences can be streamed as followed by specifying the event identifier:
24+
```bash
25+
> grpcurl -import-path ./proto -proto events.proto -plaintext -d '{"event_uuids": ["ijWGFy9LUq+s2fJASjY7VQ=="]}' 127.0.0.1:8089 events.OmniEventService/StreamEvents
26+
{
27+
"eventUuid": "ijWGFy9LUq+s2fJASjY7VQ==",
28+
"blockInfo": {
29+
"blockNumber": "2",
30+
"blockHash": "Ecr/C4bYj5zC69VO1nEuptsgPEOlJpxQwY3St5nQ5p4=",
31+
"timestamp": "2025-07-07T16:50:51Z"
32+
},
33+
"rawLogData": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADEhlbGxvIFdvcmxkIQAAAAAAAAAAAAAAAAAAAAAAAAAA",
34+
"eventData": [
35+
{
36+
"solType": "string",
37+
"stringValue": "Hello World!"
38+
}
39+
]
40+
}
41+
```
42+
43+
### Obtain historical event occurrences
44+
To obtain past event occurrences with filtering, the following command may be used:
45+
```bash
46+
> grpcurl -import-path ./proto -proto events.proto -plaintext -d '{"event_uuids": ["ijWGFy9LUq+s2fJASjY7VQ=="], "filter": {"data_filters": [{"data_index": 0, "string": {"exact_values": ["Hello World!"]}}]}}' 127.0.0.1:8089 events.OmniEventService/GetHistoricalEvents
47+
{
48+
"occurrences": [
49+
{
50+
"eventUuid": "ijWGFy9LUq+s2fJASjY7VQ==",
51+
"blockInfo": {
52+
"blockNumber": "2",
53+
"blockHash": "Ecr/C4bYj5zC69VO1nEuptsgPEOlJpxQwY3St5nQ5p4=",
54+
"timestamp": "2025-07-07T16:50:51Z"
55+
},
56+
"rawLogData": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADEhlbGxvIFdvcmxkIQAAAAAAAAAAAAAAAAAAAAAAAAAA",
57+
"eventData": [
58+
{
59+
"solType": "string",
60+
"stringValue": "Hello World!"
61+
}
62+
]
63+
}
64+
]
65+
}
66+
```
67+
68+
## Compilation
69+
Run the server: TODO
70+
71+
Due to a compile-time verification of SQL queries, you will need to specify a database url as follows when compiling the code:
72+
`DATABASE_URL=sqlite:///tmp/temp.db cargo build --all-features --all-targets`

omnievent/proto/events.proto

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,8 @@ message GetHistoricalEventsRequest {
7272
// Uuid uniquely representing the event
7373
repeated bytes event_uuids = 1;
7474

75-
// Block range (required for historical queries)
76-
BlockRange block_range = 2;
77-
78-
// Add filters (chain id, address, custom tags, values, etc) in the future
75+
// Filter the event occurrences.
76+
EventOccurrenceFilter filter = 2;
7977
}
8078

8179
// Response with historical events
@@ -167,12 +165,61 @@ enum BlockSafety {
167165
BLOCK_SAFETY_SAFE = 2;
168166
}
169167

170-
// Block range for queries
171-
message BlockRange {
168+
// Filter for an event occurrence that requires all conditions to be met
169+
message EventOccurrenceFilter {
170+
optional BlockFilter block_filter = 1;
171+
172+
repeated OccurrenceDataFilter data_filters = 2;
173+
}
174+
175+
// A filter for blocks
176+
message BlockFilter {
172177
// Start block (inclusive)
173-
uint64 from_block = 1;
178+
optional uint64 from_block = 1;
174179

175-
// Optional end block (exclusive)
176-
// if not set, relies on the event's block safety or latest.
180+
// End block (exclusive)
177181
optional uint64 to_block = 2;
178182
}
183+
184+
// Filters the data from an event occurrence
185+
message OccurrenceDataFilter {
186+
uint32 data_index = 1;
187+
188+
// A filter depending on the data type.
189+
oneof filter {
190+
StringDataFilter string = 10;
191+
UintDataFilter uint = 11;
192+
IntDataFilter int = 12;
193+
BoolDataFilter bool = 13;
194+
BytesDataFilter bytes = 14;
195+
AddressDataFilter address = 15;
196+
// For other types
197+
BytesDataFilter abi_bytes = 19;
198+
}
199+
}
200+
201+
message StringDataFilter {
202+
repeated string exact_values = 1;
203+
}
204+
205+
message IntDataFilter {
206+
// Hex-encoded 256 bits signed integers
207+
repeated string exact_hex_values = 1;
208+
}
209+
210+
message UintDataFilter {
211+
// Hex-encoded 256 bits unsigned integers
212+
repeated string exact_hex_values = 1;
213+
}
214+
215+
message BoolDataFilter {
216+
bool exact_value = 1;
217+
}
218+
219+
message BytesDataFilter {
220+
repeated bytes exact_values = 1;
221+
}
222+
223+
message AddressDataFilter {
224+
repeated bytes exact_values = 1;
225+
}

omnievent/sql/schema.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- Registered events table
2+
CREATE TABLE IF NOT EXISTS registered_events (
3+
id UUID PRIMARY KEY NOT NULL,
4+
chain_id VARCHAR(20) NOT NULL, -- can't completely store u64 in INTEGER. 20 digits for int repr.
5+
address BLOB NOT NULL,
6+
block_safety INTEGER NOT NULL,
7+
event_name TEXT NOT NULL,
8+
fields_json TEXT NOT NULL -- json encoded fields
9+
);
10+
11+
-- Event occurrences table
12+
CREATE TABLE IF NOT EXISTS event_occurrences (
13+
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
14+
event_id UUID NOT NULL,
15+
block_number VARCHAR(20) NOT NULL, -- can't completely store u64 in INTEGER. 20 digits for int repr.
16+
block_hash BYTES NOT NULL,
17+
block_timestamp DATETIME NOT NULL,
18+
raw_log_json TEXT NOT NULL,
19+
fields_json TEXT NOT NULL, -- json encoded fields
20+
FOREIGN KEY (event_id) REFERENCES registered_events(id) ON DELETE CASCADE
21+
);
22+

omnievent/src/event_manager.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
pub mod db;
55
mod events_occurrence;
6+
mod filtering;
67
pub(crate) mod listener;
78
mod register;
89

@@ -11,6 +12,7 @@ use crate::event_manager::events_occurrence::HandleEventsOccurrenceTask;
1112
use crate::event_manager::listener::{
1213
EventListener, EventListenerHandle, EventReceiverHandleError,
1314
};
15+
use crate::proto_types::EventOccurrenceFilter;
1416
use crate::types::{EventFieldData, EventId, EventOccurrence, ParsedRegisterNewEventRequest};
1517
use alloy::rpc::types::Log;
1618
use futures_util::stream::SelectAll;
@@ -70,9 +72,14 @@ pub(crate) enum EventManagerError {
7072

7173
#[error("database error")]
7274
Database(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
75+
76+
#[error("filter error")]
77+
Filter(#[from] FilterError),
7378
}
7479

75-
// export [`CreateStreamError`] since it's used in [`EventManagerError`]
80+
// export other event_manager's module errors
81+
use crate::event_manager::filtering::filter_occurrences;
82+
pub(crate) use filtering::FilterError;
7683
pub(crate) use register::CreateStreamError;
7784

7885
impl<MP, DB> EventManager<MP, DB>
@@ -191,26 +198,85 @@ where
191198

192199
pub(crate) async fn get_ethereum_multi_event_stream(
193200
&self,
194-
events_id: impl IntoIterator<Item = EventId>,
201+
event_ids: impl IntoIterator<Item = EventId>,
195202
) -> Result<SelectAll<BroadcastStream<EventOccurrence>>, EventManagerError> {
196203
// TODO: n locks, not great, improve
197204
let streams = futures::future::try_join_all(
198-
events_id
205+
event_ids
199206
.into_iter()
200207
.map(|id| self.get_ethereum_event_stream(id)),
201208
)
202209
.await?;
203210
Ok(futures::stream::select_all(streams))
204211
}
205212

213+
pub(crate) async fn unregister_ethereum_multi_event_stream(
214+
&self,
215+
event_ids: impl IntoIterator<Item = EventId>,
216+
stream: SelectAll<BroadcastStream<EventOccurrence>>, // take ownership to make sure that it's dropped properly
217+
) {
218+
// Drop the stream (i.e., the receiver side of the broadcast channel)
219+
drop(stream);
220+
221+
let mut active_events_map = self.active_events_map.write().await;
222+
for event_id in event_ids {
223+
let _span =
224+
tracing::info_span!("unregister_ethereum_multi_event_stream", event_id = ?event_id)
225+
.entered();
226+
227+
let Some(entry) = active_events_map.get_mut(&event_id) else {
228+
tracing::error!("Attempting to unregister a stream not in active_events_map");
229+
continue;
230+
};
231+
232+
let Some(outgoing_stream) = &entry.outgoing_stream else {
233+
tracing::debug!("Attempting to unregister a stream that was already removed");
234+
continue;
235+
};
236+
237+
// If there are no more receivers, remove the outgoing stream
238+
let rx_count = outgoing_stream.receiver_count();
239+
if rx_count == 0 {
240+
tracing::debug!("Removing broadcast channel");
241+
entry.outgoing_stream = None;
242+
} else {
243+
tracing::debug!(
244+
rx_count,
245+
"Leaving broadcast channel open due to remaining receivers"
246+
);
247+
}
248+
}
249+
}
250+
251+
/// Get a vector of historical event occurrences, optionally with filtering enabled.
252+
/// TODO: Currently, the function uses a very naive approach to filtering. It fetches every events
253+
/// from the database, and then applies filters on top. This needs to be reworked once we have
254+
/// more concrete usage, to know which filters are the most important so that they can be offloaded
255+
/// to the database implementation.
206256
pub(crate) async fn get_historical_event_occurrences(
207257
&self,
208258
event_ids: impl IntoIterator<Item = EventId> + Send,
259+
filter: Option<EventOccurrenceFilter>,
209260
) -> Result<Vec<EventOccurrence>, EventManagerError> {
210-
self.events_db
261+
let mut occurrences = self
262+
.events_db
211263
.get_event_occurrences(event_ids)
212264
.await
213-
.map_err(|e| EventManagerError::Database(Box::new(e)))
265+
.map_err(|e| EventManagerError::Database(Box::new(e)))?;
266+
267+
tracing::debug!(
268+
n_occurrences = occurrences.len(),
269+
"Obtained occurrences from database"
270+
);
271+
272+
if let Some(filter) = filter {
273+
occurrences = filter_occurrences(occurrences, filter).map_err(|e| {
274+
tracing::info!(error = ?e, "Failed to apply filters to occurrences");
275+
e
276+
})?;
277+
}
278+
279+
Ok(occurrences)
214280
}
215281
}
216282

omnievent/src/event_manager/db.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
33
use crate::types::{EventId, EventOccurrence, RegisteredEvent};
44

5-
pub(crate) mod in_memory;
5+
pub mod in_memory;
6+
7+
#[cfg(feature = "sql")]
8+
pub mod sql;
69

710
pub trait EventsDatabase {
811
type Error: std::error::Error + Send + Sync + 'static;

0 commit comments

Comments
 (0)