Skip to content

Commit 9c655d1

Browse files
committed
use url instead of listen_addr and add tests
1 parent 4544fe7 commit 9c655d1

4 files changed

Lines changed: 146 additions & 3 deletions

File tree

src/endpoints/http.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ impl HttpConsumer {
4141
.with_state(request_tx);
4242

4343
let listen_address = config
44-
.listen_address
44+
.url
4545
.as_deref()
46-
.ok_or_else(|| anyhow!("'listen_address' is required for http source connection"))?;
46+
.ok_or_else(|| anyhow!("'url' is required for http source connection"))?;
4747
let addr: SocketAddr = listen_address
4848
.parse()
4949
.with_context(|| format!("Invalid listen address: {}", listen_address))?;

src/middleware/deduplication.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,54 @@ impl MessageConsumer for DeduplicationConsumer {
137137
self
138138
}
139139
}
140+
141+
#[cfg(test)]
142+
mod tests {
143+
use super::*;
144+
use crate::endpoints::memory::MemoryConsumer;
145+
use crate::models::{DeduplicationMiddleware, MemoryConfig};
146+
use tempfile::tempdir;
147+
148+
#[tokio::test]
149+
async fn test_deduplication_logic() {
150+
let dir = tempdir().unwrap();
151+
let db_path = dir.path().join("dedup_test").to_str().unwrap().to_string();
152+
153+
let config = DeduplicationMiddleware {
154+
sled_path: db_path,
155+
ttl_seconds: 60,
156+
};
157+
158+
let mem_cfg = MemoryConfig {
159+
topic: "dedup_topic".to_string(),
160+
capacity: Some(10),
161+
};
162+
let mem_consumer = MemoryConsumer::new(&mem_cfg).unwrap();
163+
let channel = mem_consumer.channel();
164+
165+
// 1. Send a message
166+
let msg1 = CanonicalMessage::new(b"data1".to_vec(), Some(100));
167+
channel.send_message(msg1).await.unwrap();
168+
169+
// 2. Send a duplicate message
170+
let msg2 = CanonicalMessage::new(b"data1_dup".to_vec(), Some(100));
171+
channel.send_message(msg2).await.unwrap();
172+
173+
// 3. Send a new message
174+
let msg3 = CanonicalMessage::new(b"data2".to_vec(), Some(101));
175+
channel.send_message(msg3).await.unwrap();
176+
177+
let mut dedup_consumer =
178+
DeduplicationConsumer::new(Box::new(mem_consumer), &config, "test_route").unwrap();
179+
180+
// First receive: Should be msg1 (ID 100)
181+
let (rec1, commit1) = dedup_consumer.receive().await.unwrap();
182+
assert_eq!(rec1.message_id, Some(100));
183+
commit1(None).await;
184+
185+
// Second receive: Should be msg3 (ID 101). msg2 (ID 100) is skipped internally.
186+
let (rec2, commit2) = dedup_consumer.receive().await.unwrap();
187+
assert_eq!(rec2.message_id, Some(101));
188+
commit2(None).await;
189+
}
190+
}

src/middleware/retry.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,96 @@ impl MessagePublisher for RetryPublisher {
106106
self
107107
}
108108
}
109+
110+
#[cfg(test)]
111+
mod tests {
112+
use super::*;
113+
use crate::traits::MessagePublisher;
114+
use crate::CanonicalMessage;
115+
use anyhow::anyhow;
116+
use async_trait::async_trait;
117+
use std::any::Any;
118+
use std::sync::{Arc, Mutex};
119+
120+
#[derive(Clone)]
121+
struct MockPublisher {
122+
attempts: Arc<Mutex<usize>>,
123+
succeed_after: usize,
124+
}
125+
126+
#[async_trait]
127+
impl MessagePublisher for MockPublisher {
128+
async fn send(&self, _msg: CanonicalMessage) -> anyhow::Result<Option<CanonicalMessage>> {
129+
let mut attempts = self.attempts.lock().unwrap();
130+
*attempts += 1;
131+
if *attempts > self.succeed_after {
132+
Ok(None)
133+
} else {
134+
Err(anyhow!("Simulated error"))
135+
}
136+
}
137+
138+
async fn send_batch(
139+
&self,
140+
_messages: Vec<CanonicalMessage>,
141+
) -> anyhow::Result<(Option<Vec<CanonicalMessage>>, Vec<CanonicalMessage>)> {
142+
let mut attempts = self.attempts.lock().unwrap();
143+
*attempts += 1;
144+
if *attempts > self.succeed_after {
145+
Ok((None, Vec::new()))
146+
} else {
147+
Err(anyhow!("Simulated batch error"))
148+
}
149+
}
150+
151+
fn as_any(&self) -> &dyn Any {
152+
self
153+
}
154+
}
155+
156+
#[tokio::test]
157+
async fn test_retry_success() {
158+
let attempts = Arc::new(Mutex::new(0));
159+
let mock = MockPublisher {
160+
attempts: attempts.clone(),
161+
succeed_after: 2, // Fails 2 times, succeeds on 3rd
162+
};
163+
164+
let config = RetryMiddleware {
165+
max_attempts: 5,
166+
initial_interval_ms: 1,
167+
max_interval_ms: 10,
168+
multiplier: 1.0,
169+
};
170+
171+
let retry_publisher = RetryPublisher::new(Box::new(mock), config);
172+
let msg = CanonicalMessage::new(vec![], None);
173+
174+
let result = retry_publisher.send(msg).await;
175+
assert!(result.is_ok());
176+
assert_eq!(*attempts.lock().unwrap(), 3);
177+
}
178+
179+
#[tokio::test]
180+
async fn test_retry_exhaustion() {
181+
let attempts = Arc::new(Mutex::new(0));
182+
let mock = MockPublisher {
183+
attempts: attempts.clone(),
184+
succeed_after: 10,
185+
};
186+
187+
let config = RetryMiddleware {
188+
max_attempts: 3,
189+
initial_interval_ms: 1,
190+
max_interval_ms: 10,
191+
multiplier: 1.0,
192+
};
193+
194+
let retry_publisher = RetryPublisher::new(Box::new(mock), config);
195+
let msg = CanonicalMessage::new(vec![], None);
196+
197+
let result = retry_publisher.send(msg).await;
198+
assert!(result.is_err());
199+
assert_eq!(*attempts.lock().unwrap(), 3);
200+
}
201+
}

src/models.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,6 @@ pub struct HttpEndpoint {
451451
#[serde(deny_unknown_fields)]
452452
pub struct HttpConfig {
453453
pub url: Option<String>,
454-
pub listen_address: Option<String>,
455454
#[serde(default)]
456455
pub tls: TlsConfig,
457456
pub response_sink: Option<String>,

0 commit comments

Comments
 (0)