Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ http = ["axum", "reqwest", "axum-server", "rustls", "tokio-rustls", "rustls-pemf
file = [] # No extra deps

[dev-dependencies]
uuid = { version = "1.19", features = ["v4"] }
serde_yaml_ng = "0.10.0"
tempfile = "3.10"
tracing-appender = "0.2"
Expand Down
3 changes: 2 additions & 1 deletion src/endpoints/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ async fn create_amqp_connection(config: &AmqpConfig) -> anyhow::Result<Connectio

let mut last_error = None;
for attempt in 1..=5 {
info!(url = %conn_uri, attempt = attempt, "Attempting to connect to AMQP broker");
// Avoid logging credentials embedded in URLs.
info!(attempt = attempt, "Attempting to connect to AMQP broker");
let conn_props = ConnectionProperties::default();
let result = if config.tls.required {
let tls_config = build_tls_config(config).await?;
Expand Down
7 changes: 4 additions & 3 deletions src/endpoints/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ impl NatsPublisher {
name: stream_name.to_string(),
// The stream must be configured to capture the specific subject,
// or a wildcard. A subject filter of `stream_name.>` will capture
// all subjects that start with the stream name, which is a common pattern.
subjects: vec![format!("{}.>", stream_name)], // e.g., "perf_stream_nats.>"
// all subjects that start with the stream name. We also add the specific
// subject to ensure it's captured even if it doesn't match the wildcard.
subjects: vec![format!("{}.>", stream_name), subject.to_string()], // e.g., "perf_stream_nats.>"
..Default::default()
})
.await?;
Expand Down Expand Up @@ -114,7 +115,7 @@ impl NatsConsumer {
jetstream
.get_or_create_stream(stream::Config {
name: stream_name.to_string(),
subjects: vec![format!("{}.>", stream_name)],
subjects: vec![format!("{}.>", stream_name), subject.to_string()],
..Default::default()
})
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// git clone https://github.com/marcomq/hot_queue

use serde::{Deserialize, Serialize};
use std::{collections::HashMap, ops::Deref};
use std::collections::HashMap;

use crate::endpoints::memory::{get_or_create_channel, MemoryChannel};

Expand Down
72 changes: 35 additions & 37 deletions src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,44 @@ impl Route {

let handle = tokio::spawn(async move {
loop {
// cheap pointer clones
let route_arc = Arc::clone(&route);
let name_arc = Arc::clone(&name);
let shutdown_rx_clone = shutdown_rx.clone();
// Create a new, per-iteration internal shutdown channel.
// This avoids a race where both this loop and the inner task
// try to consume the same external shutdown signal.
let (internal_shutdown_tx, internal_shutdown_rx) = bounded(1);

// The actual route logic is in `run_until_err`.
let mut run_task = tokio::spawn(async move {
route_arc
.run_until_err(&name_arc, Some(shutdown_rx_clone))
.run_until_err(&name_arc, Some(internal_shutdown_rx))
.await
});

select! {
res = shutdown_rx.recv() => {
if res.is_err() {
warn!("Shutdown channel for route '{}' closed unexpectedly.", name);
}
_ = shutdown_rx.recv() => {
info!("Shutdown signal received for route '{}'.", name);
// Notify the inner task to shut down.
let _ = internal_shutdown_tx.send(()).await;
// Wait for the inner task to finish gracefully.
let _ = run_task.await;
break;
}
res = &mut run_task => {
if let Ok(res) = res {
match res {
Ok(should_continue) if !should_continue => {
info!("Route '{}' completed gracefully. Shutting down.", name);
break;
}
Err(e) => {
error!("Route '{}' failed: {}. Reconnecting in 5 seconds...", name, e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
_ => {} // The route should continue running.
match res {
Ok(Ok(should_continue)) if !should_continue => {
info!("Route '{}' completed gracefully. Shutting down.", name);
break;
}
Ok(Err(e)) => {
error!("Route '{}' failed: {}. Reconnecting in 5 seconds...", name, e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Err(e) => {
error!("Route '{}' task panicked: {}. Reconnecting in 5 seconds...", name, e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
_ => {} // The route should continue running.
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
Expand All @@ -76,6 +80,13 @@ impl Route {
name: &str,
shutdown_rx: Option<async_channel::Receiver<()>>,
) -> anyhow::Result<bool> {
let (internal_shutdown_tx, internal_shutdown_rx) = bounded(1);
let shutdown_rx = shutdown_rx.unwrap_or(internal_shutdown_rx);
let _shutdown_tx = if shutdown_rx.is_closed() {
Some(internal_shutdown_tx)
} else {
None
};
if self.concurrency == 1 {
self.run_sequentially(name, shutdown_rx).await
} else {
Expand All @@ -87,16 +98,8 @@ impl Route {
async fn run_sequentially(
&self,
name: &str,
shutdown_rx: Option<async_channel::Receiver<()>>,
shutdown_rx: async_channel::Receiver<()>,
) -> anyhow::Result<bool> {
let (internal_shutdown_tx, internal_shutdown_rx) = bounded(1);
let shutdown_rx = shutdown_rx.unwrap_or(internal_shutdown_rx);
let _shutdown_tx = if shutdown_rx.is_closed() {
Some(internal_shutdown_tx)
} else {
None
};

let publisher =
Arc::new(create_publisher_from_route(name, &self.output.endpoint_type).await?);
let mut consumer = create_consumer_from_route(name, &self.input.endpoint_type).await?;
Expand All @@ -119,8 +122,10 @@ impl Route {
debug!("Received a batch of {} messages sequentially", messages.len());
// Process the batch sequentially without spawning a new task
match publisher.send_bulk(messages).await {
Ok(response) => commit(response).await,
Err(e) => error!("Failed to send message in sequential route: {}", e),
Ok(response) => {
commit(response).await;
}
Err(e) => return Err(e.into()), // Propagate error to trigger reconnect
}
}
}
Expand All @@ -131,15 +136,8 @@ impl Route {
async fn run_concurrently(
&self,
name: &str,
shutdown_rx: Option<async_channel::Receiver<()>>,
shutdown_rx: async_channel::Receiver<()>,
) -> anyhow::Result<bool> {
let (internal_shutdown_tx, internal_shutdown_rx) = bounded(1);
let shutdown_rx = shutdown_rx.unwrap_or(internal_shutdown_rx);
let _shutdown_tx = if shutdown_rx.is_closed() {
Some(internal_shutdown_tx)
} else {
None
};
let publisher =
Arc::new(create_publisher_from_route(name, &self.output.endpoint_type).await?);
let mut consumer = create_consumer_from_route(name, &self.input.endpoint_type).await?;
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ routes:
in:
mqtt: { url: "mqtt://localhost:1883", topic: "test_topic_mqtt" }
out:
memory: { topic: "test-out-mqtt", capacity: {out_capacity} } }
memory: { topic: "test-out-mqtt", capacity: {out_capacity} }
"#;

pub async fn test_mqtt_pipeline() {
Expand Down