55import io .kestra .core .models .property .Property ;
66import io .kestra .core .models .tasks .RunnableTask ;
77import io .kestra .core .runners .RunContext ;
8+ import io .kestra .core .serializers .JacksonMapper ;
89import io .swagger .v3 .oas .annotations .media .Schema ;
910import jakarta .validation .constraints .NotNull ;
1011import lombok .*;
1112import lombok .experimental .SuperBuilder ;
1213import org .apache .commons .vfs2 .impl .StandardFileSystemManager ;
1314
1415import java .net .URI ;
16+ import java .util .Arrays ;
1517import java .util .List ;
18+ import java .util .Map ;
19+ import java .util .Objects ;
1620
1721import static io .kestra .core .utils .Rethrow .throwFunction ;
1822
2125@ EqualsAndHashCode
2226@ Getter
2327@ NoArgsConstructor
24- public abstract class Uploads extends AbstractVfsTask implements RunnableTask <Uploads .Output > {
28+ public abstract class Uploads extends AbstractVfsTask implements RunnableTask <Uploads .Output >, Data . From {
2529 @ Schema (
2630 title = "The files to upload, must be internal storage URIs, must be a list of URIs or a pebble template that returns a list of URIs" ,
2731 anyOf = {
2832 String .class ,
29- String [].class
33+ List .class ,
34+ Map .class
3035 },
3136 description = "Must be Kestra internal storage URIs. Can be a single URI string, a list of URI strings, or an internal storage URI pointing to a file containing URIs."
3237 )
@@ -45,33 +50,43 @@ public Output run(RunContext runContext) throws Exception {
4550 fsm .setConfiguration (StandardFileSystemManager .class .getResource (KestraStandardFileSystemManager .CONFIG_RESOURCE ));
4651 fsm .init ();
4752
48- String renderedTo = runContext . render ( this . to ). as ( String . class ). orElseThrow ( );
53+ String [] renderedFrom = parseFromProperty ( runContext );
4954
50- List <Upload .Output > outputs = Data .from (from )
51- .readAs (runContext , String .class , obj -> obj .toString ())
52- .map (throwFunction (fromURI -> {
53- if (!fromURI .startsWith ("kestra://" )) {
54- throw new IllegalArgumentException ("'from' must be a list of Kestra's internal storage URI" );
55- }
56-
57- return VfsService .upload (
58- runContext ,
59- fsm ,
60- this .fsOptions (runContext ),
61- URI .create (fromURI ),
62- this .uri (runContext , renderedTo + fromURI .substring (fromURI .lastIndexOf ('/' ) + (renderedTo .endsWith ("/" ) ? 1 : 0 )))
63- );
64- }))
65- .collectList ()
66- .block ();
55+ List <Upload .Output > outputs = Arrays .stream (renderedFrom ).map (throwFunction (fromURI -> {
56+ var rTo = runContext .render (this .to ).as (String .class ).orElseThrow ();
57+ return VfsService .upload (
58+ runContext ,
59+ fsm ,
60+ this .fsOptions (runContext ),
61+ URI .create (fromURI ),
62+ this .uri (runContext , rTo + fromURI .substring (fromURI .lastIndexOf ('/' ) + (rTo .endsWith ("/" ) ? 1 : 0 )))
63+ );
64+ })).toList ();
6765
6866 return Output .builder ()
69- .files (outputs .stream ()
70- .map (Upload .Output ::getTo )
71- .toList ()
72- )
73- .build ();
67+ .files (outputs .stream ()
68+ .map (Upload .Output ::getTo )
69+ .toList ()
70+ )
71+ .build ();
72+ }
73+ }
74+
75+ private String [] parseFromProperty (RunContext runContext ) throws Exception {
76+ if (this .from instanceof String from ) {
77+ var rFrom = runContext .render (from ).trim ();
78+
79+ if (rFrom .startsWith ("[" ) && rFrom .endsWith ("]" )) {
80+ return JacksonMapper .ofJson ().readValue (rFrom , String [].class );
81+ }
7482 }
83+
84+ return Objects .requireNonNull (Data .from (this .from )
85+ .readAs (runContext , String .class , Object ::toString )
86+ .map (throwFunction (runContext ::render ))
87+ .collectList ()
88+ .block ())
89+ .toArray (String []::new );
7590 }
7691
7792 @ Builder
0 commit comments