Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/main/java/io/kestra/plugin/fs/local/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,14 @@ public Output run(RunContext runContext) throws Exception {

io.kestra.plugin.fs.local.List.Output listOutput = listTask.run(runContext);

List<File> listedFiles = listOutput.getFiles();
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (listOutput.getFiles().size() > rMaxFiles) {
runContext.logger().warn("Too many files to process, skipping");
return Output.builder()
.files(java.util.List.of())
.outputFiles(Map.of())
.build();
if (listedFiles.size() > rMaxFiles) {
runContext.logger().warn("Too many files to process ({}), limiting to {}", listedFiles.size(), rMaxFiles);
listedFiles = listedFiles.subList(0, rMaxFiles);
}

List<File> downloadedFiles = listOutput
.getFiles()
List<File> downloadedFiles = listedFiles
.stream()
.map(throwFunction(fileItem -> {
if (fileItem.isDirectory()) {
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/kestra/plugin/fs/local/List.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ public Output run(RunContext runContext) throws Exception {

int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (files.size() > rMaxFiles) {
runContext.logger().warn("Too many files to process, skipping");
return Output.builder()
.files(java.util.List.of())
.count(0)
.build();
runContext.logger().warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
files = files.subList(0, rMaxFiles);
}

return Output.builder()
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/io/kestra/plugin/fs/local/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,33 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
return Optional.empty();
}

if (toFire.size() > runContext.render(this.maxFiles).as(Integer.class).orElse(25)) {
logger.warn("Too many files to process, skipping");
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
java.util.List<TriggeredFile> limitedToFire = toFire;
java.util.List<File> limitedActionFiles = actionFiles;
if (toFire.size() > rMaxFiles) {
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
int limit = Math.min(rMaxFiles, toFire.size());
limitedToFire = toFire.subList(0, limit);
limitedActionFiles = actionFiles.subList(0, Math.min(limit, actionFiles.size()));
}

if (limitedToFire.isEmpty()) {
return Optional.empty();
}

Downloads.Action selectedAction = this.action != null ?
runContext.render(this.action).as(Downloads.Action.class).orElse(Downloads.Action.NONE) :
Downloads.Action.NONE;

java.util.List<File> filesToProcess = actionFiles.stream()
java.util.List<File> filesToProcess = limitedActionFiles.stream()
.filter(file -> !file.isDirectory())
.toList();

if (selectedAction != Downloads.Action.NONE) {
Downloads.performAction(filesToProcess, selectedAction, this.moveDirectory, runContext);
}

return Optional.of(TriggerService.generateExecution(this, conditionContext, triggerContext, Output.builder().files(toFire).build()));
return Optional.of(TriggerService.generateExecution(this, conditionContext, triggerContext, Output.builder().files(limitedToFire).build()));
}

public enum ChangeType {
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/kestra/plugin/fs/nfs/List.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ public Output run(RunContext runContext) throws Exception {

int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (files.size() > rMaxFiles) {
logger.warn("Too many files to process, skipping");
return Output.builder()
.files(java.util.List.of())
.build();
logger.warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
files = files.subList(0, rMaxFiles);
}

return Output.builder().files(files).build();
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/io/kestra/plugin/fs/nfs/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,13 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
return Optional.empty();
}

if (toFire.size() > runContext.render(this.maxFiles).as(Integer.class).orElse(25)) {
logger.warn("Too many files to process, skipping");
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (toFire.size() > rMaxFiles) {
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
toFire = toFire.subList(0, Math.min(rMaxFiles, toFire.size()));
}

if (toFire.isEmpty()) {
return Optional.empty();
}

Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/kestra/plugin/fs/vfs/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,8 @@ public Output run(RunContext runContext) throws Exception {

int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (files.size() > rMaxFiles) {
logger.warn("Too many files to process, skipping");
return Output.builder()
.files(java.util.List.of())
.outputFiles(Map.of())
.build();
logger.warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
files = files.subList(0, rMaxFiles);
}

java.util.List<io.kestra.plugin.fs.vfs.models.File> list = files
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/io/kestra/plugin/fs/vfs/List.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ public Output run(RunContext runContext) throws Exception {
runContext.render(this.recursive).as(Boolean.class).orElse(false)
);

java.util.List<File> files = output.getFiles();

int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (output.getFiles().size() > rMaxFiles) {
runContext.logger().warn("Too many files to process, skipping");
return Output.builder()
.files(java.util.List.of())
.build();
if (files.size() > rMaxFiles) {
runContext.logger().warn("Too many files to process ({}), limiting to {}", files.size(), rMaxFiles);
files = files.subList(0, rMaxFiles);
}

return output;
return Output.builder()
.files(files)
.build();
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/main/java/io/kestra/plugin/fs/vfs/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,17 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
return Optional.empty();
}

if (toFire.size() > runContext.render(this.maxFiles).as(Integer.class).orElse(25)) {
logger.warn("Too many files to process, skipping");
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
java.util.List<TriggeredFile> limitedToFire = toFire;
java.util.List<File> limitedActionFiles = actionFiles;
if (toFire.size() > rMaxFiles) {
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
int limit = Math.min(rMaxFiles, toFire.size());
limitedToFire = toFire.subList(0, limit);
limitedActionFiles = actionFiles.subList(0, Math.min(limit, actionFiles.size()));
}

if (limitedToFire.isEmpty()) {
return Optional.empty();
}

Expand All @@ -225,7 +234,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
runContext,
fsm,
fileSystemOptions,
actionFiles,
limitedActionFiles,
runContext.render(this.action).as(Downloads.Action.class).orElse(null),
VfsService.uri(
runContext,
Expand All @@ -239,7 +248,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
);
}

Execution execution = TriggerService.generateExecution(this, conditionContext, context, Output.builder().files(toFire).build());
Execution execution = TriggerService.generateExecution(this, conditionContext, context, Output.builder().files(limitedToFire).build());

return Optional.of(execution);
}
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/kestra/plugin/fs/vfs/Uploads.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ public Output run(RunContext runContext) throws Exception {

int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (renderedFrom.length > rMaxFiles) {
runContext.logger().warn("Too many files to process, skipping");
return Output.builder()
.files(List.of())
.build();
runContext.logger().warn("Too many files to process ({}), limiting to {}", renderedFrom.length, rMaxFiles);
renderedFrom = Arrays.copyOf(renderedFrom, rMaxFiles);
}

List<Upload.Output> outputs = Arrays.stream(renderedFrom).map(throwFunction(fromURI -> {
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void run_NoneAfterDownloads() throws Exception {
}

@Test
void run_MaxFilesShouldSkip() throws Exception {
void run_MaxFilesShouldLimit() throws Exception {
String toUploadDir = "/upload/" + random + "-maxfiles";
String out1 = FriendlyId.createFriendlyId();
ftpUtils.upload(toUploadDir + "/" + out1 + ".txt");
Expand All @@ -106,8 +106,8 @@ void run_MaxFilesShouldSkip() throws Exception {

Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));

assertThat(run.getFiles().size(), is(0));
assertThat(run.getOutputFiles().size(), is(0));
assertThat(run.getFiles().size(), is(1));
assertThat(run.getOutputFiles().size(), is(1));
assertThat(ftpUtils.list(toUploadDir).getFiles().size(), is(2));
}
}
4 changes: 2 additions & 2 deletions src/test/java/io/kestra/plugin/fs/ftp/ListTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void shouldMatchFileWithStandardName() throws Exception {
}

@Test
void maxFilesShouldSkip() throws Exception {
void maxFilesShouldLimit() throws Exception {
String dir = "/" + IdUtils.create();
ftpUtils.upload("upload" + dir + "/file1.yaml");
ftpUtils.upload("upload" + dir + "/file2.yaml");
Expand All @@ -157,6 +157,6 @@ void maxFilesShouldSkip() throws Exception {

List.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));

assertThat(run.getFiles().size(), is(0));
assertThat(run.getFiles().size(), is(1));
}
}
4 changes: 2 additions & 2 deletions src/test/java/io/kestra/plugin/fs/ftp/UploadsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void run() throws Exception {
}

@Test
void run_maxFilesShouldSkip() throws Exception {
void run_maxFilesShouldLimit() throws Exception {
URI uri1 = ftpUtils.uploadToStorage();
URI uri2 = ftpUtils.uploadToStorage();

Expand All @@ -103,6 +103,6 @@ void run_maxFilesShouldSkip() throws Exception {

Uploads.Output uploadsRun = uploadsTask.run(TestsUtils.mockRunContext(runContextFactory, uploadsTask, Map.of()));

assertThat(uploadsRun.getFiles().size(), is(0));
assertThat(uploadsRun.getFiles().size(), is(1));
}
}
4 changes: 2 additions & 2 deletions src/test/java/io/kestra/plugin/fs/local/DownloadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ void downloadsWithMaxFiles() throws Exception {

Downloads.Output output = task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));

assertThat(output.getFiles().isEmpty(), is(true));
assertThat(output.getOutputFiles().isEmpty(), is(true));
assertThat(output.getFiles().size(), is(1));
assertThat(output.getOutputFiles().size(), is(1));
} finally {
Files.walk(tempDir)
.sorted(Comparator.reverseOrder())
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/kestra/plugin/fs/local/ListTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void listFilesWithMaxFiles() throws Exception {

List.Output output = task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));

assertThat(output.getFiles(), is(empty()));
assertThat(output.getCount(), is(0));
assertThat(output.getFiles(), hasSize(2));
assertThat(output.getCount(), is(2));
}
}
8 changes: 6 additions & 2 deletions src/test/java/io/kestra/plugin/fs/local/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void moveWithRegexShouldNotDeleteSourceDirectory() throws Exception {
}

@Test
void maxFilesShouldSkipExecution() throws Exception {
void maxFilesShouldLimitExecution() throws Exception {
Path sourceDir = Paths.get("/tmp/trigger-maxfiles");
Files.createDirectories(sourceDir);

Expand All @@ -167,7 +167,11 @@ void maxFilesShouldSkipExecution() throws Exception {
var context = TestsUtils.mockTrigger(runContextFactory, trigger);
Optional<Execution> execution = trigger.evaluate(context.getKey(), context.getValue());

assertThat(execution.isEmpty(), is(true));
assertThat(execution.isPresent(), is(true));
@SuppressWarnings("unchecked")
java.util.List<Object> rawFiles =
(java.util.List<Object>) execution.get().getTrigger().getVariables().get("files");
assertThat(rawFiles, hasSize(1));
} finally {
cleanup(sourceDir);
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/kestra/plugin/fs/nfs/ListTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ void list_files_with_max_files() throws Exception {
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of());
io.kestra.plugin.fs.nfs.List.Output run = task.run(runContext);

assertThat(run.getFiles(), is(List.of()));
assertThat(run.getFiles(), hasSize(1));
}
}
9 changes: 6 additions & 3 deletions src/test/java/io/kestra/plugin/fs/nfs/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

@KestraTest
class TriggerTest {
Expand Down Expand Up @@ -184,7 +184,7 @@ void trigger_onFileCreateOrUpdate() throws Exception {
}

@Test
void trigger_maxFiles_should_skip_execution() throws Exception {
void trigger_maxFiles_should_limit_execution() throws Exception {
Files.writeString(nfsMountPoint.resolve("file1.txt"), "content1");
Files.writeString(nfsMountPoint.resolve("file2.txt"), "content2");

Expand All @@ -199,6 +199,9 @@ void trigger_maxFiles_should_skip_execution() throws Exception {
Map.Entry<ConditionContext, Trigger> context = TestsUtils.mockTrigger(runContextFactory, nfsTrigger);

Optional<Execution> execution = nfsTrigger.evaluate(context.getKey(), context.getValue());
assertThat(execution.isEmpty(), is(true));
assertThat(execution.isPresent(), is(true));
@SuppressWarnings("unchecked")
List<Map<String, Object>> rawFiles = (List<Map<String, Object>>) execution.get().getTrigger().getVariables().get("files");
assertThat(rawFiles, hasSize(1));
}
}
12 changes: 7 additions & 5 deletions src/test/java/io/kestra/plugin/fs/sftp/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import static io.kestra.plugin.fs.sftp.SftpUtils.PASSWORD;
import static io.kestra.plugin.fs.sftp.SftpUtils.USERNAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TriggerTest extends AbstractFileTriggerTest {
Expand Down Expand Up @@ -173,7 +172,7 @@ void shouldTriggerOnCreateOrUpdate() throws Exception {
}

@Test
void shouldNotTriggerWhenTooManyFiles() throws Exception {
void shouldLimitWhenTooManyFiles() throws Exception {
var trigger = Trigger.builder()
.id("sftp-too-many-files-" + IdUtils.create())
.type(Trigger.class.getName())
Expand All @@ -195,7 +194,10 @@ void shouldNotTriggerWhenTooManyFiles() throws Exception {
var context = TestsUtils.mockTrigger(runContextFactory, trigger);
Optional<Execution> execution = trigger.evaluate(context.getKey(), context.getValue());

// should then skip the execution
assertThat(execution.isPresent(), is(false));
assertThat(execution.isPresent(), is(true));
@SuppressWarnings("unchecked")
java.util.List<Object> rawFiles =
(java.util.List<Object>) execution.get().getTrigger().getVariables().get("files");
assertThat(rawFiles, hasSize(10));
}
}
Loading