Skip to content

Commit 976e936

Browse files
committed
GH-1727 - Polishing.
1 parent 2349a93 commit 976e936

2 files changed

Lines changed: 62 additions & 50 deletions

File tree

spring-modulith-events/spring-modulith-events-amqp/src/main/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfiguration.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import io.namastack.outbox.handler.OutboxHandler;
1919

20+
import java.util.Collections;
2021
import java.util.HashMap;
22+
import java.util.Map;
2123
import java.util.concurrent.CompletableFuture;
2224

2325
import org.jobrunr.scheduling.JobScheduler;
@@ -35,9 +37,11 @@
3537
import org.springframework.context.annotation.Bean;
3638
import org.springframework.context.expression.BeanFactoryResolver;
3739
import org.springframework.core.env.Environment;
40+
import org.springframework.expression.EvaluationContext;
3841
import org.springframework.expression.spel.support.StandardEvaluationContext;
3942
import org.springframework.modulith.events.EventExternalizationConfiguration;
4043
import org.springframework.modulith.events.ExternalizationMode;
44+
import org.springframework.modulith.events.RoutingTarget;
4145
import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration;
4246
import org.springframework.modulith.events.jobrunr.JobRunrExternalizationTransport;
4347
import org.springframework.modulith.events.support.BrokerRouting;
@@ -122,15 +126,9 @@ private static EventExternalizationTransport createRabbitTransport(
122126
EventExternalizationConfiguration configuration, RabbitMessageOperations operations,
123127
BeanFactory factory) {
124128

125-
var context = new StandardEvaluationContext();
126-
context.setBeanResolver(new BeanFactoryResolver(factory));
127-
128129
return (payload, target) -> {
129130

130-
var routing = BrokerRouting.of(target, context);
131-
var headers = configuration.getHeadersFor(payload);
132-
133-
operations.convertAndSend(routing.getTarget(payload), routing.getKey(payload), payload, headers);
131+
send(payload, target, configuration, Collections.emptyMap(), operations, factory);
134132

135133
return CompletableFuture.completedFuture(null);
136134
};
@@ -140,19 +138,12 @@ private static EventExternalizationTransport createConfirmingRabbitTransport(
140138
EventExternalizationConfiguration configuration, RabbitMessageOperations operations,
141139
BeanFactory factory) {
142140

143-
var context = new StandardEvaluationContext();
144-
context.setBeanResolver(new BeanFactoryResolver(factory));
145-
146141
return (payload, target) -> {
147142

148-
var routing = BrokerRouting.of(target, context);
149-
150143
var correlation = new CorrelationData();
151-
var headers = new HashMap<>(configuration.getHeadersFor(payload));
144+
var headers = Map.of(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, (Object) correlation);
152145

153-
headers.put(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, correlation);
154-
155-
operations.convertAndSend(routing.getTarget(payload), routing.getKey(payload), payload, headers);
146+
send(payload, target, configuration, headers, operations, factory);
156147

157148
return correlation.getFuture().thenAccept(confirm -> {
158149

@@ -163,4 +154,23 @@ private static EventExternalizationTransport createConfirmingRabbitTransport(
163154
});
164155
};
165156
}
157+
158+
private static void send(Object payload, RoutingTarget target, EventExternalizationConfiguration configuration,
159+
Map<String, Object> additionalHeaders, RabbitMessageOperations operations, BeanFactory factory) {
160+
161+
var routing = BrokerRouting.of(target, createContext(factory));
162+
163+
var headers = new HashMap<>(configuration.getHeadersFor(payload));
164+
headers.putAll(additionalHeaders);
165+
166+
operations.convertAndSend(routing.getTarget(payload), routing.getKey(payload), payload, additionalHeaders);
167+
}
168+
169+
private static EvaluationContext createContext(BeanFactory factory) {
170+
171+
var context = new StandardEvaluationContext();
172+
context.setBeanResolver(new BeanFactoryResolver(factory));
173+
174+
return context;
175+
}
166176
}

spring-modulith-events/spring-modulith-events-amqp/src/test/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfigurationIntegrationTests.java

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.modulith.events.amqp;
1717

1818
import static org.assertj.core.api.Assertions.*;
19+
import static org.mockito.ArgumentMatchers.*;
1920
import static org.mockito.Mockito.*;
2021

2122
import io.namastack.outbox.handler.OutboxHandler;
@@ -58,29 +59,6 @@ class RabbitEventExternalizerConfigurationIntegrationTests {
5859
static final EventExternalizationConfiguration EXTERNALIZATION_ENABLED = EventExternalizationConfiguration
5960
.defaults("org").build();
6061

61-
@Test
62-
void requiresCorrelatedPublisherConfirmsForOutboxMode() {
63-
64-
basicSetupWithoutPublisherConfirms(EXTERNALIZATION_ENABLED, confirmingOperations())
65-
.withPropertyValues(ExternalizationMode.PROPERTY + "=" + ExternalizationMode.OUTBOX)
66-
.run(ctxt -> {
67-
assertThat(ctxt).hasFailed();
68-
assertThat(ctxt.getStartupFailure())
69-
.hasRootCauseMessage("RabbitMQ outbox event externalization requires "
70-
+ "spring.rabbitmq.publisher-confirm-type=correlated!");
71-
});
72-
}
73-
74-
@Test
75-
void doesNotRequireCorrelatedPublisherConfirmsForModuleListenerMode() {
76-
77-
basicSetupWithoutPublisherConfirms(EXTERNALIZATION_ENABLED, mock(RabbitMessageOperations.class))
78-
.run(ctxt -> {
79-
assertThat(ctxt).hasNotFailed();
80-
assertThat(ctxt).hasSingleBean(EventExternalizerModuleListener.class);
81-
});
82-
}
83-
8462
@Test // GH-342
8563
void registersExternalizerByDefault() {
8664

@@ -152,7 +130,31 @@ void publishesEventExternalizedAfterNamastackExternalization() {
152130
assertEventExternalizedPublished(OutboxHandler.class, (transport, event) -> transport.handle(event, null));
153131
}
154132

155-
@Test
133+
@Test // GH-1727
134+
void requiresCorrelatedPublisherConfirmsForOutboxMode() {
135+
136+
basicSetupWithoutPublisherConfirms(EXTERNALIZATION_ENABLED, confirmingOperations())
137+
.withPropertyValues(ExternalizationMode.PROPERTY + "=" + ExternalizationMode.OUTBOX)
138+
.run(ctxt -> {
139+
140+
assertThat(ctxt).hasFailed();
141+
assertThat(ctxt.getStartupFailure())
142+
.hasRootCauseMessage("RabbitMQ outbox event externalization requires "
143+
+ "spring.rabbitmq.publisher-confirm-type=correlated!");
144+
});
145+
}
146+
147+
@Test // GH-1727
148+
void doesNotRequireCorrelatedPublisherConfirmsForModuleListenerMode() {
149+
150+
basicSetupWithoutPublisherConfirms(EXTERNALIZATION_ENABLED, mock(RabbitMessageOperations.class))
151+
.run(ctxt -> {
152+
assertThat(ctxt).hasNotFailed();
153+
assertThat(ctxt).hasSingleBean(EventExternalizerModuleListener.class);
154+
});
155+
}
156+
157+
@Test // GH-1727
156158
void propagatesSynchronousFailureAsFailedFuture() {
157159

158160
var operations = mock(RabbitMessageOperations.class);
@@ -170,7 +172,7 @@ void propagatesSynchronousFailureAsFailedFuture() {
170172
});
171173
}
172174

173-
@Test
175+
@Test // GH-1727
174176
void completesTransportFutureAfterPositivePublisherConfirm() {
175177

176178
var correlations = new ArrayList<CorrelationData>();
@@ -194,13 +196,13 @@ void completesTransportFutureAfterPositivePublisherConfirm() {
194196
});
195197
}
196198

197-
@Test
199+
@Test // GH-1727
198200
void completesTransportFutureExceptionallyAfterPublisherNack() {
199201

200202
assertPublisherNack(JobRunrExternalizationTransport.class, JobRunrExternalizationTransport::externalize);
201203
}
202204

203-
@Test
205+
@Test // GH-1727
204206
void moduleListenerDoesNotUsePublisherConfirms() {
205207

206208
var operations = mock(RabbitMessageOperations.class);
@@ -212,18 +214,18 @@ void moduleListenerDoesNotUsePublisherConfirms() {
212214
assertThat(headers).doesNotContainKey(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION);
213215

214216
return null;
217+
215218
}).when(operations).convertAndSend(any(), any(), any(), anyMap());
216219

217-
basicSetupWithoutPublisherConfirms(EXTERNALIZATION_ENABLED, operations)
218-
.run(ctxt -> {
220+
basicSetupWithoutPublisherConfirms(EXTERNALIZATION_ENABLED, operations).run(ctxt -> {
219221

220-
var result = ctxt.getBean(EventExternalizerModuleListener.class).externalize(new SampleEvent());
222+
var result = ctxt.getBean(EventExternalizerModuleListener.class).externalize(new SampleEvent());
221223

222-
assertThat(result).isCompleted();
223-
});
224+
assertThat(result).isCompleted();
225+
});
224226
}
225227

226-
@Test
228+
@Test // GH-1727
227229
void addsUniquePublisherConfirmCorrelationForEachPublish() {
228230

229231
var correlations = new ArrayList<CorrelationData>();
@@ -244,7 +246,7 @@ void addsUniquePublisherConfirmCorrelationForEachPublish() {
244246
});
245247
}
246248

247-
@Test
249+
@Test // GH-1727
248250
void propagatesPublisherNackToNamastackOutboxHandler() {
249251

250252
assertPublisherNack(OutboxHandler.class, (transport, event) -> transport.handle(event, null));

0 commit comments

Comments
 (0)