Skip to content

Commit 99eb3d2

Browse files
committed
fix(coprocessor): split dependence chains after forks instead of before
1 parent f3b475e commit 99eb3d2

File tree

1 file changed

+128
-10
lines changed

1 file changed

+128
-10
lines changed

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 128 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ fn tx_of_handle(
117117

118118
async fn fill_tx_dependence_maps(
119119
txs: &mut HashMap<TransactionHash, Transaction>,
120+
used_txs_chains: &mut HashMap<TransactionHash, HashSet<TransactionHash>>,
120121
past_chains: &ChainCache,
121122
) {
122123
// handle to tx maps
@@ -132,11 +133,31 @@ async fn fill_tx_dependence_maps(
132133
if let Some(dep_tx) = handle_creator.get(input_handle) {
133134
// intra block
134135
tx.input_tx.insert(*dep_tx);
136+
used_txs_chains
137+
.entry(*dep_tx)
138+
.and_modify(|v| {
139+
v.insert(tx.tx_hash);
140+
})
141+
.or_insert({
142+
let mut h = HashSet::new();
143+
h.insert(tx.tx_hash);
144+
h
145+
});
135146
} else if let Some(dep_tx_hash) =
136147
past_chains.write().await.get(input_handle)
137148
{
138149
// extra block, this is directly a chain hash
139150
tx.input_tx.insert(*dep_tx_hash);
151+
used_txs_chains
152+
.entry(*dep_tx_hash)
153+
.and_modify(|v| {
154+
v.insert(tx.tx_hash);
155+
})
156+
.or_insert({
157+
let mut h = HashSet::new();
158+
h.insert(tx.tx_hash);
159+
h
160+
});
140161
}
141162
}
142163
// this tx is used by consumer_tx
@@ -363,15 +384,21 @@ async fn grouping_to_chains_connex(
363384

364385
fn grouping_to_chains_no_fork(
365386
ordered_txs: &mut [Transaction],
387+
used_txs_chains: &mut HashMap<TransactionHash, HashSet<TransactionHash>>,
366388
across_blocks: bool,
367389
) -> OrderedChains {
368390
let mut used_tx: HashMap<TransactionHash, &Transaction> =
369391
HashMap::with_capacity(ordered_txs.len());
370392
let mut chains: HashMap<ChainHash, Chain> =
371393
HashMap::with_capacity(ordered_txs.len());
372394
let mut ordered_chains_hash = Vec::with_capacity(ordered_txs.len());
395+
let block_tx_hashes = ordered_txs
396+
.iter()
397+
.map(|tx| tx.tx_hash)
398+
.collect::<HashSet<_>>();
373399
for tx in ordered_txs.iter_mut() {
374-
let mut dependencies = Vec::with_capacity(tx.input_tx.len());
400+
let mut dependencies_block = Vec::with_capacity(tx.input_tx.len());
401+
let mut dependencies_outer = Vec::with_capacity(tx.input_tx.len());
375402
let mut dependencies_seen = HashSet::with_capacity(tx.input_tx.len());
376403
for dep_hash in &tx.input_tx {
377404
// Only record dependences within the block as we don't
@@ -380,20 +407,60 @@ fn grouping_to_chains_no_fork(
380407
used_tx.get(dep_hash).map(|tx| tx.linear_chain)
381408
{
382409
if !dependencies_seen.contains(&linear_chain) {
383-
dependencies.push(linear_chain);
410+
if block_tx_hashes.contains(&linear_chain) {
411+
dependencies_block.push(linear_chain);
412+
} else if across_blocks {
413+
dependencies_outer.push(linear_chain);
414+
}
384415
dependencies_seen.insert(linear_chain);
385416
}
386417
} else if across_blocks {
387418
// if not in used_tx, it is a past chain
388419
if !dependencies_seen.contains(dep_hash) {
389-
dependencies.push(*dep_hash);
420+
dependencies_outer.push(*dep_hash);
390421
dependencies_seen.insert(*dep_hash);
391422
}
392423
}
393424
}
394-
let is_linear = dependencies.len() == 1 && tx.output_tx.len() <= 1;
425+
// A chain is linear if there's no joins on the current
426+
// transaction and if the current transaction is not a
427+
// descendant of a fork
428+
// 1. Test for joins
429+
let mut is_linear =
430+
(dependencies_block.len() + dependencies_outer.len()) == 1;
431+
// 2. Test for forks
395432
if is_linear {
396-
tx.linear_chain = dependencies[0];
433+
let unique_parent = if dependencies_block.is_empty() {
434+
dependencies_outer[0]
435+
} else {
436+
dependencies_block[0]
437+
};
438+
if let Some(siblings) = used_txs_chains.get_mut(&unique_parent) {
439+
for s in siblings.clone().iter() {
440+
// If one sibling is already within a chain, this
441+
// chain could be the same as another in the
442+
// siblings set, so both dependences are then
443+
// covered by the same chain.
444+
if let Some(linear_chain) =
445+
used_tx.get(s).map(|tx| tx.linear_chain)
446+
{
447+
siblings.remove(s);
448+
siblings.insert(linear_chain);
449+
}
450+
}
451+
// If there is only one descendant for the unique
452+
// ancestor or all descendents are in a single
453+
// dependence chain as a totally ordered set, then the
454+
// linear chain continues
455+
is_linear = siblings.len() == 1;
456+
}
457+
}
458+
if is_linear {
459+
tx.linear_chain = if dependencies_block.is_empty() {
460+
dependencies_outer[0]
461+
} else {
462+
dependencies_block[0]
463+
};
397464
match chains.entry(tx.linear_chain) {
398465
// extend the existing chain from same block
399466
Entry::Occupied(mut e) => {
@@ -418,20 +485,21 @@ fn grouping_to_chains_no_fork(
418485
}
419486
} else {
420487
let mut before_size = 0;
421-
for dep in &dependencies {
488+
for dep in &dependencies_block {
422489
before_size = before_size.max(
423490
chains
424491
.get(dep)
425492
.map(|c| c.size + c.before_size)
426493
.unwrap_or(0),
427494
);
428495
}
429-
debug!("Creating new chain for tx {:?} with dependencies {:?}, before_size {}", tx, dependencies, before_size);
496+
debug!("Creating new chain for tx {:?} with block dependencies {:?}, outer dependencies {:?}, before_size {}",
497+
tx, dependencies_block, dependencies_outer, before_size);
430498
let new_chain = Chain {
431499
hash: tx.tx_hash,
432500
size: tx.size,
433501
before_size,
434-
dependencies,
502+
dependencies: dependencies_block,
435503
dependents: vec![],
436504
allowed_handle: tx.allowed_handle.clone(),
437505
new_chain: true,
@@ -473,13 +541,21 @@ pub async fn dependence_chains(
473541
across_blocks: bool,
474542
) -> OrderedChains {
475543
let (ordered_hash, mut txs) = scan_transactions(logs);
476-
fill_tx_dependence_maps(&mut txs, past_chains).await;
544+
let mut used_txs_chains: HashMap<
545+
TransactionHash,
546+
HashSet<TransactionHash>,
547+
> = HashMap::with_capacity(txs.len());
548+
fill_tx_dependence_maps(&mut txs, &mut used_txs_chains, past_chains).await;
477549
debug!("Transactions: {:?}", txs.values());
478550
let mut ordered_txs = topological_order(ordered_hash, txs);
479551
let chains = if connex {
480552
grouping_to_chains_connex(&mut ordered_txs).await
481553
} else {
482-
grouping_to_chains_no_fork(&mut ordered_txs, across_blocks)
554+
grouping_to_chains_no_fork(
555+
&mut ordered_txs,
556+
&mut used_txs_chains,
557+
across_blocks,
558+
)
483559
};
484560
// propagate to logs
485561
let txs = ordered_txs
@@ -1064,4 +1140,46 @@ mod tests {
10641140
assert!(logs[0..3].iter().all(|log| log.dependence_chain == tx3));
10651141
assert_eq!(cache.read().await.len(), 5);
10661142
}
1143+
1144+
#[tokio::test]
1145+
async fn test_past_chain_fork() {
1146+
let cache = ChainCache::new(lru::LruCache::new(
1147+
std::num::NonZeroUsize::new(100).unwrap(),
1148+
));
1149+
let past_chain1 = past_chain(100);
1150+
let past_chain_hash1 = past_chain1.hash;
1151+
let past_handle1 = new_handle();
1152+
cache.write().await.put(past_handle1, past_chain_hash1);
1153+
let mut logs = vec![];
1154+
let tx1 = TransactionHash::with_last_byte(2);
1155+
let tx2 = TransactionHash::with_last_byte(3);
1156+
let _h1 = op1(past_handle1, &mut logs, tx1);
1157+
let _h2 = op1(past_handle1, &mut logs, tx2);
1158+
let chains = dependence_chains(&mut logs, &cache, false, true).await;
1159+
assert_eq!(chains.len(), 2);
1160+
assert!(logs[0].dependence_chain == tx1);
1161+
assert!(logs[1].dependence_chain == tx2);
1162+
assert_eq!(cache.read().await.len(), 3);
1163+
}
1164+
1165+
#[tokio::test]
1166+
async fn test_current_block_fork() {
1167+
let cache = ChainCache::new(lru::LruCache::new(
1168+
std::num::NonZeroUsize::new(100).unwrap(),
1169+
));
1170+
let past_handle1 = new_handle();
1171+
let mut logs = vec![];
1172+
let tx1 = TransactionHash::with_last_byte(2);
1173+
let tx2 = TransactionHash::with_last_byte(3);
1174+
let tx3 = TransactionHash::with_last_byte(4);
1175+
let h1 = op1(past_handle1, &mut logs, tx1);
1176+
let _h2 = op1(h1, &mut logs, tx2);
1177+
let _h3 = op1(h1, &mut logs, tx3);
1178+
let chains = dependence_chains(&mut logs, &cache, false, true).await;
1179+
assert_eq!(chains.len(), 3);
1180+
assert!(logs[0].dependence_chain == tx1);
1181+
assert!(logs[1].dependence_chain == tx2);
1182+
assert!(logs[2].dependence_chain == tx3);
1183+
assert_eq!(cache.read().await.len(), 3);
1184+
}
10671185
}

0 commit comments

Comments
 (0)