Skip to content

Commit b13d965

Browse files
committed
test: add delay-queue/hot-key tests
1 parent 7f9c5bd commit b13d965

21 files changed

Lines changed: 1621 additions & 0 deletions

camellia-tests/camellia-delay-queue-tests/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,22 @@
1414
</parent>
1515

1616
<dependencies>
17+
<dependency>
18+
<groupId>com.netease.nim</groupId>
19+
<artifactId>camellia-delay-queue-server</artifactId>
20+
<version>${revision}</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>com.netease.nim</groupId>
24+
<artifactId>camellia-delay-queue-sdk</artifactId>
25+
<version>${revision}</version>
26+
</dependency>
27+
<dependency>
28+
<groupId>junit</groupId>
29+
<artifactId>junit</artifactId>
30+
<version>4.13.2</version>
31+
<scope>test</scope>
32+
</dependency>
1733
</dependencies>
1834
<build>
1935
<plugins>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
package com.netease.nim.camellia.delayqueue.tests;
2+
3+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsg;
4+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgAckRequest;
5+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgAckResponse;
6+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgDeleteRequest;
7+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgDeleteResponse;
8+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgGetRequest;
9+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgGetResponse;
10+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgPullRequest;
11+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgPullResponse;
12+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgSendRequest;
13+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgSendResponse;
14+
import com.netease.nim.camellia.delayqueue.common.domain.CamelliaDelayMsgStatus;
15+
import com.netease.nim.camellia.delayqueue.common.exception.CamelliaDelayMsgErrorCode;
16+
import com.netease.nim.camellia.delayqueue.common.exception.CamelliaDelayQueueException;
17+
import com.netease.nim.camellia.delayqueue.sdk.CamelliaDelayMsgListenerConfig;
18+
import com.netease.nim.camellia.delayqueue.sdk.CamelliaDelayQueueSdk;
19+
import com.netease.nim.camellia.delayqueue.sdk.CamelliaDelayQueueSdkConfig;
20+
import com.netease.nim.camellia.delayqueue.sdk.api.CamelliaDelayQueueApi;
21+
import org.junit.Assert;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
25+
import java.lang.reflect.Field;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
34+
public class CamelliaDelayQueueSdkTest {
35+
36+
private static final int SUCCESS = CamelliaDelayMsgErrorCode.SUCCESS.getValue();
37+
38+
private FakeApi api;
39+
private CamelliaDelayQueueSdk sdk;
40+
41+
@Before
42+
public void setUp() throws Exception {
43+
api = new FakeApi();
44+
CamelliaDelayQueueSdkConfig config = new CamelliaDelayQueueSdkConfig();
45+
config.setUrl("http://127.0.0.1:1");
46+
CamelliaDelayMsgListenerConfig listenerConfig = new CamelliaDelayMsgListenerConfig();
47+
listenerConfig.setLongPollingEnable(false);
48+
listenerConfig.setPullIntervalTimeMillis(5);
49+
listenerConfig.setAckTimeoutMillis(20);
50+
listenerConfig.setPullThreads(1);
51+
listenerConfig.setConsumeThreads(1);
52+
config.setListenerConfig(listenerConfig);
53+
sdk = new CamelliaDelayQueueSdk(config);
54+
setApi(sdk, api);
55+
}
56+
57+
@Test
58+
public void shouldWrapSendGetAndDeleteResponses() {
59+
CamelliaDelayMsg sent = sdk.sendMsg("topic", "msg-id", "payload", 10, 100, 2);
60+
Assert.assertEquals("topic", sent.getTopic());
61+
Assert.assertEquals("msg-id", sent.getMsgId());
62+
Assert.assertEquals("payload", sent.getMsg());
63+
Assert.assertEquals(10, api.lastSendRequest.getDelayMillis());
64+
65+
Assert.assertEquals(sent, sdk.getMsg("topic", "msg-id"));
66+
Assert.assertTrue(sdk.deleteMsg("topic", "msg-id"));
67+
Assert.assertFalse(sdk.deleteMsg("topic", "missing"));
68+
}
69+
70+
@Test
71+
public void shouldThrowWhenSendResponseIsNotSuccess() {
72+
api.failNextSend = true;
73+
try {
74+
sdk.sendMsg("topic", "payload", 0);
75+
Assert.fail("expected exception");
76+
} catch (CamelliaDelayQueueException e) {
77+
Assert.assertEquals(CamelliaDelayMsgErrorCode.PARAM_WRONG, e.getErrorCode());
78+
}
79+
}
80+
81+
@Test
82+
public void shouldAckTrueWhenListenerConsumesSuccessfully() throws Exception {
83+
CamelliaDelayMsg msg = msg("topic", "success-msg", "payload");
84+
api.enqueue(msg);
85+
CountDownLatch consumed = new CountDownLatch(1);
86+
87+
long listenerId = sdk.addMsgListener("topic", delayMsg -> {
88+
consumed.countDown();
89+
return true;
90+
});
91+
92+
Assert.assertTrue(consumed.await(2, TimeUnit.SECONDS));
93+
assertEventually(() -> api.ackRequests.size() == 1, 2000);
94+
Assert.assertTrue(api.ackRequests.get(0).isAck());
95+
Assert.assertEquals("success-msg", api.ackRequests.get(0).getMsgId());
96+
Assert.assertTrue(sdk.removeMsgListener(listenerId));
97+
}
98+
99+
@Test
100+
public void shouldAckFalseWhenListenerReturnsFalseOrThrows() throws Exception {
101+
api.enqueue(msg("topic", "false-msg", "payload"));
102+
api.enqueue(msg("topic", "throw-msg", "payload"));
103+
AtomicInteger calls = new AtomicInteger();
104+
CountDownLatch consumed = new CountDownLatch(2);
105+
106+
long listenerId = sdk.addMsgListener("topic", delayMsg -> {
107+
consumed.countDown();
108+
int call = calls.incrementAndGet();
109+
if (call == 1) {
110+
return false;
111+
}
112+
throw new IllegalStateException("fail");
113+
});
114+
115+
Assert.assertTrue(consumed.await(2, TimeUnit.SECONDS));
116+
assertEventually(() -> api.ackRequests.size() == 2, 2000);
117+
Assert.assertFalse(api.ackRequests.get(0).isAck());
118+
Assert.assertFalse(api.ackRequests.get(1).isAck());
119+
Assert.assertTrue(sdk.removeMsgListener(listenerId));
120+
}
121+
122+
@Test
123+
public void shouldStopConsumingAfterListenerRemoved() throws Exception {
124+
CountDownLatch consumed = new CountDownLatch(1);
125+
AtomicReference<String> consumedId = new AtomicReference<>();
126+
api.enqueue(msg("topic", "before-remove", "payload"));
127+
128+
long listenerId = sdk.addMsgListener("topic", delayMsg -> {
129+
consumedId.set(delayMsg.getMsgId());
130+
consumed.countDown();
131+
return true;
132+
});
133+
Assert.assertTrue(consumed.await(2, TimeUnit.SECONDS));
134+
Assert.assertEquals("before-remove", consumedId.get());
135+
136+
Assert.assertTrue(sdk.removeMsgListener(listenerId));
137+
int ackCount = api.ackRequests.size();
138+
api.enqueue(msg("topic", "after-remove", "payload"));
139+
Thread.sleep(100);
140+
Assert.assertEquals(ackCount, api.ackRequests.size());
141+
}
142+
143+
private static CamelliaDelayMsg msg(String topic, String msgId, String payload) {
144+
CamelliaDelayMsg msg = new CamelliaDelayMsg();
145+
msg.setTopic(topic);
146+
msg.setMsgId(msgId);
147+
msg.setMsg(payload);
148+
msg.setProduceTime(System.currentTimeMillis());
149+
msg.setTriggerTime(System.currentTimeMillis());
150+
msg.setExpireTime(System.currentTimeMillis() + 1000);
151+
msg.setStatus(CamelliaDelayMsgStatus.READY.getValue());
152+
msg.setMaxRetry(1);
153+
return msg;
154+
}
155+
156+
private static void setApi(CamelliaDelayQueueSdk sdk, CamelliaDelayQueueApi api) throws Exception {
157+
Field field = CamelliaDelayQueueSdk.class.getDeclaredField("api");
158+
field.setAccessible(true);
159+
field.set(sdk, api);
160+
}
161+
162+
private static void assertEventually(BooleanSupplier supplier, long timeoutMillis) throws Exception {
163+
long deadline = System.currentTimeMillis() + timeoutMillis;
164+
while (System.currentTimeMillis() < deadline) {
165+
if (supplier.getAsBoolean()) {
166+
return;
167+
}
168+
Thread.sleep(10);
169+
}
170+
Assert.assertTrue(supplier.getAsBoolean());
171+
}
172+
173+
private interface BooleanSupplier {
174+
boolean getAsBoolean();
175+
}
176+
177+
private static class FakeApi extends CamelliaDelayQueueApi {
178+
179+
private final List<CamelliaDelayMsg> queue = Collections.synchronizedList(new ArrayList<>());
180+
private final List<CamelliaDelayMsgAckRequest> ackRequests = Collections.synchronizedList(new ArrayList<>());
181+
private CamelliaDelayMsgSendRequest lastSendRequest;
182+
private CamelliaDelayMsg lastSentMsg;
183+
private volatile boolean failNextSend;
184+
185+
FakeApi() {
186+
super(config());
187+
}
188+
189+
@Override
190+
public CamelliaDelayMsgSendResponse sendMsg(CamelliaDelayMsgSendRequest request) {
191+
lastSendRequest = request;
192+
CamelliaDelayMsgSendResponse response = new CamelliaDelayMsgSendResponse();
193+
if (failNextSend) {
194+
failNextSend = false;
195+
response.setCode(CamelliaDelayMsgErrorCode.PARAM_WRONG.getValue());
196+
response.setMsg("param wrong");
197+
return response;
198+
}
199+
CamelliaDelayMsg msg = msg(request.getTopic(), request.getMsgId() == null ? "generated" : request.getMsgId(), request.getMsg());
200+
msg.setTriggerTime(System.currentTimeMillis() + request.getDelayMillis());
201+
msg.setExpireTime(msg.getTriggerTime() + request.getTtlMillis());
202+
msg.setMaxRetry(request.getMaxRetry());
203+
lastSentMsg = msg;
204+
response.setCode(SUCCESS);
205+
response.setMsg("success");
206+
response.setDelayMsg(msg);
207+
return response;
208+
}
209+
210+
@Override
211+
public CamelliaDelayMsgDeleteResponse deleteMsg(CamelliaDelayMsgDeleteRequest request) {
212+
CamelliaDelayMsgDeleteResponse response = new CamelliaDelayMsgDeleteResponse();
213+
response.setCode("missing".equals(request.getMsgId()) ? CamelliaDelayMsgErrorCode.NOT_EXISTS.getValue() : SUCCESS);
214+
response.setMsg("success");
215+
return response;
216+
}
217+
218+
@Override
219+
public CamelliaDelayMsgGetResponse getMsg(CamelliaDelayMsgGetRequest request) {
220+
CamelliaDelayMsgGetResponse response = new CamelliaDelayMsgGetResponse();
221+
response.setCode(lastSentMsg == null ? CamelliaDelayMsgErrorCode.NOT_EXISTS.getValue() : SUCCESS);
222+
response.setMsg("success");
223+
response.setDelayMsg(lastSentMsg);
224+
return response;
225+
}
226+
227+
@Override
228+
public CamelliaDelayMsgPullResponse pullMsg(CamelliaDelayMsgPullRequest request) {
229+
CamelliaDelayMsgPullResponse response = new CamelliaDelayMsgPullResponse();
230+
response.setCode(SUCCESS);
231+
List<CamelliaDelayMsg> messages = new ArrayList<>();
232+
synchronized (queue) {
233+
if (!queue.isEmpty()) {
234+
messages.add(queue.remove(0));
235+
}
236+
}
237+
response.setDelayMsgList(messages);
238+
return response;
239+
}
240+
241+
@Override
242+
public CamelliaDelayMsgAckResponse ackMsg(CamelliaDelayMsgAckRequest request) {
243+
ackRequests.add(request);
244+
CamelliaDelayMsgAckResponse response = new CamelliaDelayMsgAckResponse();
245+
response.setCode(SUCCESS);
246+
response.setMsg("success");
247+
return response;
248+
}
249+
250+
void enqueue(CamelliaDelayMsg msg) {
251+
queue.add(msg);
252+
}
253+
254+
private static CamelliaDelayQueueSdkConfig config() {
255+
CamelliaDelayQueueSdkConfig config = new CamelliaDelayQueueSdkConfig();
256+
config.setUrl("http://127.0.0.1:1");
257+
return config;
258+
}
259+
}
260+
}

0 commit comments

Comments
 (0)