Skip to content

Commit 48c5bb2

Browse files
RainYuYEarthChen
andauthored
Add maxMessageSize config (#15781)
Add maxMessageSize config --------- Co-authored-by: earthchen <[email protected]>
1 parent 763e8bb commit 48c5bb2

File tree

6 files changed

+90
-0
lines changed

6 files changed

+90
-0
lines changed

dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public class TripleConfig implements Serializable {
4141
public static final int DEFAULT_CONNECTION_INITIAL_WINDOW_SIZE_KEY = 65_536;
4242
public static final int DEFAULT_MAX_FRAME_SIZE = 8_388_608;
4343
public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 32_768;
44+
public static final int DEFAULT_MAX_MESSAGE_SIZE = 50 * 1024 * 1024;
45+
46+
public static final String H2_SETTINGS_MAX_MESSAGE_SIZE_KEY = "dubbo.protocol.triple.max-message-size";
4447

4548
/**
4649
* Whether enable verbose mode.
@@ -143,6 +146,11 @@ public class TripleConfig implements Serializable {
143146
*/
144147
private Integer maxHeaderListSize;
145148

149+
/**
150+
* Maximum message size.
151+
*/
152+
private Integer maxMessageSize;
153+
146154
@Nested
147155
private RestConfig rest;
148156

@@ -334,6 +342,19 @@ public void setMaxHeaderListSize(Integer maxHeaderListSize) {
334342
this.maxHeaderListSize = maxHeaderListSize;
335343
}
336344

345+
public Integer getMaxMessageSize() {
346+
return maxMessageSize;
347+
}
348+
349+
@Parameter(excluded = true, key = H2_SETTINGS_MAX_MESSAGE_SIZE_KEY)
350+
public int getMaxMessageSizeOrDefault() {
351+
return maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : maxMessageSize;
352+
}
353+
354+
public void setMaxMessageSize(Integer maxMessageSize) {
355+
this.maxMessageSize = maxMessageSize;
356+
}
357+
337358
public RestConfig getRest() {
338359
return rest;
339360
}

dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@
1616
*/
1717
package org.apache.dubbo.remoting.http12.message;
1818

19+
import org.apache.dubbo.common.config.Configuration;
20+
import org.apache.dubbo.common.config.ConfigurationUtils;
1921
import org.apache.dubbo.remoting.http12.CompositeInputStream;
2022
import org.apache.dubbo.remoting.http12.exception.DecodeException;
23+
import org.apache.dubbo.rpc.Constants;
24+
import org.apache.dubbo.rpc.RpcException;
25+
import org.apache.dubbo.rpc.model.ApplicationModel;
2126

2227
import java.io.ByteArrayInputStream;
2328
import java.io.IOException;
@@ -43,6 +48,8 @@ public class LengthFieldStreamingDecoder implements StreamingDecoder {
4348

4449
private final int lengthFieldLength;
4550

51+
private final int maxMessageSize;
52+
4653
private int requiredLength;
4754

4855
public LengthFieldStreamingDecoder() {
@@ -57,6 +64,8 @@ public LengthFieldStreamingDecoder(int lengthFieldOffset, int lengthFieldLength)
5764
this.lengthFieldOffset = lengthFieldOffset;
5865
this.lengthFieldLength = lengthFieldLength;
5966
this.requiredLength = lengthFieldOffset + lengthFieldLength;
67+
Configuration conf = ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
68+
this.maxMessageSize = conf.getInt(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
6069
}
6170

6271
@Override
@@ -150,6 +159,14 @@ private void processHeader() throws IOException {
150159
ignore = accumulate.read(lengthBytes);
151160
requiredLength = bytesToInt(lengthBytes);
152161

162+
// Validate bounds
163+
if (requiredLength < 0) {
164+
throw new RpcException("Invalid message length: " + requiredLength);
165+
}
166+
if (requiredLength > maxMessageSize) {
167+
throw new RpcException(String.format("Message size %d exceeds limit %d", requiredLength, maxMessageSize));
168+
}
169+
153170
// Continue reading the frame body.
154171
state = DecodeState.PAYLOAD;
155172
}

dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ public interface Constants {
109109
String H2_SETTINGS_BUILTIN_SERVICE_INIT = "dubbo.tri.builtin.service.init";
110110

111111
String H2_SETTINGS_JSON_FRAMEWORK_NAME = "dubbo.protocol.triple.rest.json-framework";
112+
113+
String H2_SETTINGS_MAX_MESSAGE_SIZE = "dubbo.protocol.triple.max-message-size";
114+
112115
String H2_SETTINGS_DISALLOWED_CONTENT_TYPES = "dubbo.protocol.triple.rest.disallowed-content-types";
113116
String H2_SETTINGS_OPENAPI_PREFIX = "dubbo.protocol.triple.rest.openapi";
114117

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Bzip2.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
*/
1717
package org.apache.dubbo.rpc.protocol.tri.compressor;
1818

19+
import org.apache.dubbo.common.config.Configuration;
20+
import org.apache.dubbo.common.config.ConfigurationUtils;
21+
import org.apache.dubbo.rpc.Constants;
1922
import org.apache.dubbo.rpc.RpcException;
23+
import org.apache.dubbo.rpc.model.ApplicationModel;
2024

2125
import java.io.ByteArrayInputStream;
2226
import java.io.IOException;
@@ -35,11 +39,18 @@ public class Bzip2 implements Compressor, DeCompressor {
3539

3640
public static final String BZIP2 = "bzip2";
3741

42+
private final int maxMessageSize;
43+
3844
@Override
3945
public String getMessageEncoding() {
4046
return BZIP2;
4147
}
4248

49+
public Bzip2() {
50+
Configuration conf = ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
51+
this.maxMessageSize = conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
52+
}
53+
4354
@Override
4455
public byte[] compress(byte[] payloadByteArr) throws RpcException {
4556
if (null == payloadByteArr || 0 == payloadByteArr.length) {
@@ -77,10 +88,16 @@ public byte[] decompress(byte[] payloadByteArr) {
7788
ByteArrayOutputStream out = new ByteArrayOutputStream();
7889
ByteArrayInputStream in = new ByteArrayInputStream(payloadByteArr);
7990
try {
91+
int totalBytesRead = 0;
8092
BZip2CompressorInputStream unZip = new BZip2CompressorInputStream(in);
8193
byte[] buffer = new byte[2048];
8294
int n;
8395
while ((n = unZip.read(buffer)) >= 0) {
96+
totalBytesRead += n;
97+
if (totalBytesRead > maxMessageSize) {
98+
throw new RpcException("Decompressed message size " + totalBytesRead
99+
+ " exceeds the maximum configured message size " + maxMessageSize);
100+
}
84101
out.write(buffer, 0, n);
85102
}
86103
} catch (Exception e) {

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Gzip.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
*/
1717
package org.apache.dubbo.rpc.protocol.tri.compressor;
1818

19+
import org.apache.dubbo.common.config.Configuration;
20+
import org.apache.dubbo.common.config.ConfigurationUtils;
21+
import org.apache.dubbo.rpc.Constants;
1922
import org.apache.dubbo.rpc.RpcException;
23+
import org.apache.dubbo.rpc.model.ApplicationModel;
2024

2125
import java.io.ByteArrayInputStream;
2226
import java.io.ByteArrayOutputStream;
@@ -32,6 +36,13 @@ public class Gzip implements Compressor, DeCompressor {
3236

3337
public static final String GZIP = "gzip";
3438

39+
private final int maxMessageSize;
40+
41+
public Gzip() {
42+
Configuration conf = ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
43+
this.maxMessageSize = conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
44+
}
45+
3546
@Override
3647
public String getMessageEncoding() {
3748
return GZIP;
@@ -70,10 +81,17 @@ public byte[] decompress(byte[] payloadByteArr) throws RpcException {
7081

7182
ByteArrayInputStream byteInStream = new ByteArrayInputStream(payloadByteArr);
7283
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
84+
7385
try (GZIPInputStream gzipInputStream = new GZIPInputStream(byteInStream)) {
7486
int readByteNum;
87+
int totalBytesRead = 0;
7588
byte[] bufferArr = new byte[256];
7689
while ((readByteNum = gzipInputStream.read(bufferArr)) >= 0) {
90+
totalBytesRead += readByteNum;
91+
if (totalBytesRead > maxMessageSize) {
92+
throw new RpcException("Decompressed message size " + totalBytesRead
93+
+ " exceeds the maximum configured message size " + maxMessageSize);
94+
}
7795
byteOutStream.write(bufferArr, 0, readByteNum);
7896
}
7997
} catch (Exception exception) {

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
*/
1717
package org.apache.dubbo.rpc.protocol.tri.frame;
1818

19+
import org.apache.dubbo.common.config.Configuration;
20+
import org.apache.dubbo.common.config.ConfigurationUtils;
21+
import org.apache.dubbo.rpc.Constants;
1922
import org.apache.dubbo.rpc.RpcException;
23+
import org.apache.dubbo.rpc.model.ApplicationModel;
2024
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
2125

2226
import io.netty.buffer.ByteBuf;
@@ -31,6 +35,7 @@ public class TriDecoder implements Deframer {
3135
private final CompositeByteBuf accumulate = Unpooled.compositeBuffer();
3236
private final Listener listener;
3337
private final DeCompressor decompressor;
38+
private final Integer maxMessageSize;
3439
private boolean compressedFlag;
3540
private long pendingDeliveries;
3641
private boolean inDelivery = false;
@@ -42,6 +47,8 @@ public class TriDecoder implements Deframer {
4247
private GrpcDecodeState state = GrpcDecodeState.HEADER;
4348

4449
public TriDecoder(DeCompressor decompressor, Listener listener) {
50+
Configuration conf = ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
51+
maxMessageSize = conf.getInteger(Constants.H2_SETTINGS_MAX_MESSAGE_SIZE, 50 * 1024 * 1024);
4552
this.decompressor = decompressor;
4653
this.listener = listener;
4754
}
@@ -123,6 +130,13 @@ private void processHeader() {
123130

124131
requiredLength = accumulate.readInt();
125132

133+
if (requiredLength < 0) {
134+
throw new RpcException("Invalid message length: " + requiredLength);
135+
}
136+
if (requiredLength > maxMessageSize) {
137+
throw new RpcException(String.format("Message size %d exceeds limit %d", requiredLength, maxMessageSize));
138+
}
139+
126140
// Continue reading the frame body.
127141
state = GrpcDecodeState.PAYLOAD;
128142
}

0 commit comments

Comments
 (0)