Skip to content

Commit e36bc1e

Browse files
committed
DelayedMessageWrapper cannot be deserialized when using RedisMessageStore and JSON serialization
1 parent 1de81b6 commit e36bc1e

File tree

5 files changed

+115
-7
lines changed

5 files changed

+115
-7
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
* @author Artem Bilan
9898
* @author Gary Russell
9999
* @author Christian Tzolov
100+
* @author Youbin Wu
100101
*
101102
* @since 1.0.3
102103
*/
@@ -689,7 +690,7 @@ public static final class DelayedMessageWrapper implements Serializable {
689690
@SuppressWarnings("serial")
690691
private final Message<?> original;
691692

692-
DelayedMessageWrapper(Message<?> original, long requestDate) {
693+
public DelayedMessageWrapper(Message<?> original, long requestDate) {
693694
this.original = original;
694695
this.requestDate = requestDate;
695696
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2017-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.json;
18+
19+
import com.fasterxml.jackson.databind.DeserializationContext;
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
23+
24+
import org.springframework.integration.handler.DelayHandler;
25+
import org.springframework.messaging.Message;
26+
import org.springframework.util.Assert;
27+
28+
import java.io.IOException;
29+
30+
/**
31+
* A Jackson {@link StdNodeBasedDeserializer} extension for {@link Message} implementations.
32+
*
33+
* @author Youbin Wu
34+
* @since 6.4
35+
*/
36+
public class DelayedMessageWrapperJacksonDeserializer extends StdNodeBasedDeserializer<DelayHandler.DelayedMessageWrapper> {
37+
38+
private static final long serialVersionUID = 1L;
39+
40+
private ObjectMapper mapper = new ObjectMapper();
41+
42+
protected DelayedMessageWrapperJacksonDeserializer() {
43+
super(DelayHandler.DelayedMessageWrapper.class);
44+
}
45+
46+
public void setMapper(ObjectMapper mapper) {
47+
Assert.notNull(mapper, "'mapper' must not be null");
48+
this.mapper = mapper;
49+
}
50+
51+
public ObjectMapper getMapper() {
52+
return mapper;
53+
}
54+
55+
@Override
56+
public DelayHandler.DelayedMessageWrapper convert(JsonNode root, DeserializationContext ctxt)
57+
throws IOException {
58+
Long requestDate = this.mapper.readValue(root.get("requestDate").traverse(), Long.class);
59+
Message<?> message = this.mapper.readValue(root.get("original").traverse(), Message.class);
60+
return new DelayHandler.DelayedMessageWrapper(message, requestDate);
61+
}
62+
63+
}

Diff for: spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import com.fasterxml.jackson.databind.jsontype.TypeIdResolver;
3737
import com.fasterxml.jackson.databind.module.SimpleModule;
3838

39+
import org.springframework.integration.handler.DelayHandler;
3940
import org.springframework.integration.message.AdviceMessage;
4041
import org.springframework.integration.support.MutableMessage;
4142
import org.springframework.messaging.support.ErrorMessage;
@@ -46,6 +47,7 @@
4647
*
4748
* @author Artem Bilan
4849
* @author Gary Russell
50+
* @author Youbin Wu
4951
*
5052
* @since 3.0
5153
*
@@ -63,7 +65,8 @@ public final class JacksonJsonUtils {
6365
"org.springframework.integration.support",
6466
"org.springframework.integration.message",
6567
"org.springframework.integration.store",
66-
"org.springframework.integration.history"
68+
"org.springframework.integration.history",
69+
"org.springframework.integration.handler"
6770
);
6871

6972
private JacksonJsonUtils() {
@@ -96,13 +99,17 @@ public static ObjectMapper messagingAwareMapper(String... trustedPackages) {
9699
MutableMessageJacksonDeserializer mutableMessageDeserializer = new MutableMessageJacksonDeserializer();
97100
mutableMessageDeserializer.setMapper(mapper);
98101

102+
DelayedMessageWrapperJacksonDeserializer delayedMessageWrapperJacksonDeserializer = new DelayedMessageWrapperJacksonDeserializer();
103+
delayedMessageWrapperJacksonDeserializer.setMapper(mapper);
104+
99105
SimpleModule simpleModule = new SimpleModule()
100106
.addSerializer(new MessageHeadersJacksonSerializer())
101107
.addSerializer(new MimeTypeSerializer())
102108
.addDeserializer(GenericMessage.class, genericMessageDeserializer)
103109
.addDeserializer(ErrorMessage.class, errorMessageDeserializer)
104110
.addDeserializer(AdviceMessage.class, adviceMessageDeserializer)
105-
.addDeserializer(MutableMessage.class, mutableMessageDeserializer);
111+
.addDeserializer(MutableMessage.class, mutableMessageDeserializer)
112+
.addDeserializer(DelayHandler.DelayedMessageWrapper.class, delayedMessageWrapperJacksonDeserializer);
106113

107114
mapper.registerModule(simpleModule);
108115
return mapper;

Diff for: spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisChannelMessageStoreTests.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,9 +29,11 @@
2929
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
3030
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3131
import org.springframework.integration.channel.NullChannel;
32+
import org.springframework.integration.handler.DelayHandler;
3233
import org.springframework.integration.history.MessageHistory;
3334
import org.springframework.integration.redis.RedisContainerTest;
3435
import org.springframework.integration.store.MessageGroup;
36+
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3537
import org.springframework.integration.support.MessageBuilder;
3638
import org.springframework.integration.support.MutableMessageBuilder;
3739
import org.springframework.integration.support.json.JacksonJsonUtils;
@@ -47,6 +49,7 @@
4749
* @author Gary Russell
4850
* @author Artem Bilan
4951
* @author Artem Vozhdayenko
52+
* @author Youbin Wu
5053
*
5154
* @since 4.0
5255
*
@@ -201,4 +204,30 @@ void testJsonSerialization() {
201204
assertThat(messages.get(0).getHeaders()).containsKeys(MessageHistory.HEADER_NAME);
202205
}
203206

207+
@Test
208+
void testDelayJsonSerialization() {
209+
RedisChannelMessageStore store = new RedisChannelMessageStore(RedisContainerTest.connectionFactory());
210+
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
211+
GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(mapper);
212+
store.setValueSerializer(serializer);
213+
214+
Message<?> originMessage = new GenericMessage<>(new Date());
215+
Message<?> genericMessage = new DefaultMessageBuilderFactory()
216+
.withPayload(new DelayHandler.DelayedMessageWrapper(originMessage, System.currentTimeMillis()))
217+
.copyHeaders(originMessage.getHeaders())
218+
.build();
219+
NullChannel testComponent = new NullChannel();
220+
testComponent.setBeanName("testChannel");
221+
genericMessage = MessageHistory.write(genericMessage, testComponent);
222+
223+
String groupId = "delayJsonMessagesStore";
224+
225+
store.addMessageToGroup(groupId, genericMessage);
226+
MessageGroup messageGroup = store.getMessageGroup(groupId);
227+
assertThat(messageGroup.size()).isEqualTo(1);
228+
List<Message<?>> messages = new ArrayList<>(messageGroup.getMessages());
229+
assertThat(messages.get(0)).isEqualTo(genericMessage);
230+
assertThat(messages.get(0).getHeaders()).containsKeys(MessageHistory.HEADER_NAME);
231+
}
232+
204233
}

Diff for: spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@
4242
import org.springframework.integration.channel.DirectChannel;
4343
import org.springframework.integration.channel.NullChannel;
4444
import org.springframework.integration.channel.QueueChannel;
45+
import org.springframework.integration.handler.DelayHandler;
4546
import org.springframework.integration.history.MessageHistory;
4647
import org.springframework.integration.message.AdviceMessage;
4748
import org.springframework.integration.redis.RedisContainerTest;
4849
import org.springframework.integration.store.MessageGroup;
4950
import org.springframework.integration.store.SimpleMessageGroup;
51+
import org.springframework.integration.support.DefaultMessageBuilderFactory;
5052
import org.springframework.integration.support.MessageBuilder;
5153
import org.springframework.integration.support.MutableMessage;
5254
import org.springframework.integration.support.json.JacksonJsonUtils;
@@ -64,6 +66,7 @@
6466
* @author Artem Bilan
6567
* @author Gary Russell
6668
* @author Artem Vozhdayenko
69+
* @author Youbin Wu
6770
*/
6871
class RedisMessageGroupStoreTests implements RedisContainerTest {
6972

@@ -400,16 +403,21 @@ void testJsonSerialization() {
400403
Message<?> mutableMessage = new MutableMessage<>(UUID.randomUUID());
401404
Message<?> adviceMessage = new AdviceMessage<>("foo", genericMessage);
402405
ErrorMessage errorMessage = new ErrorMessage(new RuntimeException("test exception"), mutableMessage);
406+
Message<?> delayMessage = new DefaultMessageBuilderFactory()
407+
.withPayload(new DelayHandler.DelayedMessageWrapper(genericMessage, System.currentTimeMillis()))
408+
.copyHeaders(genericMessage.getHeaders())
409+
.build();
403410

404-
store.addMessagesToGroup(this.groupId, genericMessage, mutableMessage, adviceMessage, errorMessage);
411+
store.addMessagesToGroup(this.groupId, genericMessage, mutableMessage, adviceMessage, errorMessage, delayMessage);
405412

406413
MessageGroup messageGroup = store.getMessageGroup(this.groupId);
407-
assertThat(messageGroup.size()).isEqualTo(4);
414+
assertThat(messageGroup.size()).isEqualTo(5);
408415
List<Message<?>> messages = new ArrayList<>(messageGroup.getMessages());
409416
assertThat(messages.get(0)).isEqualTo(genericMessage);
410417
assertThat(messages.get(0).getHeaders()).containsKeys(MessageHistory.HEADER_NAME);
411418
assertThat(messages.get(1)).isEqualTo(mutableMessage);
412419
assertThat(messages.get(2)).isEqualTo(adviceMessage);
420+
assertThat(messages.get(4)).isEqualTo(delayMessage);
413421
Message<?> errorMessageResult = messages.get(3);
414422
assertThat(errorMessageResult.getHeaders()).isEqualTo(errorMessage.getHeaders());
415423
assertThat(errorMessageResult).isInstanceOf(ErrorMessage.class);

0 commit comments

Comments
 (0)