Skip to content

Commit 98eab74

Browse files
authored
Merge pull request #329 from propeller-heads/ah/ENG-4993/handle-bad-change-type-deltas
feat: Propagate db update errors
2 parents d9e801b + c4078a5 commit 98eab74

File tree

8 files changed

+77
-28
lines changed

8 files changed

+77
-28
lines changed

src/evm/decoder.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ where
214214

215215
/// Decodes a `FeedMessage` into a `BlockUpdate` containing the updated states of protocol
216216
/// components
217-
pub async fn decode(&self, msg: FeedMessage<H>) -> Result<Update, StreamDecodeError> {
217+
pub async fn decode(&self, msg: &FeedMessage<H>) -> Result<Update, StreamDecodeError> {
218218
// stores all states updated in this tick/msg
219219
let mut updated_states = HashMap::new();
220220
let mut new_pairs = HashMap::new();
@@ -377,7 +377,8 @@ where
377377
Some(storage_by_address),
378378
token_proxy_accounts,
379379
)
380-
.await;
380+
.await
381+
.map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
381382
info!("Engine updated");
382383
drop(state_guard);
383384
}
@@ -437,6 +438,8 @@ where
437438
continue 'outer;
438439
}
439440
};
441+
// TODO: Ok we deployed a proxy whenever we see a new token without
442+
// a implementation set.
440443
if !state_guard
441444
.proxy_token_addresses
442445
.contains_key(&token_address)
@@ -474,7 +477,8 @@ where
474477
None,
475478
new_tokens_accounts,
476479
)
477-
.await;
480+
.await
481+
.map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
478482
}
479483

480484
// collect contracts:ids mapping for states that should update on contract
@@ -562,6 +566,9 @@ where
562566
.map(|(key, value)| {
563567
let mut update: AccountUpdate = value.clone().into();
564568

569+
// TODO: Unify this different initialisation if we receive
570+
// state updates for the token with the usual case. Also
571+
// switch the if cases.
565572
if state_guard.tokens.contains_key(key) {
566573
// If the account is a token, we need to handle it with a proxy contract
567574

@@ -599,6 +606,21 @@ where
599606
.unwrap()
600607
};
601608

609+
// TEMP PATCH (ENG-4993)
610+
//
611+
// The indexer emits deltas without code marked as creations,
612+
// which crashes TychoDB. Until fixed, treat them as updates
613+
// (since EVM code cannot be deleted).
614+
if update.code.is_none() &&
615+
matches!(update.change, ChangeType::Creation)
616+
{
617+
error!(
618+
update = ?update,
619+
"FaultyCreationDelta"
620+
);
621+
update.change = ChangeType::Update;
622+
}
623+
602624
// assign original token contract to new address
603625
update.address = proxy_addr;
604626
};
@@ -617,7 +639,8 @@ where
617639
None,
618640
token_proxy_accounts,
619641
)
620-
.await;
642+
.await
643+
.map_err(|e| StreamDecodeError::Fatal(e.to_string()))?;
621644
info!("Engine updated");
622645

623646
// Collect all pools related to the updated accounts
@@ -771,7 +794,7 @@ where
771794
// Send the tick with all updated states
772795
Ok(Update::new(header.block_number_or_timestamp(), updated_states, new_pairs)
773796
.set_removed_pairs(removed_pairs)
774-
.set_sync_states(msg.sync_states))
797+
.set_sync_states(msg.sync_states.clone()))
775798
}
776799

777800
fn apply_update(
@@ -975,12 +998,12 @@ mod tests {
975998

976999
let msg = load_test_msg("uniswap_v2_snapshot");
9771000
let res1 = decoder
978-
.decode(msg)
1001+
.decode(&msg)
9791002
.await
9801003
.expect("decode failure");
9811004
let msg = load_test_msg("uniswap_v2_delta");
9821005
let res2 = decoder
983-
.decode(msg)
1006+
.decode(&msg)
9841007
.await
9851008
.expect("decode failure");
9861009

@@ -1007,7 +1030,7 @@ mod tests {
10071030

10081031
let msg = load_test_msg("uniswap_v2_snapshot");
10091032
let res1 = decoder
1010-
.decode(msg)
1033+
.decode(&msg)
10111034
.await
10121035
.expect("decode failure");
10131036

@@ -1023,7 +1046,7 @@ mod tests {
10231046
decoder.skip_state_decode_failures = skip_failures;
10241047

10251048
let msg = load_test_msg("uniswap_v2_snapshot_broken_id");
1026-
match decoder.decode(msg).await {
1049+
match decoder.decode(&msg).await {
10271050
Err(StreamDecodeError::Fatal(msg)) => {
10281051
if !skip_failures {
10291052
assert_eq!(
@@ -1053,7 +1076,7 @@ mod tests {
10531076
decoder.skip_state_decode_failures = skip_failures;
10541077

10551078
let msg = load_test_msg("uniswap_v2_snapshot_broken_state");
1056-
match decoder.decode(msg).await {
1079+
match decoder.decode(&msg).await {
10571080
Err(StreamDecodeError::Fatal(msg)) => {
10581081
if !skip_failures {
10591082
assert_eq!(msg, "Missing attributes reserve0");
@@ -1119,7 +1142,7 @@ mod tests {
11191142

11201143
// Decode the message
11211144
let _ = decoder
1122-
.decode(msg)
1145+
.decode(&msg)
11231146
.await
11241147
.expect("decode failure");
11251148

src/evm/engine_db/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use tycho_client::feed::BlockHeader;
77
use tycho_common::simulation::errors::SimulationError;
88

99
use crate::evm::{
10-
engine_db::{engine_db_interface::EngineDatabaseInterface, tycho_db::PreCachedDB},
10+
engine_db::{
11+
engine_db_interface::EngineDatabaseInterface,
12+
tycho_db::{PreCachedDB, PreCachedDBError},
13+
},
1114
simulation::SimulationEngine,
1215
tycho_models::{AccountUpdate, ChangeType, ResponseAccount},
1316
};
@@ -70,7 +73,7 @@ pub async fn update_engine(
7073
block: Option<BlockHeader>,
7174
vm_storage: Option<HashMap<Address, ResponseAccount>>,
7275
account_updates: HashMap<Address, AccountUpdate>,
73-
) -> Vec<AccountUpdate> {
76+
) -> Result<Vec<AccountUpdate>, PreCachedDBError> {
7477
if let Some(block) = block {
7578
let mut vm_updates: Vec<AccountUpdate> = Vec::new();
7679

@@ -93,11 +96,11 @@ pub async fn update_engine(
9396
}
9497

9598
if !vm_updates.is_empty() {
96-
db.update(vm_updates.clone(), Some(block));
99+
db.update(vm_updates.clone(), Some(block))?;
97100
}
98101

99-
vm_updates
102+
Ok(vm_updates)
100103
} else {
101-
vec![]
104+
Ok(vec![])
102105
}
103106
}

src/evm/engine_db/tycho_db.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub enum TychoClientError {
3636
pub enum PreCachedDBError {
3737
#[error("Account {0} not found")]
3838
MissingAccount(Address),
39+
#[error("Bad account update: {0} - {1:?}")]
40+
BadUpdate(String, Box<AccountUpdate>),
3941
#[error("Block needs to be set")]
4042
BlockNotSet(),
4143
#[error("Tycho Client error: {0}")]
@@ -76,10 +78,17 @@ impl PreCachedDB {
7678
}
7779

7880
#[instrument(skip_all)]
79-
pub fn update(&self, account_updates: Vec<AccountUpdate>, block: Option<BlockHeader>) {
81+
pub fn update(
82+
&self,
83+
account_updates: Vec<AccountUpdate>,
84+
block: Option<BlockHeader>,
85+
) -> Result<(), PreCachedDBError> {
8086
// Hold the write lock for the duration of the function so that no other thread can
8187
// write to the storage.
82-
let mut write_guard = self.inner.write().unwrap();
88+
let mut write_guard = self
89+
.inner
90+
.write()
91+
.expect("Fatal tycho state db lock poisoned");
8392

8493
write_guard.block = block;
8594

@@ -108,10 +117,13 @@ impl PreCachedDB {
108117

109118
// We expect the code to be present.
110119
let code = Bytecode::new_raw(AlloyBytes::from(
111-
update.code.clone().unwrap_or_else(|| {
120+
update.code.clone().ok_or_else(|| {
112121
error!(%update.address, "MissingCode");
113-
Vec::new()
114-
}),
122+
PreCachedDBError::BadUpdate(
123+
"MissingCode".into(),
124+
Box::new(update.clone()),
125+
)
126+
})?,
115127
));
116128
// If the balance is not present, we set it to zero.
117129
let balance = update.balance.unwrap_or(U256::ZERO);
@@ -130,6 +142,7 @@ impl PreCachedDB {
130142
}
131143
}
132144
}
145+
Ok(())
133146
}
134147

135148
/// Retrieves the storage value at the specified index for the given account, if it exists.
@@ -552,7 +565,9 @@ mod tests {
552565
..Default::default()
553566
};
554567

555-
mock_db.update(vec![account_update], Some(new_block));
568+
mock_db
569+
.update(vec![account_update], Some(new_block))
570+
.unwrap();
556571

557572
let account_info = mock_db
558573
.basic_ref(Address::from_str("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D").unwrap())

src/evm/protocol/vm/state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,8 @@ mod tests {
790790
false,
791791
);
792792
}
793-
db.update(accounts, Some(block));
793+
db.update(accounts, Some(block))
794+
.unwrap();
794795

795796
let tokens = vec![dai().address, bal().address];
796797
for token in &tokens {

src/evm/protocol/vm/tycho_decoder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ mod tests {
311311
false,
312312
);
313313
}
314-
db.update(accounts, Some(block));
314+
db.update(accounts, Some(block))
315+
.unwrap();
315316
let account_balances = HashMap::from([(
316317
Bytes::from("0xBA12222222228d8Ba445958a75a0704d566BF2C8"),
317318
HashMap::from([

src/evm/stream.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
22

33
use futures::{Stream, StreamExt};
44
use tokio_stream::wrappers::ReceiverStream;
5-
use tracing::warn;
5+
use tracing::{debug, warn};
66
use tycho_client::{
77
feed::{component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader},
88
stream::{StreamError, TychoStreamBuilder},
@@ -153,7 +153,12 @@ impl ProtocolStreamBuilder {
153153
let decoder = decoder.clone(); // Clone the decoder for the closure
154154
move |msg| {
155155
let decoder = decoder.clone(); // Clone again for the async block
156-
async move { decoder.decode(msg).await }
156+
async move {
157+
decoder.decode(&msg).await.map_err(|e| {
158+
debug!(msg=?msg, "Decode error: {}", e);
159+
e
160+
})
161+
}
157162
}
158163
})))
159164
}

src/rfq/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl RFQStreamBuilder {
6868
Ok((provider, msg)) => {
6969
let update = self
7070
.decoder
71-
.decode(FeedMessage {
71+
.decode(&FeedMessage {
7272
state_msgs: HashMap::from([(provider.clone(), msg)]),
7373
sync_states: HashMap::new(),
7474
})

tycho_simulation_py/src/structs_py.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ impl TychoDB {
537537

538538
self_
539539
.inner
540-
.update(account_updates, block);
540+
.update(account_updates, block)
541+
.unwrap();
541542
}
542543
}

0 commit comments

Comments
 (0)