Skip to content

Commit 3c696b1

Browse files
dlvenablechenqi0805
authored andcommitted
Improves the AWS Lambda processor's memory usage by creating fewer byte arrays when handling the response from the Lambda function. This uses the existing InputStream from SdkBytes. It also converts the "null" string to UTF-8 once to perform byte[] comparison on the response. (opensearch-project#5548)
Signed-off-by: David Venable <[email protected]>
1 parent fd6c14e commit 3c696b1

File tree

1 file changed

+5
-4
lines changed
  • data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor

1 file changed

+5
-4
lines changed

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@
3939
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
4040
import software.amazon.awssdk.services.lambda.model.LambdaException;
4141

42-
import java.io.ByteArrayInputStream;
4342
import java.io.IOException;
4443
import java.io.InputStream;
44+
import java.nio.charset.StandardCharsets;
4545
import java.time.Duration;
4646
import java.util.ArrayList;
47+
import java.util.Arrays;
4748
import java.util.Collection;
4849
import java.util.Collections;
4950
import java.util.HashMap;
@@ -65,7 +66,7 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
6566
public static final String LAMBDA_RESPONSE_RECORDS_COUNTER = "lambdaResponseRecordsCounter";
6667
public static final String RECORDS_EXCEEDING_THRESHOLD = "recordsExceedingThreshold";
6768
public static final String CIRCUIT_BREAKER_TRIPS = "circuitBreakerTrips";
68-
private static final String NO_RETURN_RESPONSE = "null";
69+
private static final byte[] NO_RETURN_RESPONSE = "null".getBytes(StandardCharsets.UTF_8);
6970
private static final String EXCEEDING_PAYLOAD_LIMIT_EXCEPTION = "Status Code: 413";
7071

7172
private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class);
@@ -268,9 +269,9 @@ List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer,
268269

269270
SdkBytes payload = lambdaResponse.payload();
270271
// Considering "null" payload as empty response from lambda and not parsing it.
271-
if (!(NO_RETURN_RESPONSE.equals(payload.asUtf8String()))) {
272+
if (!(Arrays.equals(NO_RETURN_RESPONSE, payload.asByteArrayUnsafe()))) {
272273
//Convert using response codec
273-
InputStream inputStream = new ByteArrayInputStream(payload.asByteArray());
274+
InputStream inputStream = payload.asInputStream();
274275
responseCodec.parse(inputStream, record -> {
275276
Event event = record.getData();
276277
parsedEvents.add(event);

0 commit comments

Comments
 (0)