Skip to content

Commit fd6c14e

Browse files
dlvenablechenqi0805
authored andcommitted
Improves the AWS Lambda processor's memory usage by creating fewer byte arrays when creating the request for the Lambda function. This uses the "unsafe" methods on SdkBytes. However, these are fine here because the byte[] is not accessible anywhere else for modification. This additionally adds some new unit tests to verify the actual behavior of the InMemoryBuffer as well. (opensearch-project#5547)
Signed-off-by: David Venable <[email protected]>
1 parent d8c0a29 commit fd6c14e

File tree

3 files changed

+301
-226
lines changed

3 files changed

+301
-226
lines changed

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,30 @@
77

88
import java.time.Duration;
99
import java.util.List;
10+
1011
import org.opensearch.dataprepper.model.event.Event;
1112
import org.opensearch.dataprepper.model.record.Record;
12-
import software.amazon.awssdk.core.SdkBytes;
1313
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
1414

1515
/**
1616
* A buffer can hold data before flushing it.
1717
*/
1818
public interface Buffer {
19+
long getSize();
1920

20-
long getSize();
21-
22-
int getEventCount();
23-
24-
Duration getDuration();
21+
int getEventCount();
2522

26-
InvokeRequest getRequestPayload(String functionName, String invocationType);
23+
Duration getDuration();
2724

28-
SdkBytes getPayload();
25+
InvokeRequest getRequestPayload(String functionName, String invocationType);
2926

30-
void addRecord(Record<Event> record);
27+
void addRecord(Record<Event> record);
3128

32-
List<Record<Event>> getRecords();
29+
List<Record<Event>> getRecords();
3330

34-
Duration getFlushLambdaLatencyMetric();
31+
Duration getFlushLambdaLatencyMetric();
3532

36-
Long getPayloadRequestSize();
33+
Long getPayloadRequestSize();
3734

38-
Duration stopLatencyWatch();
39-
35+
Duration stopLatencyWatch();
4036
}

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java

Lines changed: 107 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.ArrayList;
1212
import java.util.List;
1313
import java.util.concurrent.TimeUnit;
14+
1415
import org.apache.commons.lang3.time.StopWatch;
1516
import org.opensearch.dataprepper.model.codec.OutputCodec;
1617
import org.opensearch.dataprepper.model.event.Event;
@@ -27,126 +28,123 @@
2728
*/
2829
public class InMemoryBuffer implements Buffer {
2930

30-
private final ByteArrayOutputStream byteArrayOutputStream;
31-
32-
protected List<Record<Event>> records;
33-
private final StopWatch bufferWatch;
34-
private final StopWatch lambdaLatencyWatch;
35-
private final OutputCodec requestCodec;
36-
private final OutputCodecContext outputCodecContext;
37-
private final long payloadResponseSize;
38-
private int eventCount;
39-
private long payloadRequestSize;
40-
41-
42-
public InMemoryBuffer(String batchOptionKeyName) {
43-
this(batchOptionKeyName, new OutputCodecContext());
44-
}
45-
46-
public InMemoryBuffer(String batchOptionKeyName, OutputCodecContext outputCodecContext) {
47-
byteArrayOutputStream = new ByteArrayOutputStream();
48-
records = new ArrayList<>();
49-
bufferWatch = new StopWatch();
50-
bufferWatch.start();
51-
lambdaLatencyWatch = new StopWatch();
52-
eventCount = 0;
53-
payloadRequestSize = 0;
54-
payloadResponseSize = 0;
55-
// Setup request codec
56-
JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig();
57-
jsonOutputCodecConfig.setKeyName(batchOptionKeyName);
58-
requestCodec = new JsonOutputCodec(jsonOutputCodecConfig);
59-
this.outputCodecContext = outputCodecContext;
60-
}
61-
62-
@Override
63-
public void addRecord(Record<Event> record) {
64-
records.add(record);
65-
Event event = record.getData();
66-
try {
67-
if (eventCount == 0) {
68-
requestCodec.start(this.byteArrayOutputStream, event, this.outputCodecContext);
69-
}
70-
requestCodec.writeEvent(event, this.byteArrayOutputStream);
71-
} catch (IOException e) {
72-
throw new RuntimeException(e);
31+
private final ByteArrayOutputStream byteArrayOutputStream;
32+
33+
protected List<Record<Event>> records;
34+
private final StopWatch bufferWatch;
35+
private final StopWatch lambdaLatencyWatch;
36+
private final OutputCodec requestCodec;
37+
private final OutputCodecContext outputCodecContext;
38+
private final long payloadResponseSize;
39+
private int eventCount;
40+
private long payloadRequestSize;
41+
42+
43+
public InMemoryBuffer(String batchOptionKeyName) {
44+
this(batchOptionKeyName, new OutputCodecContext());
7345
}
74-
eventCount++;
75-
}
76-
77-
@Override
78-
public List<Record<Event>> getRecords() {
79-
return records;
80-
}
81-
82-
@Override
83-
public long getSize() {
84-
return byteArrayOutputStream.size();
85-
}
86-
87-
@Override
88-
public int getEventCount() {
89-
return eventCount;
90-
}
91-
92-
public Duration getDuration() {
93-
return Duration.ofMillis(bufferWatch.getTime(TimeUnit.MILLISECONDS));
94-
}
95-
96-
@Override
97-
public InvokeRequest getRequestPayload(String functionName, String invocationType) {
98-
99-
if (eventCount == 0) {
100-
//We never added any events so there is no payload
101-
return null;
46+
47+
public InMemoryBuffer(String batchOptionKeyName, OutputCodecContext outputCodecContext) {
48+
byteArrayOutputStream = new ByteArrayOutputStream(32 * 1024);
49+
records = new ArrayList<>();
50+
bufferWatch = new StopWatch();
51+
bufferWatch.start();
52+
lambdaLatencyWatch = new StopWatch();
53+
eventCount = 0;
54+
payloadRequestSize = 0;
55+
payloadResponseSize = 0;
56+
// Setup request codec
57+
JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig();
58+
jsonOutputCodecConfig.setKeyName(batchOptionKeyName);
59+
requestCodec = new JsonOutputCodec(jsonOutputCodecConfig);
60+
this.outputCodecContext = outputCodecContext;
10261
}
10362

104-
try {
105-
requestCodec.complete(this.byteArrayOutputStream);
106-
} catch (IOException e) {
107-
throw new RuntimeException(e);
63+
@Override
64+
public void addRecord(Record<Event> record) {
65+
records.add(record);
66+
Event event = record.getData();
67+
try {
68+
if (eventCount == 0) {
69+
requestCodec.start(this.byteArrayOutputStream, event, this.outputCodecContext);
70+
}
71+
requestCodec.writeEvent(event, this.byteArrayOutputStream);
72+
} catch (IOException e) {
73+
throw new RuntimeException(e);
74+
}
75+
eventCount++;
10876
}
10977

110-
SdkBytes payload = getPayload();
111-
payloadRequestSize = payload.asByteArray().length;
112-
113-
// Setup an InvokeRequest.
114-
InvokeRequest request = InvokeRequest.builder()
115-
.functionName(functionName)
116-
.payload(payload)
117-
.invocationType(invocationType)
118-
.build();
119-
120-
synchronized (this) {
121-
if (lambdaLatencyWatch.isStarted()) {
122-
lambdaLatencyWatch.reset();
123-
}
124-
lambdaLatencyWatch.start();
78+
@Override
79+
public List<Record<Event>> getRecords() {
80+
return records;
12581
}
126-
return request;
127-
}
12882

129-
public synchronized Duration stopLatencyWatch() {
130-
if (lambdaLatencyWatch.isStarted()) {
131-
lambdaLatencyWatch.stop();
83+
@Override
84+
public long getSize() {
85+
return byteArrayOutputStream.size();
13286
}
133-
long timeInMillis = lambdaLatencyWatch.getTime();
134-
return Duration.ofMillis(timeInMillis);
135-
}
13687

137-
@Override
138-
public SdkBytes getPayload() {
139-
byte[] bytes = byteArrayOutputStream.toByteArray();
140-
return SdkBytes.fromByteArray(bytes);
141-
}
88+
@Override
89+
public int getEventCount() {
90+
return eventCount;
91+
}
14292

143-
public Duration getFlushLambdaLatencyMetric() {
144-
return Duration.ofMillis(lambdaLatencyWatch.getTime(TimeUnit.MILLISECONDS));
145-
}
93+
public Duration getDuration() {
94+
return Duration.ofMillis(bufferWatch.getTime(TimeUnit.MILLISECONDS));
95+
}
14696

147-
public Long getPayloadRequestSize() {
148-
return payloadRequestSize;
149-
}
97+
@Override
98+
public InvokeRequest getRequestPayload(String functionName, String invocationType) {
99+
100+
if (eventCount == 0) {
101+
//We never added any events so there is no payload
102+
return null;
103+
}
104+
105+
try {
106+
requestCodec.complete(this.byteArrayOutputStream);
107+
} catch (IOException e) {
108+
throw new RuntimeException(e);
109+
}
110+
111+
SdkBytes payload = getPayload();
112+
payloadRequestSize = payload.asByteArrayUnsafe().length;
113+
114+
// Setup an InvokeRequest.
115+
InvokeRequest request = InvokeRequest.builder()
116+
.functionName(functionName)
117+
.payload(payload)
118+
.invocationType(invocationType)
119+
.build();
120+
121+
synchronized (this) {
122+
if (lambdaLatencyWatch.isStarted()) {
123+
lambdaLatencyWatch.reset();
124+
}
125+
lambdaLatencyWatch.start();
126+
}
127+
return request;
128+
}
150129

130+
public synchronized Duration stopLatencyWatch() {
131+
if (lambdaLatencyWatch.isStarted()) {
132+
lambdaLatencyWatch.stop();
133+
}
134+
long timeInMillis = lambdaLatencyWatch.getTime();
135+
return Duration.ofMillis(timeInMillis);
136+
}
137+
138+
private SdkBytes getPayload() {
139+
return SdkBytes.fromByteArrayUnsafe(byteArrayOutputStream.toByteArray());
140+
}
141+
142+
public Duration getFlushLambdaLatencyMetric() {
143+
return Duration.ofMillis(lambdaLatencyWatch.getTime(TimeUnit.MILLISECONDS));
144+
}
145+
146+
public Long getPayloadRequestSize() {
147+
return payloadRequestSize;
148+
}
151149
}
152150

0 commit comments

Comments
 (0)