Skip to content

Commit 89d1708

Browse files
authored
feat: added max files limit (#683)
* feat: added max files limit * fix(s3): return truncated list instead of empty when maxFiles exceeded * fix: removed default maxFiles = 25 value
1 parent 4aa16d7 commit 89d1708

File tree

9 files changed

+276
-1
lines changed

9 files changed

+276
-1
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM kestra/kestra:latest-no-plugins
1+
FROM kestra/kestra:latest
22

33
RUN mkdir -p /app/plugins
44

src/main/java/io/kestra/plugin/aws/s3/Download.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ public class Download extends AbstractS3Object implements RunnableTask<Download.
113113
)
114114
protected Property<String> regexp;
115115

116+
@Schema(
117+
title = "The maximum number of files to retrieve at once"
118+
)
119+
private Property<Integer> maxFiles;
120+
116121
@Schema(
117122
title = "The account ID of the expected bucket owner",
118123
description = "Requests will fail with a Forbidden error (access denied) if the bucket is owned by a different account."
@@ -229,6 +234,7 @@ private List.Output getObjectsList(RunContext runContext) throws Exception {
229234
.expectedBucketOwner(this.expectedBucketOwner)
230235
.regexp(this.regexp)
231236
.filter(Property.ofValue(ListInterface.Filter.FILES))
237+
.maxFiles(this.maxFiles)
232238
.stsRoleArn(this.stsRoleArn)
233239
.stsRoleSessionName(this.stsRoleSessionName)
234240
.stsRoleExternalId(this.stsRoleExternalId)

src/main/java/io/kestra/plugin/aws/s3/Downloads.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public class Downloads extends AbstractS3Object implements RunnableTask<Download
9292
@Builder.Default
9393
protected final Property<Filter> filter = Property.ofValue(Filter.BOTH);
9494

95+
@Schema(
96+
title = "The maximum number of files to retrieve at once"
97+
)
98+
private Property<Integer> maxFiles;
99+
95100
private Property<ActionInterface.Action> action;
96101

97102
private Copy.CopyObject moveTo;
@@ -116,6 +121,7 @@ public Output run(RunContext runContext) throws Exception {
116121
.expectedBucketOwner(this.expectedBucketOwner)
117122
.regexp(this.regexp)
118123
.filter(this.filter)
124+
.maxFiles(this.maxFiles)
119125
.stsRoleArn(this.stsRoleArn)
120126
.stsRoleSessionName(this.stsRoleSessionName)
121127
.stsRoleExternalId(this.stsRoleExternalId)

src/main/java/io/kestra/plugin/aws/s3/List.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public class List extends AbstractS3Object implements RunnableTask<List.Output>,
6666

6767
protected Property<String> regexp;
6868

69+
@Schema(
70+
title = "The maximum number of files to retrieve at once"
71+
)
72+
private Property<Integer> maxFiles;
73+
6974
@Builder.Default
7075
protected final Property<Filter> filter = Property.ofValue(Filter.BOTH);
7176

@@ -84,6 +89,16 @@ public Output run(RunContext runContext) throws Exception {
8489
runContext.render(prefix).as(String.class).orElse(null)
8590
);
8691

92+
Integer rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(null);
93+
if (rMaxFiles != null && list.size() > rMaxFiles) {
94+
runContext.logger().warn(
95+
"Listing returned {} files but maxFiles limit is {}. Only the first {} files will be returned. " +
96+
"Increase the maxFiles property if you need more files.",
97+
list.size(), rMaxFiles, rMaxFiles
98+
);
99+
list = list.subList(0, rMaxFiles);
100+
}
101+
87102
return Output.builder()
88103
.objects(list)
89104
.build();

src/main/java/io/kestra/plugin/aws/s3/Trigger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
166166
@Builder.Default
167167
private final Property<On> on = Property.ofValue(On.CREATE_OR_UPDATE);
168168

169+
@Schema(
170+
title = "The maximum number of files to retrieve at once"
171+
)
172+
private Property<Integer> maxFiles;
173+
169174
private Property<String> stateKey;
170175

171176
private Property<Duration> stateTtl;
@@ -196,6 +201,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
196201
.expectedBucketOwner(this.expectedBucketOwner)
197202
.regexp(this.regexp)
198203
.filter(this.filter)
204+
.maxFiles(this.maxFiles)
199205
.stsRoleArn(this.stsRoleArn)
200206
.stsRoleSessionName(this.stsRoleSessionName)
201207
.stsRoleExternalId(this.stsRoleExternalId)

src/test/java/io/kestra/plugin/aws/s3/DownloadTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,65 @@ void testInvalidConfiguration() throws Exception {
172172
containsString("Invalid configuration: either specify 'key' for single file download or at least one filtering parameter")
173173
);
174174
}
175+
176+
@Test
177+
void maxFilesExceeded() throws Exception {
178+
this.createBucket();
179+
180+
String basePrefix = IdUtils.create() + "/maxfiles-test/";
181+
182+
// Upload 5 files
183+
for (int i = 0; i < 5; i++) {
184+
URI file = storagePut("file" + i + ".txt");
185+
uploadFile(file, basePrefix + "file" + i + ".txt");
186+
}
187+
188+
// Download with maxFiles=3 (less than 5 files) - should return first 3 files (truncated)
189+
Download download = Download.builder()
190+
.id(DownloadTest.class.getSimpleName() + "-maxFilesExceeded")
191+
.type(Download.class.getName())
192+
.bucket(Property.ofValue(this.BUCKET))
193+
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
194+
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
195+
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
196+
.region(Property.ofValue(localstack.getRegion()))
197+
.prefix(Property.ofValue(basePrefix))
198+
.maxFiles(Property.ofValue(3))
199+
.build();
200+
201+
Download.Output output = download.run(runContext(download));
202+
203+
// When maxFiles exceeded, List returns first 3 files
204+
assertThat(output.getFiles().size(), is(3));
205+
}
206+
207+
@Test
208+
void maxFilesNotExceeded() throws Exception {
209+
this.createBucket();
210+
211+
String basePrefix = IdUtils.create() + "/maxfiles-ok/";
212+
213+
// Upload 5 files
214+
for (int i = 0; i < 5; i++) {
215+
URI file = storagePut("file" + i + ".txt");
216+
uploadFile(file, basePrefix + "file" + i + ".txt");
217+
}
218+
219+
// Download with maxFiles=10 (more than 5 files) - should return all 5 files
220+
Download download = Download.builder()
221+
.id(DownloadTest.class.getSimpleName() + "-maxFilesNotExceeded")
222+
.type(Download.class.getName())
223+
.bucket(Property.ofValue(this.BUCKET))
224+
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
225+
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
226+
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
227+
.region(Property.ofValue(localstack.getRegion()))
228+
.prefix(Property.ofValue(basePrefix))
229+
.maxFiles(Property.ofValue(10))
230+
.build();
231+
232+
Download.Output output = download.run(runContext(download));
233+
234+
assertThat(output.getFiles().size(), is(5));
235+
}
175236
}

src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,60 @@ void move() throws Exception {
7777
listOutput = list.run(runContext(list));
7878
assertThat(listOutput.getObjects().size(), is(2));
7979
}
80+
81+
@Test
82+
void maxFilesExceeded() throws Exception {
83+
this.createBucket();
84+
85+
// Upload 5 files
86+
for (int i = 0; i < 5; i++) {
87+
upload("/tasks/s3-maxfiles");
88+
}
89+
90+
// Downloads with maxFiles=3 (less than 5 files) - should return first 3 files (truncated)
91+
Downloads task = Downloads.builder()
92+
.id(DownloadsTest.class.getSimpleName())
93+
.type(Downloads.class.getName())
94+
.bucket(Property.ofValue(this.BUCKET))
95+
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
96+
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
97+
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
98+
.region(Property.ofValue(localstack.getRegion()))
99+
.prefix(Property.ofValue("/tasks/s3-maxfiles"))
100+
.maxFiles(Property.ofValue(3))
101+
.action(Property.ofValue(ActionInterface.Action.NONE))
102+
.build();
103+
104+
Downloads.Output run = task.run(runContext(task));
105+
106+
assertThat(run.getObjects().size(), is(3));
107+
}
108+
109+
@Test
110+
void maxFilesNotExceeded() throws Exception {
111+
this.createBucket();
112+
113+
// Upload 5 files
114+
for (int i = 0; i < 5; i++) {
115+
upload("/tasks/s3-maxfiles-ok");
116+
}
117+
118+
// Downloads with maxFiles=10 (more than 5 files) - should return all 5 files
119+
Downloads task = Downloads.builder()
120+
.id(DownloadsTest.class.getSimpleName())
121+
.type(Downloads.class.getName())
122+
.bucket(Property.ofValue(this.BUCKET))
123+
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
124+
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
125+
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
126+
.region(Property.ofValue(localstack.getRegion()))
127+
.prefix(Property.ofValue("/tasks/s3-maxfiles-ok"))
128+
.maxFiles(Property.ofValue(10))
129+
.action(Property.ofValue(ActionInterface.Action.NONE))
130+
.build();
131+
132+
Downloads.Output run = task.run(runContext(task));
133+
134+
assertThat(run.getObjects().size(), is(5));
135+
}
80136
}

src/test/java/io/kestra/plugin/aws/s3/ListTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,66 @@ void run() throws Exception {
6262
run = task.run(runContext(task));
6363
assertThat(run.getObjects().size(), is(1));
6464
}
65+
66+
@Test
67+
void maxFilesExceeded() throws Exception {
68+
this.createBucket();
69+
70+
String dir = IdUtils.create();
71+
72+
// Upload 5 files
73+
for (int i = 0; i < 5; i++) {
74+
upload("/tasks/s3/" + dir);
75+
}
76+
77+
// List with maxFiles=3 (less than 5 files) - should return first 3 files (truncated)
78+
List task = list()
79+
.prefix(Property.ofValue("/tasks/s3/" + dir))
80+
.maxFiles(Property.ofValue(3))
81+
.build();
82+
List.Output run = task.run(runContext(task));
83+
84+
assertThat(run.getObjects().size(), is(3));
85+
}
86+
87+
@Test
88+
void maxFilesNotExceeded() throws Exception {
89+
this.createBucket();
90+
91+
String dir = IdUtils.create();
92+
93+
// Upload 5 files
94+
for (int i = 0; i < 5; i++) {
95+
upload("/tasks/s3/" + dir);
96+
}
97+
98+
// List with maxFiles=10 (more than 5 files) - should return all 5 files
99+
List task = list()
100+
.prefix(Property.ofValue("/tasks/s3/" + dir))
101+
.maxFiles(Property.ofValue(10))
102+
.build();
103+
List.Output run = task.run(runContext(task));
104+
105+
assertThat(run.getObjects().size(), is(5));
106+
}
107+
108+
@Test
109+
void maxFilesNotSpecified() throws Exception {
110+
this.createBucket();
111+
112+
String dir = IdUtils.create();
113+
114+
// Upload 30 files
115+
for (int i = 0; i < 30; i++) {
116+
upload("/tasks/s3/" + dir);
117+
}
118+
119+
// List WITHOUT specifying maxFiles - should return all files (no default limit)
120+
List task = list()
121+
.prefix(Property.ofValue("/tasks/s3/" + dir))
122+
.build();
123+
List.Output run = task.run(runContext(task));
124+
125+
assertThat(run.getObjects().size(), is(30));
126+
}
65127
}

src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,4 +302,67 @@ void shouldExecuteOnCreateOrUpdate() throws Exception {
302302
Optional<Execution> updateExecution = trigger.evaluate(context.getKey(), context.getValue());
303303
assertThat(updateExecution.isPresent(), is(true));
304304
}
305+
306+
@Test
307+
void maxFilesExceeded() throws Exception {
308+
String bucket = "trigger-maxfiles-exceeded";
309+
this.createBucket(bucket);
310+
311+
// Upload 5 files
312+
for (int i = 0; i < 5; i++) {
313+
upload("trigger/maxfiles", bucket);
314+
}
315+
316+
// Trigger with maxFiles=3 (less than 5 files) - should fire with first 3 files (truncated)
317+
Trigger trigger = Trigger.builder()
318+
.id("s3-" + IdUtils.create())
319+
.type(Trigger.class.getName())
320+
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
321+
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
322+
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
323+
.region(Property.ofValue(localstack.getRegion()))
324+
.bucket(Property.ofValue(bucket))
325+
.prefix(Property.ofValue("trigger/maxfiles"))
326+
.action(Property.ofValue(ActionInterface.Action.NONE))
327+
.maxFiles(Property.ofValue(3))
328+
.interval(Duration.ofSeconds(10))
329+
.build();
330+
331+
Map.Entry<ConditionContext, io.kestra.core.models.triggers.Trigger> context = TestsUtils.mockTrigger(runContextFactory, trigger);
332+
333+
Optional<Execution> execution = trigger.evaluate(context.getKey(), context.getValue());
334+
// When maxFiles exceeded, List returns first 3 files, so Trigger should fire
335+
assertThat(execution.isPresent(), is(true));
336+
}
337+
338+
@Test
339+
void maxFilesNotExceeded() throws Exception {
340+
String bucket = "trigger-maxfiles-ok";
341+
this.createBucket(bucket);
342+
343+
// Upload 5 files
344+
for (int i = 0; i < 5; i++) {
345+
upload("trigger/maxfiles-ok", bucket);
346+
}
347+
348+
// Trigger with maxFiles=10 (more than 5 files) - should fire
349+
Trigger trigger = Trigger.builder()
350+
.id("s3-" + IdUtils.create())
351+
.type(Trigger.class.getName())
352+
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
353+
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
354+
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
355+
.region(Property.ofValue(localstack.getRegion()))
356+
.bucket(Property.ofValue(bucket))
357+
.prefix(Property.ofValue("trigger/maxfiles-ok"))
358+
.action(Property.ofValue(ActionInterface.Action.NONE))
359+
.maxFiles(Property.ofValue(10))
360+
.interval(Duration.ofSeconds(10))
361+
.build();
362+
363+
Map.Entry<ConditionContext, io.kestra.core.models.triggers.Trigger> context = TestsUtils.mockTrigger(runContextFactory, trigger);
364+
365+
Optional<Execution> execution = trigger.evaluate(context.getKey(), context.getValue());
366+
assertThat(execution.isPresent(), is(true));
367+
}
305368
}

0 commit comments

Comments
 (0)