Skip to content

Commit ac70625

Browse files
committed
fix(rocketmq): log warning when FLAG or DELAY header is not numeric
RocketMQMessageConverterSupport#getAndWrapMessage swallowed any Integer.parseInt failure on the MQ_FLAG and DELAY_TIME_LEVEL message headers with catch (Exception ignored) {}, silently falling back to flag=0 and delayLevel=0. A producer that set DELAY to a misconfigured non-numeric value (e.g. via a SpEL expression that produced a String instead of an int) would have its delayed delivery silently turn into an immediate send with no log line, no exception, and no way for the caller to know the header was malformed. Add an SLF4J logger to the support class and warn with both header names and their actual values when parsing fails. Behaviour is otherwise unchanged: flag and delayLevel still fall back to 0, the broad catch (Exception) is kept, and the DELAY_TIME_LEVEL header lookup remains inside the try block so any exception path that the original code happened to absorb is still absorbed. Only the variable declaration for delayLevelObj is hoisted out of the try block so the catch clause can include its raw value in the warning. Adds three regression tests that lock in the fallback behaviour: non-numeric DELAY_TIME_LEVEL stays at 0, non-numeric FLAG stays at 0, and conversion does not propagate a parse exception. These tests pass both before and after the fix; their purpose is to prevent a future refactor from regressing the silent-fallback semantics that downstream producers may rely on.
1 parent 4877f98 commit ac70625

2 files changed

Lines changed: 52 additions & 2 deletions

File tree

spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
2727
import org.apache.rocketmq.common.message.MessageConst;
2828
import org.apache.rocketmq.common.message.MessageExt;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931

3032
import org.springframework.messaging.Message;
3133
import org.springframework.messaging.MessageHeaders;
@@ -40,6 +42,9 @@
4042
*/
4143
public final class RocketMQMessageConverterSupport {
4244

45+
private static final Logger log = LoggerFactory
46+
.getLogger(RocketMQMessageConverterSupport.class);
47+
4348
private RocketMQMessageConverterSupport() {
4449
}
4550

@@ -155,17 +160,23 @@ private static org.apache.rocketmq.common.message.Message getAndWrapMessage(
155160
headers.get(toRocketHeaderKey(Headers.FLAG)));
156161
int flag = 0;
157162
int delayLevel = 0;
163+
Object delayLevelObj = null;
158164
try {
159165
flagObj = flagObj == null ? 0 : flagObj;
160-
Object delayLevelObj = headers.getOrDefault(
166+
delayLevelObj = headers.getOrDefault(
161167
RocketMQConst.PROPERTY_DELAY_TIME_LEVEL,
162168
headers.get(toRocketHeaderKey(
163169
RocketMQConst.PROPERTY_DELAY_TIME_LEVEL)));
164170
delayLevelObj = delayLevelObj == null ? 0 : delayLevelObj;
165171
delayLevel = Integer.parseInt(String.valueOf(delayLevelObj));
166172
flag = Integer.parseInt(String.valueOf(flagObj));
167173
}
168-
catch (Exception ignored) {
174+
catch (Exception e) {
175+
log.warn(
176+
"Non-numeric '{}' or '{}' header; falling back to flag=0 and delayLevel=0. "
177+
+ "flagHeader={}, delayLevelHeader={}",
178+
Headers.FLAG, RocketMQConst.PROPERTY_DELAY_TIME_LEVEL,
179+
flagObj, delayLevelObj, e);
169180
}
170181
if (delayLevel > 0) {
171182
rocketMsg.setDelayTimeLevel(delayLevel);

spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.cloud.stream.binder.rocketmq;
1818

19+
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
1920
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
2021
import org.apache.rocketmq.common.message.MessageConst;
2122
import org.junit.jupiter.api.Test;
@@ -24,6 +25,7 @@
2425
import org.springframework.messaging.support.MessageBuilder;
2526

2627
import static org.assertj.core.api.Assertions.assertThat;
28+
import static org.assertj.core.api.Assertions.assertThatCode;
2729

2830
/**
2931
* @author Sorie
@@ -44,4 +46,41 @@ public void convertMessage2MQBlankHeaderTest() {
4446
assertThat(testProp).isNull();
4547
assertThat(tagProp).isEqualTo("a");
4648
}
49+
50+
@Test
51+
public void nonNumericDelayTimeLevelHeaderFallsBackToZero() {
52+
Message<String> message = MessageBuilder.withPayload("msg")
53+
.setHeader(RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, "not-a-number")
54+
.build();
55+
56+
org.apache.rocketmq.common.message.Message rkmqMsg =
57+
RocketMQMessageConverterSupport.convertMessage2MQ("topic", message);
58+
59+
assertThat(rkmqMsg.getDelayTimeLevel()).isEqualTo(0);
60+
assertThat(rkmqMsg.getFlag()).isEqualTo(0);
61+
}
62+
63+
@Test
64+
public void nonNumericFlagHeaderFallsBackToZero() {
65+
Message<String> message = MessageBuilder.withPayload("msg")
66+
.setHeader(RocketMQConst.Headers.FLAG, "not-a-number")
67+
.build();
68+
69+
org.apache.rocketmq.common.message.Message rkmqMsg =
70+
RocketMQMessageConverterSupport.convertMessage2MQ("topic", message);
71+
72+
assertThat(rkmqMsg.getFlag()).isEqualTo(0);
73+
}
74+
75+
@Test
76+
public void invalidNumericHeaderDoesNotPropagateException() {
77+
Message<String> message = MessageBuilder.withPayload("msg")
78+
.setHeader(RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, "x")
79+
.setHeader(RocketMQConst.Headers.FLAG, "y")
80+
.build();
81+
82+
assertThatCode(() ->
83+
RocketMQMessageConverterSupport.convertMessage2MQ("topic", message))
84+
.doesNotThrowAnyException();
85+
}
4786
}

0 commit comments

Comments
 (0)