Skip to content

Commit cdcffd7

Browse files
committed
GH-9521: Make middle-flow endpoints started later
Fixes: #9521
1 parent 90f06b1 commit cdcffd7

File tree

4 files changed

+34
-20
lines changed

4 files changed

+34
-20
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -416,17 +416,9 @@ private void smartLifecycle() {
416416
if (this.autoStartup != null) {
417417
this.endpoint.setAutoStartup(this.autoStartup);
418418
}
419-
int phaseToSet = this.phase;
420-
if (!this.isPhaseSet) {
421-
if (this.endpoint instanceof PollingConsumer) {
422-
phaseToSet = Integer.MAX_VALUE / 2;
423-
}
424-
else {
425-
phaseToSet = Integer.MIN_VALUE;
426-
}
419+
if (this.isPhaseSet) {
420+
this.endpoint.setPhase(this.phase);
427421
}
428-
429-
this.endpoint.setPhase(phaseToSet);
430422
}
431423

432424

Diff for: spring-integration-core/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ public EventDrivenConsumer(SubscribableChannel inputChannel, MessageHandler hand
4545
Assert.notNull(handler, "handler must not be null");
4646
this.inputChannel = inputChannel;
4747
this.handler = handler;
48-
setPhase(Integer.MIN_VALUE);
48+
if (this.handler instanceof MessageProducer) {
49+
setPhase(Integer.MIN_VALUE + 1000);
50+
}
51+
else {
52+
setPhase(Integer.MIN_VALUE);
53+
}
4954
}
5055

5156
@Override

Diff for: spring-integration-core/src/test/java/org/springframework/integration/config/xml/ServiceActivatorParserTests.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package org.springframework.integration.config.xml;
1818

19-
import org.junit.Test;
20-
import org.junit.runner.RunWith;
19+
import org.junit.jupiter.api.Test;
2120

2221
import org.springframework.beans.factory.annotation.Autowired;
2322
import org.springframework.beans.factory.annotation.Qualifier;
@@ -29,8 +28,8 @@
2928
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
3029
import org.springframework.messaging.Message;
3130
import org.springframework.messaging.MessageChannel;
32-
import org.springframework.test.context.ContextConfiguration;
33-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
31+
import org.springframework.test.annotation.DirtiesContext;
32+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3433

3534
import static org.assertj.core.api.Assertions.assertThat;
3635
import static org.assertj.core.api.Assertions.fail;
@@ -39,11 +38,10 @@
3938
* @author Mark Fisher
4039
* @author Gary Russell
4140
* @author Artem Bilan
42-
*
4341
* @since 2.0
4442
*/
45-
@ContextConfiguration
46-
@RunWith(SpringJUnit4ClassRunner.class)
43+
@SpringJUnitConfig
44+
@DirtiesContext
4745
public class ServiceActivatorParserTests {
4846

4947
@Autowired
@@ -211,7 +209,7 @@ public void failMethodAndExpressionElement() {
211209

212210
@Test
213211
public void testConsumerEndpointFactoryBeanDefaultPhase() {
214-
assertThat(this.testAliasEndpoint.getPhase()).isEqualTo(Integer.MIN_VALUE);
212+
assertThat(this.testAliasEndpoint.getPhase()).isEqualTo(Integer.MIN_VALUE + 1000);
215213
}
216214

217215
private Object sendAndReceive(MessageChannel channel, Object payload) {

Diff for: spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3939
import org.springframework.integration.aggregator.FluxAggregatorMessageHandler;
4040
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
41+
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
4142
import org.springframework.integration.channel.QueueChannel;
4243
import org.springframework.integration.config.EnableIntegration;
4344
import org.springframework.integration.dsl.IntegrationFlow;
@@ -47,6 +48,8 @@
4748
import org.springframework.integration.dsl.context.IntegrationFlowContext;
4849
import org.springframework.integration.handler.MessageTriggerAction;
4950
import org.springframework.integration.json.ObjectToJsonTransformer;
51+
import org.springframework.integration.store.MessageGroupStore;
52+
import org.springframework.integration.store.SimpleMessageStore;
5053
import org.springframework.integration.support.MessageBuilder;
5154
import org.springframework.messaging.Message;
5255
import org.springframework.messaging.MessageChannel;
@@ -60,7 +63,6 @@
6063
/**
6164
* @author Artem Bilan
6265
* @author Gary Russell
63-
*
6466
* @since 5.0
6567
*/
6668
@SpringJUnitConfig
@@ -320,6 +322,23 @@ public IntegrationFlow releaseBarrierFlow(MessageTriggerAction barrierTriggerAct
320322
.get();
321323
}
322324

325+
@Bean
326+
IntegrationFlow purgeOrphanedGroupsDoesNotFailStartupFlow() {
327+
MessageGroupStore messageStore = new SimpleMessageStore();
328+
// Add two messages that 'complete' the aggregation immediately on startup
329+
messageStore.addMessagesToGroup("test", new GenericMessage<>("1"));
330+
messageStore.addMessagesToGroup("test", new GenericMessage<>("2"));
331+
332+
return f -> f.aggregate((a) -> a
333+
.messageStore(messageStore)
334+
.id("purgeOrphanedGroups")
335+
.expireTimeout(1) // Expire immediately
336+
// The group above is actually 'complete' on startup
337+
.releaseStrategy(new MessageCountReleaseStrategy(2)))
338+
.channel("purgeOrphanedGroupsChannel")
339+
.handle(message -> { }, e -> e.id("endOfFlowEndpoint"));
340+
}
341+
323342
}
324343

325344
record TestSplitterPojo(List<String> first, List<String> second) {

0 commit comments

Comments
 (0)