Skip to content

Commit 32fc110

Browse files
authored
Merge pull request #571 from SorellaLabs/feat/generic-amm-quoter
feat(op-angstrom): amm-quoter generic mode
2 parents 1ab83d6 + 8b57530 commit 32fc110

File tree

12 files changed

+311
-161
lines changed

12 files changed

+311
-161
lines changed

Cargo.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,3 +214,9 @@ uniswap-v4 = { path = "./crates/uniswap-v4/" }
214214
uniswap_v3_math = { git = "https://github.com/SorellaLabs/v3-math", branch = "main" }
215215
url = "2"
216216
validation = { path = "./crates/validation/" }
217+
218+
# OP-stack support
219+
reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
220+
reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
221+
reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
222+
reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }

bin/angstrom/src/components.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use alloy::{
1414
providers::{Provider, ProviderBuilder, network::Ethereum}
1515
};
1616
use alloy_chains::Chain;
17-
use angstrom_amm_quoter::{QuoterManager, Slot0Update};
17+
use angstrom_amm_quoter::{ConsensusQuoterManager, Slot0Update};
1818
use angstrom_eth::{
1919
handle::{Eth, EthCommand},
2020
manager::{EthDataCleanser, EthEvent}
@@ -423,7 +423,7 @@ where
423423
let matching_handle = MatchingManager::spawn(executor.clone(), validation_handle.clone());
424424

425425
// spin up amm quoter
426-
let amm = QuoterManager::new(
426+
let amm = ConsensusQuoterManager::new(
427427
global_block_sync.clone(),
428428
order_storage.clone(),
429429
handles.quoter_rx,

bin/op-angstrom/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ url.workspace = true
4646
validation.workspace = true
4747

4848
# OP-stack support
49-
reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
50-
reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
51-
reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
52-
reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", version = "1.6.0", tag = "v1.6.0" }
49+
reth-optimism-chainspec.workspace = true
50+
reth-optimism-cli.workspace = true
51+
reth-optimism-node.workspace = true
52+
reth-optimism-primitives.workspace = true
5353

5454
[target.'cfg(unix)'.dependencies]
5555
tikv-jemallocator = { version = "0.6.0", optional = true }

bin/op-angstrom/src/components.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use alloy::{
1313
primitives::Address,
1414
providers::{Provider, ProviderBuilder, network::Ethereum}
1515
};
16-
use angstrom_amm_quoter::{QuoterManager, Slot0Update};
16+
use angstrom_amm_quoter::{RollupQuoterManager, Slot0Update};
1717
use angstrom_eth::{
1818
handle::{Eth, EthCommand},
1919
manager::{EthDataCleanser, EthEvent}
@@ -379,7 +379,7 @@ where
379379
let matching_handle = MatchingManager::spawn(executor.clone(), validation_handle.clone());
380380

381381
// spin up amm quoter
382-
let amm = QuoterManager::new(
382+
let amm = RollupQuoterManager::new(
383383
global_block_sync.clone(),
384384
order_storage.clone(),
385385
handles.quoter_rx,

crates/amm-quoter/src/consensus.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//! This module implements consensus-based quoter mode.
2+
use std::{
3+
collections::{HashMap, HashSet},
4+
pin::Pin,
5+
sync::Arc,
6+
task::Poll,
7+
time::Duration
8+
};
9+
10+
use angstrom_types::{
11+
block_sync::BlockSyncConsumer,
12+
consensus::{ConsensusRoundEvent, ConsensusRoundOrderHashes},
13+
orders::OrderSet,
14+
primitive::PoolId,
15+
sol_bindings::{grouped_orders::AllOrders, rpc_orders::TopOfBlockOrder}
16+
};
17+
use futures::{Stream, StreamExt, stream::FuturesUnordered};
18+
use order_pool::order_storage::OrderStorage;
19+
use rayon::ThreadPool;
20+
use tokio::{sync::mpsc, time::interval};
21+
use uniswap_v4::uniswap::pool_manager::SyncedUniswapPools;
22+
23+
use crate::{QuoterManager, Slot0Update, book_snapshots_from_amms};
24+
25+
/// A type alias for the consensus quoter manager.
26+
pub type ConsensusQuoterManager<BlockSync> = QuoterManager<BlockSync, ConsensusMode>;
27+
28+
/// Mode for consensus-based order book building.
29+
pub struct ConsensusMode {
30+
consensus_stream: Pin<Box<dyn Stream<Item = ConsensusRoundOrderHashes> + Send>>,
31+
/// The unique order hashes of the current PreProposalAggregate consensus
32+
/// round. Used to build the book for the slot0 stream, so that all
33+
/// orders are valid, and the subscription can't be manipulated by orders
34+
/// submitted after this round and between the next block
35+
active_pre_proposal_aggr_order_hashes: Option<ConsensusRoundOrderHashes>
36+
}
37+
38+
impl<BlockSync: BlockSyncConsumer> QuoterManager<BlockSync, ConsensusMode> {
39+
/// ensure that we haven't registered on the BlockSync.
40+
/// We just want to ensure that we don't access during a update period
41+
pub fn new(
42+
block_sync: BlockSync,
43+
orders: Arc<OrderStorage>,
44+
recv: mpsc::Receiver<(HashSet<PoolId>, mpsc::Sender<Slot0Update>)>,
45+
amms: SyncedUniswapPools,
46+
threadpool: ThreadPool,
47+
update_interval: Duration,
48+
consensus_stream: Pin<Box<dyn Stream<Item = ConsensusRoundOrderHashes> + Send>>
49+
) -> Self {
50+
let cur_block = block_sync.current_block_number();
51+
let book_snapshots = book_snapshots_from_amms(&amms);
52+
53+
assert!(
54+
update_interval > Duration::from_millis(10),
55+
"cannot update quicker than every 10ms"
56+
);
57+
58+
let mode = ConsensusMode { consensus_stream, active_pre_proposal_aggr_order_hashes: None };
59+
60+
Self {
61+
seq_id: 0,
62+
block_sync,
63+
orders,
64+
amms,
65+
recv,
66+
cur_block,
67+
book_snapshots,
68+
threadpool,
69+
pending_tasks: FuturesUnordered::new(),
70+
pool_to_subscribers: HashMap::default(),
71+
execution_interval: interval(update_interval),
72+
mode
73+
}
74+
}
75+
76+
pub(crate) fn all_orders_with_consensus(&self) -> OrderSet<AllOrders, TopOfBlockOrder> {
77+
if let Some(hashes) = self.mode.active_pre_proposal_aggr_order_hashes.as_ref() {
78+
self.orders
79+
.get_all_orders_with_hashes(&hashes.limit, &hashes.searcher)
80+
} else {
81+
self.orders.get_all_orders()
82+
}
83+
}
84+
85+
fn update_consensus_state(&mut self, round: ConsensusRoundOrderHashes) {
86+
if matches!(round.round, ConsensusRoundEvent::PropagatePreProposalAgg) {
87+
self.mode.active_pre_proposal_aggr_order_hashes = Some(round)
88+
}
89+
}
90+
}
91+
92+
impl<BlockSync: BlockSyncConsumer> Future for QuoterManager<BlockSync, ConsensusMode> {
93+
type Output = ();
94+
95+
fn poll(
96+
mut self: Pin<&mut Self>,
97+
cx: &mut std::task::Context<'_>
98+
) -> std::task::Poll<Self::Output> {
99+
while let Poll::Ready(Some((pools, subscriber))) = self.recv.poll_recv(cx) {
100+
self.handle_new_subscription(pools, subscriber);
101+
}
102+
103+
while let Poll::Ready(Some(consensus_update)) =
104+
self.mode.consensus_stream.poll_next_unpin(cx)
105+
{
106+
self.update_consensus_state(consensus_update);
107+
}
108+
109+
while let Poll::Ready(Some(Ok(slot_update))) = self.pending_tasks.poll_next_unpin(cx) {
110+
self.send_out_result(slot_update);
111+
}
112+
113+
while self.execution_interval.poll_tick(cx).is_ready() {
114+
// cycle through if we can't do any processing
115+
if !self.block_sync.can_operate() {
116+
cx.waker().wake_by_ref();
117+
return Poll::Pending;
118+
}
119+
120+
// update block number, amm snapshot and reset seq id
121+
if self.cur_block != self.block_sync.current_block_number() {
122+
self.update_book_state();
123+
self.cur_block = self.block_sync.current_block_number();
124+
125+
self.mode.active_pre_proposal_aggr_order_hashes = None;
126+
127+
self.seq_id = 0;
128+
}
129+
130+
// inc seq_id
131+
let seq_id = self.seq_id;
132+
// given that we have a max update speed of 10ms, the max
133+
// this should reach is 1200 before a new block update
134+
// occurs. Becuase of this, there is no need to check for overflow
135+
// as 65535 is more than enough
136+
self.seq_id += 1;
137+
138+
let orders = self.all_orders_with_consensus();
139+
self.spawn_book_solvers(seq_id, orders);
140+
}
141+
142+
Poll::Pending
143+
}
144+
}

0 commit comments

Comments
 (0)