-
Notifications
You must be signed in to change notification settings - Fork 55
x-delayed-message may deliver immediately when x-delay is large (possible overflow/type conversion to 0) #1781
Description
Summary
Messages published to an x-delayed-message exchange with a large x-delay can be delivered almost immediately instead of being delayed.
I can reproduce this on LavinMQ 2.6.8 with a Spring AMQP producer.
Environment
- Broker: LavinMQ 2.6.8
- Producer: Java 17 + Spring Boot 3.5.9 + spring-rabbit 3.2.8
- Client: RabbitMQ Java client 5.25.0
- OS: macOS (arm64)
Reproduction
- Declare exchange type
x-delayed-messagewithx-delayed-type=direct. - Bind a queue.
- Publish a message with header
x-delay=31536000000(365 days in ms). - Poll queue for 60s.
Reproduction Test Code (JUnit)
package com.lavinmq.springbootlavinmq;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class LavinmqLongDelayMessageTest {
private static final String HOST = System.getProperty("lavinmq.host", "127.0.0.1");
private static final int PORT = Integer.getInteger("lavinmq.port", 5672);
private static final String USERNAME = System.getProperty("lavinmq.username", "guest");
private static final String PASSWORD = System.getProperty("lavinmq.password", "guest");
private static final String EXCHANGE_NAME = "test-long-delay-exchange";
private static final String QUEUE_NAME = "test-long-delay-queue";
private static final String ROUTING_KEY = "test-long-delay-key";
private CachingConnectionFactory connectionFactory;
private RabbitAdmin rabbitAdmin;
private RabbitTemplate rabbitTemplate;
@BeforeEach
void setUp() {
connectionFactory = new CachingConnectionFactory(HOST, PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitTemplate = new RabbitTemplate(connectionFactory);
declareTopology();
}
@AfterEach
void tearDown() {
if (rabbitAdmin != null) {
rabbitAdmin.deleteQueue(QUEUE_NAME);
rabbitAdmin.deleteExchange(EXCHANGE_NAME);
}
if (connectionFactory != null) {
connectionFactory.destroy();
}
}
@Test
void shouldNotConsumeImmediatelyWhenDelayExceedsInt32() throws InterruptedException {
long delay = Duration.ofDays(365).toMillis();
long observeWindowMs = 60_000L;
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "long-delay-message", message -> {
message.getMessageProperties().setHeader("x-delay", delay);
return message;
});
long start = System.currentTimeMillis();
long deadline = start + observeWindowMs;
Object firstReceived = null;
Long firstReceivedAtMs = null;
while (System.currentTimeMillis() < deadline) {
Object message = rabbitTemplate.receiveAndConvert(QUEUE_NAME);
if (message != null && firstReceived == null) {
firstReceived = message;
firstReceivedAtMs = System.currentTimeMillis() - start;
System.out.printf(
"Consumed delayed message: delay=%dms, arrivedAt=%dms, payload=%s%n",
delay, firstReceivedAtMs, firstReceived
);
}
Thread.sleep(100L);
}
assertThat(firstReceived)
.as("Message with x-delay=%d should not be delivered within %d ms, but first arrived at %s ms",
delay, observeWindowMs, firstReceivedAtMs)
.isNull();
}
private void declareTopology() {
Map<String, Object> exchangeArgs = new HashMap<>();
exchangeArgs.put("x-delayed-type", "direct");
CustomExchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, exchangeArgs);
Queue queue = QueueBuilder.durable(QUEUE_NAME).build();
Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
rabbitAdmin.purgeQueue(QUEUE_NAME, true);
}
}Actual Result
Message is consumed almost immediately. Example output:
Consumed delayed message: delay=31536000000ms, arrivedAt=3ms, payload=long-delay-message
Expected Result
The message should remain delayed (certainly not delivered within 60 seconds when delay is 365 days).
Source Analysis (possible root cause)
I looked at current main (commit 6cad6c2b499e3269489152b494a2cdbc83317645) and found:
-
Delay is stored as
UInt32inSegmentPosition:getter delay : UInt32 # used by delayed exchange queue
-
x-delayparsing inSegmentPosition.makeconverts withas?(Int).try(&.to_u32)and rescues to0u32:lavinmq/src/lavinmq/segment_position.cr
Line 24 in 6cad6c2
when "x-delay" then delay = value.as?(Int).try(&.to_u32) || 0u32 rescue 0u32
-
Expiration check uses
timestamp + delay, so if delay becomes0, message expires immediately:lavinmq/src/lavinmq/amqp/queue/delayed_exchange_queue.cr
Lines 106 to 109 in 6cad6c2
delay = env.segment_position.delay timestamp = env.message.timestamp expire_at = timestamp + delay expire_at <= RoughTime.unix_ms
I suspect either:
- type mismatch for AMQP header numeric types (e.g. Int64/LongLong not matching
as?(Int)), or - overflow-to-zero behavior when converting to
UInt32.
In both cases, silently coercing invalid/overflow delay to zero can cause immediate delivery.
Suggestion
- Handle wider numeric types for
x-delayexplicitly (at least Int64). - Validate bounds and reject unsupported values with a precondition error, instead of silently defaulting to
0. - Consider storing delay as a wider type (
Int64/UInt64) if large delays are intended to be supported.