33use crate :: event_manager:: db:: EventsDatabase ;
44use crate :: event_manager:: listener:: InternalEventStreamRegistration ;
55use crate :: event_manager:: { EventManager , EventManagerError , RegisteredEventEntry } ;
6- use crate :: types:: {
7- EventId , NewRegisteredEventError , ParsedRegisterNewEventRequest , RegisteredEvent ,
8- } ;
6+ use crate :: types:: { EventId , NewRegisteredEventSpecError , RegisteredEventSpec } ;
97use alloy:: network:: { Ethereum , Network } ;
108use alloy:: providers:: Provider ;
119use alloy:: pubsub:: SubscriptionStream ;
2018{
2119 pub ( super ) async fn internal_register_ethereum_event (
2220 & self ,
23- req : ParsedRegisterNewEventRequest ,
21+ event_spec : RegisteredEventSpec ,
2422 ) -> Result < EventId , EventManagerError > {
2523 tracing:: debug!( "Registering new event" ) ;
2624
@@ -30,22 +28,21 @@ where
3028 } ;
3129
3230 // Do nothing if the event is already registered
33- let event_id = req . id ;
31+ let event_id = event_spec . id ;
3432 if self . active_events_map . read ( ) . await . contains_key ( & event_id) {
3533 tracing:: debug!( "Event already registered" ) ;
3634 return Ok ( event_id) ;
3735 }
3836
39- let ( event, stream) =
40- create_stream_and_spec :: < _ , Ethereum > ( req, & self . multi_provider ) . await ?;
37+ let stream = create_stream :: < _ , Ethereum > ( & event_spec, & self . multi_provider ) . await ?;
4138
4239 let reg = InternalEventStreamRegistration :: new (
43- event . clone ( ) ,
40+ event_spec . clone ( ) ,
4441 stream. map ( move |l| ( event_id, l) ) . boxed ( ) , // boxing :( but we need type erasure due to the closure
4542 ) ;
4643
4744 // Save the event in the database
48- if let Err ( e) = self . events_db . store_event ( event ) . await {
45+ if let Err ( e) = self . events_db . store_event ( event_spec ) . await {
4946 tracing:: error!( event = ?event_id, error = ?e, "Failed to store event in database" ) ;
5047 Err ( EventManagerError :: Database ( e. into ( ) ) ) ?
5148 }
7774#[ derive( thiserror:: Error , Debug ) ]
7875pub ( crate ) enum CreateStreamError {
7976 #[ error( "failed to create event" ) ]
80- FailedToCreateEvent ( #[ from] NewRegisteredEventError ) ,
77+ FailedToCreateEvent ( #[ from] NewRegisteredEventSpecError ) ,
8178
8279 #[ error( "unsupported chain" ) ]
8380 UnsupportedChain ,
@@ -88,25 +85,23 @@ pub(crate) enum CreateStreamError {
8885 ) ,
8986}
9087
91- pub ( crate ) async fn create_stream_and_spec < MP , N > (
92- req : ParsedRegisterNewEventRequest ,
88+ pub ( crate ) async fn create_stream < MP , N > (
89+ spec : & RegisteredEventSpec ,
9390 multi_provider : & MP ,
94- ) -> Result < ( RegisteredEvent , SubscriptionStream < Log > ) , CreateStreamError >
91+ ) -> Result < SubscriptionStream < Log > , CreateStreamError >
9592where
9693 MP : MultiChainProvider < u64 > ,
9794 N : Network ,
9895{
99- let registered_event = RegisteredEvent :: try_from_req ( req) ?;
100-
10196 // Obtain a provider for the specified chainid and network
102- let Some ( provider) = multi_provider. get_provider :: < N > ( & registered_event . chain_id ) else {
97+ let Some ( provider) = multi_provider. get_provider :: < N > ( & spec . chain_id ) else {
10398 Err ( CreateStreamError :: UnsupportedChain ) ?
10499 } ;
105100
106101 // Create a new subscription for the specified event
107102 let stream = provider
108- . subscribe_logs ( & Filter :: from ( & registered_event ) )
103+ . subscribe_logs ( & Filter :: from ( spec ) )
109104 . await ?
110105 . into_stream ( ) ;
111- Ok ( ( registered_event , stream) )
106+ Ok ( stream)
112107}
0 commit comments