Skip to content

Conversation

@senzenn
Copy link

@senzenn senzenn commented Oct 28, 2025

Summary

Adds Jetstreamer as a new data source for replaying historical Solana ledger data from Old Faithful archive.

Closes #149

Changes

  • ✅ New jetstreamer-source crate with SourceTrait implementation
  • ✅ Epoch and slot-range based queries
  • ✅ Multi-threaded processing with out-of-order slot handling
  • ✅ Example: jetstream-replay for epoch 800
  • ✅ Updated Vixen.example.toml with Jetstream config
  • ✅ Unit tests and documentation

Usage

[source]
archive_url = "https://api.old-faithful.net"
epoch = 800
threads = 4
cargo run --example jetstream-replay

Requirements

⚠️ Requires Rust 1.88+ (due to jetstreamer-utils dependency)

Acceptance Criteria

  • Jetstream source integrated with Vixen
  • Supports slot range and epoch input
  • Emits blocks/transactions into pipeline
  • Handles multi-threaded replay and out-of-order slots
  • Includes epoch 800 replay example

Testing

cargo test --package jetstreamer-source
cargo run --example jetstream-replay

@Neocryptoquant
Copy link

Neocryptoquant commented Oct 29, 2025 via email

Copy link
Collaborator

@kespinola kespinola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Run with fmt with cargo +night fmt --all to remove all the fmt changes
  • I think we should drop slot ordering to simplify the the source.
  • The example should use Vixen runtime
  • Make use of the filters in transaction callback

Looking really good though!

@senzenn
Copy link
Author

senzenn commented Nov 2, 2025

I have completed the todos and feedback based on the previous PR suggestions you gave me. Kindly review the PR as well, please @kespinola

@senzenn senzenn requested a review from kespinola November 3, 2025 11:32
Comment on lines 16 to 37
pub(crate) static JETSTREAM_BLOCKS_RECEIVED: LazyLock<IntCounter> = LazyLock::new(|| {
IntCounter::with_opts(Opts::new(
"jetstream_blocks_received",
"Total blocks received from Jetstream",
))
.unwrap()
});

/// Total number of transactions received from Jetstream
pub(crate) static JETSTREAM_TRANSACTIONS_RECEIVED: LazyLock<IntCounter> = LazyLock::new(|| {
IntCounter::with_opts(Opts::new(
"jetstream_transactions_received",
"Total transactions received from Jetstream",
))
.unwrap()
});

/// Register all Jetstream metrics with the provided Prometheus registry
pub fn register_metrics(registry: &Registry) {
let _ = registry.register(Box::new(JETSTREAM_BLOCKS_RECEIVED.clone()));
let _ = registry.register(Box::new(JETSTREAM_TRANSACTIONS_RECEIVED.clone()));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/rpcpool/yellowstone-vixen/blob/main/crates/runtime/src/metrics.rs#L250-L279

There is already metric for blocks and transactions received please remove from the source.

Comment on lines 2 to 6
archive_url = "https://api.old-faithful.net"
epoch = 800
threads = 4
reorder_buffer_size = 1000
slot_timeout_secs = 30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please adjust serde attributes for dash case?

Suggested change
archive_url = "https://api.old-faithful.net"
epoch = 800
threads = 4
reorder_buffer_size = 1000
slot_timeout_secs = 30
archive-url = "https://api.old-faithful.net"
epoch = 800
threads = 4
reorder-buffer-size = 1000
slot-timeout-secs = 30

Comment on lines +219 to +221
/// Control transaction filtering: true = permissive (all), false = strict (limited).
#[arg(long, env, default_value = "true")]
pub permissive_transaction_filtering: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not have this option its confusing API. You can make a parser that matches all instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed certain confusing api's in Lts Pr

- Remove duplicate metrics, use runtime metrics
- Add serde kebab-case for configs
- Update examples to use dash-case fields
- Enable prometheus via runtime
@senzenn senzenn requested a review from kespinola November 4, 2025 14:12
@senzenn senzenn marked this pull request as draft November 5, 2025 16:52
@senzenn senzenn marked this pull request as ready for review November 5, 2025 18:05
@senzenn
Copy link
Author

senzenn commented Nov 5, 2025

@kespinola I’ve resolved the majority of your feedback could you take another look at this PR?

@senzenn senzenn closed this Nov 14, 2025
@kespinola kespinola reopened this Nov 14, 2025
Copy link
Collaborator

@kespinola kespinola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last round of CR related to setting the filters key on SubscribeUpdate for matching event to a parser.

Comment on lines +36 to +39
[filters]
# Add filters for specific programs/accounts if needed
# programs = ["11111111111111111111111111111112"]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filters should come from the parser's attached filters which mimic grpc subscribe filter notation for selecting account owner or address in accounts list for transactions we shouldn't need to specify filters on the source.

entry_count,
} => {
let update = SubscribeUpdate {
filters: vec![],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senzenn Vixen uses this filter list to match a subscribe update with a parser so you need to include the filter key for Vixen to match on when it goes through the runtime.

}
// TODO: Populate transaction field with protobuf-encoded transaction data
let update = SubscribeUpdate {
filters: vec![],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here that want to use the filters to let Vixen know what parser and handler to take the event through.

Ok(())
}

fn should_process_transaction(&self, tx: &TransactionData) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning boolean this should return Vec to set on the filters key of the subscribe update. If its empty then you can toss out the event and don't send it to the channel.

Comment on lines +208 to +210
/// Control transaction filtering: true = permissive (all), false = strict (limited).
#[arg(long, env, default_value = "true")]
pub permissive_transaction_filtering: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Control transaction filtering: true = permissive (all), false = strict (limited).
#[arg(long, env, default_value = "true")]
pub permissive_transaction_filtering: bool,

I'd like to drop as filters are governed by the registered parsers so as long as there is 1 parser registered then it will match what is desired. I imagine this was helpful for testing through so understand why it was initially included.

Comment on lines +329 to +339
let handler_clone = handler.clone();
let on_block = Some(move |_thread_id: usize, block: BlockData| {
let handler = handler_clone.clone();
async move {
handler
.process_block(block)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)
}
.boxed()
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let handler_clone = handler.clone();
let on_block = Some(move |_thread_id: usize, block: BlockData| {
let handler = handler_clone.clone();
async move {
handler
.process_block(block)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)
}
.boxed()
});
let handler_on_block = handler.clone();
let on_block = Some(move |_thread_id: usize, block: BlockData| {
let handler_callback = handler_on_block.clone();
async move {
handler
.process_block(block)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)
}
.boxed()
});

Comment on lines +1 to +8
//! Prometheus metrics for Jetstream source
use prometheus::Registry;

/// Register all Jetstream metrics with the provided Prometheus registry
pub fn register_metrics(_registry: &Registry) {
// No metrics to register currently
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can delete this file.

Comment on lines +26 to +27
jetstreamer-firehose = "0.1.6"
jetstreamer-utils = "0.1.6"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include in the root Cargo.toml and then reference through the workspace so all versions are set in one place. We can also allow 0.1 to allow matching of any patches that come out.

Comment on lines +16 to +17
yellowstone-vixen-jetstream-source = { path = "../../crates/jetstreamer-source", features = ["prometheus"] }
yellowstone-vixen-parser = { path = "../../crates/parser", features = ["token-program"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be referenced by workspace once set there?

yellowstone-vixen-jetstream-source = { path = "../../crates/jetstreamer-source", features = ["prometheus"] }
yellowstone-vixen-parser = { path = "../../crates/parser", features = ["token-program"] }
prometheus = { workspace = true }
warp = "0.3"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include in Cargo.toml of root and reference through workspace.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

📡 Feature: Add Jetstreamer as a Data Source to Vixen

3 participants