Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/common-pull-request-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
fetch-depth: 0

- name: actionlint
uses: raven-actions/actionlint@3a24062651993d40fed1019b58ac6fbdfbf276cc # v2.0.1
uses: raven-actions/actionlint@e01d1ea33dd6a5ed517d95b4c0c357560ac6f518 # v2.1.1
with:
version: ${{ env.ACTIONLINT_VERSION }}

Expand Down
2 changes: 1 addition & 1 deletion charts/coprocessor/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: coprocessor
description: A helm chart to distribute and deploy Zama fhevm Co-Processor services
version: 0.7.11
version: 0.7.12
apiVersion: v2
keywords:
- fhevm
Expand Down
3 changes: 3 additions & 0 deletions charts/coprocessor/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ hostListener:
- --service-name="host-listener"
- --catchup-finalization-in-blocks=20 # Continuous catchup will wait for block "finalization"

### New in v0.11
# - --timeout-request-websocket=15 # OPTIONAL, Default timeout in seconds for websocket interactions

# Service ports configuration
ports:
metrics: 9100
Expand Down
2 changes: 1 addition & 1 deletion coprocessor/fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ aws-credential-types = "1.2.6"
aws-sdk-kms = { version = "1.68.0", default-features = false }
aws-sdk-s3 = { version = "1.103.0", features = ["test-util"] }
bigdecimal = "0.4.8"
clap = { version = "4.5.38", features = ["derive"] }
clap = { version = "4.5.38", features = ["derive", "env"] }
daggy = "0.8.1"
foundry-compilers = { version = "0.19.1", features = ["svm-solc"] }
futures-util = "0.3.31"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct Conf {
log_last_processed_every_number_of_updates: u64,

/// gw-listener service name in OTLP traces
#[arg(long, default_value = "gw-listener")]
#[arg(long, env = "OTEL_SERVICE_NAME", default_value = "gw-listener")]
pub service_name: String,

#[arg(long, default_value = None, help = "Can be negative from last processed block", allow_hyphen_values = true, alias = "catchup-kms-generation-from-block")]
Expand Down
109 changes: 83 additions & 26 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub const DEFAULT_DEPENDENCE_CACHE_SIZE: u16 = 10_000;
pub const DEFAULT_DEPENDENCE_BY_CONNEXITY: bool = false;
pub const DEFAULT_DEPENDENCE_CROSS_BLOCK: bool = true;

const TIMEOUT_REQUEST_ON_WEBSOCKET: u64 = 15;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
Expand Down Expand Up @@ -131,7 +133,7 @@ pub struct Args {
pub reorg_maximum_duration_in_blocks: u64,

/// service name in OTLP traces
#[arg(long, default_value = "host-listener")]
#[arg(long, env = "OTEL_SERVICE_NAME", default_value = "host-listener")]
pub service_name: String,

#[arg(
Expand All @@ -156,6 +158,13 @@ pub struct Args {
help = "Sleep duration in seconds between catchup loop iterations"
)]
pub catchup_loop_sleep_secs: u64,

#[arg(
long,
default_value_t = TIMEOUT_REQUEST_ON_WEBSOCKET,
help = "Timeout in seconds for RPC calls over websocket"
)]
pub timeout_request_websocket: u64,
}

// TODO: to merge with Levent works
Expand All @@ -180,6 +189,7 @@ struct InfiniteLogIter {
reorg_maximum_duration_in_blocks: u64, // in blocks
block_history: BlockHistory, // to detect reorgs
catchup_finalization_in_blocks: u64,
timeout_request_websocket: u64,
}

enum BlockOrTimeoutOrNone {
Expand Down Expand Up @@ -243,6 +253,7 @@ impl InfiniteLogIter {
args.reorg_maximum_duration_in_blocks as usize,
),
catchup_finalization_in_blocks: args.catchup_finalization_in_blocks,
timeout_request_websocket: args.timeout_request_websocket,
}
}

Expand Down Expand Up @@ -313,13 +324,27 @@ impl InfiniteLogIter {
Ok(provider) => provider,
Err(_) => anyhow::bail!("Cannot get a provider"),
};
provider.get_logs(&filter).await.map_err(|err| {
if eth_rpc_err::too_much_blocks_or_events(&err) {
anyhow::anyhow!("Too much blocks or events: {err}")
} else {
anyhow::anyhow!("Cannot get logs for {filter:?} due to {err}")
// Timeout to prevent hanging indefinitely on buggy node
match tokio::time::timeout(
Duration::from_secs(self.timeout_request_websocket),
provider.get_logs(&filter),
)
.await
{
Err(_) => {
anyhow::bail!("Timeout getting range logs for {filter:?}")
}
})
Ok(Err(err)) => {
if eth_rpc_err::too_much_blocks_or_events(&err) {
anyhow::bail!("Too much blocks or events: {err}")
} else {
anyhow::bail!(
"Cannot get range logs for {filter:?} due to {err}"
)
}
}
Ok(Ok(logs)) => Ok(logs),
}
}

async fn deduce_block_summary(
Expand Down Expand Up @@ -501,17 +526,24 @@ impl InfiniteLogIter {
error!("No provider, inconsistent state");
return Err(anyhow::anyhow!("No provider, inconsistent state"));
};
let block = provider.get_block(block_id).await;
match block {
Ok(Some(block)) => return Ok(block),
Ok(None) => error!(
let block = tokio::time::timeout(
Duration::from_secs(self.timeout_request_websocket),
provider.get_block(block_id),
);
match block.await {
Ok(Ok(Some(block))) => return Ok(block),
Ok(Ok(None)) => error!(
block_id = ?block_id,
"Cannot get current block {block_id}, retrying",
"Cannot get block {block_id}, retrying",
),
Err(err) => error!(
Ok(Err(err)) => error!(
block_id = ?block_id,
error = %err,
"Cannot get current block {block_id}, retrying",
"Cannot get block {block_id}, retrying",
),
Err(_) => error!(
block_id = ?block_id,
"Timeout getting block {block_id}, retrying",
),
}
if i != REORG_RETRY_GET_BLOCK {
Expand All @@ -530,17 +562,24 @@ impl InfiniteLogIter {
error!("No provider, inconsistent state");
return Err(anyhow::anyhow!("No provider, inconsistent state"));
};
let block = provider.get_block_by_hash(block_hash).await;
match block {
Ok(Some(block)) => return Ok(block),
Ok(None) => error!(
let block = tokio::time::timeout(
Duration::from_secs(self.timeout_request_websocket),
provider.get_block_by_hash(block_hash),
);
match block.await {
Ok(Ok(Some(block))) => return Ok(block),
Ok(Ok(None)) => error!(
block_hash = ?block_hash,
"Cannot get block, retrying",
"Cannot get block by hash, retrying",
),
Err(err) => error!(
Ok(Err(err)) => error!(
block_hash = ?block_hash,
error = %err,
"Cannot get block, retrying",
"Cannot get block by hash, retrying",
),
Err(_) => error!(
block_hash = ?block_hash,
"Timeout getting block by hash, retrying",
),
}
if i != REORG_RETRY_GET_BLOCK {
Expand All @@ -551,7 +590,7 @@ impl InfiniteLogIter {
}
}
Err(anyhow::anyhow!(
"Cannot get block {block_hash} after retries"
"Cannot get block by hash {block_hash} after retries"
))
}

Expand All @@ -568,10 +607,27 @@ impl InfiniteLogIter {
error!("No provider, inconsistent state");
return Err(anyhow::anyhow!("No provider, inconsistent state"));
};
let logs = provider.get_logs(&filter).await;
match logs {
Ok(logs) => return Ok(logs),
Err(err) => {
match tokio::time::timeout(
Duration::from_secs(self.timeout_request_websocket),
provider.get_logs(&filter),
)
.await
{
Err(_) => {
error!(
block_hash = ?block_hash,
"Timeout getting logs for block {block_hash}, retrying",
);
tokio::time::sleep(Duration::from_millis(
RETRY_GET_LOGS_DELAY_IN_MS,
))
.await;
continue;
}
Ok(Ok(logs)) => {
return Ok(logs);
}
Ok(Err(err)) => {
error!(
block_hash = ?block_hash,
error = %err,
Expand Down Expand Up @@ -853,6 +909,7 @@ impl InfiniteLogIter {
let Ok(block_logs) = self.find_last_block_and_logs().await
else {
error!("Cannot get last block and logs");
self.stream = None; // to restart
continue;
};
warn!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ async fn setup(node_chain_id: Option<u64>) -> Result<Setup, anyhow::Error> {
catchup_finalization_in_blocks: 2,
dependence_by_connexity: false,
dependence_cross_block: true,
timeout_request_websocket: 30,
};
let health_check_url = format!("http://127.0.0.1:{}", args.health_port);

Expand Down Expand Up @@ -288,6 +289,7 @@ async fn test_only_catchup_loop_requires_negative_start_at_block(
catchup_loop_sleep_secs: 60,
dependence_by_connexity: false,
dependence_cross_block: true,
timeout_request_websocket: 30,
};

let result = main(args).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct Args {
pub keys_file_path: Option<String>,

/// sns-executor service name in OTLP traces
#[arg(long, default_value = "sns-executor")]
#[arg(long, env = "OTEL_SERVICE_NAME", default_value = "sns-executor")]
pub service_name: String,

/// S3 bucket name for ct128 ciphertexts
Expand Down
2 changes: 1 addition & 1 deletion coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct Args {
pub coprocessor_private_key: String,

/// tfhe-worker service name in OTLP traces
#[arg(long, default_value = "tfhe-worker")]
#[arg(long, env = "OTEL_SERVICE_NAME", default_value = "tfhe-worker")]
pub service_name: String,

/// Worker/replica ID for this worker instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ struct Conf {
graceful_shutdown_timeout: Duration,

/// service name in OTLP traces
#[arg(long, default_value = "txn-sender")]
#[arg(long, env = "OTEL_SERVICE_NAME", default_value = "txn-sender")]
pub service_name: String,

/// Prometheus metrics: coprocessor_host_txn_latency_seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct Args {
pub worker_thread_count: u32,

/// Zkproof-worker service name in OTLP traces
#[arg(long, default_value = "zkproof-worker")]
#[arg(long, env = "OTEL_SERVICE_NAME", default_value = "zkproof-worker")]
pub service_name: String,

/// Log level for the worker
Expand Down
Loading