1111import io .kestra .core .models .executions .metrics .Counter ;
1212import io .kestra .core .models .executions .metrics .Timer ;
1313import io .kestra .core .models .flows .State ;
14+ import io .kestra .core .models .property .Property ;
1415import io .kestra .core .models .tasks .RunnableTask ;
1516import io .kestra .core .runners .RunContext ;
1617import io .kestra .core .serializers .FileSerde ;
8283 secretKeyId: "<secret-key>"
8384 region: "eu-central-1"
8485 streamName: "mystream"
85- records: kestra:///myfile.ion
86+ records: kestra:///myfile.ion
8687 """
8788 )
8889 }
@@ -94,28 +95,25 @@ public class PutRecords extends AbstractConnection implements RunnableTask<PutRe
9495 private static final ObjectMapper MAPPER = JacksonMapper .ofIon ()
9596 .setSerializationInclusion (JsonInclude .Include .ALWAYS );
9697
97- @ PluginProperty
9898 @ NotNull
9999 @ Schema (
100100 title = "Mark the task as failed when sending a record is unsuccessful." ,
101101 description = "If true, the task will fail when any record fails to be sent."
102102 )
103103 @ Builder .Default
104- private boolean failOnUnsuccessfulRecords = true ;
104+ private Property < Boolean > failOnUnsuccessfulRecords = Property . of ( true ) ;
105105
106- @ PluginProperty (dynamic = true )
107106 @ Schema (
108107 title = "The name of the stream to push the records." ,
109108 description = "Make sure to set either `streamName` or `streamArn`. One of those must be provided."
110109 )
111- private String streamName ;
110+ private Property < String > streamName ;
112111
113- @ PluginProperty (dynamic = true )
114112 @ Schema (
115113 title = "The ARN of the stream to push the records." ,
116114 description = "Make sure to set either `streamName` or `streamArn`. One of those must be provided."
117115 )
118- private String streamArn ;
116+ private Property < String > streamArn ;
119117
120118 @ PluginProperty (dynamic = true )
121119 @ Schema (
@@ -135,7 +133,7 @@ public Output run(RunContext runContext) throws Exception {
135133 PutRecordsResponse putRecordsResponse = putRecords (runContext , records );
136134
137135 // Fail if failOnUnsuccessfulRecords
138- if (failOnUnsuccessfulRecords && putRecordsResponse .failedRecordCount () > 0 ) {
136+ if (runContext . render ( failOnUnsuccessfulRecords ). as ( Boolean . class ). orElseThrow () && putRecordsResponse .failedRecordCount () > 0 ) {
139137 var logger = runContext .logger ();
140138 logger .error ("Response show {} record failed: {}" , putRecordsResponse .failedRecordCount (), putRecordsResponse );
141139 throw new RuntimeException (String .format ("Response show %d record failed: %s" , putRecordsResponse .failedRecordCount (), putRecordsResponse ));
@@ -159,10 +157,12 @@ private PutRecordsResponse putRecords(RunContext runContext, List<Record> record
159157 try (KinesisClient client = client (runContext )) {
160158 PutRecordsRequest .Builder builder = PutRecordsRequest .builder ();
161159
162- if (!Strings .isNullOrEmpty (streamArn )) {
163- builder .streamARN (streamArn );
164- } else if (!Strings .isNullOrEmpty (streamName )) {
165- builder .streamName (streamName );
160+ var renderedStreamArn = runContext .render (streamArn ).as (String .class ).orElse (null );
161+ var renderedStreamName = runContext .render (streamName ).as (String .class ).orElse (null );
162+ if (!Strings .isNullOrEmpty (renderedStreamArn )) {
163+ builder .streamARN (renderedStreamArn );
164+ } else if (!Strings .isNullOrEmpty (renderedStreamName )) {
165+ builder .streamName (renderedStreamName );
166166 } else {
167167 throw new IllegalArgumentException ("Either streamName or streamArn has to be set." );
168168 }
0 commit comments