Skip to content

Commit 5b3dafb

Browse files
committed
refactor httphandler
1 parent 648f3e9 commit 5b3dafb

36 files changed

+1156
-2296
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java

+11
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.eventmesh.common.protocol.http.header.Header;
2828
import org.apache.eventmesh.common.utils.JsonUtils;
2929

30+
import java.net.URI;
3031
import java.util.Objects;
3132
import java.util.Optional;
3233
import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,16 @@ public class HttpCommand implements ProtocolTransportObject {
6667
// Command response time
6768
public long resTime;
6869

70+
public URI requestURI;
71+
72+
public URI getRequestURI() {
73+
return requestURI;
74+
}
75+
76+
public void setRequestURI(URI requestURI) {
77+
this.requestURI = requestURI;
78+
}
79+
6980
public CmdType cmdType = CmdType.REQ;
7081

7182
public HttpCommand() {

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java

+10
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.IOException;
2727
import java.lang.reflect.Type;
28+
import java.util.Map;
2829
import java.util.Objects;
2930

3031
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -52,6 +53,15 @@ public class JsonUtils {
5253
OBJECT_MAPPER.registerModule(new JavaTimeModule());
5354
}
5455

56+
public static <T> T mapToObject(Map<String, Object> map, Class<T> beanClass) {
57+
if (map == null) {
58+
return null;
59+
}
60+
Object obj = OBJECT_MAPPER.convertValue(map, beanClass);
61+
return (T) obj;
62+
}
63+
64+
5565
/**
5666
* Serialize object to json string.
5767
*

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/NetUtils.java

-29
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,10 @@
1717

1818
package org.apache.eventmesh.common.utils;
1919

20-
import static org.apache.eventmesh.common.Constants.SUCCESS_CODE;
21-
2220
import org.apache.eventmesh.common.Constants;
23-
import org.apache.eventmesh.common.enums.HttpMethod;
2421

2522
import org.apache.commons.lang3.StringUtils;
2623

27-
import java.io.IOException;
28-
import java.io.InputStreamReader;
2924
import java.io.UnsupportedEncodingException;
3025
import java.net.InetSocketAddress;
3126
import java.net.URLDecoder;
@@ -34,8 +29,6 @@
3429
import java.util.HashMap;
3530
import java.util.Map;
3631

37-
import com.sun.net.httpserver.HttpExchange;
38-
3932
import lombok.extern.slf4j.Slf4j;
4033

4134
/**
@@ -81,26 +74,4 @@ public static String addressToString(Collection<InetSocketAddress> clients) {
8174
}
8275
return sb.toString();
8376
}
84-
85-
public static String parsePostBody(HttpExchange exchange)
86-
throws IOException {
87-
88-
if (!HttpMethod.POST.name().equalsIgnoreCase(exchange.getRequestMethod())
89-
&& !HttpMethod.PUT.name().equalsIgnoreCase(exchange.getRequestMethod())) {
90-
return "";
91-
}
92-
StringBuilder body = new StringBuilder(1024);
93-
try (InputStreamReader reader = new InputStreamReader(exchange.getRequestBody(), Constants.DEFAULT_CHARSET.name())) {
94-
char[] buffer = new char[256];
95-
int readIndex;
96-
while ((readIndex = reader.read(buffer)) != -1) {
97-
body.append(buffer, 0, readIndex);
98-
}
99-
}
100-
return body.toString();
101-
}
102-
103-
public static void sendSuccessResponseHeaders(HttpExchange httpExchange) throws IOException {
104-
httpExchange.sendResponseHeaders(SUCCESS_CODE, 0);
105-
}
10677
}

Diff for: eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/NetUtilsTest.java

-29
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,13 @@
1717

1818
package org.apache.eventmesh.common.utils;
1919

20-
import org.apache.eventmesh.common.enums.HttpMethod;
21-
22-
import java.io.ByteArrayInputStream;
23-
import java.io.IOException;
2420
import java.net.InetSocketAddress;
25-
import java.nio.charset.StandardCharsets;
2621
import java.util.ArrayList;
2722
import java.util.List;
2823
import java.util.Map;
2924

3025
import org.junit.jupiter.api.Assertions;
3126
import org.junit.jupiter.api.Test;
32-
import org.mockito.Mockito;
33-
34-
import com.sun.net.httpserver.HttpExchange;
3527

3628
public class NetUtilsTest {
3729

@@ -58,25 +50,4 @@ public void testAddressToString() {
5850
Assertions.assertEquals(localAddress + "|", result);
5951
}
6052

61-
@Test
62-
public void testParsePostBody() throws Exception {
63-
64-
HttpExchange exchange = Mockito.mock(HttpExchange.class);
65-
String expected = "mxsm";
66-
ByteArrayInputStream inputStream = new ByteArrayInputStream(expected.getBytes(StandardCharsets.UTF_8));
67-
Mockito.when(exchange.getRequestMethod()).thenReturn(HttpMethod.POST.name());
68-
Mockito.when(exchange.getRequestBody()).thenReturn(inputStream);
69-
70-
String actual = NetUtils.parsePostBody(exchange);
71-
Assertions.assertEquals(expected, actual);
72-
73-
}
74-
75-
@Test
76-
public void testSendSuccessResponseHeaders() throws IOException {
77-
HttpExchange exchange = Mockito.mock(HttpExchange.class);
78-
NetUtils.sendSuccessResponseHeaders(exchange);
79-
Mockito.verify(exchange, Mockito.times(1))
80-
.sendResponseHeaders(Mockito.anyInt(), Mockito.anyLong());
81-
}
8253
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java

+119-9
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,128 @@
1717

1818
package org.apache.eventmesh.runtime.admin.handler;
1919

20-
import com.sun.net.httpserver.HttpHandler;
20+
import org.apache.eventmesh.common.enums.HttpMethod;
21+
import org.apache.eventmesh.common.protocol.http.HttpCommand;
22+
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
23+
import org.apache.eventmesh.runtime.util.HttpResponseUtils;
2124

22-
import lombok.Data;
25+
import java.util.HashMap;
26+
import java.util.Map;
2327

24-
/**
25-
* An abstract class that implements the {@link HttpHandler} interface
26-
* and provides basic functionality for HTTP request handling.
27-
* <p>
28-
* Subclasses should extend this class to implement specific HTTP request handling logic.
29-
*/
28+
import io.netty.buffer.Unpooled;
29+
import io.netty.channel.ChannelFutureListener;
30+
import io.netty.channel.ChannelHandlerContext;
31+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
32+
import io.netty.handler.codec.http.DefaultHttpHeaders;
33+
import io.netty.handler.codec.http.HttpHeaders;
34+
import io.netty.handler.codec.http.HttpResponse;
35+
import io.netty.handler.codec.http.HttpResponseStatus;
36+
import io.netty.handler.codec.http.HttpVersion;
37+
import io.netty.util.AsciiString;
38+
39+
import lombok.Data;
3040

3141
@Data
32-
public abstract class AbstractHttpHandler implements HttpHandler {
42+
public abstract class AbstractHttpHandler implements org.apache.eventmesh.runtime.admin.handler.HttpHandler {
43+
44+
protected void write(ChannelHandlerContext ctx, byte[] result, AsciiString headerValue) {
45+
ctx.writeAndFlush(HttpResponseUtils.getHttpResponse(result, ctx, headerValue)).addListener(ChannelFutureListener.CLOSE);
46+
}
47+
48+
protected void write(ChannelHandlerContext ctx, HttpResponse response) {
49+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
50+
}
51+
52+
protected void write(ChannelHandlerContext ctx, byte[] result, AsciiString headerValue, HttpResponseStatus httpResponseStatus) {
53+
ctx.writeAndFlush(HttpResponseUtils.getHttpResponse(result, ctx, headerValue, httpResponseStatus)).addListener(ChannelFutureListener.CLOSE);
54+
}
55+
56+
protected void write401(ChannelHandlerContext ctx) {
57+
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
58+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
59+
}
60+
61+
protected void writeSuccess(ChannelHandlerContext ctx) {
62+
HttpHeaders responseHeaders = new DefaultHttpHeaders();
63+
responseHeaders.add(EventMeshConstants.HANDLER_ORIGIN, "*");
64+
DefaultFullHttpResponse response =
65+
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER, responseHeaders, responseHeaders);
66+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
67+
}
68+
69+
protected void writeSuccess(ChannelHandlerContext ctx, DefaultHttpHeaders responseHeaders) {
70+
DefaultFullHttpResponse response =
71+
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER, responseHeaders, responseHeaders);
72+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
73+
}
74+
75+
protected void preflight(ChannelHandlerContext ctx) {
76+
HttpHeaders responseHeaders = new DefaultHttpHeaders();
77+
responseHeaders.add(EventMeshConstants.HANDLER_ORIGIN, "*");
78+
responseHeaders.add(EventMeshConstants.HANDLER_METHODS, "*");
79+
responseHeaders.add(EventMeshConstants.HANDLER_HEADERS, "*");
80+
responseHeaders.add(EventMeshConstants.HANDLER_AGE, EventMeshConstants.MAX_AGE);
81+
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER,
82+
responseHeaders, responseHeaders);
83+
write(ctx, response);
84+
}
85+
86+
/**
87+
* Converts a query string to a map of key-value pairs.
88+
* <p>
89+
* This method takes a query string and parses it to create a map of key-value pairs, where each key and value are extracted from the query string
90+
* separated by '='.
91+
* <p>
92+
* If the query string is null, an empty map is returned.
93+
*
94+
* @param query the query string to convert to a map
95+
* @return a map containing the key-value pairs from the query string
96+
*/
97+
protected Map<String, String> queryToMap(String query) {
98+
if (query == null) {
99+
return new HashMap<>();
100+
}
101+
Map<String, String> result = new HashMap<>();
102+
for (String param : query.split("&")) {
103+
String[] entry = param.split("=");
104+
if (entry.length > 1) {
105+
result.put(entry[0], entry[1]);
106+
} else {
107+
result.put(entry[0], "");
108+
}
109+
}
110+
return result;
111+
}
112+
113+
@Override
114+
public void handle(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
115+
switch (HttpMethod.valueOf(httpCommand.getHttpMethod())) {
116+
case OPTIONS:
117+
preflight(ctx);
118+
break;
119+
case GET:
120+
get(httpCommand, ctx);
121+
break;
122+
case POST:
123+
post(httpCommand, ctx);
124+
break;
125+
case DELETE:
126+
delete(httpCommand, ctx);
127+
break;
128+
default: // do nothing
129+
break;
130+
}
131+
}
132+
133+
protected void post(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception{
134+
}
135+
136+
protected void delete(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception{
137+
}
138+
139+
protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception{
140+
}
141+
33142

34143
}
144+

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.Optional;
3232
import java.util.concurrent.ConcurrentHashMap;
3333

34-
import com.sun.net.httpserver.HttpHandler;
3534

3635
public class AdminHandlerManager {
3736

0 commit comments

Comments
 (0)