Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE ciphertexts128 (
tenant_id INTEGER NOT NULL,
handle BYTEA NOT NULL,
ciphertext BYTEA NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

PRIMARY KEY (tenant_id, handle)
);

CREATE INDEX idx_ciphertexts128_handle
ON ciphertexts128 (handle);
6 changes: 3 additions & 3 deletions coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async fn run_uploader_loop(
// Spawn a new task to upload the ciphertexts
let h = tokio::spawn(async move {
let s = item.otel.child_span("upload_s3");
match upload_ciphertexts(trx, item, &client, &conf).instrument(error_span!("upload_s3")).await {
match upload_ciphertexts(trx, item.clone(), &client, &conf).instrument(error_span!("upload_s3")).await {
Ok(()) => telemetry::end_span(s),
Err(err) => {
if let ExecutionError::S3TransientError(_) = err {
Expand Down Expand Up @@ -437,15 +437,15 @@ async fn fetch_pending_uploads(
// Fetch missing ciphertext128
if ciphertext128_digest.is_none() {
if let Ok(row) = sqlx::query!(
"SELECT ciphertext128 FROM ciphertexts WHERE tenant_id = $1 AND handle = $2;",
"SELECT ciphertext FROM ciphertexts128 WHERE tenant_id = $1 AND handle = $2;",
row.tenant_id,
handle
)
.fetch_optional(db_pool)
.await
{
if let Some(record) = row {
match record.ciphertext128 {
match record.ciphertext {
Some(ct) if !ct.is_empty() => {
ct128 = ct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ pub struct Args {
#[arg(long, default_value_t = 4)]
pub work_items_batch_size: u32,

/// Garbage collection batch size
#[arg(long, default_value_t = 80)]
pub gc_batch_size: u32,

/// NOTIFY/LISTEN channels for database that the worker listen to
#[arg(long, num_args(1..))]
pub pg_listen_channels: Vec<String>,
Expand Down Expand Up @@ -88,9 +84,15 @@ pub struct Args {
#[arg(long, default_value = "120s", value_parser = parse_duration)]
pub s3_regular_recheck_duration: Duration,

#[arg(long, default_value = "120s", value_parser = parse_duration)]
#[arg(long, default_value = "15min", value_parser = parse_duration)]
pub cleanup_interval: Duration,

/// Garbage collection batch size
/// Number of ciphertext128 to delete in one GC cycle
/// To disable GC set this value to 0
#[arg(long, default_value_t = 1000)]
pub gc_batch_size: u32,

#[arg(
long,
value_parser = clap::value_parser!(Level),
Expand Down
Loading
Loading