Skip to content

Commit 831fd72

Browse files
jevinjiangJiangShuJu
and
JiangShuJu
authored
[ISSUE #5127] fix create topic error in Standalone mode (#5128)
* [ISSUE #5127] fix * [ISSUE #5127] fix * [ISSUE #5127] fix * [ISSUE #5127] fix * [ISSUE #5127] fix checkstyle test --------- Co-authored-by: JiangShuJu <[email protected]>
1 parent dd9698f commit 831fd72

File tree

5 files changed

+27
-22
lines changed

5 files changed

+27
-22
lines changed

Diff for: eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.lmax.disruptor.dsl.ProducerType;
3232

3333
import lombok.Getter;
34+
import lombok.Setter;
3435

3536

3637
public class Channel implements LifeCycle {
@@ -39,11 +40,16 @@ public class Channel implements LifeCycle {
3940
@Getter
4041
private DisruptorProvider provider;
4142
private final Integer size;
42-
private final EventHandler<MessageEntity> eventHandler;
43+
@Setter
44+
private EventHandler<MessageEntity> eventHandler;
4345
private volatile boolean started = false;
4446
private final TopicMetadata topic;
4547
private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_";
4648

49+
public Channel(TopicMetadata topic) {
50+
this(DEFAULT_SIZE, topic, null);
51+
}
52+
4753
public Channel(TopicMetadata topic, EventHandler<MessageEntity> eventHandler) {
4854
this(DEFAULT_SIZE, topic, eventHandler);
4955
}

Diff for: eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,20 @@ public static StandaloneBroker getInstance() {
6060
public MessageEntity putMessage(String topicName, CloudEvent message) {
6161
TopicMetadata topicMetadata = new TopicMetadata(topicName);
6262
if (!messageContainer.containsKey(topicMetadata)) {
63-
createTopic(topicName);
63+
throw new RuntimeException(String.format("The topic:%s is not created", topicName));
6464
}
6565
Channel channel = messageContainer.get(topicMetadata);
66+
if (channel.isClosed()) {
67+
throw new RuntimeException(String.format("The topic:%s is not subscribed", topicName));
68+
}
6669
MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message);
6770
channel.getProvider().onData(messageEntity);
6871
return messageEntity;
6972
}
7073

7174
public Channel createTopic(String topicName) {
7275
TopicMetadata topicMetadata = new TopicMetadata(topicName);
73-
return messageContainer.computeIfAbsent(topicMetadata, k -> {
74-
Subscribe subscribe = subscribeContainer.get(topicMetadata);
75-
if (subscribe == null) {
76-
throw new IllegalStateException("the topic not exist subscribe ");
77-
}
78-
Channel channel = new Channel(topicMetadata, subscribe);
79-
channel.start();
80-
return channel;
81-
});
76+
return messageContainer.computeIfAbsent(topicMetadata, k -> new Channel(topicMetadata));
8277
}
8378

8479
/**
@@ -139,10 +134,17 @@ public void deleteTopicIfExist(String topicName) {
139134

140135
public void subscribed(String topicName, Subscribe subscribe) {
141136
TopicMetadata topicMetadata = new TopicMetadata(topicName);
142-
if (getMessageContainer().containsKey(topicMetadata)) {
143-
log.warn("the topic already subscribed");
137+
if (subscribeContainer.containsKey(topicMetadata)) {
138+
log.warn("the topic:{} already subscribed", topicName);
139+
return;
140+
}
141+
Channel channel = getMessageContainer().get(topicMetadata);
142+
if (channel == null) {
143+
log.warn("the topic:{} is not created", topicName);
144144
return;
145145
}
146+
channel.setEventHandler(subscribe);
147+
channel.start();
146148
subscribeContainer.put(topicMetadata, subscribe);
147149
}
148150

Diff for: eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,13 @@ public static MessageEntity createMessageEntity(TopicMetadata topicMetadata, Clo
9393
}
9494

9595
public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) {
96+
standaloneBroker.createTopic(TEST_TOPIC);
9697
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> {
9798
});
9899
}
99100

100101
public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, List<CloudEvent> cloudEvents) {
102+
standaloneBroker.createTopic(TEST_TOPIC);
101103
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> {
102104
cloudEvents.add(cloudEvent);
103105
});

Diff for: eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java

-9
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,4 @@ public void testCheckTopicExist() throws InterruptedException {
6969
Assertions.assertTrue(exists);
7070
}
7171

72-
@Test
73-
public void testDeleteTopicIfExist() throws InterruptedException {
74-
StandaloneBroker instance = getStandaloneBroker();
75-
CloudEvent cloudEvent = createDefaultCloudEvent();
76-
instance.putMessage(TEST_TOPIC, cloudEvent);
77-
instance.deleteTopicIfExist(TEST_TOPIC);
78-
boolean exists = instance.checkTopicExist(TEST_TOPIC);
79-
Assertions.assertFalse(exists);
80-
}
8172
}

Diff for: eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.eventmesh.storage.standalone.producer;
1919

2020
import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
21+
import static org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe;
2122

2223
import org.apache.eventmesh.api.SendResult;
2324
import org.apache.eventmesh.storage.standalone.TestUtils;
2425
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
26+
import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
2527

2628
import java.util.Properties;
2729

@@ -70,6 +72,8 @@ public void testPublish() {
7072
StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
7173
standaloneBroker.createTopicIfAbsent(TEST_TOPIC);
7274
CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent();
75+
Subscribe subscribe = createSubscribe(standaloneBroker);
76+
subscribe.subscribe();
7377
SendResult sendResult = standaloneProducer.publish(cloudEvent);
7478
Assertions.assertNotNull(sendResult);
7579
}

0 commit comments

Comments
 (0)