Skip to content
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.4.2-SNAPSHOT
kestraVersion=1.1.0
kestraVersion=1.2.0
34 changes: 34 additions & 0 deletions src/main/java/io/kestra/plugin/aws/cloudwatch/CloudWatchLogs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.kestra.plugin.aws.cloudwatch;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.AbstractConnection;
import io.kestra.plugin.aws.ConnectionUtils;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class CloudWatchLogs extends AbstractConnection {

/**
* Create a CloudWatchLogsClient using the standard Kestra AWS configuration
* (credentials, region, endpoint overrides, etc.).
*/
public CloudWatchLogsClient logsClient(final RunContext runContext)
throws IllegalVariableEvaluationException {

final AwsClientConfig clientConfig = awsClientConfig(runContext);

return ConnectionUtils
.configureSyncClient(clientConfig, CloudWatchLogsClient.builder())
.build();
}
}
155 changes: 102 additions & 53 deletions src/main/java/io/kestra/plugin/aws/lambda/Invoke.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
package io.kestra.plugin.aws.lambda;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;

import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Metric;
Expand All @@ -11,51 +25,45 @@
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.RetryUtils;
import io.kestra.plugin.aws.AbstractConnection;
import io.kestra.plugin.aws.ConnectionUtils;
import io.kestra.plugin.aws.cloudwatch.CloudWatchLogs;
import io.kestra.plugin.aws.lambda.Invoke.Output;
import io.kestra.plugin.aws.s3.ObjectOutput;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.FilterLogEventsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.FilteredLogEvent;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.LambdaException;
import io.kestra.core.models.tasks.retrys.Constant;
import io.kestra.core.models.tasks.retrys.Exponential;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.List;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Invoke an AWS Lambda function."
)
@Plugin(
examples = {
@Example(
title = "Invoke given Lambda function and wait for its completion.",
full = true,
code = """
@Schema(title = "Invoke an AWS Lambda function.")
@Plugin(examples = {
@Example(title = "Invoke given Lambda function and wait for its completion.", full = true, code = """
id: aws_lambda_invoke
namespace: company.team

Expand All @@ -66,12 +74,8 @@
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
functionArn: "arn:aws:lambda:eu-central-1:123456789012:function:my-function"
"""
),
@Example(
title = "Invoke given Lambda function with given payload parameters and wait for its completion. Payload is a map of items.",
full = true,
code = """
"""),
@Example(title = "Invoke given Lambda function with given payload parameters and wait for its completion. Payload is a map of items.", full = true, code = """
id: aws_lambda_invoke
namespace: company.team

Expand All @@ -86,14 +90,11 @@
id: 1
firstname: "John"
lastname: "Doe"
"""
)
},
metrics = {
""")
}, metrics = {
@Metric(name = "file.size", type = Counter.TYPE),
@Metric(name = "duration", type = Timer.TYPE)
}
)
})
@Slf4j
public class Invoke extends AbstractConnection implements RunnableTask<Output> {

Expand All @@ -103,31 +104,31 @@ public class Invoke extends AbstractConnection implements RunnableTask<Output> {
@NotNull
private Property<String> functionArn;

@Schema(
title = "Function request payload.",
description = "Request payload. It's a map of string -> object."
)
@Schema(title = "Function request payload.", description = "Request payload. It's a map of string -> object.")
private Property<Map<String, Object>> functionPayload;

@Override
public Output run(RunContext runContext) throws Exception {
final long start = System.nanoTime();
final Instant invocationStart = Instant.now().minusSeconds(5);
var functionArn = runContext.render(this.functionArn).as(String.class).orElseThrow();
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ?
null :
runContext.render(this.functionPayload).asMap(String.class, Object.class);
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ? null
: runContext.render(this.functionPayload).asMap(String.class, Object.class);
var logger = runContext.logger();

try (var lambda = client(runContext)) {
var builder = InvokeRequest.builder().functionName(functionArn);
if (requestPayload != null && requestPayload.size() > 0) {
var payload = SdkBytes.fromUtf8String(OBJECT_MAPPER.writeValueAsString(requestPayload)) ;
var payload = SdkBytes.fromUtf8String(OBJECT_MAPPER.writeValueAsString(requestPayload));
builder.payload(payload);
}
InvokeRequest request = builder.build();
// TODO take care about long-running functions: your client might disconnect during
// synchronous invocation while it waits for a response. Configure your HTTP client,
// SDK, firewall, proxy, or operating system to allow for long connections with timeout
// TODO take care about long-running functions: your client might disconnect
// during
// synchronous invocation while it waits for a response. Configure your HTTP
// client,
// SDK, firewall, proxy, or operating system to allow for long connections with
// timeout
// or keep-alive settings.
InvokeResponse res = lambda.invoke(request);
Optional<String> contentTypeHeader = res.sdkHttpResponse().firstMatchingHeader(HttpHeaders.CONTENT_TYPE);
Expand All @@ -141,13 +142,19 @@ public Output run(RunContext runContext) throws Exception {
logger.debug("Lambda {} invoked successfully", functionArn);
}
Output out = handleContent(runContext, functionArn, contentType, res.payload());
fetchAndLogLambdaLogs(runContext, functionArn, invocationStart);
runContext.metric(Timer.of("duration", Duration.ofNanos(System.nanoTime() - start)));
return out;
} catch (LambdaException e) {
throw new LambdaInvokeException("Lambda Invoke task execution failed for function: " + functionArn, e);
}
}

@VisibleForTesting
CloudWatchLogsClient getCloudWatchLogsClient(RunContext runContext) throws IllegalVariableEvaluationException {
return new CloudWatchLogs().logsClient(runContext);
}

@VisibleForTesting
LambdaClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
final AwsClientConfig clientConfig = awsClientConfig(runContext);
Expand All @@ -165,7 +172,7 @@ ContentType parseContentType(Optional<String> contentType) {
// Apply charset only if it was provided originally
return ContentType.create(known.getMimeType(), parsed.getCharset());
}
} catch(Exception cte) {
} catch (Exception cte) {
log.warn("Unable to parse Lambda response content type {}: {}", contentType.get(), cte.getMessage());
}
}
Expand All @@ -178,7 +185,8 @@ Optional<String> readError(String payload) {
try {
// Sample error from AWS could be:
// {"errorMessage": "'path'", "errorType": "KeyError", "requestId":
// "f32ff4cf-b0dc-44ec-a59a-4c5b18b836c3", "stackTrace": [" File \"/var/task/hello.py\",
// "f32ff4cf-b0dc-44ec-a59a-4c5b18b836c3", "stackTrace": [" File
// \"/var/task/hello.py\",
// line 12, in handler\n \"body\": \"Hello AWS Lambda!!! You have requested
// {}\".format(event[\"path\"])\n"]}
// TODO May be it's more resonable to return the whole payload as an error
Expand All @@ -195,6 +203,53 @@ Optional<String> readError(String payload) {
return Optional.empty();
}

@VisibleForTesting
void fetchAndLogLambdaLogs(RunContext runContext, String functionArn, Instant startTime) {
var logger = runContext.logger();
String functionName = extractFunctionName(functionArn);
String logGroupName = "/aws/lambda/" + functionName;

// Explicit retry policy: 5 attempts, 3s interval, maxInterval 3s
AbstractRetry retryPolicy = Exponential.builder()
.interval(Duration.ofSeconds(3))
.maxAttempts(5)
.maxInterval(Duration.ofSeconds(10))
.build();

try (CloudWatchLogsClient logsClient = getCloudWatchLogsClient(runContext)) {
// Explicitly specify generic type for RetryUtils
List<FilteredLogEvent> events = RetryUtils.<List<FilteredLogEvent>, Exception>of(retryPolicy, logger)
.run(
result -> result == null || result.isEmpty(),
() -> {
FilterLogEventsRequest request = FilterLogEventsRequest.builder()
.logGroupName(logGroupName)
.startTime(startTime.toEpochMilli())
.build();

var response = logsClient.filterLogEvents(request);
return response.events();
});

if (events != null) {
events.forEach(event -> logger.info("[lambda] {}", event.message().trim()));
}

} catch (Exception e) {
logger.warn("Failed to fetch CloudWatch logs for Lambda {}: {}", functionArn, e.getMessage());
}
}

@VisibleForTesting
private String extractFunctionName(String functionArnOrName) {
if (functionArnOrName.contains(":function:")) {
// Handle Full ARN
return functionArnOrName.split(":function:")[1].split(":")[0];
}
// Handle just the name
return functionArnOrName;
}

@VisibleForTesting
void handleError(String functionArn, ContentType contentType, SdkBytes payload) {
String errorPayload;
Expand Down Expand Up @@ -251,20 +306,14 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont
@Getter
public static class Output extends ObjectOutput implements io.kestra.core.models.tasks.Output {

@Schema(
title = "Response file URI."
)
@Schema(title = "Response file URI.")
private final URI uri;

@Schema(
title = "Size of the response content in bytes."
)
@Schema(title = "Size of the response content in bytes.")

private final Long contentLength;

@Schema(
title = "A standard MIME type describing the format of the content."
)
@Schema(title = "A standard MIME type describing the format of the content.")
private final String contentType;
}
}
Loading
Loading