99import io .kestra .core .models .property .Property ;
1010import io .kestra .core .models .tasks .RunnableTask ;
1111import io .kestra .core .runners .RunContext ;
12+ import io .kestra .core .serializers .YamlParser ;
1213import io .kestra .plugin .git .services .GitService ;
1314import io .kestra .sdk .KestraClient ;
1415import io .kestra .sdk .api .FilesApi ;
3435import java .nio .file .Path ;
3536import java .util .*;
3637import java .util .stream .Collectors ;
38+ import java .util .zip .ZipEntry ;
39+ import java .util .zip .ZipInputStream ;
3740
3841import static org .eclipse .jgit .transport .RemoteRefUpdate .Status .*;
3942
@@ -242,7 +245,7 @@ public Output run(RunContext runContext) throws Exception {
242245 }
243246 }
244247
245- List <FlowWithSource > kestraFlows = fetchFlowsFromKestra (kestraClient , runContext , namespace );
248+ List <FlowWithSource > kestraFlows = fetchFlowsFromKestra (kestraClient , runContext , namespace , rOnInvalidSyntax );
246249 Map <String , byte []> kestraFiles = listNamespaceFiles (kestraClient , runContext , namespace );
247250
248251 planNamespace (
@@ -728,7 +731,7 @@ private void planDashboards(
728731 }
729732 }
730733
731- private List <FlowWithSource > fetchFlowsFromKestra (KestraClient kestraClient , RunContext runContext , String namespace ) {
734+ private List <FlowWithSource > fetchFlowsFromKestra (KestraClient kestraClient , RunContext runContext , String namespace , OnInvalidSyntax rOnInvalidSyntax ) {
732735 try {
733736 // Export all flows from Kestra for the given namespace (including sub-namespaces)
734737 byte [] zippedFlows = kestraClient .flows ().exportFlowsByQuery (
@@ -740,17 +743,24 @@ private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, Run
740743
741744 try (
742745 var bais = new ByteArrayInputStream (zippedFlows );
743- var zis = new java . util . zip . ZipInputStream (bais )
746+ var zis = new ZipInputStream (bais )
744747 ) {
745- java . util . zip . ZipEntry entry ;
748+ ZipEntry entry ;
746749 while ((entry = zis .getNextEntry ()) != null ) {
747750 if (!entry .getName ().endsWith (".yml" ) && !entry .getName ().endsWith (".yaml" )) {
748751 continue ;
749752 }
750753
751- String yaml = new String (zis .readAllBytes (), StandardCharsets .UTF_8 );
754+ var entryName = entry .getName ();
755+ var yaml = new String (zis .readAllBytes (), StandardCharsets .UTF_8 );
752756
753- io .kestra .core .models .flows .Flow parsed = io .kestra .core .serializers .YamlParser .parse (yaml , io .kestra .core .models .flows .Flow .class );
757+ io .kestra .core .models .flows .Flow parsed ;
758+ try {
759+ parsed = YamlParser .parse (yaml , io .kestra .core .models .flows .Flow .class );
760+ } catch (Exception e ) {
761+ handleInvalid (runContext , rOnInvalidSyntax , "FLOW from entry " + entryName , e );
762+ continue ;
763+ }
754764
755765 if (!namespace .equals (parsed .getNamespace ())) {
756766 continue ;
@@ -762,6 +772,9 @@ private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, Run
762772
763773 return flows ;
764774
775+ } catch (KestraRuntimeException e ) {
776+ // Re-throw KestraRuntimeException from handleInvalid(FAIL) without wrapping
777+ throw e ;
765778 } catch (Exception e ) {
766779 throw new KestraRuntimeException ("Failed to export flows from Kestra for namespace " + namespace , e );
767780 }
0 commit comments