Skip to content

Commit a10e610

Browse files
feat(lambda): fetch CloudWatch logs after Lambda invocation (#677)
* feat(lambda): fetch CloudWatch logs after Lambda invocation Fetches CloudWatch Logs for AWS Lambda invocations using FilterLogEvents paginator and streams log messages into the task logger. Includes unit tests covering log retrieval and logging behavior. * fix(lambda): improve CloudWatch log polling and stabilize unit test * chore: trigger CI re-run for flaky test * Refactor fetchAndLogLambdaLogs to use RetryUtils with proper retry policy * Bump kestraVersion to 1.2.0 for RetryUtils static method * Fix annotation formatting in Invoke plugin --------- Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com>
1 parent c0b2617 commit a10e610

File tree

4 files changed

+194
-53
lines changed

4 files changed

+194
-53
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
version=1.4.2-SNAPSHOT
2-
kestraVersion=1.1.0
2+
kestraVersion=1.2.0
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.kestra.plugin.aws.cloudwatch;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.runners.RunContext;
5+
import io.kestra.plugin.aws.AbstractConnection;
6+
import io.kestra.plugin.aws.ConnectionUtils;
7+
import lombok.EqualsAndHashCode;
8+
import lombok.Getter;
9+
import lombok.NoArgsConstructor;
10+
import lombok.ToString;
11+
import lombok.experimental.SuperBuilder;
12+
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
13+
14+
@SuperBuilder
15+
@ToString
16+
@EqualsAndHashCode
17+
@Getter
18+
@NoArgsConstructor
19+
public class CloudWatchLogs extends AbstractConnection {
20+
21+
/**
22+
* Create a CloudWatchLogsClient using the standard Kestra AWS configuration
23+
* (credentials, region, endpoint overrides, etc.).
24+
*/
25+
public CloudWatchLogsClient logsClient(final RunContext runContext)
26+
throws IllegalVariableEvaluationException {
27+
28+
final AwsClientConfig clientConfig = awsClientConfig(runContext);
29+
30+
return ConnectionUtils
31+
.configureSyncClient(clientConfig, CloudWatchLogsClient.builder())
32+
.build();
33+
}
34+
}

src/main/java/io/kestra/plugin/aws/lambda/Invoke.java

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
package io.kestra.plugin.aws.lambda;
22

3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.io.UncheckedIOException;
6+
import java.net.URI;
7+
import java.nio.file.Files;
8+
import java.time.Duration;
9+
import java.time.Instant;
10+
import java.util.Map;
11+
import java.util.Optional;
12+
13+
import org.apache.http.HttpHeaders;
14+
import org.apache.http.entity.ContentType;
15+
316
import com.fasterxml.jackson.core.JsonProcessingException;
417
import com.fasterxml.jackson.databind.ObjectMapper;
518
import com.google.common.annotations.VisibleForTesting;
19+
620
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
721
import io.kestra.core.models.annotations.Example;
822
import io.kestra.core.models.annotations.Metric;
@@ -11,10 +25,13 @@
1125
import io.kestra.core.models.executions.metrics.Timer;
1226
import io.kestra.core.models.property.Property;
1327
import io.kestra.core.models.tasks.RunnableTask;
28+
import io.kestra.core.models.tasks.retrys.AbstractRetry;
1429
import io.kestra.core.runners.RunContext;
1530
import io.kestra.core.serializers.JacksonMapper;
31+
import io.kestra.core.utils.RetryUtils;
1632
import io.kestra.plugin.aws.AbstractConnection;
1733
import io.kestra.plugin.aws.ConnectionUtils;
34+
import io.kestra.plugin.aws.cloudwatch.CloudWatchLogs;
1835
import io.kestra.plugin.aws.lambda.Invoke.Output;
1936
import io.kestra.plugin.aws.s3.ObjectOutput;
2037
import io.swagger.v3.oas.annotations.media.Schema;
@@ -25,22 +42,17 @@
2542
import lombok.ToString;
2643
import lombok.experimental.SuperBuilder;
2744
import lombok.extern.slf4j.Slf4j;
28-
import org.apache.http.HttpHeaders;
29-
import org.apache.http.entity.ContentType;
3045
import software.amazon.awssdk.core.SdkBytes;
46+
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
47+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilterLogEventsRequest;
48+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilteredLogEvent;
3149
import software.amazon.awssdk.services.lambda.LambdaClient;
3250
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
3351
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
3452
import software.amazon.awssdk.services.lambda.model.LambdaException;
53+
import io.kestra.core.models.tasks.retrys.Exponential;
3554

36-
import java.io.File;
37-
import java.io.IOException;
38-
import java.io.UncheckedIOException;
39-
import java.net.URI;
40-
import java.nio.file.Files;
41-
import java.time.Duration;
42-
import java.util.Map;
43-
import java.util.Optional;
55+
import java.util.List;
4456

4557
@SuperBuilder
4658
@ToString
@@ -94,6 +106,7 @@
94106
@Metric(name = "duration", type = Timer.TYPE)
95107
}
96108
)
109+
97110
@Slf4j
98111
public class Invoke extends AbstractConnection implements RunnableTask<Output> {
99112

@@ -103,32 +116,28 @@ public class Invoke extends AbstractConnection implements RunnableTask<Output> {
103116
@NotNull
104117
private Property<String> functionArn;
105118

106-
@Schema(
107-
title = "Function request payload.",
108-
description = "Request payload. It's a map of string -> object."
109-
)
119+
@Schema(title = "Function request payload.", description = "Request payload. It's a map of string -> object.")
110120
private Property<Map<String, Object>> functionPayload;
111121

112122
@Override
113123
public Output run(RunContext runContext) throws Exception {
114124
final long start = System.nanoTime();
125+
final Instant invocationStart = Instant.now().minusSeconds(5);
115126
var functionArn = runContext.render(this.functionArn).as(String.class).orElseThrow();
116-
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ?
117-
null :
118-
runContext.render(this.functionPayload).asMap(String.class, Object.class);
127+
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ? null
128+
: runContext.render(this.functionPayload).asMap(String.class, Object.class);
119129
var logger = runContext.logger();
120130

121131
try (var lambda = client(runContext)) {
122132
var builder = InvokeRequest.builder().functionName(functionArn);
123133
if (requestPayload != null && requestPayload.size() > 0) {
124-
var payload = SdkBytes.fromUtf8String(OBJECT_MAPPER.writeValueAsString(requestPayload)) ;
134+
var payload = SdkBytes.fromUtf8String(OBJECT_MAPPER.writeValueAsString(requestPayload));
125135
builder.payload(payload);
126136
}
127137
InvokeRequest request = builder.build();
128138
// TODO take care about long-running functions: your client might disconnect during
129139
// synchronous invocation while it waits for a response. Configure your HTTP client,
130140
// SDK, firewall, proxy, or operating system to allow for long connections with timeout
131-
// or keep-alive settings.
132141
InvokeResponse res = lambda.invoke(request);
133142
Optional<String> contentTypeHeader = res.sdkHttpResponse().firstMatchingHeader(HttpHeaders.CONTENT_TYPE);
134143
ContentType contentType = parseContentType(contentTypeHeader);
@@ -141,13 +150,19 @@ public Output run(RunContext runContext) throws Exception {
141150
logger.debug("Lambda {} invoked successfully", functionArn);
142151
}
143152
Output out = handleContent(runContext, functionArn, contentType, res.payload());
153+
fetchAndLogLambdaLogs(runContext, functionArn, invocationStart);
144154
runContext.metric(Timer.of("duration", Duration.ofNanos(System.nanoTime() - start)));
145155
return out;
146156
} catch (LambdaException e) {
147157
throw new LambdaInvokeException("Lambda Invoke task execution failed for function: " + functionArn, e);
148158
}
149159
}
150160

161+
@VisibleForTesting
162+
CloudWatchLogsClient getCloudWatchLogsClient(RunContext runContext) throws IllegalVariableEvaluationException {
163+
return new CloudWatchLogs().logsClient(runContext);
164+
}
165+
151166
@VisibleForTesting
152167
LambdaClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
153168
final AwsClientConfig clientConfig = awsClientConfig(runContext);
@@ -165,7 +180,7 @@ ContentType parseContentType(Optional<String> contentType) {
165180
// Apply charset only if it was provided originally
166181
return ContentType.create(known.getMimeType(), parsed.getCharset());
167182
}
168-
} catch(Exception cte) {
183+
} catch (Exception cte) {
169184
log.warn("Unable to parse Lambda response content type {}: {}", contentType.get(), cte.getMessage());
170185
}
171186
}
@@ -195,6 +210,53 @@ Optional<String> readError(String payload) {
195210
return Optional.empty();
196211
}
197212

213+
@VisibleForTesting
214+
void fetchAndLogLambdaLogs(RunContext runContext, String functionArn, Instant startTime) {
215+
var logger = runContext.logger();
216+
String functionName = extractFunctionName(functionArn);
217+
String logGroupName = "/aws/lambda/" + functionName;
218+
219+
// Explicit retry policy: 5 attempts, 3s interval, maxInterval 3s
220+
AbstractRetry retryPolicy = Exponential.builder()
221+
.interval(Duration.ofSeconds(3))
222+
.maxAttempts(5)
223+
.maxInterval(Duration.ofSeconds(10))
224+
.build();
225+
226+
try (CloudWatchLogsClient logsClient = getCloudWatchLogsClient(runContext)) {
227+
// Explicitly specify generic type for RetryUtils
228+
List<FilteredLogEvent> events = RetryUtils.<List<FilteredLogEvent>, Exception>of(retryPolicy, logger)
229+
.run(
230+
result -> result == null || result.isEmpty(),
231+
() -> {
232+
FilterLogEventsRequest request = FilterLogEventsRequest.builder()
233+
.logGroupName(logGroupName)
234+
.startTime(startTime.toEpochMilli())
235+
.build();
236+
237+
var response = logsClient.filterLogEvents(request);
238+
return response.events();
239+
});
240+
241+
if (events != null) {
242+
events.forEach(event -> logger.info("[lambda] {}", event.message().trim()));
243+
}
244+
245+
} catch (Exception e) {
246+
logger.warn("Failed to fetch CloudWatch logs for Lambda {}: {}", functionArn, e.getMessage());
247+
}
248+
}
249+
250+
@VisibleForTesting
251+
private String extractFunctionName(String functionArnOrName) {
252+
if (functionArnOrName.contains(":function:")) {
253+
// Handle Full ARN
254+
return functionArnOrName.split(":function:")[1].split(":")[0];
255+
}
256+
// Handle just the name
257+
return functionArnOrName;
258+
}
259+
198260
@VisibleForTesting
199261
void handleError(String functionArn, ContentType contentType, SdkBytes payload) {
200262
String errorPayload;
@@ -251,20 +313,14 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont
251313
@Getter
252314
public static class Output extends ObjectOutput implements io.kestra.core.models.tasks.Output {
253315

254-
@Schema(
255-
title = "Response file URI."
256-
)
316+
@Schema(title = "Response file URI.")
257317
private final URI uri;
258318

259-
@Schema(
260-
title = "Size of the response content in bytes."
261-
)
319+
@Schema(title = "Size of the response content in bytes.")
262320

263321
private final Long contentLength;
264322

265-
@Schema(
266-
title = "A standard MIME type describing the format of the content."
267-
)
323+
@Schema(title = "A standard MIME type describing the format of the content.")
268324
private final String contentType;
269325
}
270326
}

src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java

Lines changed: 74 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,60 @@
11
package io.kestra.plugin.aws.lambda;
22

3-
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4-
import io.kestra.core.models.property.Property;
5-
import io.kestra.core.runners.RunContext;
6-
import io.kestra.core.runners.RunContextProperty;
7-
import io.kestra.core.runners.WorkingDir;
8-
import io.kestra.core.storages.Storage;
9-
import io.kestra.plugin.aws.lambda.Invoke.Output;
3+
import java.io.ByteArrayInputStream;
4+
import java.io.File;
5+
import java.io.IOException;
6+
import java.net.URI;
7+
import java.nio.file.Files;
8+
import java.time.Instant;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Optional;
1013
import org.apache.http.entity.ContentType;
1114
import org.junit.jupiter.api.AfterEach;
15+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertNotNull;
18+
import static org.junit.jupiter.api.Assertions.assertThrows;
19+
import static org.junit.jupiter.api.Assertions.assertTrue;
1220
import org.junit.jupiter.api.BeforeEach;
1321
import org.junit.jupiter.api.Test;
1422
import org.junit.jupiter.api.extension.ExtendWith;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.ArgumentMatchers.eq;
25+
import static org.mockito.BDDMockito.given;
1526
import org.mockito.Mock;
1627
import org.mockito.Mock.Strictness;
28+
29+
import static org.mockito.Mockito.atLeastOnce;
30+
import static org.mockito.Mockito.doReturn;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.spy;
33+
import static org.mockito.Mockito.times;
34+
import static org.mockito.Mockito.verify;
1735
import org.mockito.invocation.InvocationOnMock;
1836
import org.mockito.junit.jupiter.MockitoExtension;
1937
import org.mockito.stubbing.Answer;
2038
import org.slf4j.Logger;
39+
40+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
41+
import io.kestra.core.models.property.Property;
42+
import io.kestra.core.runners.RunContext;
43+
import io.kestra.core.runners.RunContextProperty;
44+
import io.kestra.core.runners.WorkingDir;
45+
import io.kestra.core.storages.Storage;
46+
import io.kestra.plugin.aws.lambda.Invoke.Output;
2147
import software.amazon.awssdk.core.SdkBytes;
2248
import software.amazon.awssdk.http.SdkHttpResponse;
49+
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
50+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilterLogEventsRequest;
51+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilterLogEventsResponse;
52+
import software.amazon.awssdk.services.cloudwatchlogs.model.FilteredLogEvent;
53+
import software.amazon.awssdk.services.cloudwatchlogs.paginators.FilterLogEventsIterable;
2354
import software.amazon.awssdk.services.lambda.LambdaClient;
2455
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
2556
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
2657

27-
import java.io.ByteArrayInputStream;
28-
import java.io.File;
29-
import java.io.IOException;
30-
import java.net.URI;
31-
import java.nio.file.Files;
32-
import java.util.Collections;
33-
import java.util.Map;
34-
import java.util.Optional;
35-
36-
import static org.junit.jupiter.api.Assertions.*;
37-
import static org.mockito.ArgumentMatchers.any;
38-
import static org.mockito.ArgumentMatchers.eq;
39-
import static org.mockito.BDDMockito.given;
40-
import static org.mockito.Mockito.doReturn;
41-
import static org.mockito.Mockito.spy;
42-
4358
@ExtendWith(MockitoExtension.class)
4459
public class InvokeUnitTest {
4560

@@ -48,6 +63,9 @@ public class InvokeUnitTest {
4863
@Mock(strictness = Strictness.LENIENT)
4964
private RunContext context;
5065

66+
@Mock
67+
private CloudWatchLogsClient logsClient;
68+
5169
@Mock(strictness = Strictness.LENIENT)
5270
private RunContextProperty runContextProperty;
5371

@@ -203,4 +221,37 @@ void givenFunctionArnNoParams_whenInvokeLambda_thenOutputWithFile(
203221
// Then
204222
checkOutput(data, res);
205223
}
224+
225+
@Test
226+
void givenLambdaInvocation_whenLogsAreFetched_thenLogsAreLoggedCorrectly() throws Throwable {
227+
// Arrange
228+
Instant startTime = Instant.parse("2026-01-15T10:00:00Z");
229+
230+
FilteredLogEvent logEvent = FilteredLogEvent.builder()
231+
.message("Hello from CloudWatch Logs")
232+
.timestamp(startTime.plusSeconds(1).toEpochMilli())
233+
.build();
234+
235+
FilterLogEventsResponse response = FilterLogEventsResponse.builder()
236+
.events(List.of(logEvent))
237+
.build();
238+
239+
given(logsClient.filterLogEvents(any(FilterLogEventsRequest.class)))
240+
.willReturn(response);
241+
242+
given(context.logger()).willReturn(logger);
243+
244+
Invoke spyInvoke = spy(invoke);
245+
doReturn(logsClient).when(spyInvoke).getCloudWatchLogsClient(any());
246+
247+
// Act
248+
spyInvoke.fetchAndLogLambdaLogs(
249+
context,
250+
"arn:aws:lambda:ap-south-1:123456789012:function:test-function",
251+
startTime);
252+
253+
// Assert (THIS is what really matters)
254+
verify(logger).info("[lambda] {}", "Hello from CloudWatch Logs");
255+
}
206256
}
257+

0 commit comments

Comments
 (0)