Skip to content

Commit 5ab273a

Browse files
feat(cloudwatch): display lambda logs (#681)
* feat(lambda): fetch CloudWatch logs after Lambda invocation Fetches CloudWatch Logs for AWS Lambda invocations using FilterLogEvents paginator and streams log messages into the task logger. Includes unit tests covering log retrieval and logging behavior. * fix(lambda): improve CloudWatch log polling and stabilize unit test * chore: trigger CI re-run for flaky test * Refactor fetchAndLogLambdaLogs to use RetryUtils with proper retry policy * Bump kestraVersion to 1.2.0 for RetryUtils static method * Fix annotation formatting in Invoke plugin * chore: sync with main and trigger CI for #677 * fix(lambda): stabilize CloudWatch log polling and unit tests --------- Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com>
1 parent fbca90e commit 5ab273a

File tree

4 files changed

+219
-53
lines changed

4 files changed

+219
-53
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
version=1.4.2-SNAPSHOT
2-
kestraVersion=1.1.0
2+
kestraVersion=1.1.0
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.kestra.plugin.aws.cloudwatch;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.runners.RunContext;
5+
import io.kestra.plugin.aws.AbstractConnection;
6+
import io.kestra.plugin.aws.ConnectionUtils;
7+
import lombok.EqualsAndHashCode;
8+
import lombok.Getter;
9+
import lombok.NoArgsConstructor;
10+
import lombok.ToString;
11+
import lombok.experimental.SuperBuilder;
12+
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
13+
14+
@SuperBuilder
15+
@ToString
16+
@EqualsAndHashCode
17+
@Getter
18+
@NoArgsConstructor
19+
public class CloudWatchLogs extends AbstractConnection {
20+
21+
/**
22+
* Create a CloudWatchLogsClient using the standard Kestra AWS configuration
23+
* (credentials, region, endpoint overrides, etc.).
24+
*/
25+
public CloudWatchLogsClient logsClient(final RunContext runContext)
26+
throws IllegalVariableEvaluationException {
27+
28+
final AwsClientConfig clientConfig = awsClientConfig(runContext);
29+
30+
return ConnectionUtils
31+
.configureSyncClient(clientConfig, CloudWatchLogsClient.builder())
32+
.build();
33+
}
34+
}

src/main/java/io/kestra/plugin/aws/lambda/Invoke.java

Lines changed: 106 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
package io.kestra.plugin.aws.lambda;
22

3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.io.UncheckedIOException;
6+
import java.net.URI;
7+
import java.nio.file.Files;
8+
import java.time.Duration;
9+
import java.time.Instant;
10+
import java.util.Map;
11+
import java.util.Optional;
12+
13+
import org.apache.http.HttpHeaders;
14+
import org.apache.http.entity.ContentType;
15+
316
import com.fasterxml.jackson.core.JsonProcessingException;
417
import com.fasterxml.jackson.databind.ObjectMapper;
518
import com.google.common.annotations.VisibleForTesting;
19+
620
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
721
import io.kestra.core.models.annotations.Example;
822
import io.kestra.core.models.annotations.Metric;
@@ -11,10 +25,13 @@
1125
import io.kestra.core.models.executions.metrics.Timer;
1226
import io.kestra.core.models.property.Property;
1327
import io.kestra.core.models.tasks.RunnableTask;
28+
import io.kestra.core.models.tasks.retrys.AbstractRetry;
1429
import io.kestra.core.runners.RunContext;
1530
import io.kestra.core.serializers.JacksonMapper;
31+
import io.kestra.core.utils.RetryUtils;
1632
import io.kestra.plugin.aws.AbstractConnection;
1733
import io.kestra.plugin.aws.ConnectionUtils;
34+
import io.kestra.plugin.aws.cloudwatch.CloudWatchLogs;
1835
import io.kestra.plugin.aws.lambda.Invoke.Output;
1936
import io.kestra.plugin.aws.s3.ObjectOutput;
2037
import io.swagger.v3.oas.annotations.media.Schema;
@@ -25,22 +42,17 @@
2542
import lombok.ToString;
2643
import lombok.experimental.SuperBuilder;
2744
import lombok.extern.slf4j.Slf4j;
28-
import org.apache.http.HttpHeaders;
29-
import org.apache.http.entity.ContentType;
3045
import software.amazon.awssdk.core.SdkBytes;
46+
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
47+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilterLogEventsRequest;
48+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilteredLogEvent;
3149
import software.amazon.awssdk.services.lambda.LambdaClient;
3250
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
3351
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
3452
import software.amazon.awssdk.services.lambda.model.LambdaException;
53+
import io.kestra.core.models.tasks.retrys.Exponential;
3554

36-
import java.io.File;
37-
import java.io.IOException;
38-
import java.io.UncheckedIOException;
39-
import java.net.URI;
40-
import java.nio.file.Files;
41-
import java.time.Duration;
42-
import java.util.Map;
43-
import java.util.Optional;
55+
import java.util.List;
4456

4557
@SuperBuilder
4658
@ToString
@@ -94,6 +106,7 @@
94106
@Metric(name = "duration", type = Timer.TYPE)
95107
}
96108
)
109+
97110
@Slf4j
98111
public class Invoke extends AbstractConnection implements RunnableTask<Output> {
99112

@@ -103,32 +116,28 @@ public class Invoke extends AbstractConnection implements RunnableTask<Output> {
103116
@NotNull
104117
private Property<String> functionArn;
105118

106-
@Schema(
107-
title = "Function request payload.",
108-
description = "Request payload. It's a map of string -> object."
109-
)
119+
@Schema(title = "Function request payload.", description = "Request payload. It's a map of string -> object.")
110120
private Property<Map<String, Object>> functionPayload;
111121

112122
@Override
113123
public Output run(RunContext runContext) throws Exception {
114124
final long start = System.nanoTime();
125+
final Instant invocationStart = Instant.now().minusSeconds(5);
115126
var functionArn = runContext.render(this.functionArn).as(String.class).orElseThrow();
116-
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ?
117-
null :
118-
runContext.render(this.functionPayload).asMap(String.class, Object.class);
127+
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ? null
128+
: runContext.render(this.functionPayload).asMap(String.class, Object.class);
119129
var logger = runContext.logger();
120130

121131
try (var lambda = client(runContext)) {
122132
var builder = InvokeRequest.builder().functionName(functionArn);
123133
if (requestPayload != null && requestPayload.size() > 0) {
124-
var payload = SdkBytes.fromUtf8String(OBJECT_MAPPER.writeValueAsString(requestPayload)) ;
134+
var payload = SdkBytes.fromUtf8String(OBJECT_MAPPER.writeValueAsString(requestPayload));
125135
builder.payload(payload);
126136
}
127137
InvokeRequest request = builder.build();
128138
// TODO take care about long-running functions: your client might disconnect during
129139
// synchronous invocation while it waits for a response. Configure your HTTP client,
130140
// SDK, firewall, proxy, or operating system to allow for long connections with timeout
131-
// or keep-alive settings.
132141
InvokeResponse res = lambda.invoke(request);
133142
Optional<String> contentTypeHeader = res.sdkHttpResponse().firstMatchingHeader(HttpHeaders.CONTENT_TYPE);
134143
ContentType contentType = parseContentType(contentTypeHeader);
@@ -141,13 +150,19 @@ public Output run(RunContext runContext) throws Exception {
141150
logger.debug("Lambda {} invoked successfully", functionArn);
142151
}
143152
Output out = handleContent(runContext, functionArn, contentType, res.payload());
153+
fetchAndLogLambdaLogs(runContext, functionArn, invocationStart);
144154
runContext.metric(Timer.of("duration", Duration.ofNanos(System.nanoTime() - start)));
145155
return out;
146156
} catch (LambdaException e) {
147157
throw new LambdaInvokeException("Lambda Invoke task execution failed for function: " + functionArn, e);
148158
}
149159
}
150160

161+
@VisibleForTesting
162+
CloudWatchLogsClient getCloudWatchLogsClient(RunContext runContext) throws IllegalVariableEvaluationException {
163+
return new CloudWatchLogs().logsClient(runContext);
164+
}
165+
151166
@VisibleForTesting
152167
LambdaClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
153168
final AwsClientConfig clientConfig = awsClientConfig(runContext);
@@ -165,7 +180,7 @@ ContentType parseContentType(Optional<String> contentType) {
165180
// Apply charset only if it was provided originally
166181
return ContentType.create(known.getMimeType(), parsed.getCharset());
167182
}
168-
} catch(Exception cte) {
183+
} catch (Exception cte) {
169184
log.warn("Unable to parse Lambda response content type {}: {}", contentType.get(), cte.getMessage());
170185
}
171186
}
@@ -195,6 +210,74 @@ Optional<String> readError(String payload) {
195210
return Optional.empty();
196211
}
197212

213+
@VisibleForTesting
214+
void fetchAndLogLambdaLogs(RunContext runContext, String functionArn, Instant startTime) {
215+
var logger = runContext.logger();
216+
String functionName;
217+
218+
try {
219+
functionName = extractFunctionName(functionArn);
220+
} catch (Exception e) {
221+
logger.warn("Unable to determine Lambda function name from ARN: {}", functionArn);
222+
return;
223+
}
224+
225+
String logGroupName = "/aws/lambda/" + functionName;
226+
227+
// Polling logic: 5 attempts, waiting 3 seconds between each
228+
int maxAttempts = 5;
229+
long sleepMillis = 3000;
230+
boolean logsFound = false;
231+
232+
try (CloudWatchLogsClient logsClient = getCloudWatchLogsClient(runContext)) {
233+
for (int i = 0; i < maxAttempts; i++) {
234+
FilterLogEventsRequest request = FilterLogEventsRequest.builder()
235+
.logGroupName(logGroupName)
236+
.startTime(startTime.toEpochMilli())
237+
.build();
238+
239+
var response = logsClient.filterLogEvents(request);
240+
var events = response.events();
241+
242+
if (events != null && !events.isEmpty()) {
243+
events.stream()
244+
.limit(1000)
245+
.map(FilteredLogEvent::message)
246+
.filter(message -> message != null && !message.isBlank())
247+
.forEach(message -> logger.info("[lambda] {}", message.trim()));
248+
249+
logsFound = true;
250+
break; // Exit early if found logs
251+
}
252+
253+
// Wait before next retry, but not after the last attempt
254+
if (i < maxAttempts - 1) {
255+
Thread.sleep(sleepMillis);
256+
}
257+
}
258+
259+
if (!logsFound) {
260+
logger.debug("No CloudWatch logs found for {} after {} attempts.", functionName, maxAttempts);
261+
}
262+
263+
} catch (InterruptedException e) {
264+
Thread.currentThread().interrupt();
265+
logger.warn("Log fetching interrupted for Lambda {}", functionArn);
266+
} catch (Exception e) {
267+
logger.warn("Failed to fetch CloudWatch logs for Lambda {}: {}", functionArn, e.getMessage());
268+
}
269+
}
270+
271+
@VisibleForTesting
272+
private String extractFunctionName(String functionArnOrName) {
273+
if (functionArnOrName.contains(":function:")) {
274+
// Handle Full ARN
275+
return functionArnOrName.split(":function:")[1].split(":")[0];
276+
}
277+
// Handle just the name
278+
return functionArnOrName;
279+
}
280+
198281
@VisibleForTesting
199282
void handleError(String functionArn, ContentType contentType, SdkBytes payload) {
200283
String errorPayload;
@@ -251,20 +334,14 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont
251334
@Getter
252335
public static class Output extends ObjectOutput implements io.kestra.core.models.tasks.Output {
253336

254-
@Schema(
255-
title = "Response file URI."
256-
)
337+
@Schema(title = "Response file URI.")
257338
private final URI uri;
258339

259-
@Schema(
260-
title = "Size of the response content in bytes."
261-
)
340+
@Schema(title = "Size of the response content in bytes.")
262341

263342
private final Long contentLength;
264343

265-
@Schema(
266-
title = "A standard MIME type describing the format of the content."
267-
)
344+
@Schema(title = "A standard MIME type describing the format of the content.")
268345
private final String contentType;
269346
}
270347
}

0 commit comments

Comments
 (0)