Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions crates/solana-snapshot-source/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fs::File, path::PathBuf, sync::mpsc};
use std::{collections::HashMap, fs::File, path::PathBuf, sync::mpsc};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -95,23 +95,23 @@ impl SourceTrait for SolanaSnapshotSource {
let filters = self.filters.clone();
let config = self.config.clone();

let (owners, filter_keys): (Vec<_>, Vec<_>) = filters
.parsers_filters
.iter()
.filter_map(|(key, parser_filter)| {
parser_filter
.account
.as_ref()
.map(|accounts| (accounts.owners.clone(), key.clone()))
})
.fold(
(Vec::new(), Vec::new()),
|(mut owners, mut keys), (account_owners, key)| {
owners.extend(account_owners);
keys.push(key);
(owners, keys)
},
);
let mut owner_to_filters: HashMap<yellowstone_vixen_core::Pubkey, Vec<String>> =
HashMap::new();
let mut all_owners = Vec::new();

for (key, parser_filter) in filters.parsers_filters.iter() {
if let Some(account_filter) = parser_filter.account.as_ref() {
for owner in &account_filter.owners {
owner_to_filters
.entry(*owner)
.or_default()
.push(key.clone());
if !all_owners.contains(owner) {
all_owners.push(*owner);
}
}
}
}

let solana_snapshot = SolanaSnapshot::unpack_compressed(config.path.clone())?;

Expand All @@ -138,13 +138,10 @@ impl SourceTrait for SolanaSnapshotSource {
let (sync_tx, sync_rx) = mpsc::channel::<Event>();
let mut account_file_workers = JoinSet::new();

let filter_keys = filter_keys.clone();
let sender_handle = tokio::spawn(async move {
while let Ok(event) = sync_rx.recv() {
match event {
Event::AccountUpdate(account) => {
let filter_keys = filter_keys.clone();

Event::AccountUpdate(account, filter_keys) => {
if let Err(err) = tx
.send(Ok(SubscribeUpdate {
filters: filter_keys,
Expand All @@ -164,7 +161,8 @@ impl SourceTrait for SolanaSnapshotSource {
for AccountFile(path, current_len) in solana_snapshot.accounts {
let sync_tx = sync_tx.clone();
let slot = solana_snapshot.slot;
let owners = owners.clone();
let all_owners = all_owners.clone();
let owner_to_filters = owner_to_filters.clone();

account_file_workers.spawn(async move {
let (accounts, _usize) = AccountsFile::new_from_file(
Expand All @@ -179,21 +177,30 @@ impl SourceTrait for SolanaSnapshotSource {
yellowstone_vixen_core::Pubkey::try_from(account.owner.as_ref())
.expect("Owner address is Pubkey");

if owners.contains(&account_owner) {
let _ = sync_tx.send(Event::AccountUpdate(SubscribeUpdateAccount {
account: Some(SubscribeUpdateAccountInfo {
pubkey: account.pubkey().to_bytes().to_vec(),
lamports: account.lamports,
owner: account.owner.to_bytes().to_vec(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account.data.to_vec(),
write_version: 0,
txn_signature: None,
}),
slot,
is_startup: true,
}));
if all_owners.contains(&account_owner) {
// Get the specific filter keys for this owner
let filter_keys = owner_to_filters
.get(&account_owner)
.cloned()
.unwrap_or_default();

let _ = sync_tx.send(Event::AccountUpdate(
SubscribeUpdateAccount {
account: Some(SubscribeUpdateAccountInfo {
pubkey: account.pubkey().to_bytes().to_vec(),
lamports: account.lamports,
owner: account.owner.to_bytes().to_vec(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account.data.to_vec(),
write_version: 0,
txn_signature: None,
}),
slot,
is_startup: true,
},
filter_keys,
));
}
});
});
Expand All @@ -213,6 +220,6 @@ impl SourceTrait for SolanaSnapshotSource {
}

enum Event {
AccountUpdate(SubscribeUpdateAccount),
AccountUpdate(SubscribeUpdateAccount, Vec<String>),
SnapshotFinished,
}
}