1- use std:: { fs:: File , path:: PathBuf , sync:: mpsc} ;
1+ use std:: { collections :: HashMap , fs:: File , path:: PathBuf , sync:: mpsc} ;
22
33use async_trait:: async_trait;
44use serde:: { Deserialize , Serialize } ;
@@ -95,23 +95,23 @@ impl SourceTrait for SolanaSnapshotSource {
9595 let filters = self . filters . clone ( ) ;
9696 let config = self . config . clone ( ) ;
9797
98- let ( owners , filter_keys ) : ( Vec < _ > , Vec < _ > ) = filters
99- . parsers_filters
100- . iter ( )
101- . filter_map ( | ( key , parser_filter ) | {
102- parser_filter
103- . account
104- . as_ref ( )
105- . map ( |accounts| ( accounts . owners . clone ( ) , key . clone ( ) ) )
106- } )
107- . fold (
108- ( Vec :: new ( ) , Vec :: new ( ) ) ,
109- | ( mut owners , mut keys ) , ( account_owners , key ) | {
110- owners . extend ( account_owners ) ;
111- keys . push ( key ) ;
112- ( owners , keys )
113- } ,
114- ) ;
98+ let mut owner_to_filters : HashMap < yellowstone_vixen_core :: Pubkey , Vec < String > > =
99+ HashMap :: new ( ) ;
100+ let mut all_owners = Vec :: new ( ) ;
101+
102+ for ( key , parser_filter ) in filters . parsers_filters . iter ( ) {
103+ if let Some ( account_filter ) = parser_filter . account . as_ref ( ) {
104+ for owner in & account_filter . owners {
105+ owner_to_filters
106+ . entry ( * owner )
107+ . or_default ( )
108+ . push ( key . clone ( ) ) ;
109+ if !all_owners . contains ( owner ) {
110+ all_owners . push ( * owner ) ;
111+ }
112+ }
113+ }
114+ }
115115
116116 let solana_snapshot = SolanaSnapshot :: unpack_compressed ( config. path . clone ( ) ) ?;
117117
@@ -138,13 +138,10 @@ impl SourceTrait for SolanaSnapshotSource {
138138 let ( sync_tx, sync_rx) = mpsc:: channel :: < Event > ( ) ;
139139 let mut account_file_workers = JoinSet :: new ( ) ;
140140
141- let filter_keys = filter_keys. clone ( ) ;
142141 let sender_handle = tokio:: spawn ( async move {
143142 while let Ok ( event) = sync_rx. recv ( ) {
144143 match event {
145- Event :: AccountUpdate ( account) => {
146- let filter_keys = filter_keys. clone ( ) ;
147-
144+ Event :: AccountUpdate ( account, filter_keys) => {
148145 if let Err ( err) = tx
149146 . send ( Ok ( SubscribeUpdate {
150147 filters : filter_keys,
@@ -164,7 +161,8 @@ impl SourceTrait for SolanaSnapshotSource {
164161 for AccountFile ( path, current_len) in solana_snapshot. accounts {
165162 let sync_tx = sync_tx. clone ( ) ;
166163 let slot = solana_snapshot. slot ;
167- let owners = owners. clone ( ) ;
164+ let all_owners = all_owners. clone ( ) ;
165+ let owner_to_filters = owner_to_filters. clone ( ) ;
168166
169167 account_file_workers. spawn ( async move {
170168 let ( accounts, _usize) = AccountsFile :: new_from_file (
@@ -179,21 +177,30 @@ impl SourceTrait for SolanaSnapshotSource {
179177 yellowstone_vixen_core:: Pubkey :: try_from ( account. owner . as_ref ( ) )
180178 . expect ( "Owner address is Pubkey" ) ;
181179
182- if owners. contains ( & account_owner) {
183- let _ = sync_tx. send ( Event :: AccountUpdate ( SubscribeUpdateAccount {
184- account : Some ( SubscribeUpdateAccountInfo {
185- pubkey : account. pubkey ( ) . to_bytes ( ) . to_vec ( ) ,
186- lamports : account. lamports ,
187- owner : account. owner . to_bytes ( ) . to_vec ( ) ,
188- executable : account. executable ,
189- rent_epoch : account. rent_epoch ,
190- data : account. data . to_vec ( ) ,
191- write_version : 0 ,
192- txn_signature : None ,
193- } ) ,
194- slot,
195- is_startup : true ,
196- } ) ) ;
180+ if all_owners. contains ( & account_owner) {
181+ // Get the specific filter keys for this owner
182+ let filter_keys = owner_to_filters
183+ . get ( & account_owner)
184+ . cloned ( )
185+ . unwrap_or_default ( ) ;
186+
187+ let _ = sync_tx. send ( Event :: AccountUpdate (
188+ SubscribeUpdateAccount {
189+ account : Some ( SubscribeUpdateAccountInfo {
190+ pubkey : account. pubkey ( ) . to_bytes ( ) . to_vec ( ) ,
191+ lamports : account. lamports ,
192+ owner : account. owner . to_bytes ( ) . to_vec ( ) ,
193+ executable : account. executable ,
194+ rent_epoch : account. rent_epoch ,
195+ data : account. data . to_vec ( ) ,
196+ write_version : 0 ,
197+ txn_signature : None ,
198+ } ) ,
199+ slot,
200+ is_startup : true ,
201+ } ,
202+ filter_keys,
203+ ) ) ;
197204 }
198205 } ) ;
199206 } ) ;
@@ -213,6 +220,6 @@ impl SourceTrait for SolanaSnapshotSource {
213220}
214221
215222enum Event {
216- AccountUpdate ( SubscribeUpdateAccount ) ,
223+ AccountUpdate ( SubscribeUpdateAccount , Vec < String > ) ,
217224 SnapshotFinished ,
218- }
225+ }
0 commit comments