Skip to content

Commit 30d4f60

Browse files
committed
fix(omnievent): replace protobuf-based uuidv5 with cbor for stable ids
1 parent 23f5ed1 commit 30d4f60

12 files changed

Lines changed: 169 additions & 85 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ prost-build = { version = "0.13" }
6060
prost-types = { version = "0.13" }
6161
serde = { version = "1.0", features = ["derive"], default-features = false }
6262
serde_json = "1.0"
63+
serde_cbor = "0.11"
6364

6465
# storage
6566
sqlx = { version = "0.8" }

omnievent/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ prost.workspace = true
3131
prost-types.workspace = true
3232
serde.workspace = true
3333
serde_json.workspace = true
34+
serde_cbor.workspace = true
3435

3536
# storage
3637
sqlx = { workspace = true, features = ["runtime-tokio"], optional = true }
@@ -44,6 +45,7 @@ uuid = { version = "1.0", features = ["v5"] }
4445

4546
[dev-dependencies]
4647
alloy = { workspace = true, features = ["default", "provider-ws", "provider-anvil-node"] }
48+
hex.workspace = true
4749

4850
[build-dependencies]
4951
tonic-build.workspace = true

omnievent/src/event_manager.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use crate::event_manager::listener::{
1313
EventListener, EventListenerHandle, EventReceiverHandleError,
1414
};
1515
use crate::proto_types::EventOccurrenceFilter;
16-
use crate::types::{EventFieldData, EventId, EventOccurrence, ParsedRegisterNewEventRequest};
16+
use crate::types::{
17+
EventFieldData, EventId, EventOccurrence, NewRegisteredEventSpecError,
18+
ParsedRegisterNewEventRequest, RegisteredEventSpec,
19+
};
1720
use alloy::primitives::Address;
1821
use alloy::rpc::types::Log;
1922
use futures_util::stream::SelectAll;
@@ -78,6 +81,9 @@ pub(crate) enum EventManagerError {
7881

7982
#[error("filter error")]
8083
Filter(#[from] FilterError),
84+
85+
#[error("failed to convert event registration into spec")]
86+
EventRegistrationIntoSpec(#[from] NewRegisteredEventSpecError),
8187
}
8288

8389
// export other event_manager's module errors
@@ -146,11 +152,12 @@ where
146152
&self,
147153
req: ParsedRegisterNewEventRequest,
148154
) -> Result<EventId, EventManagerError> {
149-
let event_id = req.id;
150-
let chain_id = req.chain_id;
151-
let address = req.address;
152-
let event_name = req.event_name.clone();
153-
self.internal_register_ethereum_event(req)
155+
let event_spec = RegisteredEventSpec::try_from(req)?;
156+
let event_id = event_spec.id;
157+
let chain_id = event_spec.chain_id;
158+
let address = event_spec.address;
159+
let event_name = event_spec.event_name.clone();
160+
self.internal_register_ethereum_event(event_spec)
154161
.instrument(tracing::info_span!("register_ethereum_event", %event_id, %chain_id, %address, %event_name))
155162
.await
156163
}
@@ -300,7 +307,7 @@ pub(crate) mod tests {
300307
pub(crate) mod test_contracts {
301308
use crate::event_manager::tests::test_contracts::EventEmitter::EventEmitterInstance;
302309
use crate::proto_types::{BlockSafety, EventField, RegisterNewEventRequest};
303-
use crate::types::{ParsedRegisterNewEventRequest, RegisteredEvent};
310+
use crate::types::{ParsedRegisterNewEventRequest, RegisteredEventSpec};
304311
use alloy::network::Network;
305312
use alloy::providers::Provider;
306313

@@ -330,12 +337,12 @@ pub(crate) mod tests {
330337

331338
pub(crate) async fn get_string_registered_event<P, N>(
332339
instance: &EventEmitterInstance<P, N>,
333-
) -> RegisteredEvent
340+
) -> RegisteredEventSpec
334341
where
335342
P: Provider<N>,
336343
N: Network,
337344
{
338-
RegisteredEvent::try_from_req(get_string_register_req(instance).await).unwrap()
345+
RegisteredEventSpec::try_from(get_string_register_req(instance).await).unwrap()
339346
}
340347

341348
pub(crate) async fn get_string_register_req<P, N>(

omnievent/src/event_manager/db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Database used to store events and their occurrences.
22
3-
use crate::types::{EventId, EventOccurrence, RegisteredEvent};
3+
use crate::types::{EventId, EventOccurrence, RegisteredEventSpec};
44

55
pub mod in_memory;
66

@@ -13,7 +13,7 @@ pub trait EventsDatabase {
1313
/// Store an event in the database.
1414
fn store_event(
1515
&self,
16-
event: RegisteredEvent,
16+
event: RegisteredEventSpec,
1717
) -> impl Future<Output = Result<(), Self::Error>> + Send;
1818

1919
/// Store an event occurrence in the database.

omnievent/src/event_manager/db/in_memory.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
//! Non-persistent in-memory database.
22
33
use crate::event_manager::db::EventsDatabase;
4-
use crate::types::{EventId, EventOccurrence, RegisteredEvent};
4+
use crate::types::{EventId, EventOccurrence, RegisteredEventSpec};
55
use itertools::Itertools;
66
use std::collections::HashMap;
77
use std::sync::Arc;
88

99
pub struct InMemoryDatabaseEntry {
1010
#[allow(unused)]
11-
registered_event: RegisteredEvent,
11+
registered_event: RegisteredEventSpec,
1212
occurrences: Vec<EventOccurrence>,
1313
}
1414

@@ -27,7 +27,7 @@ pub(crate) enum InMemoryDatabaseError {
2727
impl EventsDatabase for InMemoryDatabase {
2828
type Error = InMemoryDatabaseError;
2929

30-
async fn store_event(&self, registered_event: RegisteredEvent) -> Result<(), Self::Error> {
30+
async fn store_event(&self, registered_event: RegisteredEventSpec) -> Result<(), Self::Error> {
3131
let mut db = self.0.write().await;
3232
db.0.insert(
3333
registered_event.id,

omnievent/src/event_manager/db/sql/sqlite.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A sqlite-based [`EventsDatabase`]
22
33
use crate::event_manager::db::EventsDatabase;
4-
use crate::types::{BlockInfo, EventId, EventOccurrence, RegisteredEvent};
4+
use crate::types::{BlockInfo, EventId, EventOccurrence, RegisteredEventSpec};
55
use alloy::primitives::Address;
66
use chrono::{DateTime, Utc};
77
use sqlx::{FromRow, QueryBuilder, Row, Sqlite, SqlitePool};
@@ -84,7 +84,7 @@ impl SqliteEventDatabase {
8484
impl EventsDatabase for SqliteEventDatabase {
8585
type Error = SqliteEventDatabaseError;
8686

87-
async fn store_event(&self, event: RegisteredEvent) -> Result<(), Self::Error> {
87+
async fn store_event(&self, event: RegisteredEventSpec) -> Result<(), Self::Error> {
8888
let event_id = Uuid::from(event.id);
8989
let event_chain_id = event.chain_id.to_string();
9090
let event_address = event.address.to_vec();
@@ -248,7 +248,7 @@ mod tests {
248248
use crate::event_manager::db::EventsDatabase;
249249
use crate::event_manager::db::sql::sqlite::SqliteEventDatabase;
250250
use crate::proto_types::BlockSafety;
251-
use crate::types::{BlockInfo, EventId, EventOccurrence, RegisteredEvent};
251+
use crate::types::{BlockInfo, EventId, EventOccurrence, RegisteredEventSpec};
252252
use alloy::primitives::{Address, LogData};
253253

254254
#[tokio::test]
@@ -272,7 +272,7 @@ mod tests {
272272

273273
let res = db
274274
.store_event(
275-
RegisteredEvent::try_new(
275+
RegisteredEventSpec::try_new(
276276
EventId::new(b"test_event"),
277277
0u64,
278278
Address::default(),
@@ -325,7 +325,7 @@ mod tests {
325325

326326
let event_id = EventId::new(b"test_event");
327327
db.store_event(
328-
RegisteredEvent::try_new(
328+
RegisteredEventSpec::try_new(
329329
event_id,
330330
0u64,
331331
Address::default(),
@@ -367,7 +367,7 @@ mod tests {
367367

368368
let event_id = EventId::new(b"test_event");
369369
db.store_event(
370-
RegisteredEvent::try_new(
370+
RegisteredEventSpec::try_new(
371371
event_id,
372372
0u64,
373373
Address::default(),

omnievent/src/event_manager/listener.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Manages event streams and sends back decoded event occurrences.
22
33
use crate::event_manager::DecodedEvent;
4-
use crate::types::{EventFieldData, EventId, RegisteredEvent};
4+
use crate::types::{EventFieldData, EventId, RegisteredEventSpec};
55
use alloy::rpc::types::Log;
66
use futures_util::StreamExt;
77
use futures_util::stream::{BoxStream, SelectAll};
@@ -83,13 +83,13 @@ impl EventListener {
8383
pub(crate) type LogStreamWithId = BoxStream<'static, (EventId, Log)>;
8484

8585
pub(crate) struct InternalEventStreamRegistration {
86-
event: RegisteredEvent,
86+
event: RegisteredEventSpec,
8787
stream: LogStreamWithId,
8888
}
8989

9090
impl InternalEventStreamRegistration {
9191
/// Register a new event stream.
92-
pub fn new(event: RegisteredEvent, stream: LogStreamWithId) -> Self {
92+
pub fn new(event: RegisteredEventSpec, stream: LogStreamWithId) -> Self {
9393
Self { event, stream }
9494
}
9595
}
@@ -217,11 +217,11 @@ pub enum EventLogDecodeError {
217217
/// Try to decode a log with a given event specification and return the decoded fields.
218218
///
219219
/// # Panics
220-
/// Panics if the provided [`RegisteredEvent`] is inconsistent, e.g., the sol_event is inconsistent
220+
/// Panics if the provided [`RegisteredEventSpec`] is inconsistent, e.g., the sol_event is inconsistent
221221
/// with the vector of fields.
222222
fn decode_log(
223223
log: &Log,
224-
event: &RegisteredEvent,
224+
event: &RegisteredEventSpec,
225225
) -> Result<Vec<EventFieldData>, EventLogDecodeError> {
226226
let decoded = event.sol_event.decode_log_data(log.data()).map_err(|e| {
227227
tracing::error!(error = ?e, ?log, ?event, "Failed to decode log with given spec");
@@ -309,7 +309,7 @@ mod tests {
309309
..Default::default()
310310
};
311311

312-
let event = RegisteredEvent::try_new(
312+
let event = RegisteredEventSpec::try_new(
313313
EventId::new(b"RandomnessRequested -- test"),
314314
137,
315315
address,

omnievent/src/event_manager/register.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
use crate::event_manager::db::EventsDatabase;
44
use crate::event_manager::listener::InternalEventStreamRegistration;
55
use crate::event_manager::{EventManager, EventManagerError, RegisteredEventEntry};
6-
use crate::types::{
7-
EventId, NewRegisteredEventError, ParsedRegisterNewEventRequest, RegisteredEvent,
8-
};
6+
use crate::types::{EventId, NewRegisteredEventSpecError, RegisteredEventSpec};
97
use alloy::network::{Ethereum, Network};
108
use alloy::providers::Provider;
119
use alloy::pubsub::SubscriptionStream;
@@ -20,7 +18,7 @@ where
2018
{
2119
pub(super) async fn internal_register_ethereum_event(
2220
&self,
23-
req: ParsedRegisterNewEventRequest,
21+
event_spec: RegisteredEventSpec,
2422
) -> Result<EventId, EventManagerError> {
2523
tracing::debug!("Registering new event");
2624

@@ -30,22 +28,21 @@ where
3028
};
3129

3230
// Do nothing if the event is already registered
33-
let event_id = req.id;
31+
let event_id = event_spec.id;
3432
if self.active_events_map.read().await.contains_key(&event_id) {
3533
tracing::debug!("Event already registered");
3634
return Ok(event_id);
3735
}
3836

39-
let (event, stream) =
40-
create_stream_and_spec::<_, Ethereum>(req, &self.multi_provider).await?;
37+
let stream = create_stream::<_, Ethereum>(&event_spec, &self.multi_provider).await?;
4138

4239
let reg = InternalEventStreamRegistration::new(
43-
event.clone(),
40+
event_spec.clone(),
4441
stream.map(move |l| (event_id, l)).boxed(), // boxing :( but we need type erasure due to the closure
4542
);
4643

4744
// Save the event in the database
48-
if let Err(e) = self.events_db.store_event(event).await {
45+
if let Err(e) = self.events_db.store_event(event_spec).await {
4946
tracing::error!(event = ?event_id, error = ?e, "Failed to store event in database");
5047
Err(EventManagerError::Database(e.into()))?
5148
}
@@ -77,7 +74,7 @@ where
7774
#[derive(thiserror::Error, Debug)]
7875
pub(crate) enum CreateStreamError {
7976
#[error("failed to create event")]
80-
FailedToCreateEvent(#[from] NewRegisteredEventError),
77+
FailedToCreateEvent(#[from] NewRegisteredEventSpecError),
8178

8279
#[error("unsupported chain")]
8380
UnsupportedChain,
@@ -88,25 +85,23 @@ pub(crate) enum CreateStreamError {
8885
),
8986
}
9087

91-
pub(crate) async fn create_stream_and_spec<MP, N>(
92-
req: ParsedRegisterNewEventRequest,
88+
pub(crate) async fn create_stream<MP, N>(
89+
spec: &RegisteredEventSpec,
9390
multi_provider: &MP,
94-
) -> Result<(RegisteredEvent, SubscriptionStream<Log>), CreateStreamError>
91+
) -> Result<SubscriptionStream<Log>, CreateStreamError>
9592
where
9693
MP: MultiChainProvider<u64>,
9794
N: Network,
9895
{
99-
let registered_event = RegisteredEvent::try_from_req(req)?;
100-
10196
// Obtain a provider for the specified chainid and network
102-
let Some(provider) = multi_provider.get_provider::<N>(&registered_event.chain_id) else {
97+
let Some(provider) = multi_provider.get_provider::<N>(&spec.chain_id) else {
10398
Err(CreateStreamError::UnsupportedChain)?
10499
};
105100

106101
// Create a new subscription for the specified event
107102
let stream = provider
108-
.subscribe_logs(&Filter::from(&registered_event))
103+
.subscribe_logs(&Filter::from(spec))
109104
.await?
110105
.into_stream();
111-
Ok((registered_event, stream))
106+
Ok(stream)
112107
}

omnievent/src/proto_types.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,25 @@ mod events {
1616
}
1717
}
1818

19+
impl serde::Serialize for BlockSafety {
20+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
21+
where
22+
S: serde::Serializer,
23+
{
24+
serializer.serialize_str(self.as_str_name())
25+
}
26+
}
27+
28+
impl<'de> serde::Deserialize<'de> for BlockSafety {
29+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
30+
where
31+
D: serde::Deserializer<'de>,
32+
{
33+
let s = String::deserialize(deserializer)?;
34+
Self::from_str_name(&s).ok_or(serde::de::Error::unknown_variant(&s, &[]))
35+
}
36+
}
37+
1938
impl From<alloy::dyn_abi::DynSolValue> for event_data::Value {
2039
fn from(value: alloy::dyn_abi::DynSolValue) -> Self {
2140
match value {

0 commit comments

Comments
 (0)