Skip to content

Commit 9aba654

Browse files
authored
[ISSUE #4820] Bug fix EventHandler not return json (#4821)
* bug fix * bug fix * bug fix
1 parent 748e22a commit 9aba654

File tree

4 files changed

+246
-1
lines changed

4 files changed

+246
-1
lines changed

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Ex
9999
.serialize(event);
100100
eventJsonList.add(new String(serializedEvent, StandardCharsets.UTF_8));
101101
}
102+
String result = JsonUtils.toJSONString(eventJsonList);
103+
writeJson(ctx, result);
102104
}
103105

104106
@Override

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static <T> Map<String, Object> parseHttpRequestBody(final HttpRequest htt
5252
} else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) {
5353
decodeHttpRequestBody(httpRequest, httpRequestBody);
5454
}
55-
if (Objects.isNull(t)) {
55+
if (!Objects.isNull(end)) {
5656
end.accept(t);
5757
}
5858
return httpRequestBody;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.runtime.admin.handler;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
import static org.mockito.ArgumentMatchers.anyInt;
23+
import static org.mockito.ArgumentMatchers.anyString;
24+
import static org.mockito.Mockito.when;
25+
26+
import org.apache.eventmesh.runtime.admin.handler.v1.EventHandler;
27+
import org.apache.eventmesh.runtime.boot.EventMeshServer;
28+
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
29+
import org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper;
30+
import org.apache.eventmesh.runtime.mock.MockCloudEvent;
31+
32+
import java.io.ByteArrayOutputStream;
33+
import java.io.IOException;
34+
import java.lang.reflect.Field;
35+
import java.lang.reflect.Method;
36+
import java.net.URI;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.junit.jupiter.api.extension.ExtendWith;
43+
import org.mockito.Mock;
44+
import org.mockito.Spy;
45+
import org.mockito.junit.jupiter.MockitoExtension;
46+
47+
import io.cloudevents.CloudEvent;
48+
import io.netty.buffer.ByteBuf;
49+
import io.netty.buffer.Unpooled;
50+
import io.netty.channel.ChannelHandlerContext;
51+
import io.netty.channel.SimpleChannelInboundHandler;
52+
import io.netty.channel.embedded.EmbeddedChannel;
53+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
54+
import io.netty.handler.codec.http.FullHttpRequest;
55+
import io.netty.handler.codec.http.HttpMethod;
56+
import io.netty.handler.codec.http.HttpObjectAggregator;
57+
import io.netty.handler.codec.http.HttpRequest;
58+
import io.netty.handler.codec.http.HttpRequestDecoder;
59+
import io.netty.handler.codec.http.HttpResponseEncoder;
60+
import io.netty.handler.codec.http.HttpVersion;
61+
62+
import com.fasterxml.jackson.databind.JsonNode;
63+
import com.fasterxml.jackson.databind.ObjectMapper;
64+
65+
@ExtendWith(MockitoExtension.class)
66+
public class EventHandlerTest {
67+
68+
public static String storagePlugin = "standalone";
69+
70+
EmbeddedChannel embeddedChannel;
71+
72+
AdminHandlerManager adminHandlerManager;
73+
74+
@Mock
75+
EventMeshServer eventMeshServer;
76+
77+
@Mock
78+
EventMeshTCPServer eventMeshTCPServer;
79+
80+
@Spy
81+
EventHandler eventHandler = new EventHandler(storagePlugin);
82+
83+
@Mock
84+
MQAdminWrapper mockAdmin;
85+
86+
List<CloudEvent> result = new ArrayList<>();
87+
88+
Method initHandler;
89+
90+
@BeforeEach
91+
public void init() throws Exception {
92+
result.add(new MockCloudEvent());
93+
when(eventMeshServer.getEventMeshTCPServer()).thenReturn(eventMeshTCPServer);
94+
adminHandlerManager = new AdminHandlerManager(eventMeshServer);
95+
Field admin = EventHandler.class.getDeclaredField("admin");
96+
admin.setAccessible(true);
97+
admin.set(eventHandler, mockAdmin);
98+
embeddedChannel = new EmbeddedChannel(
99+
new HttpRequestDecoder(),
100+
new HttpResponseEncoder(),
101+
new HttpObjectAggregator(Integer.MAX_VALUE),
102+
new SimpleChannelInboundHandler<HttpRequest>() {
103+
@Override
104+
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
105+
String uriStr = msg.uri();
106+
URI uri = URI.create(uriStr);
107+
adminHandlerManager.getHttpHandler(uri.getPath()).get().handle(msg, ctx);
108+
}
109+
});
110+
initHandler = AdminHandlerManager.class.getDeclaredMethod("initHandler", HttpHandler.class);
111+
initHandler.setAccessible(true);
112+
initHandler.invoke(adminHandlerManager, eventHandler);
113+
}
114+
115+
@Test
116+
public void testGet() throws Exception {
117+
when(mockAdmin.getEvent(anyString(), anyInt(), anyInt())).thenReturn(result);
118+
FullHttpRequest httpRequest = new DefaultFullHttpRequest(
119+
HttpVersion.HTTP_1_1, HttpMethod.GET, "/event?topicName=123&offset=0&length=1");
120+
embeddedChannel.writeInbound(httpRequest);
121+
boolean finish = embeddedChannel.finish();
122+
assertTrue(finish);
123+
ByteBuf byteBuf = null;
124+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
125+
while ((byteBuf = embeddedChannel.readOutbound()) != null) {
126+
byte[] data = new byte[byteBuf.readableBytes()];
127+
byteBuf.readBytes(data);
128+
byteArrayOutputStream.write(data);
129+
}
130+
;
131+
String response = new String(byteArrayOutputStream.toByteArray(), "UTF-8");
132+
String responseBody = response.split("\\r?\\n\\r?\\n")[1];
133+
JsonNode jsonNode = new ObjectMapper().readTree(responseBody);
134+
assertTrue(jsonNode.get(0).asText().contains("mockData"));
135+
}
136+
137+
@Test
138+
public void testPost() throws IOException {
139+
FullHttpRequest httpRequest = new DefaultFullHttpRequest(
140+
HttpVersion.HTTP_1_1, HttpMethod.POST, "/event", Unpooled.copiedBuffer(
141+
("specversion=1.0&id=cd7c0d63-6c7c-4300-9f4e-ceb51f46b1b1&source"
142+
+ "=/&type=cloudevents&datacontenttype=application/cloudevents+json&subject=test&ttl=4000").getBytes()));
143+
embeddedChannel.writeInbound(httpRequest);
144+
ByteBuf byteBuf = embeddedChannel.readOutbound();
145+
byte[] data = new byte[byteBuf.readableBytes()];
146+
byteBuf.readBytes(data);
147+
String response = new String(data, "UTF-8");
148+
String[] requestMessage = response.split("\r\n");
149+
assertEquals("HTTP/1.1 200 OK", requestMessage[0].toString());
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.runtime.mock;
19+
20+
import java.net.URI;
21+
import java.nio.charset.StandardCharsets;
22+
import java.time.OffsetDateTime;
23+
import java.util.Collections;
24+
import java.util.Set;
25+
26+
import io.cloudevents.CloudEvent;
27+
import io.cloudevents.CloudEventData;
28+
import io.cloudevents.SpecVersion;
29+
import io.cloudevents.core.data.BytesCloudEventData;
30+
31+
public class MockCloudEvent implements CloudEvent {
32+
33+
@Override
34+
public CloudEventData getData() {
35+
return BytesCloudEventData.wrap("mockData".getBytes(StandardCharsets.UTF_8));
36+
}
37+
38+
@Override
39+
public SpecVersion getSpecVersion() {
40+
return SpecVersion.V1;
41+
}
42+
43+
@Override
44+
public String getId() {
45+
return "mockId";
46+
}
47+
48+
@Override
49+
public String getType() {
50+
return "mockType";
51+
}
52+
53+
@Override
54+
public URI getSource() {
55+
return URI.create("mockSource");
56+
}
57+
58+
@Override
59+
public String getDataContentType() {
60+
return null;
61+
}
62+
63+
@Override
64+
public URI getDataSchema() {
65+
return URI.create("mockDataSchema");
66+
}
67+
68+
@Override
69+
public String getSubject() {
70+
return "mockSubject";
71+
}
72+
73+
@Override
74+
public OffsetDateTime getTime() {
75+
return null;
76+
}
77+
78+
@Override
79+
public Object getAttribute(String attributeName) throws IllegalArgumentException {
80+
return null;
81+
}
82+
83+
@Override
84+
public Object getExtension(String extensionName) {
85+
return null;
86+
}
87+
88+
@Override
89+
public Set<String> getExtensionNames() {
90+
return Collections.emptySet();
91+
}
92+
}

0 commit comments

Comments
 (0)