Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM kestra/kestra:latest-no-plugins
FROM kestra/kestra:latest

RUN mkdir -p /app/plugins

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/Download.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public class Download extends AbstractS3Object implements RunnableTask<Download.
)
protected Property<String> regexp;

@Builder.Default
@Schema(
title = "The maximum number of files to retrieve at once"
)
private Property<Integer> maxFiles = Property.ofValue(25);

@Schema(
title = "The account ID of the expected bucket owner",
description = "Requests will fail with a Forbidden error (access denied) if the bucket is owned by a different account."
Expand Down Expand Up @@ -229,6 +235,7 @@ private List.Output getObjectsList(RunContext runContext) throws Exception {
.expectedBucketOwner(this.expectedBucketOwner)
.regexp(this.regexp)
.filter(Property.ofValue(ListInterface.Filter.FILES))
.maxFiles(this.maxFiles)
.stsRoleArn(this.stsRoleArn)
.stsRoleSessionName(this.stsRoleSessionName)
.stsRoleExternalId(this.stsRoleExternalId)
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public class Downloads extends AbstractS3Object implements RunnableTask<Download
@Builder.Default
protected final Property<Filter> filter = Property.ofValue(Filter.BOTH);

@Builder.Default
@Schema(
title = "The maximum number of files to retrieve at once"
)
private Property<Integer> maxFiles = Property.ofValue(25);

private Property<ActionInterface.Action> action;

private Copy.CopyObject moveTo;
Expand All @@ -116,6 +122,7 @@ public Output run(RunContext runContext) throws Exception {
.expectedBucketOwner(this.expectedBucketOwner)
.regexp(this.regexp)
.filter(this.filter)
.maxFiles(this.maxFiles)
.stsRoleArn(this.stsRoleArn)
.stsRoleSessionName(this.stsRoleSessionName)
.stsRoleExternalId(this.stsRoleExternalId)
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/List.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public class List extends AbstractS3Object implements RunnableTask<List.Output>,

protected Property<String> regexp;

@Builder.Default
@Schema(
title = "The maximum number of files to retrieve at once"
)
private Property<Integer> maxFiles = Property.ofValue(25);

@Builder.Default
protected final Property<Filter> filter = Property.ofValue(Filter.BOTH);

Expand All @@ -84,6 +90,16 @@ public Output run(RunContext runContext) throws Exception {
runContext.render(prefix).as(String.class).orElse(null)
);

int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
if (list.size() > rMaxFiles) {
runContext.logger().warn(
"Listing returned {} files but maxFiles limit is {}. Only the first {} files will be returned. " +
"Increase the maxFiles property if you need more files.",
list.size(), rMaxFiles, rMaxFiles
);
list = list.subList(0, rMaxFiles);
}

return Output.builder()
.objects(list)
.build();
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
@Builder.Default
private final Property<On> on = Property.ofValue(On.CREATE_OR_UPDATE);

@Builder.Default
@Schema(
title = "The maximum number of files to retrieve at once"
)
private Property<Integer> maxFiles = Property.ofValue(25);

private Property<String> stateKey;

private Property<Duration> stateTtl;
Expand Down Expand Up @@ -196,6 +202,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.expectedBucketOwner(this.expectedBucketOwner)
.regexp(this.regexp)
.filter(this.filter)
.maxFiles(this.maxFiles)
.stsRoleArn(this.stsRoleArn)
.stsRoleSessionName(this.stsRoleSessionName)
.stsRoleExternalId(this.stsRoleExternalId)
Expand Down
61 changes: 61 additions & 0 deletions src/test/java/io/kestra/plugin/aws/s3/DownloadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,65 @@ void testInvalidConfiguration() throws Exception {
containsString("Invalid configuration: either specify 'key' for single file download or at least one filtering parameter")
);
}

@Test
void maxFilesExceeded() throws Exception {
this.createBucket();

String basePrefix = IdUtils.create() + "/maxfiles-test/";

// Upload 5 files
for (int i = 0; i < 5; i++) {
URI file = storagePut("file" + i + ".txt");
uploadFile(file, basePrefix + "file" + i + ".txt");
}

// Download with maxFiles=3 (less than 5 files) - should return first 3 files (truncated)
Download download = Download.builder()
.id(DownloadTest.class.getSimpleName() + "-maxFilesExceeded")
.type(Download.class.getName())
.bucket(Property.ofValue(this.BUCKET))
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
.region(Property.ofValue(localstack.getRegion()))
.prefix(Property.ofValue(basePrefix))
.maxFiles(Property.ofValue(3))
.build();

Download.Output output = download.run(runContext(download));

// When maxFiles exceeded, List returns first 3 files
assertThat(output.getFiles().size(), is(3));
}

@Test
void maxFilesNotExceeded() throws Exception {
this.createBucket();

String basePrefix = IdUtils.create() + "/maxfiles-ok/";

// Upload 5 files
for (int i = 0; i < 5; i++) {
URI file = storagePut("file" + i + ".txt");
uploadFile(file, basePrefix + "file" + i + ".txt");
}

// Download with maxFiles=10 (more than 5 files) - should return all 5 files
Download download = Download.builder()
.id(DownloadTest.class.getSimpleName() + "-maxFilesNotExceeded")
.type(Download.class.getName())
.bucket(Property.ofValue(this.BUCKET))
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
.region(Property.ofValue(localstack.getRegion()))
.prefix(Property.ofValue(basePrefix))
.maxFiles(Property.ofValue(10))
.build();

Download.Output output = download.run(runContext(download));

assertThat(output.getFiles().size(), is(5));
}
}
56 changes: 56 additions & 0 deletions src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,60 @@ void move() throws Exception {
listOutput = list.run(runContext(list));
assertThat(listOutput.getObjects().size(), is(2));
}

@Test
void maxFilesExceeded() throws Exception {
this.createBucket();

// Upload 5 files
for (int i = 0; i < 5; i++) {
upload("/tasks/s3-maxfiles");
}

// Downloads with maxFiles=3 (less than 5 files) - should return first 3 files (truncated)
Downloads task = Downloads.builder()
.id(DownloadsTest.class.getSimpleName())
.type(Downloads.class.getName())
.bucket(Property.ofValue(this.BUCKET))
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
.region(Property.ofValue(localstack.getRegion()))
.prefix(Property.ofValue("/tasks/s3-maxfiles"))
.maxFiles(Property.ofValue(3))
.action(Property.ofValue(ActionInterface.Action.NONE))
.build();

Downloads.Output run = task.run(runContext(task));

assertThat(run.getObjects().size(), is(3));
}

@Test
void maxFilesNotExceeded() throws Exception {
this.createBucket();

// Upload 5 files
for (int i = 0; i < 5; i++) {
upload("/tasks/s3-maxfiles-ok");
}

// Downloads with maxFiles=10 (more than 5 files) - should return all 5 files
Downloads task = Downloads.builder()
.id(DownloadsTest.class.getSimpleName())
.type(Downloads.class.getName())
.bucket(Property.ofValue(this.BUCKET))
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
.region(Property.ofValue(localstack.getRegion()))
.prefix(Property.ofValue("/tasks/s3-maxfiles-ok"))
.maxFiles(Property.ofValue(10))
.action(Property.ofValue(ActionInterface.Action.NONE))
.build();

Downloads.Output run = task.run(runContext(task));

assertThat(run.getObjects().size(), is(5));
}
}
62 changes: 62 additions & 0 deletions src/test/java/io/kestra/plugin/aws/s3/ListTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,66 @@ void run() throws Exception {
run = task.run(runContext(task));
assertThat(run.getObjects().size(), is(1));
}

@Test
void maxFilesExceeded() throws Exception {
this.createBucket();

String dir = IdUtils.create();

// Upload 5 files
for (int i = 0; i < 5; i++) {
upload("/tasks/s3/" + dir);
}

// List with maxFiles=3 (less than 5 files) - should return first 3 files (truncated)
List task = list()
.prefix(Property.ofValue("/tasks/s3/" + dir))
.maxFiles(Property.ofValue(3))
.build();
List.Output run = task.run(runContext(task));

assertThat(run.getObjects().size(), is(3));
}

@Test
void maxFilesNotExceeded() throws Exception {
this.createBucket();

String dir = IdUtils.create();

// Upload 5 files
for (int i = 0; i < 5; i++) {
upload("/tasks/s3/" + dir);
}

// List with maxFiles=10 (more than 5 files) - should return all 5 files
List task = list()
.prefix(Property.ofValue("/tasks/s3/" + dir))
.maxFiles(Property.ofValue(10))
.build();
List.Output run = task.run(runContext(task));

assertThat(run.getObjects().size(), is(5));
}

@Test
void maxFilesDefault() throws Exception {
this.createBucket();

String dir = IdUtils.create();

// Upload 30 files (more than default limit of 25)
for (int i = 0; i < 30; i++) {
upload("/tasks/s3/" + dir);
}

// List WITHOUT specifying maxFiles - should use default of 25 and return first 25 files (truncated)
List task = list()
.prefix(Property.ofValue("/tasks/s3/" + dir))
.build();
List.Output run = task.run(runContext(task));

assertThat(run.getObjects().size(), is(25));
}
}
63 changes: 63 additions & 0 deletions src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,67 @@ void shouldExecuteOnCreateOrUpdate() throws Exception {
Optional<Execution> updateExecution = trigger.evaluate(context.getKey(), context.getValue());
assertThat(updateExecution.isPresent(), is(true));
}

@Test
void maxFilesExceeded() throws Exception {
String bucket = "trigger-maxfiles-exceeded";
this.createBucket(bucket);

// Upload 5 files
for (int i = 0; i < 5; i++) {
upload("trigger/maxfiles", bucket);
}

// Trigger with maxFiles=3 (less than 5 files) - should fire with first 3 files (truncated)
Trigger trigger = Trigger.builder()
.id("s3-" + IdUtils.create())
.type(Trigger.class.getName())
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
.region(Property.ofValue(localstack.getRegion()))
.bucket(Property.ofValue(bucket))
.prefix(Property.ofValue("trigger/maxfiles"))
.action(Property.ofValue(ActionInterface.Action.NONE))
.maxFiles(Property.ofValue(3))
.interval(Duration.ofSeconds(10))
.build();

Map.Entry<ConditionContext, io.kestra.core.models.triggers.Trigger> context = TestsUtils.mockTrigger(runContextFactory, trigger);

Optional<Execution> execution = trigger.evaluate(context.getKey(), context.getValue());
// When maxFiles exceeded, List returns first 3 files, so Trigger should fire
assertThat(execution.isPresent(), is(true));
}

@Test
void maxFilesNotExceeded() throws Exception {
String bucket = "trigger-maxfiles-ok";
this.createBucket(bucket);

// Upload 5 files
for (int i = 0; i < 5; i++) {
upload("trigger/maxfiles-ok", bucket);
}

// Trigger with maxFiles=10 (more than 5 files) - should fire
Trigger trigger = Trigger.builder()
.id("s3-" + IdUtils.create())
.type(Trigger.class.getName())
.endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.accessKeyId(Property.ofValue(localstack.getAccessKey()))
.secretKeyId(Property.ofValue(localstack.getSecretKey()))
.region(Property.ofValue(localstack.getRegion()))
.bucket(Property.ofValue(bucket))
.prefix(Property.ofValue("trigger/maxfiles-ok"))
.action(Property.ofValue(ActionInterface.Action.NONE))
.maxFiles(Property.ofValue(10))
.interval(Duration.ofSeconds(10))
.build();

Map.Entry<ConditionContext, io.kestra.core.models.triggers.Trigger> context = TestsUtils.mockTrigger(runContextFactory, trigger);

Optional<Execution> execution = trigger.evaluate(context.getKey(), context.getValue());
assertThat(execution.isPresent(), is(true));
}
}
Loading