Skip to content

Commit 88706d1

Browse files
committed
feat(coprocessor): host-listener, dependents for dependency_chain
1 parent 7ff5f7e commit 88706d1

File tree

5 files changed

+92
-53
lines changed

5 files changed

+92
-53
lines changed

coprocessor/fhevm-engine/.sqlx/query-a287f23e7450a0f1f52d25ad518cdd3ef227e24e2bb4e7212dd2f60347349de3.json

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/.sqlx/query-d1e5c42539bb901184fe1f4690e62c26850ba790fe0c8d6bdba3adfa18dce89b.json

Lines changed: 0 additions & 16 deletions
This file was deleted.

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,9 @@ fn grouping_to_chains(ordered_txs: &mut [Transaction]) -> OrderedChains {
255255
size: 0,
256256
before_size: 0,
257257
dependencies: vec![],
258+
dependents: vec![],
258259
allowed_handle: tx.allowed_handle.clone(), // needed to publish in cache
260+
new_chain: false,
259261
};
260262
ordered_chains_hash.push(new_chain.hash);
261263
e.insert(new_chain);
@@ -277,7 +279,9 @@ fn grouping_to_chains(ordered_txs: &mut [Transaction]) -> OrderedChains {
277279
size: tx.size,
278280
before_size,
279281
dependencies,
282+
dependents: vec![],
280283
allowed_handle: tx.allowed_handle.clone(),
284+
new_chain: true,
281285
};
282286
ordered_chains_hash.push(new_chain.hash);
283287
chains.insert(new_chain.hash, new_chain);
@@ -286,6 +290,23 @@ fn grouping_to_chains(ordered_txs: &mut [Transaction]) -> OrderedChains {
286290
used_tx.insert(tx.tx_hash, tx);
287291
}
288292
}
293+
// compute dependents field
294+
for chain_hash in ordered_chains_hash.iter() {
295+
let Some(chain) = chains.get(chain_hash) else {
296+
continue;
297+
};
298+
if !chain.new_chain {
299+
continue;
300+
}
301+
for dep in chain.dependencies.clone() {
302+
if let Some(dep_chain) = chains.get_mut(&dep) {
303+
if !dep_chain.new_chain {
304+
continue;
305+
}
306+
dep_chain.dependents.push(*chain_hash);
307+
}
308+
}
309+
}
289310
ordered_chains_hash
290311
.iter()
291312
.filter_map(|hash| chains.remove(hash))
@@ -613,15 +634,20 @@ mod tests {
613634
assert_eq!(chains[0].dependencies.len(), 0);
614635
assert_eq!(chains[1].dependencies.len(), 0);
615636
assert_eq!(chains[2].dependencies.len(), 2);
637+
assert_eq!(chains[0].dependents, vec![tx3]);
638+
assert_eq!(chains[1].dependents, vec![tx3]);
639+
assert!(chains[2].dependents.is_empty());
616640
}
617641

618642
fn past_chain(last_byte: u8) -> Chain {
619643
Chain {
620644
hash: TransactionHash::with_last_byte(last_byte),
621645
dependencies: vec![],
646+
dependents: vec![],
622647
size: 1,
623648
before_size: 0,
624649
allowed_handle: vec![],
650+
new_chain: false,
625651
}
626652
}
627653

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,13 @@ pub async fn ingest_block_logs(
160160

161161
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
162162
if at_least_one_insertion {
163-
db.update_dependence_chain(&mut tx, chains, block_timestamp)
164-
.await?;
163+
db.update_dependence_chain(
164+
&mut tx,
165+
chains,
166+
block_timestamp,
167+
&block_logs.summary,
168+
)
169+
.await?;
165170
}
166171
tx.commit().await
167172
}

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ pub type ChainHash = TransactionHash;
4343
pub struct Chain {
4444
pub hash: ChainHash,
4545
pub dependencies: Vec<ChainHash>,
46+
pub dependents: Vec<ChainHash>,
4647
pub allowed_handle: Vec<Handle>,
4748
pub size: usize,
4849
pub before_size: usize,
50+
pub new_chain: bool,
4951
}
5052
pub type ChainCache = RwLock<lru::LruCache<Handle, ChainHash>>;
5153
pub type OrderedChains = Vec<Chain>;
@@ -764,44 +766,47 @@ impl Database {
764766
tx: &mut Transaction<'_>,
765767
chains: OrderedChains,
766768
block_timestamp: PrimitiveDateTime,
769+
block_summary: &BlockSummary,
767770
) -> Result<(), SqlxError> {
768-
if chains.is_empty() {
769-
return Ok(());
770-
}
771-
let chains_hash =
772-
chains.iter().map(|c| c.hash.to_vec()).collect::<Vec<_>>();
773-
let dependency_counts =
774-
chains.iter().map(|c| c.dependencies.len() as i64).collect::<Vec<_>>();
775-
let timestamps: Vec<PrimitiveDateTime> = chains
776-
.iter()
777-
.map(|c| {
778-
block_timestamp.saturating_add(TimeDuration::microseconds(
779-
c.before_size as i64,
780-
))
781-
})
782-
.collect();
783-
let query = sqlx::query!(
784-
r#"
785-
INSERT INTO dependence_chain(
786-
dependence_chain_id,
787-
status,
771+
for chain in chains {
772+
let last_updated_at = block_timestamp.saturating_add(
773+
TimeDuration::microseconds(chain.before_size as i64),
774+
);
775+
let dependents = chain
776+
.dependents
777+
.iter()
778+
.map(|h| h.to_vec())
779+
.collect::<Vec<_>>();
780+
sqlx::query!(
781+
r#"
782+
INSERT INTO dependence_chain(
783+
dependence_chain_id,
784+
status,
785+
last_updated_at,
786+
dependency_count,
787+
dependents,
788+
block_hash,
789+
block_height
790+
) VALUES (
791+
$1, 'updated', $2::timestamp, $3, $4, $5, $6
792+
)
793+
ON CONFLICT (dependence_chain_id) DO UPDATE
794+
SET status = 'updated',
795+
last_updated_at = GREATEST(
796+
dependence_chain.last_updated_at,
797+
EXCLUDED.last_updated_at
798+
)
799+
"#,
800+
chain.hash.to_vec(),
788801
last_updated_at,
789-
dependency_count
802+
chain.dependencies.len() as i64,
803+
&dependents,
804+
block_summary.hash.to_vec(),
805+
block_summary.number as i64,
790806
)
791-
SELECT dcid, 'updated' AS status, ts as last_updated_at, dependency_count
792-
FROM unnest($1::bytea[], $2::timestamp[], $3::bigint[]) AS t(dcid, ts, dependency_count)
793-
ON CONFLICT (dependence_chain_id) DO UPDATE
794-
SET status = 'updated',
795-
last_updated_at = GREATEST(
796-
dependence_chain.last_updated_at,
797-
EXCLUDED.last_updated_at
798-
)
799-
"#,
800-
&chains_hash,
801-
&timestamps,
802-
&dependency_counts
803-
);
804-
query.execute(tx.deref_mut()).await?;
807+
.execute(tx.deref_mut())
808+
.await?;
809+
}
805810
Ok(())
806811
}
807812
}

0 commit comments

Comments
 (0)