Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/tpu-client/src/yellowstone_grpc/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ pub async fn create_yellowstone_tpu_sender_with_clients<CB>(
config: YellowstoneTpuSenderConfig,
initial_identity: Keypair,
rpc_client: Arc<rpc_client::RpcClient>,
grpc_client: GeyserGrpcClient<impl yellowstone_grpc_client::Interceptor + Clone + 'static>,
grpc_client: GeyserGrpcClient<impl yellowstone_grpc_client::Interceptor + Clone + Send + 'static>,
callback: Option<CB>,
) -> Result<NewYellowstoneTpuSender, CreateTpuSenderError>
where
Expand Down
195 changes: 136 additions & 59 deletions crates/tpu-client/src/yellowstone_grpc/slot_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::slot::AtomicSlotTracker,
futures::Stream,
std::{collections::HashMap, panic, sync::Arc},
std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration},
tokio::task::JoinHandle,
tokio_stream::StreamExt,
yellowstone_grpc_client::{GeyserGrpcClientResult, Interceptor},
Expand All @@ -14,8 +14,13 @@ use {
},
};

type SlotStream = Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>> + Send>>;

pub(crate) const SLOT_TRACKER_DM_FILTER_NAME: &str = "jet-tpu-client";

const MAX_RETRY_DELAY: Duration = Duration::from_secs(15);
const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1);

pub struct YellowstoneSlotTrackerOk {
pub atomic_slot_tracker: Arc<AtomicSlotTracker>,
pub join_handle: JoinHandle<()>,
Expand Down Expand Up @@ -46,49 +51,112 @@ impl Drop for AutoCloseSlotTracker {
}
}

///
/// Background task to update the AtomicSlotTracker from the Yellowstone Geyser slot stream
///
async fn atomic_slot_tracker_loop<S>(mut dm_slot_stream: S, to_drop: AutoCloseSlotTracker)
where
S: Stream<Item = Result<SubscribeUpdate, Status>> + Unpin + Send + 'static,
/// Consume a single stream until it errors or ends.
/// Returns `true` if it ended cleanly, `false` on error (caller should retry).
async fn drain_slot_stream(
dm_slot_stream: &mut (impl Stream<Item = Result<SubscribeUpdate, Status>> + Unpin),
shared: &Arc<AtomicSlotTracker>,
current_slot: &mut u64,
) -> bool
{
let shared = Arc::clone(&to_drop.slot_tracker);
let mut current_slot = shared.slot.load(std::sync::atomic::Ordering::Relaxed);
loop {
let result = dm_slot_stream.next().await;
if result.is_none() {
tracing::warn!("Yellowstone slot tracker stream ended");
break;
return true;
}

let response = match result.unwrap() {
Ok(response) => response,
Err(err) => {
tracing::error!("Yellowstone slot tracker stream error: {:?}", err);
drop(to_drop);
panic::panic_any(err);
return false;
}
};
match response.update_oneof.expect("update_oneof") {
UpdateOneof::Slot(subscribe_update_slot) => {
let slot = subscribe_update_slot.slot;
if slot <= current_slot {
// Ignore out-of-order or duplicate slot updates
if slot <= *current_slot {
continue;
}
current_slot = slot;
*current_slot = slot;
tracing::trace!("Yellowstone slot tracker received slot update: {}", slot);
shared
.slot
.store(current_slot, std::sync::atomic::Ordering::Relaxed);
.store(*current_slot, std::sync::atomic::Ordering::Relaxed);
}
_ => {
// Ignore other updates
_ => {}
}
}
}

///
/// Background task to update the AtomicSlotTracker from the Yellowstone Geyser slot stream.
/// On stream error, resubscribes with exponential backoff instead of panicking.
///
async fn atomic_slot_tracker_loop_reconnect(
mut resubscribe: Box<dyn ResubscribeFn>,
mut stream: SlotStream,
to_drop: AutoCloseSlotTracker,
) {
let shared = Arc::clone(&to_drop.slot_tracker);
let mut current_slot = shared.slot.load(std::sync::atomic::Ordering::Relaxed);
let mut retry_delay = INITIAL_RETRY_DELAY;

let clean = drain_slot_stream(&mut stream, &shared, &mut current_slot).await;
if clean {
drop(to_drop);
return;
}

loop {
tracing::warn!(
"Yellowstone slot tracker reconnecting in {:?}...",
retry_delay
);
tokio::time::sleep(retry_delay).await;
retry_delay = (retry_delay * 2).min(MAX_RETRY_DELAY);

let new_stream = match resubscribe.resubscribe().await {
Ok(s) => s,
Err(err) => {
tracing::error!("Yellowstone slot tracker resubscribe failed: {:?}", err);
continue;
}
};

tracing::info!("Yellowstone slot tracker reconnected");
retry_delay = INITIAL_RETRY_DELAY;

stream = new_stream;
let clean = drain_slot_stream(&mut stream, &shared, &mut current_slot).await;
if clean {
drop(to_drop);
return;
}
}
drop(to_drop);
}

#[async_trait::async_trait]
trait ResubscribeFn: Send + 'static {
async fn resubscribe(
&mut self,
) -> Result<SlotStream, yellowstone_grpc_client::GeyserGrpcClientError>;
}

struct GeyserResubscriber<I: Interceptor> {
client: yellowstone_grpc_client::GeyserGrpcClient<I>,
}

#[async_trait::async_trait]
impl<I: Interceptor + Send + 'static> ResubscribeFn for GeyserResubscriber<I> {
async fn resubscribe(
&mut self,
) -> Result<SlotStream, yellowstone_grpc_client::GeyserGrpcClientError> {
let req = get_yellowstone_slot_tracker_subscribe_request();
let stream = self.client.subscribe_once(req).await?;
Ok(Box::pin(stream))
}
}

///
Expand All @@ -98,7 +166,7 @@ pub async fn atomic_slot_tracker<I>(
mut geyser_client: yellowstone_grpc_client::GeyserGrpcClient<I>,
) -> GeyserGrpcClientResult<Option<YellowstoneSlotTrackerOk>>
where
I: Interceptor + 'static,
I: Interceptor + Clone + Send + 'static,
{
let subscribe_request = get_yellowstone_slot_tracker_subscribe_request();
let mut stream = geyser_client.subscribe_once(subscribe_request).await?;
Expand Down Expand Up @@ -126,7 +194,6 @@ where
break;
}
_ => {
// Ignore other updates
continue;
}
}
Expand All @@ -136,7 +203,14 @@ where
let to_drop = AutoCloseSlotTracker {
slot_tracker: Arc::clone(&shared),
};
let jh = tokio::spawn(atomic_slot_tracker_loop(stream, to_drop));
let resubscriber: Box<dyn ResubscribeFn> = Box::new(GeyserResubscriber {
client: geyser_client,
});
let jh = tokio::spawn(atomic_slot_tracker_loop_reconnect(
resubscriber,
Box::pin(stream),
to_drop,
));

Ok(Some(YellowstoneSlotTrackerOk {
atomic_slot_tracker: shared,
Expand All @@ -155,11 +229,9 @@ mod tests {
};

#[tokio::test]
async fn test_atomic_slot_tracker_loop() {
async fn test_drain_slot_stream() {
let slot_tracker = Arc::new(AtomicSlotTracker::new(0));
let to_drop = AutoCloseSlotTracker {
slot_tracker: Arc::clone(&slot_tracker),
};
let mut current_slot = 0u64;

let updates = vec![
Ok(SubscribeUpdate {
Expand Down Expand Up @@ -193,47 +265,52 @@ mod tests {
created_at: None,
}),
];
let expected_slot_views = [1, 2, 3];
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let stream = UnboundedReceiverStream::new(rx);
let handle = tokio::spawn(atomic_slot_tracker_loop(stream, to_drop));

for (i, update) in updates.into_iter().enumerate() {
tx.send(update).expect("send update");
tokio::time::sleep(Duration::from_millis(10)).await;
let expected_slot = expected_slot_views[i];
let current_slot = slot_tracker.load().expect("load");
assert_eq!(current_slot, expected_slot);
}

// Drop the handle to clean up
handle.abort();
let mut stream = tokio_stream::iter(updates);
let clean = drain_slot_stream(&mut stream, &slot_tracker, &mut current_slot).await;

// Sleep a bit to ensure the drop has taken effect
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(
slot_tracker
.closed
.load(std::sync::atomic::Ordering::Relaxed)
assert!(clean);
assert_eq!(current_slot, 3);
assert_eq!(
slot_tracker.slot.load(std::sync::atomic::Ordering::Relaxed),
3
);
}

#[tokio::test]
async fn test_it_should_poison_when_stream_empty() {
async fn test_drain_slot_stream_returns_false_on_error() {
let slot_tracker = Arc::new(AtomicSlotTracker::new(0));
let to_drop = AutoCloseSlotTracker {
slot_tracker: Arc::clone(&slot_tracker),
};
let mut current_slot = 0u64;

let updates: Vec<Result<SubscribeUpdate, Status>> = vec![
Ok(SubscribeUpdate {
update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot {
slot: 1,
dead_error: None,
parent: None,
status: SlotStatus::SlotProcessed as i32,
})),
filters: vec![SLOT_TRACKER_DM_FILTER_NAME.to_string()],
created_at: None,
}),
Err(Status::cancelled("stream terminated by user")),
];

let stream = tokio_stream::iter(vec![]);
let handle = tokio::spawn(atomic_slot_tracker_loop(stream, to_drop));
let mut stream = tokio_stream::iter(updates);
let clean = drain_slot_stream(&mut stream, &slot_tracker, &mut current_slot).await;

let _ = handle.await;
assert!(!clean);
assert_eq!(current_slot, 1);
}

assert!(
slot_tracker
.closed
.load(std::sync::atomic::Ordering::Relaxed)
);
#[tokio::test]
async fn test_empty_stream_closes_tracker() {
let slot_tracker = Arc::new(AtomicSlotTracker::new(0));
let mut current_slot = 0u64;

let stream_items: Vec<Result<SubscribeUpdate, Status>> = vec![];
let mut stream = tokio_stream::iter(stream_items);
let clean = drain_slot_stream(&mut stream, &slot_tracker, &mut current_slot).await;

assert!(clean);
}
}