Skip to content

Commit cb1b35e

Browse files
author
rohan2794
committed
feat: replace DelayQueue with sleep , timeouts CLI-configurable, add rotation guards
Signed-off-by: rohan2794 <rohan2794@gmial.com>
1 parent 67f0095 commit cb1b35e

7 files changed

Lines changed: 172 additions & 346 deletions

File tree

Cargo.lock

Lines changed: 18 additions & 70 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ tokio = { version = "1.41.0", features = ["full"] }
2323
tokio-retry = "0.3"
2424
futures = "0.3"
2525
async-trait = "0.1.83"
26+
async-nats = "0.37.0"
2627

2728
# tracing
2829
tracing = "0.1.40"

events-aggregator/Cargo.toml

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,16 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7-
# Async runtime
87
tokio = { workspace = true }
9-
# NATS client
10-
async-nats = { version = "0.37.0" }
11-
# JSON serialization
8+
async-nats = { workspace = true }
129
serde = { workspace = true }
1310
serde_json = { workspace = true }
14-
# Logging and Observability
1511
tracing = { workspace = true }
16-
tracing-subscriber = { workspace = true, features = ["json"]}
17-
tracing-appender = "0.2"
18-
# Error handling
1912
anyhow = { workspace = true }
2013
parse-size = { workspace = true }
2114
futures = { workspace = true }
2215
rand = { workspace = true }
23-
tokio-util = { version = "0.7.13", features = ["time"] }
24-
tokio-stream = { version = "0.1.16", features = ["sync"] }
2516
clap = { workspace = true }
2617
url = { workspace = true }
18+
humantime = { workspace = true }
2719
utils = { path = "../dependencies/control-plane/utils/utils-lib" }

events-aggregator/src/client/nats_client.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::constant::{
2-
CONNECTION_TIMEOUT, JETSTREAM_CONSUMER_NAME, JETSTREAM_STREAM_NAME, MAX_ATTEMPTS,
3-
MAX_BACKOFF_ATTEMPTS, REQUEST_TIMEOUT,
2+
JETSTREAM_CONSUMER_NAME, JETSTREAM_STREAM_NAME, MAX_ATTEMPTS, MAX_BACKOFF_ATTEMPTS,
43
};
54
use anyhow::bail;
65
use async_nats::jetstream::consumer::pull;
@@ -47,13 +46,17 @@ impl NatsManager {
4746
}
4847

4948
/// Establish a connection to the NATS server with strict timeouts and a robust retry strategy.
50-
pub async fn new(url: &str) -> Result<Self, async_nats::Error> {
49+
pub async fn new(
50+
url: &str,
51+
connection_timeout: Duration,
52+
request_timeout: Duration,
53+
) -> Result<Self, async_nats::Error> {
5154
info!("Attempting to connect to NATS at {url}...");
5255

5356
// Configure native NATS backoff and retries
5457
let options = async_nats::ConnectOptions::new()
55-
.connection_timeout(CONNECTION_TIMEOUT)
56-
.request_timeout(Some(REQUEST_TIMEOUT))
58+
.connection_timeout(connection_timeout)
59+
.request_timeout(Some(request_timeout))
5760
// Enable retries on initial connection failure
5861
.retry_on_initial_connect()
5962
.reconnect_delay_callback(|attempts| {
@@ -78,6 +81,7 @@ impl NatsManager {
7881
subject: String,
7982
jetstream_enabled: bool,
8083
tx: mpsc::Sender<UnifiedMessage>,
84+
request_timeout: Duration,
8185
) -> anyhow::Result<()> {
8286
// Check the environment-derived flag (e.g., JETSTREAM_ENABLED=true)
8387
if !jetstream_enabled {
@@ -89,15 +93,16 @@ impl NatsManager {
8993
}
9094

9195
info!("JetStream is enabled. Booting JetStream consumer with retry support...");
92-
self.setup_jetstream(subject, tx).await?;
96+
self.setup_jetstream(subject, tx, request_timeout).await?;
9397
Ok(())
9498
}
9599

96-
// JetStream Consumer Setup with Robust Retry Logic and Backoff
100+
// JetStream Consumer Setup with Robust Retry Logic and Backoff.
97101
async fn setup_jetstream(
98102
&self,
99103
subject: String,
100104
tx: mpsc::Sender<UnifiedMessage>,
105+
request_timeout: Duration,
101106
) -> anyhow::Result<()> {
102107
let js = async_nats::jetstream::new(self.client.clone());
103108
let mut attempt = 0;
@@ -130,7 +135,7 @@ impl NatsManager {
130135
continue;
131136
}
132137

133-
match tokio::time::timeout(REQUEST_TIMEOUT, js.get_stream(JETSTREAM_STREAM_NAME)).await
138+
match tokio::time::timeout(request_timeout, js.get_stream(JETSTREAM_STREAM_NAME)).await
134139
{
135140
Ok(Ok(stream)) => match stream
136141
.get_or_create_consumer(

events-aggregator/src/constant.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
use std::time::Duration;
2-
31
/// Channel capacity for internal message passing and exporter buffering.
42
pub const CHANNEL_CAPACITY: usize = 512;
53

6-
/// Maximum payload bytes to log when a message payload is not valid JSON.
7-
pub const MAX_NON_JSON_PAYLOAD_LOG_BYTES: usize = 4096;
8-
94
/// Maximum number of events to batch in a single export flush.
105
pub const BATCH_MAX_SIZE: usize = 100;
116

@@ -24,12 +19,6 @@ pub const JETSTREAM_CONSUMER_NAME: &str = "events-aggregator-consumer";
2419
/// Default JetStream stream name used for publishing and consuming events.
2520
pub const JETSTREAM_STREAM_NAME: &str = "events-stream";
2621

27-
/// Timeout for establishing a NATS connection.
28-
pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
29-
30-
/// Timeout for NATS request/response operations.
31-
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
32-
3322
/// Maximum number of attempts for retryable operations.
3423
pub const MAX_ATTEMPTS: u32 = 10;
3524

@@ -38,3 +27,6 @@ pub const MAX_BACKOFF_ATTEMPTS: u32 = 6;
3827

3928
/// Base filename for the local event export file.
4029
pub const EVENTS_JSON_FILE: &str = "events.json";
30+
31+
/// Filename for the rotated (previous) local event export file.
32+
pub const EVENTS_JSON_ROTATED_FILE: &str = "events.1.json";

0 commit comments

Comments
 (0)