Skip to content

Commit a611a9e

Browse files
authored
feat(omnievent): repeat chain_id and contract address in event occurrence (#34)
1 parent 4e8291d commit a611a9e

7 files changed

Lines changed: 53 additions & 1 deletion

File tree

omnievent/proto/events.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ message EventOccurrence {
115115

116116
// Event data (decoded parameters)
117117
repeated EventData event_data = 4;
118+
119+
// Chain id
120+
uint64 chain_id = 5;
121+
122+
// Address of the contract
123+
bytes address = 6;
118124
}
119125

120126
// Block information

omnievent/sql/schema.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,18 @@ CREATE TABLE IF NOT EXISTS event_occurrences (
2020
FOREIGN KEY (event_id) REFERENCES registered_events(id) ON DELETE CASCADE
2121
);
2222

23+
-- View that combines event_occurrences with registered_events data
24+
CREATE VIEW event_occurrences_with_context AS
25+
SELECT
26+
occurrence.id,
27+
occurrence.event_id,
28+
occurrence.block_number,
29+
occurrence.block_hash,
30+
occurrence.block_timestamp,
31+
occurrence.raw_log_json,
32+
occurrence.fields_json,
33+
event.chain_id,
34+
event.address,
35+
event.event_name
36+
FROM event_occurrences occurrence
37+
INNER JOIN registered_events event ON occurrence.event_id = event.id;

omnievent/src/event_manager.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::event_manager::listener::{
1414
};
1515
use crate::proto_types::EventOccurrenceFilter;
1616
use crate::types::{EventFieldData, EventId, EventOccurrence, ParsedRegisterNewEventRequest};
17+
use alloy::primitives::Address;
1718
use alloy::rpc::types::Log;
1819
use futures_util::stream::SelectAll;
1920
use std::collections::HashMap;
@@ -29,6 +30,8 @@ const BROADCAST_STREAM_CAPACITY: usize = 64;
2930
#[derive(Clone, Debug)]
3031
pub(crate) struct DecodedEvent {
3132
event_id: EventId,
33+
chain_id: u64,
34+
address: Address,
3235
data: Vec<EventFieldData>,
3336
log: Log,
3437
}

omnievent/src/event_manager/db/sql/sqlite.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use crate::event_manager::db::EventsDatabase;
44
use crate::types::{BlockInfo, EventId, EventOccurrence, RegisteredEvent};
5+
use alloy::primitives::Address;
56
use chrono::{DateTime, Utc};
67
use sqlx::{FromRow, QueryBuilder, Row, Sqlite, SqlitePool};
78
use std::str::FromStr;
@@ -169,7 +170,7 @@ impl EventsDatabase for SqliteEventDatabase {
169170
};
170171

171172
let mut query_builder: QueryBuilder<Sqlite> =
172-
QueryBuilder::new("SELECT * FROM event_occurrences WHERE event_id IN (");
173+
QueryBuilder::new("SELECT * FROM event_occurrences_with_context WHERE event_id IN (");
173174
let mut separated = query_builder.separated(", ");
174175
for id in event_ids {
175176
separated.push_bind(Uuid::from(id));
@@ -195,12 +196,23 @@ impl From<(sqlx::Error, &'static str)> for SqliteEventDatabaseError {
195196
impl<'r> FromRow<'r, sqlx::sqlite::SqliteRow> for EventOccurrence {
196197
fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
197198
let event_id: Uuid = row.try_get("event_id")?;
199+
let address: Vec<u8> = row.try_get("address")?;
200+
let chain_id_str: String = row.try_get("chain_id")?;
198201
let block_number_str: String = row.try_get("block_number")?;
199202
let block_hash: Vec<u8> = row.try_get("block_hash")?;
200203
let block_timestamp: DateTime<Utc> = row.try_get("block_timestamp")?;
201204
let raw_log_json: String = row.try_get("raw_log_json")?;
202205
let fields_json: String = row.try_get("fields_json")?;
203206

207+
let address =
208+
Address::try_from(address.as_slice()).map_err(|e| sqlx::Error::ColumnDecode {
209+
index: "address".to_owned(),
210+
source: Box::new(e),
211+
})?;
212+
let chain_id = u64::from_str(&chain_id_str).map_err(|e| sqlx::Error::ColumnDecode {
213+
index: "chain_id".to_owned(),
214+
source: Box::new(e),
215+
})?;
204216
let block_number =
205217
u64::from_str(&block_number_str).map_err(|e| sqlx::Error::ColumnDecode {
206218
index: "block_number".to_owned(),
@@ -218,6 +230,8 @@ impl<'r> FromRow<'r, sqlx::sqlite::SqliteRow> for EventOccurrence {
218230

219231
Ok(Self {
220232
event_id: event_id.into(),
233+
address,
234+
chain_id,
221235
block_info: BlockInfo {
222236
number: block_number,
223237
hash: block_hash.into(),
@@ -285,6 +299,8 @@ mod tests {
285299
let res = db
286300
.store_event_occurrence(EventOccurrence {
287301
event_id: EventId::new(b"invalid event id"),
302+
address: Default::default(),
303+
chain_id: 0,
288304
data: vec![],
289305
raw_log: LogData::empty(),
290306
block_info: BlockInfo {
@@ -325,6 +341,8 @@ mod tests {
325341
let res = db
326342
.store_event_occurrence(EventOccurrence {
327343
event_id,
344+
address: Default::default(),
345+
chain_id: 0,
328346
data: vec![],
329347
raw_log: LogData::empty(),
330348
block_info: BlockInfo {
@@ -364,6 +382,8 @@ mod tests {
364382

365383
let occurrence = EventOccurrence {
366384
event_id,
385+
address: Default::default(),
386+
chain_id: 0,
367387
data: vec![],
368388
raw_log: LogData::empty(),
369389
block_info: BlockInfo {

omnievent/src/event_manager/events_occurrence.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ fn event_occurrence_from_decoded_event(decoded_event: DecodedEvent) -> EventOccu
121121

122122
EventOccurrence {
123123
event_id: decoded_event.event_id,
124+
chain_id: decoded_event.chain_id,
125+
address: decoded_event.address,
124126
raw_log: decoded_event.log.data().to_owned(),
125127
data: decoded_event.data,
126128
block_info: BlockInfo {

omnievent/src/event_manager/listener.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ impl EventListener {
194194
// Send decoded event through channel
195195
if sender.send(DecodedEvent {
196196
event_id,
197+
address: event.address,
198+
chain_id: event.chain_id,
197199
data: decoded_fields,
198200
log,
199201
}).await.is_err() {

omnievent/src/types.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ impl From<BlockInfo> for proto_types::BlockInfo {
258258
#[derive(Clone, Debug, PartialEq)]
259259
pub struct EventOccurrence {
260260
pub event_id: EventId,
261+
pub chain_id: u64,
262+
pub address: Address,
261263
pub block_info: BlockInfo,
262264
pub raw_log: LogData,
263265
pub data: Vec<EventFieldData>,
@@ -277,6 +279,8 @@ impl From<EventOccurrence> for proto_types::EventOccurrence {
277279

278280
Self {
279281
event_uuid: event.event_id.into(),
282+
address: event.address.to_vec().into(),
283+
chain_id: event.chain_id,
280284
event_data: data,
281285
raw_log_data: Some(event.raw_log.data.into()),
282286
block_info: Some(event.block_info.into()),

0 commit comments

Comments
 (0)