Skip to content

Commit ca16430

Browse files
committed
feat(coprocessor): topologic timestamp
1 parent 5003ea6 commit ca16430

File tree

8 files changed

+47
-23
lines changed

8 files changed

+47
-23
lines changed

coprocessor/fhevm-engine/.sqlx/query-04c918d1380072cbacde5fb7937e17c3b50874fe082a6761f25f4a9b1b082bf3.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/.sqlx/query-e83798919f9929634fec3ed1d1ed3ec8891fbb77ce3b9831054589567a0e084a.json

Lines changed: 0 additions & 14 deletions
This file was deleted.

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ tfhe = { version = "=1.4.0-alpha.4", features = [
8585
] }
8686
tfhe-versionable = "=0.6.2"
8787
tfhe-zk-pok = "=0.7.4"
88+
time = "0.3.43"
8889
tokio = { version = "1.45.0", features = ["full"] }
8990
tokio-util = "0.7.15"
9091
tonic = { version = "0.12.3", features = ["server"] }

coprocessor/fhevm-engine/host-listener/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ rustls = { workspace = true }
2929
serde = { workspace = true }
3030
serde_json = { workspace = true }
3131
sqlx = { workspace = true }
32+
time = { workspace = true }
3233
tokio = { workspace = true }
3334
tokio-util = { workspace = true }
3435
tracing = { workspace = true }

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl Transaction {
3232
input_tx: HashSet::with_capacity(3),
3333
output_tx: HashSet::with_capacity(3),
3434
linear_chain: tx_hash, // before coallescing linear tx chains
35-
size: 1,
35+
size: 0,
3636
}
3737
}
3838
}
@@ -603,6 +603,9 @@ mod tests {
603603
assert_eq!(logs[3].dependence_chain, tx1);
604604
assert_eq!(logs[4].dependence_chain, tx3);
605605
assert_eq!(cache.read().await.len(), 3);
606+
assert_eq!(chains[0].before_size, 0);
607+
assert_eq!(chains[1].before_size, 0);
608+
assert_eq!(chains[2].before_size, 2);
606609
}
607610

608611
fn past_chain(last_byte: u8) -> Chain {
@@ -765,6 +768,6 @@ mod tests {
765768
eprintln!("Logs: {:?}", logs);
766769
let chains = dependence_chains(&mut logs, &cache).await;
767770
assert_eq!(chains.len(), 6);
768-
// assert_eq!(cache.read().await.len(), 66);
771+
assert!(chains.iter().all(|c| c.before_size == 0));
769772
}
770773
}

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ pub async fn ingest_block_logs(
160160

161161
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
162162
if at_least_one_insertion {
163-
db.update_dependence_chain(&mut tx, chains).await?;
163+
db.update_dependence_chain(&mut tx, chains, block_timestamp)
164+
.await?;
164165
}
165166
tx.commit().await
166167
}

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use sqlx::{PgPool, Postgres};
1717
use std::ops::DerefMut;
1818
use std::sync::Arc;
1919
use std::time::Duration;
20+
use time::{Duration as TimeDuration, PrimitiveDateTime};
2021
use tokio::sync::RwLock;
2122
use tracing::error;
2223
use tracing::info;
@@ -99,7 +100,7 @@ pub struct LogTfhe {
99100
pub transaction_hash: Option<TransactionHash>,
100101
pub is_allowed: bool,
101102
pub block_number: u64,
102-
pub block_timestamp: sqlx::types::time::PrimitiveDateTime,
103+
pub block_timestamp: PrimitiveDateTime,
103104
pub dependence_chain: TransactionHash,
104105
}
105106

@@ -762,24 +763,39 @@ impl Database {
762763
&self,
763764
tx: &mut Transaction<'_>,
764765
chains: OrderedChains,
766+
block_timestamp: PrimitiveDateTime,
765767
) -> Result<(), SqlxError> {
766768
if chains.is_empty() {
767769
return Ok(());
768770
}
769-
let chains = chains.iter().map(|d| d.hash.to_vec()).collect::<Vec<_>>();
771+
let chains_hash =
772+
chains.iter().map(|c| c.hash.to_vec()).collect::<Vec<_>>();
773+
let timestamps: Vec<PrimitiveDateTime> = chains
774+
.iter()
775+
.map(|c| {
776+
block_timestamp.saturating_add(TimeDuration::microseconds(
777+
c.before_size as i64,
778+
))
779+
})
780+
.collect();
770781
let query = sqlx::query!(
771782
r#"
772783
INSERT INTO dependence_chain(
773784
dependence_chain_id,
774785
status,
775786
last_updated_at
776787
)
777-
VALUES (unnest($1::bytea[]), 'updated', statement_timestamp())
788+
SELECT dcid, 'updated' AS status, ts as last_updated_at
789+
FROM unnest($1::bytea[], $2::timestamp[]) AS t(dcid, ts)
778790
ON CONFLICT (dependence_chain_id) DO UPDATE
779-
SET status = EXCLUDED.status,
780-
last_updated_at = EXCLUDED.last_updated_at
791+
SET status = 'updated',
792+
last_updated_at = GREATEST(
793+
dependence_chain.last_updated_at,
794+
EXCLUDED.last_updated_at
795+
)
781796
"#,
782-
&chains,
797+
&chains_hash,
798+
&timestamps,
783799
);
784800
query.execute(tx.deref_mut()).await?;
785801
Ok(())

0 commit comments

Comments
 (0)