Skip to content

Commit affa723

Browse files
authored
[ISSUE apache#4701]Fix use tcp protocol client send message, it throw a DecoderException (apache#4702)
* [ISSUE apache#4701]fix use tcp protocol client send message, it throw a DecoderException * optmize code * refactor with LengthFieldBasedFrameDecoder * fix code style * optimize code * fix log print * optimize code and add some comments
1 parent d3d0299 commit affa723

File tree

3 files changed

+59
-42
lines changed
  • eventmesh-common/src
    • main/java/org/apache/eventmesh/common/protocol/tcp/codec
    • test/java/org/apache/eventmesh/common/protocol/tcp/codec
  • eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common

3 files changed

+59
-42
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,40 +31,28 @@
3131
import org.apache.commons.lang3.StringUtils;
3232

3333
import java.util.Arrays;
34-
import java.util.List;
3534

3635
import io.netty.buffer.ByteBuf;
3736
import io.netty.channel.ChannelHandlerContext;
38-
import io.netty.handler.codec.ByteToMessageCodec;
37+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
3938
import io.netty.handler.codec.MessageToByteEncoder;
40-
import io.netty.handler.codec.ReplayingDecoder;
41-
4239

4340
import com.fasterxml.jackson.core.JsonProcessingException;
4441
import com.google.common.base.Preconditions;
4542

4643
import lombok.extern.slf4j.Slf4j;
4744

4845
@Slf4j
49-
public class Codec extends ByteToMessageCodec<Package> {
46+
public class Codec {
5047

5148
private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4;
5249

5350
private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh");
5451
private static final byte[] VERSION = serializeBytes("0000");
5552

56-
private Encoder encoder = new Encoder();
57-
private Decoder decoder = new Decoder();
53+
private static final int PREFIX_LENGTH = CONSTANT_MAGIC_FLAG.length + VERSION.length; //13
5854

59-
@Override
60-
protected void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception {
61-
encoder.encode(ctx, pkg, out);
62-
}
63-
64-
@Override
65-
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
66-
decoder.decode(ctx, in, out);
67-
}
55+
private static final int PACKAGE_BYTES_FIELD_LENGTH = 4;
6856

6957
public static class Encoder extends MessageToByteEncoder<Package> {
7058

@@ -89,7 +77,7 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E
8977
int headerLength = ArrayUtils.getLength(headerData);
9078
int bodyLength = ArrayUtils.getLength(bodyData);
9179

92-
final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length + headerLength + bodyLength;
80+
final int length = PREFIX_LENGTH + headerLength + bodyLength;
9381

9482
if (length > FRAME_MAX_LENGTH) {
9583
throw new IllegalArgumentException("message size is exceed limit!");
@@ -116,31 +104,62 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E
116104
}
117105
}
118106

119-
public static class Decoder extends ReplayingDecoder<Package> {
107+
public static class Decoder extends LengthFieldBasedFrameDecoder {
108+
109+
public Decoder() {
110+
/**
111+
* lengthAdjustment value = -9 explain:
112+
* Header + Body, Format:
113+
* <pre>
114+
* ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
115+
* │ MAGIC_FLAG │ VERSION │ package length │ Header length │ Header │ body │
116+
* │ (9bytes) │ (4bytes) │ (4bytes) │ (4bytes) │ (header bytes) │ (body bytes) │
117+
* └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
118+
* </pre>
119+
* package length = MAGIC_FLAG + VERSION + Header length + Body length,Currently,
120+
* adding MAGIC_FLAG + VERSION + package length field (4 bytes) actually adds 17 bytes.
121+
* However, the value of the package length field is only reduced by the four bytes of
122+
* the package length field itself and the four bytes of the header length field.
123+
* Therefore, the compensation value to be added to the length field value is -9,
124+
* which means subtracting the extra 9 bytes.
125+
* Refer to the encoding in the {@link Encoder}
126+
*/
127+
super(FRAME_MAX_LENGTH, PREFIX_LENGTH, PACKAGE_BYTES_FIELD_LENGTH, -9, 0);
128+
}
120129

121130
@Override
122-
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
131+
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
132+
133+
ByteBuf target = null;
134+
123135
try {
124-
if (null == in) {
125-
return;
136+
target = (ByteBuf) super.decode(ctx, in);
137+
if (null == target) {
138+
return null;
126139
}
127-
128-
byte[] flagBytes = parseFlag(in);
129-
byte[] versionBytes = parseVersion(in);
140+
byte[] flagBytes = parseFlag(target);
141+
byte[] versionBytes = parseVersion(target);
130142
validateFlag(flagBytes, versionBytes, ctx);
131143

132-
final int length = in.readInt();
133-
final int headerLength = in.readInt();
134-
final int bodyLength = length - CONSTANT_MAGIC_FLAG.length - VERSION.length - headerLength;
135-
Header header = parseHeader(in, headerLength);
136-
Object body = parseBody(in, header, bodyLength);
144+
final int length = target.readInt();
145+
final int headerLength = target.readInt();
146+
final int bodyLength = length - PREFIX_LENGTH - headerLength;
147+
Header header = parseHeader(target, headerLength);
148+
Object body = parseBody(target, header, bodyLength);
137149

138150
Package pkg = new Package(header, body);
139-
out.add(pkg);
140-
} catch (Exception e) {
141-
log.error("decode error| received data: {}.", deserializeBytes(in.array()), e);
142-
throw e;
151+
return pkg;
152+
153+
} catch (Exception ex) {
154+
log.error("decode error", ex);
155+
ctx.channel().close();
156+
} finally {
157+
if (target != null) {
158+
target.release();
159+
}
143160
}
161+
162+
return null;
144163
}
145164

146165
private byte[] parseFlag(ByteBuf in) {

eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.eventmesh.common.protocol.tcp.Command;
2121
import org.apache.eventmesh.common.protocol.tcp.Header;
2222
import org.apache.eventmesh.common.protocol.tcp.Package;
23-
24-
import java.util.ArrayList;
23+
import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Decoder;
24+
import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Encoder;
2525

2626
import org.junit.jupiter.api.Assertions;
2727
import org.junit.jupiter.api.Test;
@@ -37,14 +37,12 @@ public void testCodec() throws Exception {
3737
header.setCmd(Command.HELLO_REQUEST);
3838
Package testP = new Package(header);
3939
testP.setBody(new Object());
40-
Codec.Encoder ce = new Codec.Encoder();
40+
Encoder ce = new Codec.Encoder();
4141
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
4242
ce.encode(null, testP, buf);
43-
Codec.Decoder cd = new Codec.Decoder();
44-
ArrayList<Object> result = new ArrayList<>();
45-
cd.decode(null, buf, result);
46-
Assertions.assertNotNull(result.get(0));
47-
Assertions.assertEquals(testP.getHeader(), ((Package) result.get(0)).getHeader());
43+
Decoder cd = new Codec.Decoder();
44+
final Package decode = (Package) cd.decode(null, buf);
45+
Assertions.assertEquals(testP.getHeader(), decode.getHeader());
4846
}
4947

5048
}

eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ protected synchronized void open(SimpleChannelInboundHandler<Package> handler) t
110110

111111
@Override
112112
public void initChannel(SocketChannel ch) {
113-
ch.pipeline().addLast(new Codec())
113+
ch.pipeline().addLast(new Codec.Encoder(), new Codec.Decoder())
114114
.addLast(handler, newExceptionHandler());
115115
}
116116
});

0 commit comments

Comments
 (0)