Skip to content

Commit 9492527

Browse files
feat: Add configurable stream read constraints for JSON input codec
- Introduced a configurable option for setting the maximum event length in the JSON input codec. - Updated JsonDecoder to accept max_event_length parameter and apply it to StreamReadConstraints. - Added validation to ensure the maximum event length is within acceptable limits. Fixes #5466 Signed-off-by: Pallempati Saketh <[email protected]>
1 parent 6e6ab6e commit 9492527

File tree

7 files changed

+84
-21
lines changed

7 files changed

+84
-21
lines changed

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.fasterxml.jackson.core.JsonFactory;
99
import com.fasterxml.jackson.core.JsonParser;
1010
import com.fasterxml.jackson.core.JsonToken;
11+
import com.fasterxml.jackson.core.StreamReadConstraints;
1112
import com.fasterxml.jackson.databind.ObjectMapper;
1213

1314
import org.opensearch.dataprepper.model.event.Event;
@@ -30,10 +31,13 @@ public class JsonDecoder implements ByteDecoder {
3031
private Collection<String> includeKeys;
3132
private Collection<String> includeKeysMetadata;
3233

33-
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata) {
34+
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata, int maxEventLength) {
3435
this.keyName = keyName;
3536
this.includeKeys = includeKeys;
3637
this.includeKeysMetadata = includeKeysMetadata;
38+
jsonFactory.setStreamReadConstraints(StreamReadConstraints.builder()
39+
.maxStringLength(maxEventLength)
40+
.build());
3741
}
3842

3943
public JsonDecoder() {

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java

+60-18
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@
2222
import static org.hamcrest.CoreMatchers.equalTo;
2323
import static org.hamcrest.MatcherAssert.assertThat;
2424
import static org.junit.jupiter.api.Assertions.assertEquals;
25-
import static org.junit.jupiter.api.Assertions.assertFalse;
2625
import static org.junit.jupiter.api.Assertions.assertNotEquals;
26+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2727
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
import static org.junit.jupiter.api.Assertions.assertFalse;
29+
import static org.junit.jupiter.api.Assertions.assertThrows;
30+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
31+
2832

2933
import org.junit.jupiter.api.BeforeEach;
3034

@@ -48,32 +52,68 @@ void test_basicJsonDecoder() {
4852
String stringValue = UUID.randomUUID().toString();
4953
Random r = new Random();
5054
int intValue = r.nextInt();
51-
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
55+
String inputString = "[{\"key1\":\"" + stringValue + "\", \"key2\":" + intValue + "}]";
5256
try {
5357
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
5458
receivedRecord = record;
5559
});
56-
} catch (Exception e){}
57-
60+
} catch (Exception e) {
61+
}
62+
5863
assertNotEquals(receivedRecord, null);
5964
Map<String, Object> map = receivedRecord.getData().toMap();
6065
assertThat(map.get("key1"), equalTo(stringValue));
6166
assertThat(map.get("key2"), equalTo(intValue));
6267
}
6368

69+
@Test
70+
void test_basicJsonDecoder_exceedingMaxEventLength_throwsException() {
71+
String largeString = "x".repeat(200);
72+
String inputString = "[{\"key1\":\"" + largeString + "\"}]";
73+
74+
jsonDecoder = new JsonDecoder(null, null, null, 100);
75+
76+
Exception exception = assertThrows(Exception.class, () -> {
77+
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
78+
receivedRecord = record;
79+
});
80+
});
81+
82+
assertEquals("String value length (200) exceeds the maximum allowed (100, from `StreamReadConstraints.getMaxStringLength()`)", exception.getMessage());
83+
}
84+
85+
@Test
86+
void test_basicJsonDecoder_withMaxEventLength() {
87+
String validString = "Short string";
88+
String inputString = "[{\"key1\":\"" + validString + "\"}]";
89+
90+
jsonDecoder = new JsonDecoder(null, null, null, 100);
91+
92+
assertDoesNotThrow(() -> {
93+
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
94+
receivedRecord = record;
95+
});
96+
});
97+
98+
assertNotNull(receivedRecord);
99+
Map<String, Object> map = receivedRecord.getData().toMap();
100+
assertThat(map.get("key1"), equalTo(validString));
101+
}
102+
64103
@Test
65104
void test_basicJsonDecoder_withTimeReceived() {
66105
String stringValue = UUID.randomUUID().toString();
67106
Random r = new Random();
68107
int intValue = r.nextInt();
69-
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
108+
String inputString = "[{\"key1\":\"" + stringValue + "\", \"key2\":" + intValue + "}]";
70109
final Instant now = Instant.now();
71110
try {
72111
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> {
73112
receivedRecord = record;
74113
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
75114
});
76-
} catch (Exception e){}
115+
} catch (Exception e) {
116+
}
77117

78118
assertNotEquals(receivedRecord, null);
79119
Map<String, Object> map = receivedRecord.getData().toMap();
@@ -91,21 +131,23 @@ class JsonDecoderWithInputConfig {
91131
private static final int numKeyPerRecord = 3;
92132
private Map<String, Object> jsonObject;
93133
private final String key_name = "logEvents";
134+
private final Integer maxEventLength = 20000000;
94135

95136
@BeforeEach
96137
void setup() {
97138
objectMapper = new ObjectMapper();
98-
for (int i=0; i<10; i++) {
139+
for (int i = 0; i < 10; i++) {
99140
includeKeys.add(UUID.randomUUID().toString());
100141
includeMetadataKeys.add(UUID.randomUUID().toString());
101142
}
102143
jsonObject = generateJsonWithSpecificKeys(includeKeys, includeMetadataKeys, key_name, numKeyRecords, numKeyPerRecord);
103144
}
145+
104146
@Test
105147
void test_basicJsonDecoder_withInputConfig() throws IOException {
106148
final Instant now = Instant.now();
107149
List<Record<Event>> records = new ArrayList<>();
108-
jsonDecoder = new JsonDecoder(key_name, includeKeys, includeMetadataKeys);
150+
jsonDecoder = new JsonDecoder(key_name, includeKeys, includeMetadataKeys, maxEventLength);
109151
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
110152
records.add(record);
111153
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -118,10 +160,10 @@ void test_basicJsonDecoder_withInputConfig() throws IOException {
118160
Map<String, Object> dataMap = record.getData().toMap();
119161
Map<String, Object> metadataMap = record.getData().getMetadata().getAttributes();
120162

121-
for (String includeKey: includeKeys) {
163+
for (String includeKey : includeKeys) {
122164
assertThat(dataMap.get(includeKey), equalTo(jsonObject.get(includeKey)));
123165
}
124-
for (String includeMetadataKey: includeMetadataKeys) {
166+
for (String includeMetadataKey : includeMetadataKeys) {
125167
assertThat(metadataMap.get(includeMetadataKey), equalTo(jsonObject.get(includeMetadataKey)));
126168
}
127169
});
@@ -133,7 +175,7 @@ void test_basicJsonDecoder_withInputConfig() throws IOException {
133175
void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_metadata_keys() throws IOException {
134176
final Instant now = Instant.now();
135177
List<Record<Event>> records = new ArrayList<>();
136-
jsonDecoder = new JsonDecoder("", includeKeys, Collections.emptyList());
178+
jsonDecoder = new JsonDecoder("", includeKeys, Collections.emptyList(), maxEventLength);
137179
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
138180
records.add(record);
139181
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -145,7 +187,7 @@ void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_metadata_keys() t
145187
void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_metadata_keys() throws IOException {
146188
final Instant now = Instant.now();
147189
List<Record<Event>> records = new ArrayList<>();
148-
jsonDecoder = new JsonDecoder("", includeKeys, null);
190+
jsonDecoder = new JsonDecoder("", includeKeys, null, maxEventLength);
149191
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
150192
records.add(record);
151193
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -158,7 +200,7 @@ void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_metadata_k
158200
void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_include_keys() throws IOException {
159201
final Instant now = Instant.now();
160202
List<Record<Event>> records = new ArrayList<>();
161-
jsonDecoder = new JsonDecoder("", Collections.emptyList(), includeMetadataKeys);
203+
jsonDecoder = new JsonDecoder("", Collections.emptyList(), includeMetadataKeys, maxEventLength);
162204
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
163205
records.add(record);
164206
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -170,7 +212,7 @@ void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_include_keys() th
170212
void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_keys() throws IOException {
171213
final Instant now = Instant.now();
172214
List<Record<Event>> records = new ArrayList<>();
173-
jsonDecoder = new JsonDecoder("", null, includeMetadataKeys);
215+
jsonDecoder = new JsonDecoder("", null, includeMetadataKeys, maxEventLength);
174216
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
175217
records.add(record);
176218
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -187,17 +229,17 @@ private Map<String, Object> generateJsonWithSpecificKeys(final List<String> incl
187229
final Map<String, Object> jsonObject = new LinkedHashMap<>();
188230
final List<Map<String, Object>> innerObjects = new ArrayList<>();
189231

190-
for (String includeKey: includeKeys) {
232+
for (String includeKey : includeKeys) {
191233
jsonObject.put(includeKey, UUID.randomUUID().toString());
192234
}
193235

194-
for (String includeMetadataKey: includeMetadataKeys) {
236+
for (String includeMetadataKey : includeMetadataKeys) {
195237
jsonObject.put(includeMetadataKey, UUID.randomUUID().toString());
196238
}
197239

198-
for (int i=0; i<numKeyRecords; i++) {
240+
for (int i = 0; i < numKeyRecords; i++) {
199241
final Map<String, Object> innerJsonMap = new LinkedHashMap<>();
200-
for (int j=0; j<numKeyPerRecord; j++) {
242+
for (int j = 0; j < numKeyPerRecord; j++) {
201243
innerJsonMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
202244
}
203245
innerObjects.add(innerJsonMap);

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ public class JsonInputCodec extends JsonDecoder implements InputCodec {
2525

2626
@DataPrepperPluginConstructor
2727
public JsonInputCodec(final JsonInputCodecConfig config) {
28-
super(Objects.requireNonNull(config).getKeyName(), config.getIncludeKeys(), config.getIncludeKeysMetadata());
28+
super(Objects.requireNonNull(config).getKeyName(), config.getIncludeKeys(), config.getIncludeKeysMetadata(), config.getMaxEventLength());
2929
}
3030

3131
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
3232
parse(inputStream, null, eventConsumer);
3333
}
34-
3534
}

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfig.java

+10
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
package org.opensearch.dataprepper.plugins.codec.json;
1212

1313
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.constraints.Min;
1415
import jakarta.validation.constraints.Size;
1516

1617
import java.util.List;
1718

1819
public class JsonInputCodecConfig {
20+
static final Integer DEFAULT_MAX_EVENT_LENGTH = 20000000;
1921

2022
@JsonProperty("key_name")
2123
@Size(min = 1, max = 2048)
@@ -38,4 +40,12 @@ public List<String> getIncludeKeys() {
3840
public List<String> getIncludeKeysMetadata() {
3941
return includeKeysMetadata;
4042
}
43+
44+
@JsonProperty("max_event_length")
45+
@Min(1)
46+
private Integer maxEventLength = DEFAULT_MAX_EVENT_LENGTH;
47+
48+
public Integer getMaxEventLength(){
49+
return maxEventLength;
50+
}
4151
}

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.mockito.Mockito.times;
3838
import static org.mockito.Mockito.verify;
3939
import static org.mockito.Mockito.when;
40+
import static org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig.DEFAULT_MAX_EVENT_LENGTH;
4041

4142
public class JsonCodecsIT {
4243

@@ -51,6 +52,7 @@ void setUp() {
5152
when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(Collections.emptyList());
5253
when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(Collections.emptyList());
5354
when(jsonInputCodecConfig.getKeyName()).thenReturn(null);
55+
when(jsonInputCodecConfig.getMaxEventLength()).thenReturn(DEFAULT_MAX_EVENT_LENGTH);
5456
eventConsumer = mock(Consumer.class);
5557
}
5658

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfigTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77

88
import org.junit.jupiter.api.Test;
99

10+
import static org.hamcrest.CoreMatchers.equalTo;
11+
import static org.hamcrest.MatcherAssert.assertThat;
1012
import static org.junit.jupiter.api.Assertions.assertNull;
13+
import static org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig.DEFAULT_MAX_EVENT_LENGTH;
1114

1215
public class JsonInputCodecConfigTest {
1316

@@ -21,5 +24,6 @@ public void testJsonInputCodecConfig() {
2124
assertNull(jsonInputCodecConfig.getKeyName());
2225
assertNull(jsonInputCodecConfig.getIncludeKeys());
2326
assertNull(jsonInputCodecConfig.getIncludeKeysMetadata());
27+
assertThat(jsonInputCodecConfig.getMaxEventLength(), equalTo(DEFAULT_MAX_EVENT_LENGTH));
2428
}
2529
}

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static org.mockito.Mockito.verifyNoInteractions;
5555
import static org.mockito.Mockito.mock;
5656
import static org.mockito.Mockito.when;
57+
import static org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig.DEFAULT_MAX_EVENT_LENGTH;
5758

5859
class JsonInputCodecTest {
5960

@@ -68,6 +69,7 @@ void setUp() {
6869
when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(null);
6970
when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(null);
7071
when(jsonInputCodecConfig.getKeyName()).thenReturn(null);
72+
when(jsonInputCodecConfig.getMaxEventLength()).thenReturn(DEFAULT_MAX_EVENT_LENGTH);
7173
eventConsumer = mock(Consumer.class);
7274
}
7375

0 commit comments

Comments
 (0)