Skip to content

Commit 5d4ab1c

Browse files
fix(syncflows): improve ignoreInvalidFlows setting (#230)
1 parent 785d122 commit 5d4ab1c

File tree

4 files changed

+149
-8
lines changed

4 files changed

+149
-8
lines changed

src/main/java/io/kestra/plugin/git/AbstractSyncTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ protected boolean mustKeep(RunContext runContext, T instanceResource) {
107107

108108
protected abstract void deleteResource(RunContext runContext, String renderedNamespace, T instanceResource) throws IOException;
109109

110-
protected abstract T simulateResourceWrite(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, FlowProcessingException;
110+
protected abstract T simulateResourceWrite(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, FlowProcessingException, IllegalVariableEvaluationException;
111111

112-
protected abstract T writeResource(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, URISyntaxException, FlowProcessingException;
112+
protected abstract T writeResource(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, URISyntaxException, FlowProcessingException, IllegalVariableEvaluationException;
113113

114114
protected abstract SyncResult wrapper(RunContext runContext, String renderedGitDirectory, String renderedNamespace, URI resourceUri, T resourceBeforeUpdate, T resourceAfterUpdate) throws IOException;
115115

src/main/java/io/kestra/plugin/git/SyncFlow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ public Property<String> getBranch() {
7979

8080
@Override
8181
public Output run(RunContext runContext) throws Exception {
82-
82+
8383
configureHttpTransport(runContext);
84-
84+
8585
// we add this method to configure ssl to allow self signed certs
8686
configureEnvironmentWithSsl(runContext);
8787

src/main/java/io/kestra/plugin/git/SyncFlows.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kestra.core.runners.DefaultRunContext;
1212
import io.kestra.core.runners.RunContext;
1313
import io.kestra.core.services.FlowService;
14+
import io.kestra.sdk.KestraClient;
1415
import io.swagger.v3.oas.annotations.media.Schema;
1516
import jakarta.validation.constraints.NotNull;
1617
import lombok.*;
@@ -195,12 +196,31 @@ protected void deleteResource(RunContext runContext, String renderedNamespace, F
195196
}
196197

197198
@Override
198-
protected Flow simulateResourceWrite(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, FlowProcessingException {
199+
protected Flow simulateResourceWrite(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, FlowProcessingException, IllegalVariableEvaluationException {
199200
if (inputStream == null) {
200201
return null;
201202
}
202203

203-
return flowService(runContext).importFlow(runContext.flowInfo().tenantId(), SyncFlows.replaceNamespace(renderedNamespace, uri, inputStream), true);
204+
String flowSource = SyncFlows.replaceNamespace(renderedNamespace, uri, inputStream);
205+
206+
var flowValidated = flowService.validate(runContext.flowInfo().tenantId(), flowSource).getFirst();
207+
208+
if (flowValidated.getConstraints() != null) {
209+
var ref = uri.getPath();
210+
211+
if (ref.startsWith("/")) {
212+
ref = ref.substring(1);
213+
}
214+
215+
if (runContext.render(this.ignoreInvalidFlows).as(Boolean.class).orElse(false)) {
216+
runContext.logger().warn("Ignoring invalid flow {}: {}", ref, flowValidated.getConstraints());
217+
return null;
218+
}
219+
220+
throw new FlowProcessingException("Invalid flow: " + ref + " : " + flowValidated.getConstraints());
221+
}
222+
223+
return flowService(runContext).importFlow(runContext.flowInfo().tenantId(), flowSource, true);
204224
}
205225

206226
@Override
@@ -217,14 +237,31 @@ protected Property<Boolean> traverseDirectories() {
217237
}
218238

219239
@Override
220-
protected Flow writeResource(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, FlowProcessingException {
240+
protected Flow writeResource(RunContext runContext, String renderedNamespace, URI uri, InputStream inputStream) throws IOException, FlowProcessingException, IllegalVariableEvaluationException {
221241
if (inputStream == null) {
222242
return null;
223243
}
224244

225245
String flowSource = SyncFlows.replaceNamespace(renderedNamespace, uri, inputStream);
226246

227-
return flowService(runContext).importFlow(runContext.flowInfo().tenantId(), flowSource);
247+
var flowValidated = flowService.validate(runContext.flowInfo().tenantId(), flowSource).getFirst();
248+
249+
if (flowValidated.getConstraints() != null) {
250+
var ref = uri.getPath();
251+
252+
if (ref.startsWith("/")) {
253+
ref = ref.substring(1);
254+
}
255+
256+
if (runContext.render(this.ignoreInvalidFlows).as(Boolean.class).orElse(false)) {
257+
runContext.logger().warn("Ignoring invalid flow {}: {}", ref, flowValidated.getConstraints());
258+
return null;
259+
}
260+
261+
throw new FlowProcessingException("Invalid flow: " + ref + " imported from Git: " + flowValidated.getConstraints());
262+
}
263+
264+
return flowService(runContext).importFlow(runContext.flowInfo().tenantId(), flowSource, false);
228265
}
229266

230267
private static String replaceNamespace(String renderedNamespace, URI uri, InputStream inputStream) throws IOException {
@@ -242,6 +279,10 @@ protected SyncResult wrapper(RunContext runContext, String renderedGitDirectory,
242279
return null;
243280
}
244281

282+
if (flowBeforeUpdate == null && flowAfterUpdate == null) {
283+
return null;
284+
}
285+
245286
SyncState syncState;
246287
if (resourceUri == null) {
247288
syncState = SyncState.DELETED;

src/test/java/io/kestra/plugin/git/SyncFlowsTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kestra.plugin.git;
22

33
import com.fasterxml.jackson.core.type.TypeReference;
4+
import io.kestra.core.exceptions.FlowProcessingException;
45
import io.kestra.core.models.flows.Flow;
56
import io.kestra.core.models.flows.FlowId;
67
import io.kestra.core.models.flows.FlowWithSource;
@@ -16,6 +17,7 @@
1617
import io.kestra.core.junit.annotations.KestraTest;
1718
import jakarta.inject.Inject;
1819
import org.apache.commons.io.IOUtils;
20+
import org.eclipse.jgit.api.Git;
1921
import org.junit.jupiter.api.BeforeEach;
2022
import org.junit.jupiter.api.Test;
2123

@@ -442,6 +444,104 @@ void shouldFailWhenGitDirectoryDoesNotExist() throws Exception {
442444
assertThat(exception.getMessage(), containsString("The directory 'nonexistent/path' was not found"));
443445
}
444446

447+
@Test
448+
void shouldIgnoreInvalidFlowFromGit() throws Exception {
449+
Path repoDir = Files.createTempDirectory("git-test");
450+
Git git = Git.init().setDirectory(repoDir.toFile()).call();
451+
452+
Path flowsDir = repoDir.resolve("_flows");
453+
Files.createDirectories(flowsDir);
454+
455+
Files.writeString(
456+
flowsDir.resolve("invalid-flow.yaml"),
457+
"""
458+
id: invalid-flow
459+
namespace: my.namespace
460+
461+
tasks:
462+
- id: bad
463+
type: unknown.type
464+
"""
465+
);
466+
467+
git.add().addFilepattern(".").call();
468+
git.commit().setMessage("Add invalid flow").call();
469+
470+
RunContext runContext = runContextFactory.of(Map.of(
471+
"flow", Map.of(
472+
"tenantId", TENANT_ID,
473+
"namespace", NAMESPACE,
474+
"id", FLOW_ID
475+
),
476+
"url", repoDir.toUri().toString(),
477+
"branch", "master",
478+
"gitDirectory", "_flows",
479+
"namespace", NAMESPACE
480+
));
481+
482+
SyncFlows task = SyncFlows.builder()
483+
.url(new Property<>("{{url}}"))
484+
.branch(new Property<>("{{branch}}"))
485+
.gitDirectory(new Property<>("{{gitDirectory}}"))
486+
.targetNamespace(new Property<>("{{namespace}}"))
487+
.ignoreInvalidFlows(Property.ofValue(true))
488+
.build();
489+
490+
SyncFlows.Output output = task.run(runContext);
491+
492+
List<Flow> flows = flowRepositoryInterface.findByNamespace(TENANT_ID, NAMESPACE);
493+
assertThat(flows.stream().map(Flow::getId).toList(), not(hasItem("invalid-flow")));
494+
495+
assertThat(runContext.storage().getFile(output.diffFileUri()).toString(), not(containsString("\"flowId\":\"invalid-flow\"")));
496+
}
497+
498+
@Test
499+
void shouldThrowOnInvalidFlowFromGit() throws Exception {
500+
Path repoDir = Files.createTempDirectory("git-test");
501+
Git git = Git.init().setDirectory(repoDir.toFile()).call();
502+
503+
Path flowsDir = repoDir.resolve("_flows");
504+
Files.createDirectories(flowsDir);
505+
506+
Files.writeString(
507+
flowsDir.resolve("demo-invalid-flow.yaml"),
508+
"""
509+
id: invalid-flow-demo
510+
namespace: my.namespace
511+
512+
tasks:
513+
- id: bad
514+
type: unknown.type
515+
"""
516+
);
517+
518+
git.add().addFilepattern(".").call();
519+
git.commit().setMessage("Add invalid flow").call();
520+
521+
RunContext runContext = runContextFactory.of(Map.of(
522+
"flow", Map.of(
523+
"tenantId", TENANT_ID,
524+
"namespace", NAMESPACE,
525+
"id", FLOW_ID
526+
),
527+
"url", repoDir.toUri().toString(),
528+
"branch", "master",
529+
"gitDirectory", "_flows",
530+
"namespace", NAMESPACE
531+
));
532+
533+
SyncFlows task = SyncFlows.builder()
534+
.url(new Property<>("{{url}}"))
535+
.branch(new Property<>("{{branch}}"))
536+
.gitDirectory(new Property<>("{{gitDirectory}}"))
537+
.targetNamespace(new Property<>("{{namespace}}"))
538+
.build();
539+
540+
FlowProcessingException ex = assertThrows(FlowProcessingException.class, () -> task.run(runContext));
541+
542+
assertThat(ex.getMessage(), containsString("demo-invalid-flow"));
543+
}
544+
445545
private List<Map<String, Object>> defaultCaseDiffs(boolean includeSubNamespaces, Map<String, Object>... additionalDiffs) {
446546
List<Map<String, Object>> diffs = new ArrayList<>(List.of(
447547
Map.of("gitPath", "to_clone/_flows/unchanged-flow.yaml", "syncState", "UNCHANGED", "flowId", "unchanged-flow", "namespace", NAMESPACE, "revision", previousRevisionByUid.getOrDefault(FlowId.uidWithoutRevision(TENANT_ID, NAMESPACE, "unchanged-flow"), 1)),

0 commit comments

Comments
 (0)