11package io .kestra .plugin .aws .lambda ;
22
3- import java .io .File ;
4- import java .io .IOException ;
5- import java .io .UncheckedIOException ;
6- import java .net .URI ;
7- import java .nio .file .Files ;
8- import java .time .Duration ;
9- import java .time .Instant ;
10- import java .util .Map ;
11- import java .util .Optional ;
12-
13- import org .apache .http .HttpHeaders ;
14- import org .apache .http .entity .ContentType ;
15-
163import com .fasterxml .jackson .core .JsonProcessingException ;
174import com .fasterxml .jackson .databind .ObjectMapper ;
185import com .google .common .annotations .VisibleForTesting ;
19-
206import io .kestra .core .exceptions .IllegalVariableEvaluationException ;
217import io .kestra .core .models .annotations .Example ;
228import io .kestra .core .models .annotations .Metric ;
2511import io .kestra .core .models .executions .metrics .Timer ;
2612import io .kestra .core .models .property .Property ;
2713import io .kestra .core .models .tasks .RunnableTask ;
28- import io .kestra .core .models .tasks .retrys .AbstractRetry ;
2914import io .kestra .core .runners .RunContext ;
3015import io .kestra .core .serializers .JacksonMapper ;
31- import io .kestra .core .utils .RetryUtils ;
3216import io .kestra .plugin .aws .AbstractConnection ;
3317import io .kestra .plugin .aws .ConnectionUtils ;
34- import io .kestra .plugin .aws .cloudwatch .CloudWatchLogs ;
3518import io .kestra .plugin .aws .lambda .Invoke .Output ;
3619import io .kestra .plugin .aws .s3 .ObjectOutput ;
3720import io .swagger .v3 .oas .annotations .media .Schema ;
4225import lombok .ToString ;
4326import lombok .experimental .SuperBuilder ;
4427import lombok .extern .slf4j .Slf4j ;
28+ import org .apache .http .HttpHeaders ;
29+ import org .apache .http .entity .ContentType ;
4530import software .amazon .awssdk .core .SdkBytes ;
46- import software .amazon .awssdk .services .cloudwatchlogs .CloudWatchLogsClient ;
47- import software .amazon .awssdk .services .cloudwatchlogs .model .FilterLogEventsRequest ;
48- import software .amazon .awssdk .services .cloudwatchlogs .model .FilteredLogEvent ;
4931import software .amazon .awssdk .services .lambda .LambdaClient ;
5032import software .amazon .awssdk .services .lambda .model .InvokeRequest ;
5133import software .amazon .awssdk .services .lambda .model .InvokeResponse ;
5234import software .amazon .awssdk .services .lambda .model .LambdaException ;
53- import io .kestra .core .models .tasks .retrys .Exponential ;
5435
55- import java .util .List ;
36+ import java .io .File ;
37+ import java .io .IOException ;
38+ import java .io .UncheckedIOException ;
39+ import java .net .URI ;
40+ import java .nio .file .Files ;
41+ import java .time .Duration ;
42+ import java .util .Map ;
43+ import java .util .Optional ;
5644
5745@ SuperBuilder
5846@ ToString
10694 @ Metric (name = "duration" , type = Timer .TYPE )
10795 }
10896)
109-
11097@ Slf4j
11198public class Invoke extends AbstractConnection implements RunnableTask <Output > {
11299
@@ -116,28 +103,32 @@ public class Invoke extends AbstractConnection implements RunnableTask<Output> {
116103 @ NotNull
117104 private Property <String > functionArn ;
118105
119- @ Schema (title = "Function request payload." , description = "Request payload. It's a map of string -> object." )
106+ @ Schema (
107+ title = "Function request payload." ,
108+ description = "Request payload. It's a map of string -> object."
109+ )
120110 private Property <Map <String , Object >> functionPayload ;
121111
122112 @ Override
123113 public Output run (RunContext runContext ) throws Exception {
124114 final long start = System .nanoTime ();
125- final Instant invocationStart = Instant .now ().minusSeconds (5 );
126115 var functionArn = runContext .render (this .functionArn ).as (String .class ).orElseThrow ();
127- var requestPayload = runContext .render (this .functionPayload ).asMap (String .class , Object .class ).isEmpty () ? null
128- : runContext .render (this .functionPayload ).asMap (String .class , Object .class );
116+ var requestPayload = runContext .render (this .functionPayload ).asMap (String .class , Object .class ).isEmpty () ?
117+ null :
118+ runContext .render (this .functionPayload ).asMap (String .class , Object .class );
129119 var logger = runContext .logger ();
130120
131121 try (var lambda = client (runContext )) {
132122 var builder = InvokeRequest .builder ().functionName (functionArn );
133123 if (requestPayload != null && requestPayload .size () > 0 ) {
134- var payload = SdkBytes .fromUtf8String (OBJECT_MAPPER .writeValueAsString (requestPayload ));
124+ var payload = SdkBytes .fromUtf8String (OBJECT_MAPPER .writeValueAsString (requestPayload )) ;
135125 builder .payload (payload );
136126 }
137127 InvokeRequest request = builder .build ();
138128 // TODO take care about long-running functions: your client might disconnect during
139129 // synchronous invocation while it waits for a response. Configure your HTTP client,
140130 // SDK, firewall, proxy, or operating system to allow for long connections with timeout
131+ // or keep-alive settings.
141132 InvokeResponse res = lambda .invoke (request );
142133 Optional <String > contentTypeHeader = res .sdkHttpResponse ().firstMatchingHeader (HttpHeaders .CONTENT_TYPE );
143134 ContentType contentType = parseContentType (contentTypeHeader );
@@ -150,19 +141,13 @@ public Output run(RunContext runContext) throws Exception {
150141 logger .debug ("Lambda {} invoked successfully" , functionArn );
151142 }
152143 Output out = handleContent (runContext , functionArn , contentType , res .payload ());
153- fetchAndLogLambdaLogs (runContext , functionArn , invocationStart );
154144 runContext .metric (Timer .of ("duration" , Duration .ofNanos (System .nanoTime () - start )));
155145 return out ;
156146 } catch (LambdaException e ) {
157147 throw new LambdaInvokeException ("Lambda Invoke task execution failed for function: " + functionArn , e );
158148 }
159149 }
160150
161- @ VisibleForTesting
162- CloudWatchLogsClient getCloudWatchLogsClient (RunContext runContext ) throws IllegalVariableEvaluationException {
163- return new CloudWatchLogs ().logsClient (runContext );
164- }
165-
166151 @ VisibleForTesting
167152 LambdaClient client (final RunContext runContext ) throws IllegalVariableEvaluationException {
168153 final AwsClientConfig clientConfig = awsClientConfig (runContext );
@@ -180,7 +165,7 @@ ContentType parseContentType(Optional<String> contentType) {
180165 // Apply charset only if it was provided originally
181166 return ContentType .create (known .getMimeType (), parsed .getCharset ());
182167 }
183- } catch (Exception cte ) {
168+ } catch (Exception cte ) {
184169 log .warn ("Unable to parse Lambda response content type {}: {}" , contentType .get (), cte .getMessage ());
185170 }
186171 }
@@ -210,53 +195,6 @@ Optional<String> readError(String payload) {
210195 return Optional .empty ();
211196 }
212197
213- @ VisibleForTesting
214- void fetchAndLogLambdaLogs (RunContext runContext , String functionArn , Instant startTime ) {
215- var logger = runContext .logger ();
216- String functionName = extractFunctionName (functionArn );
217- String logGroupName = "/aws/lambda/" + functionName ;
218-
219- // Explicit retry policy: 5 attempts, 3s interval, maxInterval 3s
220- AbstractRetry retryPolicy = Exponential .builder ()
221- .interval (Duration .ofSeconds (3 ))
222- .maxAttempts (5 )
223- .maxInterval (Duration .ofSeconds (10 ))
224- .build ();
225-
226- try (CloudWatchLogsClient logsClient = getCloudWatchLogsClient (runContext )) {
227- // Explicitly specify generic type for RetryUtils
228- List <FilteredLogEvent > events = RetryUtils .<List <FilteredLogEvent >, Exception >of (retryPolicy , logger )
229- .run (
230- result -> result == null || result .isEmpty (),
231- () -> {
232- FilterLogEventsRequest request = FilterLogEventsRequest .builder ()
233- .logGroupName (logGroupName )
234- .startTime (startTime .toEpochMilli ())
235- .build ();
236-
237- var response = logsClient .filterLogEvents (request );
238- return response .events ();
239- });
240-
241- if (events != null ) {
242- events .forEach (event -> logger .info ("[lambda] {}" , event .message ().trim ()));
243- }
244-
245- } catch (Exception e ) {
246- logger .warn ("Failed to fetch CloudWatch logs for Lambda {}: {}" , functionArn , e .getMessage ());
247- }
248- }
249-
250- @ VisibleForTesting
251- private String extractFunctionName (String functionArnOrName ) {
252- if (functionArnOrName .contains (":function:" )) {
253- // Handle Full ARN
254- return functionArnOrName .split (":function:" )[1 ].split (":" )[0 ];
255- }
256- // Handle just the name
257- return functionArnOrName ;
258- }
259-
260198 @ VisibleForTesting
261199 void handleError (String functionArn , ContentType contentType , SdkBytes payload ) {
262200 String errorPayload ;
@@ -313,14 +251,20 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont
313251 @ Getter
314252 public static class Output extends ObjectOutput implements io .kestra .core .models .tasks .Output {
315253
316- @ Schema (title = "Response file URI." )
254+ @ Schema (
255+ title = "Response file URI."
256+ )
317257 private final URI uri ;
318258
319- @ Schema (title = "Size of the response content in bytes." )
259+ @ Schema (
260+ title = "Size of the response content in bytes."
261+ )
320262
321263 private final Long contentLength ;
322264
323- @ Schema (title = "A standard MIME type describing the format of the content." )
265+ @ Schema (
266+ title = "A standard MIME type describing the format of the content."
267+ )
324268 private final String contentType ;
325269 }
326270}
0 commit comments