Skip to content

Commit d6ba47f

Browse files
committed
fix(lambda): improve CloudWatch log polling and stabilize unit test
1 parent 9ffde0a commit d6ba47f

File tree

2 files changed

+72
-46
lines changed

2 files changed

+72
-46
lines changed

src/main/java/io/kestra/plugin/aws/lambda/Invoke.java

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -210,50 +210,72 @@ Optional<String> readError(String payload) {
210210
}
211211

212212
@VisibleForTesting
213-
void fetchAndLogLambdaLogs(
214-
RunContext runContext,
215-
String functionArn,
216-
Instant startTime
217-
) {
213+
void fetchAndLogLambdaLogs(RunContext runContext, String functionArn, Instant startTime) {
218214
var logger = runContext.logger();
219-
220-
// Extract function name from ARN
221215
String functionName;
216+
222217
try {
223-
functionName = functionArn.split(":function:")[1].split(":")[0];
218+
functionName = extractFunctionName(functionArn);
224219
} catch (Exception e) {
225220
logger.warn("Unable to determine Lambda function name from ARN: {}", functionArn);
226221
return;
227222
}
228223

229224
String logGroupName = "/aws/lambda/" + functionName;
230225

226+
// Polling logic: 5 attempts, waiting 3 seconds between each
227+
int maxAttempts = 5;
228+
long sleepMillis = 3000;
229+
boolean logsFound = false;
230+
231231
try (CloudWatchLogsClient logsClient = getCloudWatchLogsClient(runContext)) {
232-
FilterLogEventsRequest request = FilterLogEventsRequest.builder()
233-
.logGroupName(logGroupName)
234-
.startTime(startTime.toEpochMilli())
235-
.build();
236-
237-
// Fetch logs using CloudWatch paginator
238-
logsClient.filterLogEventsPaginator(request)
239-
.events()
240-
.stream()
241-
// Hard cap to prevent excessive log volume in a single task execution
242-
.limit(1_000)
243-
.map(FilteredLogEvent::message)
244-
.filter(message -> message != null && !message.isBlank())
245-
.forEach(message -> logger.info("[lambda] {}", message));
232+
for (int i = 0; i < maxAttempts; i++) {
233+
FilterLogEventsRequest request = FilterLogEventsRequest.builder()
234+
.logGroupName(logGroupName)
235+
.startTime(startTime.toEpochMilli())
236+
.build();
237+
238+
var response = logsClient.filterLogEvents(request);
239+
var events = response.events();
246240

241+
if (events != null && !events.isEmpty()) {
242+
events.stream()
243+
.limit(1000)
244+
.map(FilteredLogEvent::message)
245+
.filter(message -> message != null && !message.isBlank())
246+
.forEach(message -> logger.info("[lambda] {}", message.trim()));
247+
logsFound = true;
248+
break; // Exit early if found logs
249+
}
250+
251+
// Wait before next retry, but not after the last attempt
252+
if (i < maxAttempts - 1) {
253+
Thread.sleep(sleepMillis);
254+
}
255+
}
256+
257+
if (!logsFound) {
258+
logger.debug("No CloudWatch logs found for {} after {} attempts.", functionName, maxAttempts);
259+
}
260+
261+
} catch (InterruptedException e) {
262+
Thread.currentThread().interrupt();
263+
logger.warn("Log fetching interrupted for Lambda {}", functionArn);
247264
} catch (Exception e) {
248-
// Logs must never fail the task execution
249-
logger.warn(
250-
"Failed to fetch CloudWatch logs for Lambda {}: {}",
251-
functionArn,
252-
e.getMessage()
253-
);
265+
logger.warn("Failed to fetch CloudWatch logs for Lambda {}: {}", functionArn, e.getMessage());
254266
}
255267
}
256268

269+
@VisibleForTesting
270+
private String extractFunctionName(String functionArnOrName) {
271+
if (functionArnOrName.contains(":function:")) {
272+
// Handle Full ARN
273+
return functionArnOrName.split(":function:")[1].split(":")[0];
274+
}
275+
// Handle just the name
276+
return functionArnOrName;
277+
}
278+
257279
@VisibleForTesting
258280
void handleError(String functionArn, ContentType contentType, SdkBytes payload) {
259281
String errorPayload;

src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.mockito.Mockito.doReturn;
2929
import static org.mockito.Mockito.mock;
3030
import static org.mockito.Mockito.spy;
31+
import static org.mockito.Mockito.times;
3132
import static org.mockito.Mockito.verify;
3233
import org.mockito.invocation.InvocationOnMock;
3334
import org.mockito.junit.jupiter.MockitoExtension;
@@ -220,37 +221,40 @@ void givenFunctionArnNoParams_whenInvokeLambda_thenOutputWithFile(
220221
}
221222

222223
@Test
223-
void givenLambdaInvocation_whenCloudWatchLogsEnabled_thenLogsAreFetchedAndLogged() throws Exception {
224-
// Given
224+
void givenLambdaInvocation_whenLogsAreFetched_thenLogsAreLoggedCorrectly() throws Exception {
225+
// Arrange
226+
Instant startTime = Instant.parse("2026-01-15T10:00:00Z");
227+
225228
FilteredLogEvent logEvent = FilteredLogEvent.builder()
226229
.message("Hello from CloudWatch Logs")
227-
.timestamp(Instant.now().toEpochMilli())
230+
.timestamp(startTime.plusSeconds(1).toEpochMilli())
228231
.build();
229232

230-
// Mock paginator
231-
FilterLogEventsIterable paginator = mock(FilterLogEventsIterable.class);
233+
FilterLogEventsResponse response = FilterLogEventsResponse.builder()
234+
.events(List.of(logEvent))
235+
.build();
232236

233-
// Correct AWS SDK iterable for events()
234-
software.amazon.awssdk.core.pagination.sync.SdkIterable<FilteredLogEvent> eventIterable =
235-
() -> List.of(logEvent).iterator();
237+
given(logsClient.filterLogEvents(any(FilterLogEventsRequest.class)))
238+
.willReturn(response);
236239

237-
given(paginator.events()).willReturn(eventIterable);
238-
given(logsClient.filterLogEventsPaginator(any(FilterLogEventsRequest.class)))
239-
.willReturn(paginator);
240+
given(context.logger()).willReturn(logger);
240241

241-
// Spy Invoke to inject mocked logs client
242242
Invoke spyInvoke = spy(invoke);
243243
doReturn(logsClient).when(spyInvoke).getCloudWatchLogsClient(any());
244244

245-
// When
245+
// Act
246246
spyInvoke.fetchAndLogLambdaLogs(
247247
context,
248-
"arn:aws:lambda:eu-central-1:123456789012:function:test",
249-
Instant.now()
248+
"arn:aws:lambda:ap-south-1:123456789012:function:test-function",
249+
startTime
250250
);
251251

252-
// Then
253-
verify(logger).info("[lambda] {}", "Hello from CloudWatch Logs");
254-
}
252+
// Assert
253+
verify(logsClient, times(1))
254+
.filterLogEvents(any(FilterLogEventsRequest.class));
255255

256+
verify(logger)
257+
.info("[lambda] {}", "Hello from CloudWatch Logs");
258+
}
256259
}
260+

0 commit comments

Comments
 (0)