diff --git a/crates/solana-snapshot-source/src/lib.rs b/crates/solana-snapshot-source/src/lib.rs index de2f52b4..8db702de 100644 --- a/crates/solana-snapshot-source/src/lib.rs +++ b/crates/solana-snapshot-source/src/lib.rs @@ -1,4 +1,4 @@ -use std::{fs::File, path::PathBuf, sync::mpsc}; +use std::{collections::HashMap, fs::File, path::PathBuf, sync::mpsc}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -95,23 +95,23 @@ impl SourceTrait for SolanaSnapshotSource { let filters = self.filters.clone(); let config = self.config.clone(); - let (owners, filter_keys): (Vec<_>, Vec<_>) = filters - .parsers_filters - .iter() - .filter_map(|(key, parser_filter)| { - parser_filter - .account - .as_ref() - .map(|accounts| (accounts.owners.clone(), key.clone())) - }) - .fold( - (Vec::new(), Vec::new()), - |(mut owners, mut keys), (account_owners, key)| { - owners.extend(account_owners); - keys.push(key); - (owners, keys) - }, - ); + let mut owner_to_filters: HashMap> = + HashMap::new(); + let mut all_owners = Vec::new(); + + for (key, parser_filter) in filters.parsers_filters.iter() { + if let Some(account_filter) = parser_filter.account.as_ref() { + for owner in &account_filter.owners { + owner_to_filters + .entry(*owner) + .or_default() + .push(key.clone()); + if !all_owners.contains(owner) { + all_owners.push(*owner); + } + } + } + } let solana_snapshot = SolanaSnapshot::unpack_compressed(config.path.clone())?; @@ -138,13 +138,10 @@ impl SourceTrait for SolanaSnapshotSource { let (sync_tx, sync_rx) = mpsc::channel::(); let mut account_file_workers = JoinSet::new(); - let filter_keys = filter_keys.clone(); let sender_handle = tokio::spawn(async move { while let Ok(event) = sync_rx.recv() { match event { - Event::AccountUpdate(account) => { - let filter_keys = filter_keys.clone(); - + Event::AccountUpdate(account, filter_keys) => { if let Err(err) = tx .send(Ok(SubscribeUpdate { filters: filter_keys, @@ -164,7 +161,8 @@ impl SourceTrait for SolanaSnapshotSource { for AccountFile(path, current_len) in solana_snapshot.accounts { let sync_tx = sync_tx.clone(); let slot = solana_snapshot.slot; - let owners = owners.clone(); + let all_owners = all_owners.clone(); + let owner_to_filters = owner_to_filters.clone(); account_file_workers.spawn(async move { let (accounts, _usize) = AccountsFile::new_from_file( @@ -179,21 +177,30 @@ impl SourceTrait for SolanaSnapshotSource { yellowstone_vixen_core::Pubkey::try_from(account.owner.as_ref()) .expect("Owner address is Pubkey"); - if owners.contains(&account_owner) { - let _ = sync_tx.send(Event::AccountUpdate(SubscribeUpdateAccount { - account: Some(SubscribeUpdateAccountInfo { - pubkey: account.pubkey().to_bytes().to_vec(), - lamports: account.lamports, - owner: account.owner.to_bytes().to_vec(), - executable: account.executable, - rent_epoch: account.rent_epoch, - data: account.data.to_vec(), - write_version: 0, - txn_signature: None, - }), - slot, - is_startup: true, - })); + if all_owners.contains(&account_owner) { + // Get the specific filter keys for this owner + let filter_keys = owner_to_filters + .get(&account_owner) + .cloned() + .unwrap_or_default(); + + let _ = sync_tx.send(Event::AccountUpdate( + SubscribeUpdateAccount { + account: Some(SubscribeUpdateAccountInfo { + pubkey: account.pubkey().to_bytes().to_vec(), + lamports: account.lamports, + owner: account.owner.to_bytes().to_vec(), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: account.data.to_vec(), + write_version: 0, + txn_signature: None, + }), + slot, + is_startup: true, + }, + filter_keys, + )); } }); }); @@ -213,6 +220,6 @@ impl SourceTrait for SolanaSnapshotSource { } enum Event { - AccountUpdate(SubscribeUpdateAccount), + AccountUpdate(SubscribeUpdateAccount, Vec), SnapshotFinished, -} +} \ No newline at end of file