@@ -2,7 +2,7 @@ use std::{collections::BTreeMap, future, str::FromStr};
2
2
3
3
use anyhow:: Context ;
4
4
use binance:: model:: BookTickerEvent ;
5
- use futures:: { StreamExt , TryStreamExt } ;
5
+ use futures:: { FutureExt as _ , StreamExt , TryStreamExt } ;
6
6
use penumbra_asset:: asset:: Metadata ;
7
7
use penumbra_custody:: { AuthorizeRequest , CustodyClient } ;
8
8
use penumbra_dex:: {
@@ -18,6 +18,8 @@ use penumbra_proto::core::component::dex::v1::query_service_client::QueryService
18
18
use penumbra_proto:: core:: component:: dex:: v1:: {
19
19
LiquidityPositionsByPriceRequest , LiquidityPositionsRequest ,
20
20
} ;
21
+ use penumbra_proto:: penumbra:: view:: v1:: broadcast_transaction_response:: Status as BroadcastStatus ;
22
+ use penumbra_transaction:: txhash:: TransactionId ;
21
23
use penumbra_view:: { Planner , ViewClient } ;
22
24
use rand:: rngs:: OsRng ;
23
25
use tokio:: sync:: watch;
@@ -37,32 +39,32 @@ lazy_static! {
37
39
(
38
40
// ETH priced in terms of BTC
39
41
"ETHBTC" . to_string( ) ,
40
- DirectedUnitPair :: from_str( "test_eth: test_btc" ) . unwrap( )
42
+ DirectedUnitPair :: from_str( "test_btc:test_eth " ) . unwrap( )
41
43
) ,
42
44
(
43
45
// ETH priced in terms of USDT
44
46
"ETHUSDT" . to_string( ) ,
45
- DirectedUnitPair :: from_str( "test_eth: test_usd" ) . unwrap( )
47
+ DirectedUnitPair :: from_str( "test_usd:test_eth " ) . unwrap( )
46
48
) ,
47
49
(
48
50
// BTC priced in terms of USD
49
51
"BTCUSDT" . to_string( ) ,
50
- DirectedUnitPair :: from_str( "test_btc: test_usd" ) . unwrap( )
52
+ DirectedUnitPair :: from_str( "test_usd:test_btc " ) . unwrap( )
51
53
) ,
52
54
(
53
55
// ATOM priced in terms of BTC
54
56
"ATOMBTC" . to_string( ) ,
55
- DirectedUnitPair :: from_str( "test_atom: test_btc" ) . unwrap( )
57
+ DirectedUnitPair :: from_str( "test_btc:test_atom " ) . unwrap( )
56
58
) ,
57
59
(
58
60
// ATOM priced in terms of USDT
59
61
"ATOMUSDT" . to_string( ) ,
60
- DirectedUnitPair :: from_str( "test_atom: test_usd" ) . unwrap( )
62
+ DirectedUnitPair :: from_str( "test_usd:test_atom " ) . unwrap( )
61
63
) ,
62
64
(
63
65
// OSMO priced in terms of USDT
64
66
"OSMOUSDT" . to_string( ) ,
65
- DirectedUnitPair :: from_str( "test_osmo: test_usd" ) . unwrap( )
67
+ DirectedUnitPair :: from_str( "test_usd:test_osmo " ) . unwrap( )
66
68
) ,
67
69
] ) ;
68
70
}
79
81
view : V ,
80
82
custody : C ,
81
83
fvk : FullViewingKey ,
82
- account : u32 ,
84
+ account : AddressIndex ,
83
85
pd_url : Url ,
84
86
}
85
87
90
92
{
91
93
/// Create a new trader.
92
94
pub fn new (
93
- account : u32 ,
95
+ account : AddressIndex ,
94
96
fvk : FullViewingKey ,
95
97
view : V ,
96
98
custody : C ,
@@ -129,15 +131,11 @@ where
129
131
pub async fn run ( mut self ) -> anyhow:: Result < ( ) > {
130
132
tracing:: info!( "starting trader" ) ;
131
133
let trader_span = tracing:: debug_span!( "trader" ) ;
132
- // TODO figure out why this span doesn't display in logs
133
134
let _ = trader_span. enter ( ) ;
134
135
tracing:: debug!( "running trader functionality" ) ;
135
- // Doing this loop without any shutdown signal doesn't exactly
136
+ // TODO: Doing this loop without any shutdown signal doesn't exactly
136
137
// provide a clean shutdown, but it works for now.
137
138
loop {
138
- // TODO: ensure we have some positions from `penumbra` to create a more interesting
139
- // trading environment :)
140
-
141
139
// Check each pair
142
140
let mut actions = self . actions . clone ( ) ;
143
141
for ( symbol, rx) in actions. iter_mut ( ) {
@@ -173,8 +171,6 @@ where
173
171
. expect ( "missing symbol -> DirectedUnitPair mapping" ) ;
174
172
175
173
// Create a plan that will contain all LP management operations based on this quote.
176
- // TODO: could move this outside the loop, but it's a little easier to debug
177
- // the plans like this for now
178
174
let plan = & mut Planner :: new ( OsRng ) ;
179
175
180
176
// Find the spendable balance for each asset in the market.
@@ -223,9 +219,6 @@ where
223
219
)
224
220
. await ?;
225
221
226
- // TODO: it's possible to immediately close this position within the same block
227
- // however what if we don't get updates every block?
228
-
229
222
// Finalize and submit the transaction plan.
230
223
match self . finalize_and_submit ( plan) . await {
231
224
Ok ( _) => { }
@@ -287,7 +280,50 @@ where
287
280
. await ?;
288
281
289
282
// 3. Broadcast the transaction and wait for confirmation.
290
- self . view . broadcast_transaction ( tx, true ) . await ?;
283
+ let mut rsp = self . view . broadcast_transaction ( tx, true ) . await ?;
284
+ let id: TransactionId = async move {
285
+ while let Some ( rsp) = rsp. try_next ( ) . await ? {
286
+ match rsp. status {
287
+ Some ( status) => match status {
288
+ BroadcastStatus :: BroadcastSuccess ( bs) => {
289
+ tracing:: debug!(
290
+ "transaction broadcast successfully: {}" ,
291
+ TransactionId :: try_from(
292
+ bs. id. expect( "detected transaction missing id" )
293
+ ) ?
294
+ ) ;
295
+ }
296
+ BroadcastStatus :: Confirmed ( c) => {
297
+ let id = c. id . expect ( "detected transaction missing id" ) . try_into ( ) ?;
298
+ if c. detection_height != 0 {
299
+ tracing:: debug!(
300
+ "transaction confirmed and detected: {} @ height {}" ,
301
+ id,
302
+ c. detection_height
303
+ ) ;
304
+ } else {
305
+ tracing:: debug!( "transaction confirmed and detected: {}" , id) ;
306
+ }
307
+ return Ok ( id) ;
308
+ }
309
+ } ,
310
+ None => {
311
+ // No status is unexpected behavior
312
+ return Err ( anyhow:: anyhow!(
313
+ "empty BroadcastTransactionResponse message"
314
+ ) ) ;
315
+ }
316
+ }
317
+ }
318
+
319
+ Err ( anyhow:: anyhow!(
320
+ "should have received BroadcastTransaction status or error"
321
+ ) )
322
+ }
323
+ . boxed ( )
324
+ . await
325
+ . context ( "error broadcasting transaction" ) ?;
326
+ tracing:: debug!( transaction_id = ?id, "broadcasted transaction" ) ;
291
327
292
328
Ok ( ( ) )
293
329
}
@@ -378,9 +414,9 @@ where
378
414
market. into_directed_trading_pair ( ) ,
379
415
spread as u32 ,
380
416
// p is always the scaling value
381
- ( scaling_factor as u128 * denom_scaler) . into ( ) ,
417
+ ( scaling_factor as u128 * denom_scaler / 1_000 ) . into ( ) ,
382
418
// price is expressed in units of asset 2
383
- ( mid_price as u128 * numer_scaler) . into ( ) ,
419
+ ( mid_price as u128 * numer_scaler / 1_000 ) . into ( ) ,
384
420
reserves,
385
421
) ;
386
422
0 commit comments