Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ serde_json = "1.0"
thiserror = "2.0.17"
futures = "0.3.31"
tracing = "0.1"
rand = "0.9.2"
bytes = { version = "1.11.0", features = ["serde"] }


# Common TLS dependencies
rustls = { version = "0.23", features = ["ring"], optional = true }
tokio-rustls = { version = "0.26", optional = true }
rustls-pemfile = { version = "2.2", optional = true }

# Metrics
metrics = { version = "0.24" }
metrics = { version = "0.24", optional = true }

# Storage for Deduplication
sled = { version = "0.34", optional = true }
Expand Down Expand Up @@ -57,17 +60,13 @@ url = { version = "2.5", optional = true }
async-channel = { version = "2.5.0" }
once_cell = { version = "1.21.3" }

# Environment Variable Support
dotenvy = { version = "0.15", optional = true }

[features]
default = ["full"]
core = ["dedup", "dotenv"]
full = ["core", "mongodb", "kafka", "amqp", "nats", "mqtt", "http", "file"]
full = ["mongodb", "kafka", "amqp", "nats", "mqtt", "http", "file", "metrics", "dedup"]

# Functionality features
dedup = ["sled"]
dotenv = ["dotenvy"]
metrics = ["dep:metrics"]


# Endpoint features
Expand All @@ -80,6 +79,7 @@ http = ["axum", "reqwest", "axum-server", "rustls", "tokio-rustls", "rustls-pemf
file = [] # No extra deps

[dev-dependencies]
uuid = { version = "1.19", features = ["v4"] }
serde_yaml_ng = "0.10.0"
tempfile = "3.10"
tracing-appender = "0.2"
Expand Down
5 changes: 3 additions & 2 deletions src/canonical_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
// Licensed under MIT License, see License file for more details
// git clone https://github.com/marcomq/hot_queue

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CanonicalMessage {
pub message_id: Option<u64>,
pub payload: Vec<u8>,
pub payload: Bytes,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
}
Expand All @@ -18,7 +19,7 @@ impl CanonicalMessage {
pub fn new(payload: Vec<u8>) -> Self {
Self {
message_id: None,
payload,
payload: Bytes::from(payload),
metadata: None,
}
}
Expand Down
142 changes: 108 additions & 34 deletions src/endpoints/amqp.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use crate::models::AmqpConfig;
use crate::traits::{BoxFuture, CommitFunc, MessageConsumer, MessagePublisher};
use crate::traits::{BatchCommitFunc, BoxFuture, MessageConsumer, MessagePublisher};
use crate::CanonicalMessage;
use anyhow::anyhow;
use async_trait::async_trait;
use futures::TryStreamExt;
use lapin::tcp::{OwnedIdentity, OwnedTLSConfig};
use lapin::{
options::{
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, BasicQosOptions,
QueueDeclareOptions,
},
types::FieldTable,
types::{FieldTable, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
};
use std::any::Any;
Expand All @@ -20,7 +21,8 @@ pub struct AmqpPublisher {
channel: Channel,
exchange: String,
routing_key: String,
await_ack: bool,
no_persistence: bool,
delayed_ack: bool,
}

impl AmqpPublisher {
Expand All @@ -37,7 +39,10 @@ impl AmqpPublisher {
channel
.queue_declare(
routing_key,
QueueDeclareOptions::default(),
QueueDeclareOptions {
durable: !config.no_persistence,
..Default::default()
},
FieldTable::default(),
)
.await?;
Expand All @@ -46,7 +51,8 @@ impl AmqpPublisher {
channel,
exchange: "".to_string(), // Default exchange
routing_key: routing_key.to_string(),
await_ack: config.await_ack,
no_persistence: config.no_persistence,
delayed_ack: config.delayed_ack,
})
}

Expand All @@ -55,26 +61,32 @@ impl AmqpPublisher {
channel: self.channel.clone(),
exchange: self.exchange.clone(),
routing_key: routing_key.to_string(),
await_ack: self.await_ack,
no_persistence: self.no_persistence,
delayed_ack: self.delayed_ack,
}
}
}

#[async_trait]
impl MessagePublisher for AmqpPublisher {
async fn send(&self, message: CanonicalMessage) -> anyhow::Result<Option<CanonicalMessage>> {
let mut properties = BasicProperties::default();
let mut properties = if self.no_persistence {
BasicProperties::default()
} else {
// Delivery mode 2 makes the message persistent
BasicProperties::default().with_delivery_mode(2)
};
if let Some(metadata) = message.metadata {
if !metadata.is_empty() {
let mut table = FieldTable::default();
for (key, value) in metadata {
table.insert(
key.into(),
ShortString::from(key),
lapin::types::AMQPValue::LongString(value.into()),
);
}
properties = properties.with_headers(table);
}
}
}

let confirmation = self
Expand All @@ -88,13 +100,24 @@ impl MessagePublisher for AmqpPublisher {
)
.await?;

if self.await_ack {
if !self.delayed_ack {
// Wait for the broker's publisher confirmation.
confirmation.await?;
}
Ok(None)
}

// This isn't a real bulk send, but the normal send is fast enough.
async fn send_batch(
&self,
messages: Vec<CanonicalMessage>,
) -> anyhow::Result<(Option<Vec<CanonicalMessage>>, Vec<CanonicalMessage>)> {
crate::traits::send_batch_helper(self, messages, |publisher, message| {
Box::pin(publisher.send(message))
})
.await
}

fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -111,7 +134,14 @@ impl AmqpConsumer {

info!(queue = %queue, "Declaring AMQP queue");
channel
.queue_declare(queue, QueueDeclareOptions::default(), FieldTable::default())
.queue_declare(
queue,
QueueDeclareOptions {
durable: !config.no_persistence,
..Default::default()
},
FieldTable::default(),
)
.await?;

// Set prefetch count. This acts as a buffer and is crucial for concurrent processing.
Expand Down Expand Up @@ -150,7 +180,8 @@ async fn create_amqp_connection(config: &AmqpConfig) -> anyhow::Result<Connectio

let mut last_error = None;
for attempt in 1..=5 {
info!(url = %conn_uri, attempt = attempt, "Attempting to connect to AMQP broker");
// Avoid logging credentials embedded in URLs.
info!(attempt = attempt, "Attempting to connect to AMQP broker");
let conn_props = ConnectionProperties::default();
let result = if config.tls.required {
let tls_config = build_tls_config(config).await?;
Expand Down Expand Up @@ -193,42 +224,85 @@ async fn build_tls_config(config: &AmqpConfig) -> anyhow::Result<OwnedTLSConfig>
})
}

fn delivery_to_canonical_message(delivery: &lapin::message::Delivery) -> CanonicalMessage {
let mut canonical_message = CanonicalMessage::new(delivery.data.clone());
if let Some(headers) = delivery.properties.headers().as_ref() {
if !headers.inner().is_empty() {
let mut metadata = std::collections::HashMap::new();
for (key, value) in headers.inner().iter() {
let value_str = match value {
lapin::types::AMQPValue::LongString(s) => s.to_string(),
lapin::types::AMQPValue::ShortString(s) => s.to_string(),
lapin::types::AMQPValue::Boolean(b) => b.to_string(),
lapin::types::AMQPValue::LongInt(i) => i.to_string(),
_ => continue,
};
metadata.insert(key.to_string(), value_str);
}
if !metadata.is_empty() {
canonical_message.metadata = Some(metadata);
}
}
}
canonical_message
}

#[async_trait]
impl MessageConsumer for AmqpConsumer {
async fn receive(&mut self) -> anyhow::Result<(CanonicalMessage, CommitFunc)> {
let delivery = futures::StreamExt::next(&mut self.consumer)
async fn receive_batch(
&mut self,
max_messages: usize,
) -> anyhow::Result<(Vec<CanonicalMessage>, BatchCommitFunc)> {
if max_messages == 0 {
return Ok((Vec::new(), Box::new(|_| Box::pin(async {}))));
}

// 1. Wait for the first message. This will block until a message is available.
let mut last_delivery = futures::StreamExt::next(&mut self.consumer)
.await
.ok_or_else(|| anyhow!("AMQP consumer stream ended"))??;

let mut message = CanonicalMessage::new(delivery.data.clone());
if let Some(headers) = delivery.properties.headers().as_ref() {
if !headers.inner().is_empty() {
let mut metadata = std::collections::HashMap::new();
for (key, value) in headers.inner().iter() {
if let lapin::types::AMQPValue::LongString(s) = value {
metadata.insert(key.to_string(), s.to_string());
}
let mut messages = Vec::with_capacity(max_messages);
messages.push(delivery_to_canonical_message(&last_delivery));

// 2. Greedily consume more messages if they are already buffered, up to max_messages.
while messages.len() < max_messages {
match self.consumer.try_next().await {
Ok(Some(delivery)) => {
messages.push(delivery_to_canonical_message(&delivery));
last_delivery = delivery;
}
if !metadata.is_empty() {
message.metadata = Some(metadata);
Ok(None) => break, // No more messages in the buffer
Err(e) => {
// An error occurred, but we have some messages. Process them and let the next call handle the error.
tracing::warn!("Error receiving subsequent AMQP message: {}", e);
break;
}
}
}

let commit = Box::new(move |_response| {
// 3. Create a commit function that acks all received messages.
let messages_len = messages.len();
let commit = Box::new(move |_response: Option<Vec<CanonicalMessage>>| {
Box::pin(async move {
delivery
.ack(BasicAckOptions::default())
.await
.expect("Failed to ack AMQP message");
debug!(
delivery_tag = delivery.delivery_tag,
"AMQP message acknowledged"
);
let ack_options = BasicAckOptions {
// Use multiple: true only if we've consumed more than one message.
multiple: messages_len > 1,
..Default::default()
};
if let Err(e) = last_delivery.ack(ack_options).await {
tracing::error!(last_delivery_tag = last_delivery.delivery_tag, error = %e, "Failed to bulk-ack AMQP messages");
} else {
debug!(
last_delivery_tag = last_delivery.delivery_tag,
count = messages_len,
"Bulk-acknowledged AMQP messages"
);
}
}) as BoxFuture<'static, ()>
});

Ok((message, commit))
Ok((messages, commit))
}

fn as_any(&self) -> &dyn Any {
Expand Down
Loading