Skip to content

Commit 4db070b

Browse files
committed
fix(coprocessor): fix top timestamp for tx
1 parent 8f31d38 commit 4db070b

File tree

8 files changed

+100
-72
lines changed

8 files changed

+100
-72
lines changed

coprocessor/fhevm-engine/.sqlx/query-31320058ef8b985ad5e5e38a4ccd972985bd431f767a6db44c1f6f34c4060297.json renamed to coprocessor/fhevm-engine/.sqlx/query-7ef2b942f30885f04b74039c7d587765d220922a2f85636d6dee6d2ae2268744.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/.sqlx/query-7c4a5df22f854623f0a5cf3e9a28f8e39d0ae9282a725f57a63d35db78114222.json renamed to coprocessor/fhevm-engine/.sqlx/query-f9bd698d1a4db12f5b074bf170cc58262d79da6566477b2962078483f0f2712a.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 75 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ struct Transaction {
2020
input_tx: HashSet<TransactionHash>,
2121
output_tx: HashSet<TransactionHash>,
2222
linear_chain: TransactionHash,
23-
size: usize,
24-
tx_topo_oder_in_block: usize,
23+
size: u64,
24+
depth_size: u64,
2525
}
2626

2727
impl Transaction {
@@ -35,13 +35,15 @@ impl Transaction {
3535
output_tx: HashSet::with_capacity(3),
3636
linear_chain: tx_hash, // before coallescing linear tx chains
3737
size: 0,
38-
tx_topo_oder_in_block: 0,
38+
depth_size: 0,
3939
}
4040
}
4141
}
4242

4343
const AVG_LOGS_PER_TX: usize = 8;
44-
fn scan_transactions(logs: &[LogTfhe]) -> Vec<Transaction> {
44+
fn scan_transactions(
45+
logs: &[LogTfhe],
46+
) -> (Vec<TransactionHash>, HashMap<TransactionHash, Transaction>) {
4547
// TODO: OPT no need for hashmap if contiguous tx
4648
let mut txs = HashMap::new();
4749
let mut ordered_txs_hash = Vec::with_capacity(logs.len() / AVG_LOGS_PER_TX);
@@ -71,27 +73,24 @@ fn scan_transactions(logs: &[LogTfhe]) -> Vec<Transaction> {
7173
}
7274
}
7375
}
74-
ordered_txs_hash
75-
.iter()
76-
.filter_map(|tx_hash| txs.remove(tx_hash))
77-
.collect()
76+
(ordered_txs_hash, txs)
7877
}
7978

8079
fn tx_of_handle(
81-
ordered_txs: &[Transaction],
80+
ordered_txs: &HashMap<TransactionHash, Transaction>,
8281
) -> (
8382
HashMap<Handle, TransactionHash>,
8483
HashMap<Handle, HashSet<TransactionHash>>,
8584
) {
8685
// handle to tx maps
8786
let mut handle_creator = HashMap::new(); // no intermediate value
8887
let mut handle_consumer = HashMap::new();
89-
for tx in ordered_txs {
88+
for tx in ordered_txs.values() {
9089
for handle in &tx.allowed_handle {
9190
handle_creator.insert(*handle, tx.tx_hash);
9291
}
9392
}
94-
for tx in ordered_txs {
93+
for tx in ordered_txs.values() {
9594
for handle in &tx.input_handle {
9695
if tx.output_handle.contains(handle) {
9796
// self dependency, ignore
@@ -117,13 +116,13 @@ fn tx_of_handle(
117116
}
118117

119118
async fn fill_tx_dependence_maps(
120-
txs: &mut [Transaction],
119+
txs: &mut HashMap<TransactionHash, Transaction>,
121120
past_chains: &ChainCache,
122121
) {
123122
// handle to tx maps
124123
let (handle_creator, handle_consumer) = tx_of_handle(txs);
125124
// txs relations
126-
for tx in txs {
125+
for tx in txs.values_mut() {
127126
// this tx depends on dep_tx
128127
for input_handle in &tx.input_handle {
129128
if tx.output_handle.contains(input_handle) {
@@ -156,42 +155,54 @@ async fn fill_tx_dependence_maps(
156155
}
157156
}
158157

159-
fn topological_order(ordered_txs: &mut Vec<Transaction>) {
158+
fn topological_order(
159+
ordered_hash: Vec<TransactionHash>,
160+
mut txs: HashMap<TransactionHash, Transaction>,
161+
) -> Vec<Transaction> {
160162
let mut seen_tx: HashSet<TransactionHash> =
161-
HashSet::with_capacity(ordered_txs.len());
162-
let txs_set: HashSet<TransactionHash> =
163-
ordered_txs.clone().iter().map(|tx| tx.tx_hash).collect();
163+
HashSet::with_capacity(txs.len());
164164
let mut is_already_sorted = true;
165-
for tx in ordered_txs.iter() {
165+
for &tx_hash in &ordered_hash {
166+
let Some(tx) = txs.get(&tx_hash) else {
167+
error!("Transaction {:?} missing in txs map", tx_hash);
168+
continue;
169+
};
170+
let mut depth_size = 0;
166171
for input_tx in &tx.input_tx {
167-
if !txs_set.contains(input_tx) {
168-
// previous block tx, already seen
169-
continue;
172+
match txs.get(input_tx) {
173+
None => {
174+
// previous block tx, already seen
175+
continue;
176+
}
177+
Some(dep_tx) => {
178+
depth_size = depth_size.max(dep_tx.depth_size + dep_tx.size);
179+
}
170180
}
171181
if !seen_tx.contains(input_tx) {
172182
is_already_sorted = false;
173-
error!("Out of order transaction detected: tx {:?} depends on tx {:?} which is later in the block", tx.tx_hash, input_tx);
183+
error!("Out of order transaction detected: tx {:?} depends on tx {:?} which is later in the block", tx_hash, input_tx);
174184
break;
175185
}
176186
}
177-
seen_tx.insert(tx.tx_hash);
187+
if let Some(tx) = txs.get_mut(&tx_hash) {
188+
tx.depth_size = depth_size;
189+
}
190+
seen_tx.insert(tx_hash);
178191
}
179192
if is_already_sorted {
180-
return;
193+
return ordered_hash
194+
.iter()
195+
.filter_map(|tx_hash| txs.remove(tx_hash))
196+
.collect();
181197
}
182-
let mut txs = ordered_txs
183-
.clone()
184-
.iter()
185-
.map(|tx| (tx.tx_hash, tx.clone()))
186-
.collect::<HashMap<_, _>>();
187-
let mut done_tx = HashSet::with_capacity(ordered_txs.len());
188-
let mut stacked_tx = HashSet::with_capacity(ordered_txs.len());
198+
let mut done_tx = HashSet::with_capacity(txs.len());
199+
let mut stacked_tx = HashSet::with_capacity(txs.len());
189200
let mut stack = Vec::new();
190-
let mut reordered = Vec::with_capacity(ordered_txs.len());
191-
for tx in ordered_txs.iter() {
201+
let mut reordered = Vec::with_capacity(txs.len());
202+
for tx_hash in ordered_hash {
192203
stacked_tx.clear();
193-
stack.push(tx.tx_hash);
194-
stacked_tx.insert(tx.tx_hash);
204+
stack.push(tx_hash);
205+
stacked_tx.insert(tx_hash);
195206
while let Some(tx_hash) = stack.pop() {
196207
if done_tx.contains(&tx_hash) {
197208
continue;
@@ -203,16 +214,26 @@ fn topological_order(ordered_txs: &mut Vec<Transaction>) {
203214
continue;
204215
};
205216
let mut unseen = vec![];
217+
let mut depth_size = 0;
206218
for input_tx in &tx.input_tx {
207-
let is_other_block = !txs.contains_key(input_tx);
208-
if is_other_block {
209-
continue;
219+
match txs.get(input_tx) {
220+
None => {
221+
// previous block tx, already seen
222+
continue;
223+
}
224+
Some(dep_tx) => {
225+
depth_size =
226+
depth_size.max(dep_tx.depth_size + dep_tx.size);
227+
}
210228
}
211229
if !done_tx.contains(input_tx) {
212230
unseen.push(*input_tx);
213231
}
214232
}
215233
if unseen.is_empty() {
234+
if let Some(tx) = txs.get_mut(&tx_hash) {
235+
tx.depth_size = depth_size;
236+
}
216237
reordered.push(tx_hash);
217238
done_tx.insert(tx_hash);
218239
} else {
@@ -235,17 +256,11 @@ fn topological_order(ordered_txs: &mut Vec<Transaction>) {
235256
}
236257
}
237258
}
238-
ordered_txs.clear();
239259
debug!("Reordered txs: {:?}", reordered);
240-
let mut topo_order_position = 0;
241-
for tx_hash in reordered.iter() {
242-
let Some(mut tx) = txs.remove(tx_hash) else {
243-
continue;
244-
};
245-
tx.tx_topo_oder_in_block = topo_order_position;
246-
topo_order_position += tx.size;
247-
ordered_txs.push(tx);
248-
}
260+
reordered
261+
.iter()
262+
.filter_map(|tx_hash| txs.remove(tx_hash))
263+
.collect()
249264
}
250265

251266
async fn grouping_to_chains_connex(
@@ -456,10 +471,10 @@ pub async fn dependence_chains(
456471
connex: bool,
457472
across_blocks: bool,
458473
) -> OrderedChains {
459-
let mut ordered_txs = scan_transactions(logs);
460-
fill_tx_dependence_maps(ordered_txs.as_mut_slice(), past_chains).await;
461-
debug!("Transactions: {:?}", ordered_txs);
462-
topological_order(&mut ordered_txs);
474+
let (ordered_hash, mut txs) = scan_transactions(logs);
475+
fill_tx_dependence_maps(&mut txs, past_chains).await;
476+
debug!("Transactions: {:?}", txs.values());
477+
let mut ordered_txs = topological_order(ordered_hash, txs);
463478
let chains = if connex {
464479
grouping_to_chains_connex(&mut ordered_txs).await
465480
} else {
@@ -474,9 +489,7 @@ pub async fn dependence_chains(
474489
let tx_hash = log.transaction_hash.unwrap_or_default();
475490
if let Some(tx) = txs.get(&tx_hash) {
476491
log.dependence_chain = tx.linear_chain;
477-
log.block_timestamp = log.block_timestamp.saturating_add(
478-
time::Duration::microseconds(tx.tx_topo_oder_in_block as i64),
479-
);
492+
log.tx_depth_size = tx.depth_size;
480493
} else {
481494
// past chain
482495
log.dependence_chain = tx_hash;
@@ -530,6 +543,7 @@ mod tests {
530543
block_timestamp: sqlx::types::time::PrimitiveDateTime::MIN,
531544
transaction_hash: Some(tx),
532545
dependence_chain: TransactionHash::ZERO,
546+
tx_depth_size: 0,
533547
})
534548
}
535549

@@ -775,6 +789,11 @@ mod tests {
775789
assert_eq!(logs[2].dependence_chain, tx2);
776790
assert_eq!(logs[3].dependence_chain, tx1);
777791
assert_eq!(logs[4].dependence_chain, tx3);
792+
assert_eq!(logs[0].tx_depth_size, 0);
793+
assert_eq!(logs[1].tx_depth_size, 0);
794+
assert_eq!(logs[2].tx_depth_size, 0);
795+
assert_eq!(logs[3].tx_depth_size, 0);
796+
assert_eq!(logs[4].tx_depth_size, 2);
778797
assert_eq!(cache.read().await.len(), 3);
779798
assert_eq!(chains[0].before_size, 0);
780799
assert_eq!(chains[1].before_size, 0);
@@ -949,6 +968,7 @@ mod tests {
949968
let chains = dependence_chains(&mut logs, &cache, false, true).await;
950969
assert_eq!(chains.len(), 6);
951970
assert!(chains.iter().all(|c| c.before_size == 0));
971+
assert!(logs.iter().all(|log| log.tx_depth_size == 0));
952972
}
953973

954974
#[tokio::test]

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ pub async fn ingest_block_logs(
110110
// updated in the next loop and dependence_chains
111111
is_allowed: false,
112112
dependence_chain: Default::default(),
113+
tx_depth_size: 0,
113114
};
114115
tfhe_event_log.push(log);
115116
continue;

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ pub struct Chain {
4545
pub dependencies: Vec<ChainHash>,
4646
pub dependents: Vec<ChainHash>,
4747
pub allowed_handle: Vec<Handle>,
48-
pub size: usize,
49-
pub before_size: usize,
48+
pub size: u64,
49+
pub before_size: u64,
5050
pub new_chain: bool,
5151
}
5252
pub type ChainCache = RwLock<lru::LruCache<Handle, ChainHash>>;
@@ -103,6 +103,7 @@ pub struct LogTfhe {
103103
pub is_allowed: bool,
104104
pub block_number: u64,
105105
pub block_timestamp: PrimitiveDateTime,
106+
pub tx_depth_size: u64,
106107
pub dependence_chain: TransactionHash,
107108
}
108109

@@ -294,7 +295,7 @@ impl Database {
294295
schedule_order,
295296
is_completed
296297
)
297-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::timestamp, $9::timestamp, $10)
298+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW(), $9::timestamp, $10)
298299
ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING
299300
"#,
300301
tenant_id as i32,
@@ -305,7 +306,10 @@ impl Database {
305306
log.dependence_chain.to_vec(),
306307
log.transaction_hash.map(|txh| txh.to_vec()),
307308
log.is_allowed,
308-
log.block_timestamp,
309+
log.block_timestamp
310+
.saturating_add(time::Duration::microseconds(
311+
log.tx_depth_size as i64
312+
)),
309313
!log.is_allowed,
310314
);
311315
query

coprocessor/fhevm-engine/stress-test-generator/src/utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ pub async fn generate_trivial_encrypt(
234234
block_number: 1,
235235
block_timestamp: PrimitiveDateTime::MAX,
236236
dependence_chain: transaction_hash,
237+
tx_depth_size: 0,
237238
};
238239
let mut tx = listener_event_to_db.new_transaction().await?;
239240
listener_event_to_db
@@ -417,6 +418,7 @@ pub async fn insert_tfhe_event(
417418
block_number: 1,
418419
block_timestamp: PrimitiveDateTime::MAX,
419420
dependence_chain: transaction_hash,
421+
tx_depth_size: 0,
420422
};
421423
listener_event_to_db
422424
.insert_tfhe_event(&mut tx, &log)

coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async fn insert_tfhe_event(
4545
block_number: log.block_number.unwrap_or(0),
4646
block_timestamp: PrimitiveDateTime::MAX,
4747
dependence_chain: log.transaction_hash.unwrap_or_default(),
48+
tx_depth_size: 0,
4849
};
4950
db.insert_tfhe_event(tx, &event).await
5051
}

0 commit comments

Comments
 (0)