Skip to content

Commit add6269

Browse files
committed
fix: return max files
1 parent f130884 commit add6269

File tree

18 files changed

+85
-64
lines changed

18 files changed

+85
-64
lines changed

src/main/java/io/kestra/plugin/fs/local/Downloads.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,17 +177,14 @@ public Output run(RunContext runContext) throws Exception {
177177

178178
io.kestra.plugin.fs.local.List.Output listOutput = listTask.run(runContext);
179179

180+
List<File> listedFiles = listOutput.getFiles();
180181
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
181-
if (listOutput.getFiles().size() > rMaxFiles) {
182-
runContext.logger().warn("Too many files to process, skipping");
183-
return Output.builder()
184-
.files(java.util.List.of())
185-
.outputFiles(Map.of())
186-
.build();
182+
if (listedFiles.size() > rMaxFiles) {
183+
runContext.logger().warn("Too many files to process ({}), limiting to {}", listedFiles.size(), rMaxFiles);
184+
listedFiles = listedFiles.subList(0, rMaxFiles);
187185
}
188186

189-
List<File> downloadedFiles = listOutput
190-
.getFiles()
187+
List<File> downloadedFiles = listedFiles
191188
.stream()
192189
.map(throwFunction(fileItem -> {
193190
if (fileItem.isDirectory()) {

src/main/java/io/kestra/plugin/fs/local/List.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,8 @@ public Output run(RunContext runContext) throws Exception {
108108

109109
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
110110
if (files.size() > rMaxFiles) {
111-
runContext.logger().warn("Too many files to process, skipping");
112-
return Output.builder()
113-
.files(java.util.List.of())
114-
.count(0)
115-
.build();
111+
runContext.logger().warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
112+
files = files.subList(0, rMaxFiles);
116113
}
117114

118115
return Output.builder()

src/main/java/io/kestra/plugin/fs/local/Trigger.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,24 +191,33 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
191191
return Optional.empty();
192192
}
193193

194-
if (toFire.size() > runContext.render(this.maxFiles).as(Integer.class).orElse(25)) {
195-
logger.warn("Too many files to process, skipping");
194+
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
195+
java.util.List<TriggeredFile> limitedToFire = toFire;
196+
java.util.List<File> limitedActionFiles = actionFiles;
197+
if (toFire.size() > rMaxFiles) {
198+
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
199+
int limit = Math.min(rMaxFiles, toFire.size());
200+
limitedToFire = toFire.subList(0, limit);
201+
limitedActionFiles = actionFiles.subList(0, Math.min(limit, actionFiles.size()));
202+
}
203+
204+
if (limitedToFire.isEmpty()) {
196205
return Optional.empty();
197206
}
198207

199208
Downloads.Action selectedAction = this.action != null ?
200209
runContext.render(this.action).as(Downloads.Action.class).orElse(Downloads.Action.NONE) :
201210
Downloads.Action.NONE;
202211

203-
java.util.List<File> filesToProcess = actionFiles.stream()
212+
java.util.List<File> filesToProcess = limitedActionFiles.stream()
204213
.filter(file -> !file.isDirectory())
205214
.toList();
206215

207216
if (selectedAction != Downloads.Action.NONE) {
208217
Downloads.performAction(filesToProcess, selectedAction, this.moveDirectory, runContext);
209218
}
210219

211-
return Optional.of(TriggerService.generateExecution(this, conditionContext, triggerContext, Output.builder().files(toFire).build()));
220+
return Optional.of(TriggerService.generateExecution(this, conditionContext, triggerContext, Output.builder().files(limitedToFire).build()));
212221
}
213222

214223
public enum ChangeType {

src/main/java/io/kestra/plugin/fs/nfs/List.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,8 @@ public Output run(RunContext runContext) throws Exception {
110110

111111
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
112112
if (files.size() > rMaxFiles) {
113-
logger.warn("Too many files to process, skipping");
114-
return Output.builder()
115-
.files(java.util.List.of())
116-
.build();
113+
logger.warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
114+
files = files.subList(0, rMaxFiles);
117115
}
118116

119117
return Output.builder().files(files).build();

src/main/java/io/kestra/plugin/fs/nfs/Trigger.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,13 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
163163
return Optional.empty();
164164
}
165165

166-
if (toFire.size() > runContext.render(this.maxFiles).as(Integer.class).orElse(25)) {
167-
logger.warn("Too many files to process, skipping");
166+
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
167+
if (toFire.size() > rMaxFiles) {
168+
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
169+
toFire = toFire.subList(0, Math.min(rMaxFiles, toFire.size()));
170+
}
171+
172+
if (toFire.isEmpty()) {
168173
return Optional.empty();
169174
}
170175

src/main/java/io/kestra/plugin/fs/vfs/Downloads.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,8 @@ public Output run(RunContext runContext) throws Exception {
8888

8989
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
9090
if (files.size() > rMaxFiles) {
91-
logger.warn("Too many files to process, skipping");
92-
return Output.builder()
93-
.files(java.util.List.of())
94-
.outputFiles(Map.of())
95-
.build();
91+
logger.warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
92+
files = files.subList(0, rMaxFiles);
9693
}
9794

9895
java.util.List<io.kestra.plugin.fs.vfs.models.File> list = files

src/main/java/io/kestra/plugin/fs/vfs/List.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,17 @@ public Output run(RunContext runContext) throws Exception {
5353
runContext.render(this.recursive).as(Boolean.class).orElse(false)
5454
);
5555

56+
java.util.List<File> files = output.getFiles();
57+
5658
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
57-
if (output.getFiles().size() > rMaxFiles) {
58-
runContext.logger().warn("Too many files to process, skipping");
59-
return Output.builder()
60-
.files(java.util.List.of())
61-
.build();
59+
if (files.size() > rMaxFiles) {
60+
runContext.logger().warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
61+
files = files.subList(0, rMaxFiles);
6262
}
6363

64-
return output;
64+
return Output.builder()
65+
.files(files)
66+
.build();
6567
}
6668
}
6769

src/main/java/io/kestra/plugin/fs/vfs/Trigger.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,17 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
215215
return Optional.empty();
216216
}
217217

218-
if (toFire.size() > runContext.render(this.maxFiles).as(Integer.class).orElse(25)) {
219-
logger.warn("Too many files to process, skipping");
218+
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
219+
java.util.List<TriggeredFile> limitedToFire = toFire;
220+
java.util.List<File> limitedActionFiles = actionFiles;
221+
if (toFire.size() > rMaxFiles) {
222+
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
223+
int limit = Math.min(rMaxFiles, toFire.size());
224+
limitedToFire = toFire.subList(0, limit);
225+
limitedActionFiles = actionFiles.subList(0, Math.min(limit, actionFiles.size()));
226+
}
227+
228+
if (limitedToFire.isEmpty()) {
220229
return Optional.empty();
221230
}
222231

@@ -225,7 +234,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
225234
runContext,
226235
fsm,
227236
fileSystemOptions,
228-
actionFiles,
237+
limitedActionFiles,
229238
runContext.render(this.action).as(Downloads.Action.class).orElse(null),
230239
VfsService.uri(
231240
runContext,
@@ -239,7 +248,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
239248
);
240249
}
241250

242-
Execution execution = TriggerService.generateExecution(this, conditionContext, context, Output.builder().files(toFire).build());
251+
Execution execution = TriggerService.generateExecution(this, conditionContext, context, Output.builder().files(limitedToFire).build());
243252

244253
return Optional.of(execution);
245254
}

src/main/java/io/kestra/plugin/fs/vfs/Uploads.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@ public Output run(RunContext runContext) throws Exception {
6060

6161
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
6262
if (renderedFrom.length > rMaxFiles) {
63-
runContext.logger().warn("Too many files to process, skipping");
64-
return Output.builder()
65-
.files(List.of())
66-
.build();
63+
runContext.logger().warn("Too many files to process ({}), limiting to {}", renderedFrom.length, rMaxFiles);
64+
renderedFrom = Arrays.copyOf(renderedFrom, rMaxFiles);
6765
}
6866

6967
List<Upload.Output> outputs = Arrays.stream(renderedFrom).map(throwFunction(fromURI -> {

src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void run_NoneAfterDownloads() throws Exception {
8585
}
8686

8787
@Test
88-
void run_MaxFilesShouldSkip() throws Exception {
88+
void run_MaxFilesShouldLimit() throws Exception {
8989
String toUploadDir = "/upload/" + random + "-maxfiles";
9090
String out1 = FriendlyId.createFriendlyId();
9191
ftpUtils.upload(toUploadDir + "/" + out1 + ".txt");
@@ -106,8 +106,8 @@ void run_MaxFilesShouldSkip() throws Exception {
106106

107107
Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));
108108

109-
assertThat(run.getFiles().size(), is(0));
110-
assertThat(run.getOutputFiles().size(), is(0));
109+
assertThat(run.getFiles().size(), is(1));
110+
assertThat(run.getOutputFiles().size(), is(1));
111111
assertThat(ftpUtils.list(toUploadDir).getFiles().size(), is(2));
112112
}
113113
}

0 commit comments

Comments
 (0)