Skip to content

Commit f0faabe

Browse files
Sahillather002Malaydewangan09
authored andcommitted
refactor(minio): use ubiquitous Data.From
Removed unused Map import and updated the 'from' field description in the Uploads class. Refactored the output reading logic to handle URI strings more effectively.
1 parent 368cc7c commit f0faabe

File tree

1 file changed

+27
-28
lines changed

1 file changed

+27
-28
lines changed

src/main/java/io/kestra/plugin/fs/vfs/Uploads.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package io.kestra.plugin.fs.vfs;
22

33
import io.kestra.core.models.annotations.PluginProperty;
4+
import io.kestra.core.models.property.Data;
45
import io.kestra.core.models.property.Property;
56
import io.kestra.core.models.tasks.RunnableTask;
67
import io.kestra.core.runners.RunContext;
7-
import io.kestra.core.serializers.JacksonMapper;
88
import io.swagger.v3.oas.annotations.media.Schema;
99
import jakarta.validation.constraints.NotNull;
1010
import lombok.*;
1111
import lombok.experimental.SuperBuilder;
1212
import org.apache.commons.vfs2.impl.StandardFileSystemManager;
1313

1414
import java.net.URI;
15-
import java.util.Arrays;
1615
import java.util.List;
1716

1817
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -23,14 +22,15 @@
2322
@Getter
2423
@NoArgsConstructor
2524
public abstract class Uploads extends AbstractVfsTask implements RunnableTask<Uploads.Output> {
26-
@PluginProperty(dynamic = true, internalStorageURI = true)
2725
@Schema(
28-
title = "The files to upload, must be internal storage URIs, must be a list of URIs or a pebble template that returns a list of URIs",
29-
anyOf = {
30-
String.class,
31-
String[].class
32-
}
26+
title = "The files to upload, must be internal storage URIs, must be a list of URIs or a pebble template that returns a list of URIs",
27+
anyOf = {
28+
String.class,
29+
String[].class
30+
},
31+
description = "Must be Kestra internal storage URIs. Can be a single URI string, a list of URI strings, or an internal storage URI pointing to a file containing URIs."
3332
)
33+
@PluginProperty(dynamic = true, internalStorageURI = true)
3434
@NotNull
3535
private Object from;
3636

@@ -45,26 +45,25 @@ public Output run(RunContext runContext) throws Exception {
4545
fsm.setConfiguration(StandardFileSystemManager.class.getResource(KestraStandardFileSystemManager.CONFIG_RESOURCE));
4646
fsm.init();
4747

48-
String[] renderedFrom;
49-
if (this.from instanceof List<?> fromURIs) {
50-
renderedFrom = fromURIs.stream().map(throwFunction(from -> runContext.render((String) from))).toArray(String[]::new);
51-
} else {
52-
renderedFrom = JacksonMapper.ofJson().readValue(runContext.render((String) this.from), String[].class);
53-
}
54-
List<Upload.Output> outputs = Arrays.stream(renderedFrom).map(throwFunction(fromURI -> {
55-
if (!fromURI.startsWith("kestra://")) {
56-
throw new IllegalArgumentException("'from' must be a list of Kestra's internal storage URI");
57-
}
58-
String renderedTo = runContext.render(this.to).as(String.class).orElseThrow();
59-
return VfsService.upload(
60-
runContext,
61-
fsm,
62-
this.fsOptions(runContext),
63-
URI.create(fromURI),
64-
this.uri(runContext, renderedTo + fromURI.substring(fromURI.lastIndexOf('/') + (renderedTo.endsWith("/") ? 1 : 0)))
65-
);
66-
}
67-
)).toList();
48+
String renderedTo = runContext.render(this.to).as(String.class).orElseThrow();
49+
50+
List<Upload.Output> outputs = Data.from(from)
51+
.readAs(runContext, String.class, obj -> obj.toString())
52+
.map(throwFunction(fromURI -> {
53+
if (!fromURI.startsWith("kestra://")) {
54+
throw new IllegalArgumentException("'from' must be a list of Kestra's internal storage URI");
55+
}
56+
57+
return VfsService.upload(
58+
runContext,
59+
fsm,
60+
this.fsOptions(runContext),
61+
URI.create(fromURI),
62+
this.uri(runContext, renderedTo + fromURI.substring(fromURI.lastIndexOf('/') + (renderedTo.endsWith("/") ? 1 : 0)))
63+
);
64+
}))
65+
.collectList()
66+
.block();
6867

6968
return Output.builder()
7069
.files(outputs.stream()

0 commit comments

Comments
 (0)