Skip to content

Commit b116ffd

Browse files
committed
feat : add block subscriber to vixen and fix few bugs
1 parent 241a4f7 commit b116ffd

File tree

9 files changed

+238
-28
lines changed

9 files changed

+238
-28
lines changed

crates/core/src/lib.rs

Lines changed: 149 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ use std::{
2626

2727
use serde::Deserialize;
2828
use yellowstone_grpc_proto::geyser::{
29-
self, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta,
30-
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdateAccount,
31-
SubscribeUpdateBlockMeta, SubscribeUpdateSlot, SubscribeUpdateTransaction,
29+
self, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
30+
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
31+
SubscribeRequestFilterTransactions, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
32+
SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdateSlot,
33+
SubscribeUpdateTransaction,
3234
};
3335

3436
pub extern crate bs58;
@@ -62,10 +64,14 @@ pub type ParseResult<T> = Result<T, ParseError>;
6264

6365
/// An account update from Yellowstone.
6466
pub type AccountUpdate = SubscribeUpdateAccount;
67+
/// An account update from Yellowstone.
68+
pub type AccountUpdateInfo = SubscribeUpdateAccountInfo;
6569
/// A transaction update from Yellowstone.
6670
pub type TransactionUpdate = SubscribeUpdateTransaction;
6771
/// A block meta update from Yellowstone.
6872
pub type BlockMetaUpdate = SubscribeUpdateBlockMeta;
73+
/// A block update from Yellowstone.
74+
pub type BlockUpdate = SubscribeUpdateBlock;
6975
/// A slot update from Yellowstone.
7076
pub type SlotUpdate = SubscribeUpdateSlot;
7177

@@ -158,6 +164,8 @@ pub struct Prefilter {
158164
pub transaction: Option<TransactionPrefilter>,
159165
/// Filters for block meta updates.
160166
pub block_meta: Option<BlockMetaPrefilter>,
167+
/// Filters for block updates.
168+
pub block: Option<BlockPrefilter>,
161169
/// Filters for slot updates.
162170
pub slot: Option<SlotPrefilter>,
163171
}
@@ -182,11 +190,13 @@ impl Prefilter {
182190
account,
183191
transaction,
184192
block_meta,
193+
block,
185194
slot,
186195
} = self;
187196
merge_opt(account, other.account, AccountPrefilter::merge);
188197
merge_opt(transaction, other.transaction, TransactionPrefilter::merge);
189198
merge_opt(block_meta, other.block_meta, BlockMetaPrefilter::merge);
199+
merge_opt(block, other.block, BlockPrefilter::merge);
190200
merge_opt(slot, other.slot, SlotPrefilter::merge);
191201
}
192202
}
@@ -259,6 +269,36 @@ impl BlockMetaPrefilter {
259269
pub fn merge(_lhs: &mut Self, _rhs: Self) {}
260270
}
261271

272+
/// A prefilter for matching block updates.
273+
#[derive(Debug, Default, Clone, PartialEq)]
274+
pub struct BlockPrefilter {
275+
/// filter transactions and accounts that use any account from the list
276+
pub accounts_include: HashSet<Pubkey>,
277+
/// include all transactions
278+
pub include_transactions: bool,
279+
/// include all account updates
280+
pub include_accounts: bool,
281+
/// include all entries
282+
pub include_entries: bool,
283+
}
284+
285+
impl BlockPrefilter {
286+
/// Merge another block prefilter into this one.
287+
pub fn merge(&mut self, other: BlockPrefilter) {
288+
let Self {
289+
accounts_include,
290+
include_transactions,
291+
include_accounts,
292+
include_entries,
293+
} = self;
294+
295+
accounts_include.extend(other.accounts_include);
296+
*include_accounts = other.include_accounts;
297+
*include_transactions = other.include_transactions;
298+
*include_entries = other.include_entries;
299+
}
300+
}
301+
262302
/// A prefilter for matching slot updates updates.
263303
#[derive(Debug, Default, Clone, PartialEq, Copy)]
264304
pub struct SlotPrefilter {}
@@ -467,7 +507,19 @@ pub struct PrefilterBuilder {
467507
error: Option<PrefilterError>,
468508
slots: bool,
469509
block_metas: bool,
510+
/// Matching [`BlockPrefilter::accounts`]
511+
block_accounts_include: Option<HashSet<Pubkey>>,
512+
/// Matching [`BlockPrefilter::include_accounts`]
513+
block_include_accounts: bool,
514+
/// Matching [`BlockPrefilter::include_transactions`]
515+
block_include_transactions: bool,
516+
/// Matching [`BlockPrefilter::include_entries`]
517+
block_include_entries: bool,
518+
/// Including all accounts
519+
accounts_include_all: bool,
520+
/// Matching [`AccountPrefilter::accounts`]
470521
accounts: Option<HashSet<Pubkey>>,
522+
/// Matching [`AccountPrefilter::account_owners`]
471523
account_owners: Option<HashSet<Pubkey>>,
472524
/// Matching [`TransactionPrefilter::accounts_include`]
473525
transaction_accounts_include: Option<HashSet<Pubkey>>,
@@ -504,21 +556,26 @@ impl PrefilterBuilder {
504556
pub fn build(self) -> Result<Prefilter, PrefilterError> {
505557
let PrefilterBuilder {
506558
error,
559+
accounts_include_all,
507560
accounts,
508561
account_owners,
509562
slots,
510563
block_metas,
564+
block_accounts_include,
565+
block_include_accounts,
566+
block_include_entries,
567+
block_include_transactions,
511568
transaction_accounts_include,
512569
transaction_accounts_required,
513570
} = self;
514571
if let Some(err) = error {
515572
return Err(err);
516573
}
517574

518-
let account = account_owners.map(|owners| AccountPrefilter {
575+
let account = AccountPrefilter {
519576
accounts: accounts.unwrap_or_default(),
520-
owners,
521-
});
577+
owners: account_owners.unwrap_or_default(),
578+
};
522579

523580
let transaction = TransactionPrefilter {
524581
accounts_include: transaction_accounts_include.unwrap_or_default(),
@@ -527,12 +584,26 @@ impl PrefilterBuilder {
527584

528585
let block_meta = BlockMetaPrefilter {};
529586

587+
let block = BlockPrefilter {
588+
accounts_include: block_accounts_include.unwrap_or_default(),
589+
include_accounts: block_include_accounts,
590+
include_transactions: block_include_transactions,
591+
include_entries: block_include_entries,
592+
};
593+
530594
let slot = SlotPrefilter {};
531595

596+
let account = if accounts_include_all {
597+
Some(AccountPrefilter::default())
598+
} else {
599+
(account != AccountPrefilter::default()).then_some(account)
600+
};
601+
532602
Ok(Prefilter {
533603
account,
534604
transaction: (transaction != TransactionPrefilter::default()).then_some(transaction),
535605
block_meta: block_metas.then_some(block_meta),
606+
block: (block != BlockPrefilter::default()).then_some(block),
536607
slot: slots.then_some(slot),
537608
})
538609
}
@@ -553,6 +624,14 @@ impl PrefilterBuilder {
553624
})
554625
}
555626

627+
/// Set accounts_include_all filter
628+
pub fn accounts_include_all(self) -> Self {
629+
self.mutate(|this| {
630+
this.accounts_include_all = true;
631+
Ok(())
632+
})
633+
}
634+
556635
/// Set the accounts that this prefilter will match.
557636
pub fn accounts<I: IntoIterator>(self, it: I) -> Self
558637
where I::Item: AsRef<[u8]> {
@@ -601,6 +680,42 @@ impl PrefilterBuilder {
601680
)
602681
})
603682
}
683+
684+
/// Set the included accounts for this block prefilter.
685+
pub fn block_accounts_include<I: IntoIterator>(self, it: I) -> Self
686+
where I::Item: AsRef<[u8]> {
687+
self.mutate(|this| {
688+
set_opt(
689+
&mut this.block_accounts_include,
690+
"block_accounts_include",
691+
collect_pubkeys(it)?,
692+
)
693+
})
694+
}
695+
696+
/// Set the include_accounts flag for this block prefilter.
697+
pub fn block_include_accounts(self) -> Self {
698+
self.mutate(|this| {
699+
this.block_include_accounts = true;
700+
Ok(())
701+
})
702+
}
703+
704+
/// Set the include_transactions flag for this block prefilter.
705+
pub fn block_include_transactions(self) -> Self {
706+
self.mutate(|this| {
707+
this.block_include_transactions = true;
708+
Ok(())
709+
})
710+
}
711+
712+
/// Set the include_entries flag for this block prefilter.
713+
pub fn block_include_entries(self) -> Self {
714+
self.mutate(|this| {
715+
this.block_include_entries = true;
716+
Ok(())
717+
})
718+
}
604719
}
605720

606721
/// A collection of filters for a Vixen subscription.
@@ -665,12 +780,13 @@ impl From<Filters> for SubscribeRequest {
665780
.collect(),
666781
slots: value
667782
.parsers_filters
668-
.keys()
669-
.map(|k| {
670-
(k.clone(), SubscribeRequestFilterSlots {
783+
.iter()
784+
.filter_map(|(k, v)| {
785+
v.slot?;
786+
Some((k.clone(), SubscribeRequestFilterSlots {
671787
filter_by_commitment: Some(true),
672788
interslot_updates: None,
673-
})
789+
}))
674790
})
675791
.collect(),
676792
transactions: value
@@ -699,11 +815,31 @@ impl From<Filters> for SubscribeRequest {
699815
})
700816
.collect(),
701817
transactions_status: [].into_iter().collect(),
702-
blocks: [].into_iter().collect(),
818+
blocks: value
819+
.parsers_filters
820+
.iter()
821+
.filter_map(|(k, v)| {
822+
let v = v.block.as_ref()?;
823+
824+
Some((k.clone(), SubscribeRequestFilterBlocks {
825+
account_include: v
826+
.accounts_include
827+
.iter()
828+
.map(ToString::to_string)
829+
.collect(),
830+
include_transactions: Some(v.include_transactions),
831+
include_accounts: Some(v.include_accounts),
832+
include_entries: Some(v.include_entries),
833+
}))
834+
})
835+
.collect(),
703836
blocks_meta: value
704837
.parsers_filters
705-
.keys()
706-
.map(|k| (k.clone(), SubscribeRequestFilterBlocksMeta {}))
838+
.iter()
839+
.filter_map(|(k, v)| {
840+
v.block_meta?;
841+
Some((k.clone(), SubscribeRequestFilterBlocksMeta {}))
842+
})
707843
.collect(),
708844
entry: [].into_iter().collect(),
709845
commitment: None,

crates/parser/src/block_meta.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use std::borrow::Cow;
22

3-
use yellowstone_vixen_core::{
4-
BlockMetaUpdate, ParseResult, Parser, Prefilter, ProgramParser, Pubkey,
5-
};
3+
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta;
4+
use yellowstone_vixen_core::{ParseResult, Parser, Prefilter, ProgramParser, Pubkey};
65

7-
#[derive(Debug, Clone)]
6+
#[derive(Debug, Clone, PartialEq)]
87
pub struct Reward {
98
pub pubkey: String,
109
pub lamports: i64,
@@ -20,7 +19,7 @@ pub struct Rewards {
2019
}
2120

2221
#[derive(Debug, Clone)]
23-
pub struct BlockUpdate {
22+
pub struct BlockMetaUpdate {
2423
pub slot: u64,
2524
pub blockhash: String,
2625
pub rewards: Option<Rewards>,
@@ -36,14 +35,14 @@ pub struct BlockUpdate {
3635
pub struct BlockMetaParser;
3736

3837
impl Parser for BlockMetaParser {
39-
type Input = BlockMetaUpdate;
40-
type Output = BlockUpdate;
38+
type Input = SubscribeUpdateBlockMeta;
39+
type Output = BlockMetaUpdate;
4140

4241
fn id(&self) -> Cow<'static, str> { "yellowstone::BlockMetaParser".into() }
4342

4443
fn prefilter(&self) -> Prefilter { Prefilter::builder().build().unwrap() }
4544

46-
async fn parse(&self, block_meta: &BlockMetaUpdate) -> ParseResult<Self::Output> {
45+
async fn parse(&self, block_meta: &SubscribeUpdateBlockMeta) -> ParseResult<Self::Output> {
4746
let rewards = block_meta.rewards.as_ref().map(|reward| Rewards {
4847
rewards: reward
4948
.rewards
@@ -59,7 +58,7 @@ impl Parser for BlockMetaParser {
5958
num_partitions: reward.num_partitions.map(|num| num.num_partitions),
6059
});
6160

62-
Ok(BlockUpdate {
61+
Ok(BlockMetaUpdate {
6362
slot: block_meta.slot,
6463
blockhash: block_meta.blockhash.clone(),
6564
rewards,

crates/runtime/src/buffer.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ impl<H: Send> topograph::AsyncHandler<Job, H> for Handler {
118118
)
119119
.await;
120120
},
121+
UpdateOneof::Block(b) => {
122+
pipelines
123+
.block
124+
.get_handlers(&filters)
125+
.run(
126+
span,
127+
&b,
128+
#[cfg(feature = "prometheus")]
129+
update_type,
130+
)
131+
.await;
132+
},
121133
UpdateOneof::Slot(s) => {
122134
pipelines
123135
.slot

0 commit comments

Comments
 (0)