Skip to content

Commit 86b3d8f

Browse files
committed
feat : add block subscriber to vixen and fix few bugs
1 parent 241a4f7 commit 86b3d8f

File tree

10 files changed

+303
-35
lines changed

10 files changed

+303
-35
lines changed

crates/core/src/lib.rs

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ use std::{
2626

2727
use serde::Deserialize;
2828
use yellowstone_grpc_proto::geyser::{
29-
self, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta,
30-
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdateAccount,
31-
SubscribeUpdateBlockMeta, SubscribeUpdateSlot, SubscribeUpdateTransaction,
29+
self, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
30+
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
31+
SubscribeRequestFilterTransactions, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
32+
SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdateSlot,
33+
SubscribeUpdateTransaction,
3234
};
3335

3436
pub extern crate bs58;
@@ -62,10 +64,14 @@ pub type ParseResult<T> = Result<T, ParseError>;
6264

6365
/// An account update from Yellowstone.
6466
pub type AccountUpdate = SubscribeUpdateAccount;
67+
/// An account update from Yellowstone.
68+
pub type AccountUpdateInfo = SubscribeUpdateAccountInfo;
6569
/// A transaction update from Yellowstone.
6670
pub type TransactionUpdate = SubscribeUpdateTransaction;
6771
/// A block meta update from Yellowstone.
6872
pub type BlockMetaUpdate = SubscribeUpdateBlockMeta;
73+
/// A block update from Yellowstone.
74+
pub type BlockUpdate = SubscribeUpdateBlock;
6975
/// A slot update from Yellowstone.
7076
pub type SlotUpdate = SubscribeUpdateSlot;
7177

@@ -158,6 +164,8 @@ pub struct Prefilter {
158164
pub transaction: Option<TransactionPrefilter>,
159165
/// Filters for block meta updates.
160166
pub block_meta: Option<BlockMetaPrefilter>,
167+
/// Filters for block updates.
168+
pub block: Option<BlockPrefilter>,
161169
/// Filters for slot updates.
162170
pub slot: Option<SlotPrefilter>,
163171
}
@@ -182,11 +190,13 @@ impl Prefilter {
182190
account,
183191
transaction,
184192
block_meta,
193+
block,
185194
slot,
186195
} = self;
187196
merge_opt(account, other.account, AccountPrefilter::merge);
188197
merge_opt(transaction, other.transaction, TransactionPrefilter::merge);
189198
merge_opt(block_meta, other.block_meta, BlockMetaPrefilter::merge);
199+
merge_opt(block, other.block, BlockPrefilter::merge);
190200
merge_opt(slot, other.slot, SlotPrefilter::merge);
191201
}
192202
}
@@ -259,6 +269,36 @@ impl BlockMetaPrefilter {
259269
pub fn merge(_lhs: &mut Self, _rhs: Self) {}
260270
}
261271

272+
/// A prefilter for matching block updates.
273+
#[derive(Debug, Default, Clone, PartialEq)]
274+
pub struct BlockPrefilter {
275+
/// filter transactions and accounts that use any account from the list
276+
pub accounts_include: HashSet<Pubkey>,
277+
/// include all transactions
278+
pub include_transactions: bool,
279+
/// include all account updates
280+
pub include_accounts: bool,
281+
/// include all entries
282+
pub include_entries: bool,
283+
}
284+
285+
impl BlockPrefilter {
286+
/// Merge another block prefilter into this one.
287+
pub fn merge(&mut self, other: BlockPrefilter) {
288+
let Self {
289+
accounts_include,
290+
include_transactions,
291+
include_accounts,
292+
include_entries,
293+
} = self;
294+
295+
accounts_include.extend(other.accounts_include);
296+
*include_accounts = other.include_accounts;
297+
*include_transactions = other.include_transactions;
298+
*include_entries = other.include_entries;
299+
}
300+
}
301+
262302
/// A prefilter for matching slot updates updates.
263303
#[derive(Debug, Default, Clone, PartialEq, Copy)]
264304
pub struct SlotPrefilter {}
@@ -467,7 +507,19 @@ pub struct PrefilterBuilder {
467507
error: Option<PrefilterError>,
468508
slots: bool,
469509
block_metas: bool,
510+
/// Matching [`BlockPrefilter::accounts`]
511+
block_accounts_include: Option<HashSet<Pubkey>>,
512+
/// Matching [`BlockPrefilter::include_accounts`]
513+
block_include_accounts: bool,
514+
/// Matching [`BlockPrefilter::include_transactions`]
515+
block_include_transactions: bool,
516+
/// Matching [`BlockPrefilter::include_entries`]
517+
block_include_entries: bool,
518+
/// Including all accounts
519+
accounts_include_all: bool,
520+
/// Matching [`AccountPrefilter::accounts`]
470521
accounts: Option<HashSet<Pubkey>>,
522+
/// Matching [`AccountPrefilter::account_owners`]
471523
account_owners: Option<HashSet<Pubkey>>,
472524
/// Matching [`TransactionPrefilter::accounts_include`]
473525
transaction_accounts_include: Option<HashSet<Pubkey>>,
@@ -504,21 +556,26 @@ impl PrefilterBuilder {
504556
pub fn build(self) -> Result<Prefilter, PrefilterError> {
505557
let PrefilterBuilder {
506558
error,
559+
accounts_include_all,
507560
accounts,
508561
account_owners,
509562
slots,
510563
block_metas,
564+
block_accounts_include,
565+
block_include_accounts,
566+
block_include_entries,
567+
block_include_transactions,
511568
transaction_accounts_include,
512569
transaction_accounts_required,
513570
} = self;
514571
if let Some(err) = error {
515572
return Err(err);
516573
}
517574

518-
let account = account_owners.map(|owners| AccountPrefilter {
575+
let account = AccountPrefilter {
519576
accounts: accounts.unwrap_or_default(),
520-
owners,
521-
});
577+
owners: account_owners.unwrap_or_default(),
578+
};
522579

523580
let transaction = TransactionPrefilter {
524581
accounts_include: transaction_accounts_include.unwrap_or_default(),
@@ -527,12 +584,26 @@ impl PrefilterBuilder {
527584

528585
let block_meta = BlockMetaPrefilter {};
529586

587+
let block = BlockPrefilter {
588+
accounts_include: block_accounts_include.unwrap_or_default(),
589+
include_accounts: block_include_accounts,
590+
include_transactions: block_include_transactions,
591+
include_entries: block_include_entries,
592+
};
593+
530594
let slot = SlotPrefilter {};
531595

596+
let account = if accounts_include_all {
597+
Some(AccountPrefilter::default())
598+
} else {
599+
(account != AccountPrefilter::default()).then_some(account)
600+
};
601+
532602
Ok(Prefilter {
533603
account,
534604
transaction: (transaction != TransactionPrefilter::default()).then_some(transaction),
535605
block_meta: block_metas.then_some(block_meta),
606+
block: (block != BlockPrefilter::default()).then_some(block),
536607
slot: slots.then_some(slot),
537608
})
538609
}
@@ -553,6 +624,22 @@ impl PrefilterBuilder {
553624
})
554625
}
555626

627+
/// Set prefilter will request block_metas updates.
628+
pub fn block_metas(self) -> Self {
629+
self.mutate(|this| {
630+
this.block_metas = true;
631+
Ok(())
632+
})
633+
}
634+
635+
/// Set accounts_include_all filter
636+
pub fn accounts_include_all(self) -> Self {
637+
self.mutate(|this| {
638+
this.accounts_include_all = true;
639+
Ok(())
640+
})
641+
}
642+
556643
/// Set the accounts that this prefilter will match.
557644
pub fn accounts<I: IntoIterator>(self, it: I) -> Self
558645
where I::Item: AsRef<[u8]> {
@@ -601,6 +688,42 @@ impl PrefilterBuilder {
601688
)
602689
})
603690
}
691+
692+
/// Set the included accounts for this block prefilter.
693+
pub fn block_accounts_include<I: IntoIterator>(self, it: I) -> Self
694+
where I::Item: AsRef<[u8]> {
695+
self.mutate(|this| {
696+
set_opt(
697+
&mut this.block_accounts_include,
698+
"block_accounts_include",
699+
collect_pubkeys(it)?,
700+
)
701+
})
702+
}
703+
704+
/// Set the include_accounts flag for this block prefilter.
705+
pub fn block_include_accounts(self) -> Self {
706+
self.mutate(|this| {
707+
this.block_include_accounts = true;
708+
Ok(())
709+
})
710+
}
711+
712+
/// Set the include_transactions flag for this block prefilter.
713+
pub fn block_include_transactions(self) -> Self {
714+
self.mutate(|this| {
715+
this.block_include_transactions = true;
716+
Ok(())
717+
})
718+
}
719+
720+
/// Set the include_entries flag for this block prefilter.
721+
pub fn block_include_entries(self) -> Self {
722+
self.mutate(|this| {
723+
this.block_include_entries = true;
724+
Ok(())
725+
})
726+
}
604727
}
605728

606729
/// A collection of filters for a Vixen subscription.
@@ -665,12 +788,13 @@ impl From<Filters> for SubscribeRequest {
665788
.collect(),
666789
slots: value
667790
.parsers_filters
668-
.keys()
669-
.map(|k| {
670-
(k.clone(), SubscribeRequestFilterSlots {
791+
.iter()
792+
.filter_map(|(k, v)| {
793+
v.slot?;
794+
Some((k.clone(), SubscribeRequestFilterSlots {
671795
filter_by_commitment: Some(true),
672796
interslot_updates: None,
673-
})
797+
}))
674798
})
675799
.collect(),
676800
transactions: value
@@ -699,11 +823,31 @@ impl From<Filters> for SubscribeRequest {
699823
})
700824
.collect(),
701825
transactions_status: [].into_iter().collect(),
702-
blocks: [].into_iter().collect(),
826+
blocks: value
827+
.parsers_filters
828+
.iter()
829+
.filter_map(|(k, v)| {
830+
let v = v.block.as_ref()?;
831+
832+
Some((k.clone(), SubscribeRequestFilterBlocks {
833+
account_include: v
834+
.accounts_include
835+
.iter()
836+
.map(ToString::to_string)
837+
.collect(),
838+
include_transactions: Some(v.include_transactions),
839+
include_accounts: Some(v.include_accounts),
840+
include_entries: Some(v.include_entries),
841+
}))
842+
})
843+
.collect(),
703844
blocks_meta: value
704845
.parsers_filters
705-
.keys()
706-
.map(|k| (k.clone(), SubscribeRequestFilterBlocksMeta {}))
846+
.iter()
847+
.filter_map(|(k, v)| {
848+
v.block_meta?;
849+
Some((k.clone(), SubscribeRequestFilterBlocksMeta {}))
850+
})
707851
.collect(),
708852
entry: [].into_iter().collect(),
709853
commitment: None,

crates/parser/src/block_meta.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use std::borrow::Cow;
22

3-
use yellowstone_vixen_core::{
4-
BlockMetaUpdate, ParseResult, Parser, Prefilter, ProgramParser, Pubkey,
5-
};
3+
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta;
4+
use yellowstone_vixen_core::{ParseResult, Parser, Prefilter, ProgramParser, Pubkey};
65

7-
#[derive(Debug, Clone)]
6+
#[derive(Debug, Clone, PartialEq)]
87
pub struct Reward {
98
pub pubkey: String,
109
pub lamports: i64,
@@ -20,7 +19,7 @@ pub struct Rewards {
2019
}
2120

2221
#[derive(Debug, Clone)]
23-
pub struct BlockUpdate {
22+
pub struct BlockMetaUpdate {
2423
pub slot: u64,
2524
pub blockhash: String,
2625
pub rewards: Option<Rewards>,
@@ -36,14 +35,14 @@ pub struct BlockUpdate {
3635
pub struct BlockMetaParser;
3736

3837
impl Parser for BlockMetaParser {
39-
type Input = BlockMetaUpdate;
40-
type Output = BlockUpdate;
38+
type Input = SubscribeUpdateBlockMeta;
39+
type Output = BlockMetaUpdate;
4140

4241
fn id(&self) -> Cow<'static, str> { "yellowstone::BlockMetaParser".into() }
4342

44-
fn prefilter(&self) -> Prefilter { Prefilter::builder().build().unwrap() }
43+
fn prefilter(&self) -> Prefilter { Prefilter::builder().block_metas().build().unwrap() }
4544

46-
async fn parse(&self, block_meta: &BlockMetaUpdate) -> ParseResult<Self::Output> {
45+
async fn parse(&self, block_meta: &SubscribeUpdateBlockMeta) -> ParseResult<Self::Output> {
4746
let rewards = block_meta.rewards.as_ref().map(|reward| Rewards {
4847
rewards: reward
4948
.rewards
@@ -59,7 +58,7 @@ impl Parser for BlockMetaParser {
5958
num_partitions: reward.num_partitions.map(|num| num.num_partitions),
6059
});
6160

62-
Ok(BlockUpdate {
61+
Ok(BlockMetaUpdate {
6362
slot: block_meta.slot,
6463
blockhash: block_meta.blockhash.clone(),
6564
rewards,

crates/runtime/src/buffer.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ impl<H: Send> topograph::AsyncHandler<Job, H> for Handler {
118118
)
119119
.await;
120120
},
121+
UpdateOneof::Block(b) => {
122+
pipelines
123+
.block
124+
.get_handlers(&filters)
125+
.run(
126+
span,
127+
&b,
128+
#[cfg(feature = "prometheus")]
129+
update_type,
130+
)
131+
.await;
132+
},
121133
UpdateOneof::Slot(s) => {
122134
pipelines
123135
.slot

0 commit comments

Comments
 (0)