-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathsns_worker.rs
More file actions
94 lines (83 loc) · 3.11 KB
/
sns_worker.rs
File metadata and controls
94 lines (83 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use sns_worker::{Config, DBConfig, HealthCheckConfig, S3Config, S3RetryPolicy, SNSMetricsConfig};
use fhevm_engine_common::telemetry;
use tokio::signal::unix;
use tokio_util::sync::CancellationToken;
use tracing::error;
mod utils;
fn handle_sigint(token: CancellationToken) {
tokio::spawn(async move {
let mut signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
signal.recv().await;
token.cancel();
});
}
fn construct_config() -> Config {
let args: utils::daemon_cli::Args = utils::daemon_cli::parse_args();
let db_url = args.database_url.clone().unwrap_or_default();
Config {
tenant_api_key: args.tenant_api_key,
service_name: args.service_name,
metrics: SNSMetricsConfig {
addr: args.metrics_addr,
gauge_update_interval_secs: args.gauge_update_interval_secs,
},
db: DBConfig {
url: db_url,
listen_channels: args.pg_listen_channels,
notify_channel: args.pg_notify_channel,
batch_limit: args.work_items_batch_size,
gc_batch_limit: args.gc_batch_size,
polling_interval: args.pg_polling_interval,
max_connections: args.pg_pool_connections,
cleanup_interval: args.cleanup_interval,
timeout: args.pg_timeout,
lifo: args.lifo,
},
s3: S3Config {
bucket_ct128: args.bucket_name_ct128,
bucket_ct64: args.bucket_name_ct64,
max_concurrent_uploads: args.s3_max_concurrent_uploads,
retry_policy: S3RetryPolicy {
max_retries_per_upload: args.s3_max_retries_per_upload,
max_backoff: args.s3_max_backoff,
max_retries_timeout: args.s3_max_retries_timeout,
recheck_duration: args.s3_recheck_duration,
regular_recheck_duration: args.s3_regular_recheck_duration,
},
},
log_level: args.log_level,
health_checks: HealthCheckConfig {
liveness_threshold: args.liveness_threshold,
port: args.health_check_port,
},
enable_compression: args.enable_compression,
schedule_policy: args.schedule_policy,
pg_auto_explain_with_min_duration: args.pg_auto_explain_with_min_duration,
}
}
#[tokio::main]
async fn main() {
let config: Config = construct_config();
let parent = CancellationToken::new();
let mut otlp_setup_error: Option<String> = None;
let _otel_guard =
match telemetry::init_json_subscriber(config.log_level, &config.service_name, "otlp-layer")
{
Ok(guard) => guard,
Err(err) => {
otlp_setup_error = Some(err.to_string());
None
}
};
if let Some(err) = otlp_setup_error {
error!(error = %err, "Failed to setup OTLP");
}
// Handle SIGINIT signals
handle_sigint(parent.clone());
sns_worker::run_all(config, parent, None)
.await
.unwrap_or_else(|err| {
error!(error = %err, "Error running SNS worker");
std::process::exit(1);
});
}