Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .github/workflows/test-suite-orchestrate-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
pull_request:
branches:
- main
- release/*

permissions: {}

Expand All @@ -13,7 +14,8 @@ concurrency:

jobs:
coprocessor-docker-build:
if: startsWith(github.head_ref, 'mergify/merge-queue/')
if: &build-trigger-condition |
startsWith(github.head_ref, 'mergify/merge-queue/') || startsWith(github.base_ref, 'release/')
uses: ./.github/workflows/coprocessor-docker-build.yml
permissions: &docker_permissions
actions: 'read' # Required to read workflow run information
Expand All @@ -30,22 +32,22 @@ jobs:
CGR_USERNAME: ${{ secrets.CGR_USERNAME }}
CGR_PASSWORD: ${{ secrets.CGR_PASSWORD }}
gateway-contracts-docker-build:
if: startsWith(github.head_ref, 'mergify/merge-queue/')
if: *build-trigger-condition
uses: ./.github/workflows/gateway-contracts-docker-build.yml
permissions: *docker_permissions
secrets: *docker_secrets
host-contracts-docker-build:
if: startsWith(github.head_ref, 'mergify/merge-queue/')
if: *build-trigger-condition
uses: ./.github/workflows/host-contracts-docker-build.yml
permissions: *docker_permissions
secrets: *docker_secrets
kms-connector-docker-build:
if: startsWith(github.head_ref, 'mergify/merge-queue/')
if: *build-trigger-condition
uses: ./.github/workflows/kms-connector-docker-build.yml
permissions: *docker_permissions
secrets: *docker_secrets
test-suite-docker-build:
if: startsWith(github.head_ref, 'mergify/merge-queue/')
if: *build-trigger-condition
uses: ./.github/workflows/test-suite-docker-build.yml
permissions: *docker_permissions
secrets: *docker_secrets
Expand Down Expand Up @@ -85,7 +87,7 @@ jobs:
persist-credentials: 'false'

- id: create-e2e-tests-input
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v0.8.0
with:
script: |
const previousCommitHash = process.env.PREVIOUS_COMMIT_HASH;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 31 additions & 11 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion coprocessor/fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ tfhe = { version = "=1.5.1", features = [
] }
tfhe-versionable = "=0.6.2"
tfhe-zk-pok = "=0.8.0"
time = "0.3.43"
time = "0.3.47"
tokio = { version = "1.45.0", features = ["full"] }
tokio-util = "0.7.15"
tonic = { version = "0.12.3", features = ["server"] }
tonic-build = "0.12.3"
tracing = "0.1.41"
tracing-opentelemetry = "0.30.0"
tracing-subscriber = { version = "0.3.20", features = ["fmt", "json"] }
tracing-test = "0.2.5"
union-find = "0.4.3"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE INDEX IF NOT EXISTS idx_dependence_chain_last_updated_at
ON dependence_chain (last_updated_at)
WHERE status = 'updated'::text
AND worker_id IS NULL;

17 changes: 15 additions & 2 deletions coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::utils::to_hex;
use bigdecimal::num_traits::ToPrimitive;
use opentelemetry::{
global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
trace::{SpanBuilder, Status, TraceContextExt, Tracer},
trace::{SpanBuilder, Status, TraceContextExt, Tracer, TracerProvider},
Context, KeyValue,
};
use opentelemetry_sdk::{trace::SdkTracerProvider, Resource};
Expand Down Expand Up @@ -41,6 +41,14 @@ pub(crate) static ZKPROOF_TXN_LATENCY_HISTOGRAM: LazyLock<Histogram> = LazyLock:
pub fn setup_otlp(
service_name: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let _ = setup_otlp_tracer(service_name, "otlp-layer")?;
Ok(())
}

pub fn setup_otlp_tracer(
service_name: &str,
tracer_name: &'static str,
) -> Result<opentelemetry_sdk::trace::Tracer, Box<dyn std::error::Error + Send + Sync + 'static>> {
let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()?;
Expand All @@ -57,9 +65,10 @@ pub fn setup_otlp(
.with_batch_exporter(otlp_exporter)
.build();

let tracer = trace_provider.tracer(tracer_name);
opentelemetry::global::set_tracer_provider(trace_provider);

Ok(())
Ok(tracer)
}

#[derive(Clone)]
Expand All @@ -73,6 +82,10 @@ impl OtelTracer {
self.tracer.start_with_context(name, &self.ctx)
}

pub fn context(&self) -> &Context {
&self.ctx
}

/// Sets attribute to the root span
pub fn set_attribute(&self, key: &str, value: String) {
self.ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,8 +800,14 @@ impl Database {
SET status = 'updated',
last_updated_at = CASE
WHEN dependence_chain.status = 'processed' THEN EXCLUDED.last_updated_at
ELSE dependence_chain.last_updated_at
END
ELSE LEAST(dependence_chain.last_updated_at, EXCLUDED.last_updated_at)
END,
dependents = (
SELECT ARRAY(
SELECT DISTINCT d
FROM unnest(dependence_chain.dependents || EXCLUDED.dependents) AS d
)
)
"#,
chain.hash.to_vec(),
last_updated_at,
Expand Down
2 changes: 1 addition & 1 deletion coprocessor/fhevm-engine/sns-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tfhe = { workspace = true}
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
Expand Down Expand Up @@ -61,4 +62,3 @@ test-harness = { path = "../test-harness" }
[dev-dependencies.sns-worker]
path = "."
features = ["test_decrypt_128", "test_s3_use_handle_as_key"]

48 changes: 41 additions & 7 deletions coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use sns_worker::{Config, DBConfig, HealthCheckConfig, S3Config, S3RetryPolicy, SNSMetricsConfig};

use fhevm_engine_common::telemetry;
use tokio::signal::unix;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod utils;

fn handle_sigint(token: CancellationToken) {
Expand Down Expand Up @@ -65,14 +67,46 @@ async fn main() {
let config: Config = construct_config();
let parent = CancellationToken::new();

tracing_subscriber::fmt()
let mut otlp_setup_error: Option<String> = None;

let otel_tracer = if config.service_name.is_empty() {
None
} else {
match telemetry::setup_otlp_tracer(&config.service_name, "otlp-layer") {
Ok(tracer) => Some(tracer),
Err(err) => {
otlp_setup_error = Some(err.to_string());
None
}
}
};

let level_filter = tracing_subscriber::filter::LevelFilter::from_level(config.log_level);

let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(false) // drop "target" field so the logs are not too verbose. Instead, span names are used.
.with_current_span(true) // keep "span"
.with_span_list(false) // drop "spans"
.with_level(true)
.with_max_level(config.log_level)
.init();
// drop "target" field so the logs are not too verbose. Instead, span names are used.
.with_target(false)
// keep "span"
.with_current_span(true)
// drop "spans"
.with_span_list(false)
.with_level(true);

let base = tracing_subscriber::registry()
.with(level_filter)
.with(fmt_layer);

if let Some(tracer) = otel_tracer {
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
base.with(otel_layer).init();
} else {
base.init();
}

if let Some(err) = otlp_setup_error {
error!(error = %err, "Failed to setup OTLP");
}

// Handle SIGINIT signals
handle_sigint(parent.clone());
Expand Down
12 changes: 8 additions & 4 deletions coprocessor/fhevm-engine/sns-worker/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tokio_util::sync::CancellationToken;
use tracing::error_span;
use tracing::warn;
use tracing::{debug, error, info};
use tracing_opentelemetry::OpenTelemetrySpanExt;

const S3_HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -546,6 +547,7 @@ fn compute_task(
let started_at = SystemTime::now();
let thread_id = format!("{:?}", std::thread::current().id());
let span = error_span!("compute", thread_id = %thread_id);
span.set_parent(task.otel.context().clone());
let _enter = span.enter();

let handle = to_hex(&task.handle);
Expand All @@ -570,12 +572,15 @@ fn compute_task(
let ct_type = ct.type_name().to_owned();
info!( { handle, ct_type }, "Converting ciphertext");

let mut span = task.otel.child_span("squash_noise");
telemetry::attribute(&mut span, "ct_type", ct_type);
let squash_span = tracing::info_span!(
"squash_noise",
ct_type = %ct_type,
operation = "squash_noise"
);
let _squash_enter = squash_span.enter();

match ct.squash_noise_and_serialize(enable_compression) {
Ok(bytes) => {
telemetry::end_span(span);
info!(
handle = handle,
length = bytes.len(),
Expand Down Expand Up @@ -622,7 +627,6 @@ fn compute_task(
}
}
Err(err) => {
telemetry::end_span_with_err(span, err.to_string());
error!({ handle = handle, error = %err }, "Failed to convert ct");
}
};
Expand Down
8 changes: 1 addition & 7 deletions coprocessor/fhevm-engine/sns-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use fhevm_engine_common::{
healthz_server::{self},
metrics_server,
pg_pool::{PostgresPoolManager, ServiceError},
telemetry::{self, OtelTracer},
telemetry::OtelTracer,
types::FhevmError,
utils::{to_hex, DatabaseURL},
};
Expand Down Expand Up @@ -509,12 +509,6 @@ pub async fn run_all(
let gpu_enabled = fhevm_engine_common::utils::log_backend();
info!(gpu_enabled, rayon_threads, config = %config, "Starting SNS worker");

if !config.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&config.service_name) {
error!(error = %err, "Failed to setup OTLP");
}
}

let conf = config.clone();
let token = parent_token.child_token();
let tx = uploads_tx.clone();
Expand Down
Loading
Loading