Skip to content

Commit 5347078

Browse files
Truncate Processor: Add support to truncate all fields in an event (#4317) (#4326)
Truncate Processor: Add support to truncate all fields in an event Signed-off-by: Krishna Kondaka <[email protected]> Co-authored-by: Krishna Kondaka <[email protected]> (cherry picked from commit 8a7132d) Co-authored-by: Krishna Kondaka <[email protected]>
1 parent bfef5e8 commit 5347078

File tree

3 files changed

+85
-26
lines changed

3 files changed

+85
-26
lines changed

data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java

+37-16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.Map;
2223

2324
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
2425

@@ -41,14 +42,38 @@ public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProces
4142
}
4243

4344
private String getTruncatedValue(final String value, final int startIndex, final Integer length) {
44-
String truncatedValue =
45-
(length == null || startIndex+length >= value.length()) ?
46-
value.substring(startIndex) :
45+
String truncatedValue =
46+
(length == null || startIndex+length >= value.length()) ?
47+
value.substring(startIndex) :
4748
value.substring(startIndex, startIndex + length);
4849

4950
return truncatedValue;
5051
}
5152

53+
private void truncateKey(Event event, String key, Object value, TruncateProcessorConfig.Entry entryConfig) {
54+
final boolean recurse = entryConfig.getRecurse();
55+
final int startIndex = entryConfig.getStartAt() == null ? 0 : entryConfig.getStartAt();
56+
final Integer length = entryConfig.getLength();
57+
if (value instanceof String) {
58+
event.put(key, getTruncatedValue((String) value, startIndex, length));
59+
} else if (value instanceof List) {
60+
List<Object> result = new ArrayList<>();
61+
for (Object listItem : (List) value) {
62+
if (listItem instanceof String) {
63+
result.add(getTruncatedValue((String) listItem, startIndex, length));
64+
} else {
65+
result.add(listItem);
66+
}
67+
}
68+
event.put(key, result);
69+
} else if (recurse && (value instanceof Map)) {
70+
Map<String, Object> valueMap = (Map<String, Object>)value;
71+
for (Map.Entry<String, Object> mapEntry: valueMap.entrySet()) {
72+
truncateKey(event, key+"/"+mapEntry.getKey(), mapEntry.getValue(), entryConfig);
73+
}
74+
}
75+
}
76+
5277
@Override
5378
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
5479
for(final Record<Event> record : records) {
@@ -58,30 +83,26 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
5883
for (TruncateProcessorConfig.Entry entry : entries) {
5984
final List<String> sourceKeys = entry.getSourceKeys();
6085
final String truncateWhen = entry.getTruncateWhen();
86+
final boolean recurse = entry.getRecurse();
6187
final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt();
6288
final Integer length = entry.getLength();
6389
if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) {
6490
continue;
6591
}
92+
if (sourceKeys == null) {
93+
for (Map.Entry<String, Object> mapEntry: recordEvent.toMap().entrySet()) {
94+
truncateKey(recordEvent, mapEntry.getKey(), mapEntry.getValue(), entry);
95+
}
96+
continue;
97+
}
98+
6699
for (String sourceKey : sourceKeys) {
67100
if (!recordEvent.containsKey(sourceKey)) {
68101
continue;
69102
}
70103

71104
final Object value = recordEvent.get(sourceKey, Object.class);
72-
if (value instanceof String) {
73-
recordEvent.put(sourceKey, getTruncatedValue((String) value, startIndex, length));
74-
} else if (value instanceof List) {
75-
List<Object> result = new ArrayList<>();
76-
for (Object listItem : (List) value) {
77-
if (listItem instanceof String) {
78-
result.add(getTruncatedValue((String) listItem, startIndex, length));
79-
} else {
80-
result.add(listItem);
81-
}
82-
}
83-
recordEvent.put(sourceKey, result);
84-
}
105+
truncateKey(recordEvent, sourceKey, value, entry);
85106
}
86107
}
87108
} catch (final Exception e) {

data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
public class TruncateProcessorConfig {
1717
public static class Entry {
18-
@NotEmpty
19-
@NotNull
2018
@JsonProperty("source_keys")
2119
private List<String> sourceKeys;
2220

@@ -26,14 +24,18 @@ public static class Entry {
2624
@JsonProperty("length")
2725
private Integer length;
2826

27+
@JsonProperty("recursive")
28+
private Boolean recurse = false;
29+
2930
@JsonProperty("truncate_when")
3031
private String truncateWhen;
3132

32-
public Entry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) {
33+
public Entry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen, final Boolean recurse) {
3334
this.sourceKeys = sourceKeys;
3435
this.startAt = startAt;
3536
this.length = length;
3637
this.truncateWhen = truncateWhen;
38+
this.recurse = recurse;
3739
}
3840

3941
public Entry() {}
@@ -46,6 +48,10 @@ public Integer getStartAt() {
4648
return startAt;
4749
}
4850

51+
public Boolean getRecurse() {
52+
return recurse;
53+
}
54+
4955
public Integer getLength() {
5056
return length;
5157
}
@@ -54,7 +60,7 @@ public String getTruncateWhen() {
5460
return truncateWhen;
5561
}
5662

57-
@AssertTrue(message = "source_keys must be specified and at least one of start_at or length or both must be specified and the values must be positive integers")
63+
@AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers")
5864
public boolean isValidConfig() {
5965
if (length == null && startAt == null) {
6066
return false;

data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java

+38-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.params.provider.ArgumentsSource;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import com.google.common.collect.ImmutableMap;
2324

2425
import java.util.Collections;
2526
import java.util.HashMap;
@@ -53,7 +54,7 @@ private TruncateProcessor createObjectUnderTest() {
5354
@ArgumentsSource(TruncateArgumentsProvider.class)
5455
void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) {
5556

56-
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null)));
57+
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null, false)));
5758
final TruncateProcessor truncateProcessor = createObjectUnderTest();
5859
final Record<Event> record = createEvent("message", messageValue);
5960
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateProcessor.doExecute(Collections.singletonList(record));
@@ -64,8 +65,8 @@ void testTruncateProcessor(final Object messageValue, final Integer startAt, fin
6465
@ParameterizedTest
6566
@ArgumentsSource(MultipleTruncateArgumentsProvider.class)
6667
void testTruncateProcessorMultipleEntries(final Object messageValue, final Integer startAt1, final Integer truncateLength1, final Integer startAt2, final Integer truncateLength2, final Object truncatedMessage1, final Object truncatedMessage2) {
67-
TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null);
68-
TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null);
68+
TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null, false);
69+
TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null, false);
6970
when(config.getEntries()).thenReturn(List.of(entry1, entry2));
7071
final Record<Event> record1 = createEvent("message1", messageValue);
7172
final Record<Event> record2 = createEvent("message2", messageValue);
@@ -82,7 +83,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() {
8283
final String truncateWhen = UUID.randomUUID().toString();
8384
final String message = UUID.randomUUID().toString();
8485

85-
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen)));
86+
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen, false)));
8687

8788
final TruncateProcessor truncateProcessor = createObjectUnderTest();
8889
final Record<Event> record = createEvent("message", message);
@@ -92,8 +93,32 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() {
9293
assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap()));
9394
}
9495

95-
private TruncateProcessorConfig.Entry createEntry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) {
96-
return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen);
96+
@Test
97+
void test_event_with_all_fields_truncated() {
98+
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(null, null, 5, null, false)));
99+
final TruncateProcessor truncateProcessor = createObjectUnderTest();
100+
final Record<Event> record = createEventWithMultipleKeys(Map.of("key1", "aaaaa12345", "key2", "bbbbb12345", "key3", "ccccccc12345"));
101+
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateProcessor.doExecute(Collections.singletonList(record));
102+
Event event = truncatedRecords.get(0).getData();
103+
assertThat(event.get("key1", String.class), equalTo("aaaaa"));
104+
assertThat(event.get("key2", String.class), equalTo("bbbbb"));
105+
assertThat(event.get("key3", String.class), equalTo("ccccc"));
106+
}
107+
108+
@Test
109+
void test_event_with_all_fields_truncated_recursively() {
110+
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(null, null, 5, null, true)));
111+
final TruncateProcessor truncateProcessor = createObjectUnderTest();
112+
final Record<Event> record = createEventWithMultipleKeys(ImmutableMap.of("key1", "aaaaa12345", "key2", ImmutableMap.of("key3", "bbbbb12345", "key4", ImmutableMap.of("key5", "ccccccc12345"))));
113+
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateProcessor.doExecute(Collections.singletonList(record));
114+
Event event = truncatedRecords.get(0).getData();
115+
assertThat(event.get("key1", String.class), equalTo("aaaaa"));
116+
assertThat(event.get("key2/key3", String.class), equalTo("bbbbb"));
117+
assertThat(event.get("key2/key4/key5", String.class), equalTo("ccccc"));
118+
}
119+
120+
private TruncateProcessorConfig.Entry createEntry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen, final boolean recurse) {
121+
return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen, recurse);
97122
}
98123

99124
private Record<Event> createEvent(final String key, final Object value) {
@@ -105,6 +130,13 @@ private Record<Event> createEvent(final String key, final Object value) {
105130
.build());
106131
}
107132

133+
private Record<Event> createEventWithMultipleKeys(final Map<String, Object> data) {
134+
return new Record<>(JacksonEvent.builder()
135+
.withEventType("event")
136+
.withData(data)
137+
.build());
138+
}
139+
108140
static class TruncateArgumentsProvider implements ArgumentsProvider {
109141

110142
@Override

0 commit comments

Comments
 (0)