Skip to content

add perp/spot market update notifcations #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions crates/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct GrpcSubscribeOpts {
/// callback for slot updates
pub on_slot: Option<Box<OnSlotFn>>,
/// custom callback for account updates
pub on_account: Option<(AccountFilter, Box<OnAccountFn>)>,
pub on_account: Option<Vec<(AccountFilter, Box<OnAccountFn>)>>,
/// Network level connection config
pub connection_opts: GrpcConnectionOpts,
/// Enable inter-slot update notifications
Expand Down Expand Up @@ -117,9 +117,16 @@ impl GrpcSubscribeOpts {
pub fn on_account(
mut self,
filter: AccountFilter,
on_account: impl Fn(&AccountUpdate) + Send + Sync + 'static,
callback: impl Fn(&AccountUpdate) + Send + Sync + 'static,
) -> Self {
self.on_account = Some((filter, Box::new(on_account)));
match &mut self.on_account {
Some(on_account) => {
on_account.push((filter, Box::new(callback)));
}
None => {
self.on_account = Some(vec![(filter, Box::new(callback))]);
}
}
self
}
/// Set network level connection opts
Expand Down
46 changes: 33 additions & 13 deletions crates/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use solana_sdk::{
signature::Signature,
};
pub use solana_sdk::{address_lookup_table::AddressLookupTableAccount, pubkey::Pubkey};
use websocket_account_subscriber::OnAccountFn;

use crate::{
account_map::AccountMap,
Expand Down Expand Up @@ -156,32 +157,45 @@ impl DriftClient {
/// * `markets` - list of markets to subscribe
///
/// This is a no-op if already subscribed
pub async fn subscribe_markets(&self, markets: &[MarketId]) -> SdkResult<()> {
self.backend.subscribe_markets(markets).await
pub async fn subscribe_markets(
&self,
markets: &[MarketId],
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
self.backend.subscribe_markets(markets, on_account).await
Comment on lines +160 to +165
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn subscribe_markets(
&self,
markets: &[MarketId],
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
self.backend.subscribe_markets(markets, on_account).await
pub async fn subscribe_markets_with_callback(
&self,
markets: &[MarketId],
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
self.backend.subscribe_markets(markets, on_account).await

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to keep the existing api unchanged, use new method only when user wants the callback

}

/// Subscribe to all spot and perp markets
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_markets(&self) -> SdkResult<()> {
pub async fn subscribe_all_markets(
&self,
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
let markets = self.get_all_market_ids();
self.backend.subscribe_markets(&markets).await
self.backend.subscribe_markets(&markets, on_account).await
}

/// Subscribe to all spot markets
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_spot_markets(&self) -> SdkResult<()> {
pub async fn subscribe_all_spot_markets(
&self,
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
let markets = self.get_all_spot_market_ids();
self.backend.subscribe_markets(&markets).await
self.backend.subscribe_markets(&markets, on_account).await
}

/// Subscribe to all perp markets
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_perp_markets(&self) -> SdkResult<()> {
pub async fn subscribe_all_perp_markets(
&self,
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
let markets = self.get_all_perp_market_ids();
self.backend.subscribe_markets(&markets).await
self.backend.subscribe_markets(&markets, on_account).await
}

/// Starts background subscriptions for live oracle account updates by market
Expand Down Expand Up @@ -1013,7 +1027,11 @@ impl DriftClientBackend {
}

/// Start subscriptions for market accounts
async fn subscribe_markets(&self, markets: &[MarketId]) -> SdkResult<()> {
async fn subscribe_markets(
&self,
markets: &[MarketId],
on_account: Option<Arc<Box<OnAccountFn>>>,
) -> SdkResult<()> {
if self.is_grpc_subscribed() {
log::info!("already subscribed markets via gRPC");
return Err(SdkError::AlreadySubscribed);
Expand All @@ -1023,8 +1041,8 @@ impl DriftClientBackend {
.iter()
.partition::<Vec<MarketId>, _>(|x| x.is_perp());
let _ = tokio::try_join!(
self.perp_market_map.subscribe(&perps),
self.spot_market_map.subscribe(&spot),
self.perp_market_map.subscribe(&perps, on_account.clone()),
self.spot_market_map.subscribe(&spot, on_account.clone()),
)?;

Ok(())
Expand Down Expand Up @@ -1110,8 +1128,10 @@ impl DriftClientBackend {
}

// set custom callbacks
if let Some((filter, on_account)) = opts.on_account {
grpc.on_account(filter, on_account);
if let Some(callbacks) = opts.on_account {
for (filter, on_account) in callbacks {
grpc.on_account(filter, on_account)
}
}
if let Some(f) = opts.on_slot {
grpc.on_slot(f);
Expand Down
17 changes: 14 additions & 3 deletions crates/src/marketmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
grpc::AccountUpdate,
memcmp::get_market_filter,
types::MapOf,
websocket_account_subscriber::WebsocketAccountSubscriber,
websocket_account_subscriber::{OnAccountFn as WsOnAccountFn, WebsocketAccountSubscriber},
DataAndSlot, MarketId, MarketType, PerpMarket, SdkResult, SpotMarket, UnsubHandle,
};

Expand Down Expand Up @@ -124,7 +124,11 @@ where
}

/// Subscribe to market account updates
pub async fn subscribe(&self, markets: &[MarketId]) -> SdkResult<()> {
pub async fn subscribe(
&self,
markets: &[MarketId],
on_account: Option<Arc<Box<WsOnAccountFn>>>,
) -> SdkResult<()> {
log::debug!(target: LOG_TARGET, "subscribing: {:?}", T::MARKET_TYPE);

let markets = HashSet::<MarketId>::from_iter(markets.iter().copied());
Expand All @@ -151,10 +155,14 @@ where
let futs_iter = pending_subscriptions.into_iter().map(|(idx, fut)| {
let marketmap = Arc::clone(&self.marketmap);
let latest_slot = self.latest_slot.clone();
let on_account = on_account.clone();
async move {
let unsub = fut
.subscribe(Self::SUBSCRIPTION_ID, false, {
move |update| {
if let Some(on_account) = &on_account {
on_account(&update);
}
if update.slot > latest_slot.load(Ordering::Relaxed) {
latest_slot.store(update.slot, Ordering::Relaxed);
}
Expand Down Expand Up @@ -402,7 +410,10 @@ mod tests {
);

assert!(map
.subscribe(&[MarketId::perp(0), MarketId::perp(1), MarketId::perp(1)])
.subscribe(
&[MarketId::perp(0), MarketId::perp(1), MarketId::perp(1)],
None
)
.await
.is_ok());
assert!(map.is_subscribed(0));
Expand Down
2 changes: 2 additions & 0 deletions crates/src/websocket_account_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct AccountUpdate {
pub slot: u64,
}

pub type OnAccountFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static;

#[derive(Clone)]
pub struct WebsocketAccountSubscriber {
pubsub: Arc<PubsubClient>,
Expand Down
Loading