Skip to content

Commit ef6dbf9

Browse files
author
François Delbrayelle
committed
fix: code review feedback
1 parent 86e8df8 commit ef6dbf9

File tree

2 files changed

+136
-4
lines changed

2 files changed

+136
-4
lines changed

src/main/java/io/kestra/plugin/git/TenantSync.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -757,15 +757,20 @@ private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, Run
757757
io.kestra.core.models.flows.Flow parsed;
758758
try {
759759
parsed = YamlParser.parse(yaml, io.kestra.core.models.flows.Flow.class);
760+
761+
if (!namespace.equals(parsed.getNamespace())) {
762+
continue;
763+
}
764+
765+
var flowValidated = kestraClient.flows().validateFlows(runContext.flowInfo().tenantId(), yaml).getFirst();
766+
if (flowValidated.getConstraints() != null) {
767+
throw new FlowProcessingException(flowValidated.getConstraints());
768+
}
760769
} catch (Exception e) {
761770
handleInvalid(runContext, rOnInvalidSyntax, "FLOW from entry " + entryName, e);
762771
continue;
763772
}
764773

765-
if (!namespace.equals(parsed.getNamespace())) {
766-
continue;
767-
}
768-
769774
flows.add(FlowWithSource.of(parsed, yaml));
770775
}
771776
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package io.kestra.plugin.git;
2+
3+
import com.sun.net.httpserver.HttpServer;
4+
import io.kestra.core.exceptions.KestraRuntimeException;
5+
import io.kestra.core.junit.annotations.KestraTest;
6+
import io.kestra.core.runners.RunContextFactory;
7+
import io.kestra.core.tenant.TenantService;
8+
import io.kestra.sdk.KestraClient;
9+
import jakarta.inject.Inject;
10+
import org.junit.jupiter.api.Test;
11+
12+
import java.io.ByteArrayOutputStream;
13+
import java.lang.reflect.InvocationTargetException;
14+
import java.lang.reflect.Method;
15+
import java.net.InetSocketAddress;
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.Map;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.zip.ZipEntry;
20+
import java.util.zip.ZipOutputStream;
21+
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.Matchers.containsString;
24+
import static org.hamcrest.Matchers.instanceOf;
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertThrows;
27+
28+
@KestraTest
29+
class TenantSyncTest {
30+
private static final String TENANT_ID = TenantService.MAIN_TENANT;
31+
private static final String NAMESPACE = "my.namespace";
32+
33+
@Inject
34+
private RunContextFactory runContextFactory;
35+
36+
@Test
37+
void shouldFailOnInvalidTaskDefinitionFromKestraExport() throws Exception {
38+
var yaml = """
39+
id: exported-flow
40+
namespace: my.namespace
41+
42+
tasks:
43+
- id: log
44+
type: io.kestra.core.tasks.log.Log
45+
message: hello
46+
""";
47+
var exportedZip = zippedYaml("my.namespace/exported-flow.yaml", yaml);
48+
var validateCalls = new AtomicInteger();
49+
50+
var server = HttpServer.create(new InetSocketAddress(0), 0);
51+
server.createContext("/api/v1/" + TENANT_ID + "/flows/export/by-query", exchange -> {
52+
exchange.getRequestBody().readAllBytes();
53+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
54+
exchange.sendResponseHeaders(200, exportedZip.length);
55+
exchange.getResponseBody().write(exportedZip);
56+
exchange.close();
57+
});
58+
server.createContext("/api/v1/" + TENANT_ID + "/flows/validate", exchange -> {
59+
validateCalls.incrementAndGet();
60+
exchange.getRequestBody().readAllBytes();
61+
62+
var body = """
63+
[
64+
{
65+
"index": 0,
66+
"constraints": "invalid task definition"
67+
}
68+
]
69+
""";
70+
var payload = body.getBytes(StandardCharsets.UTF_8);
71+
exchange.getResponseHeaders().add("Content-Type", "application/json");
72+
exchange.sendResponseHeaders(200, payload.length);
73+
exchange.getResponseBody().write(payload);
74+
exchange.close();
75+
});
76+
server.start();
77+
78+
try {
79+
var task = TenantSync.builder().build();
80+
var method = fetchFlowsMethod();
81+
var runContext = runContextFactory.of(Map.of(
82+
"flow", Map.of(
83+
"tenantId", TENANT_ID,
84+
"namespace", NAMESPACE,
85+
"id", "tenant-sync-test"
86+
)
87+
));
88+
var kestraClient = KestraClient.builder()
89+
.url("http://localhost:" + server.getAddress().getPort())
90+
.basicAuth("user", "pass")
91+
.build();
92+
93+
var exception = assertThrows(
94+
InvocationTargetException.class,
95+
() -> method.invoke(task, kestraClient, runContext, NAMESPACE, TenantSync.OnInvalidSyntax.FAIL)
96+
);
97+
assertThat(exception.getCause(), instanceOf(KestraRuntimeException.class));
98+
assertThat(exception.getCause().getMessage(), containsString("FLOW from entry my.namespace/exported-flow.yaml"));
99+
assertThat(exception.getCause().getMessage(), containsString("invalid task definition"));
100+
assertEquals(1, validateCalls.get());
101+
} finally {
102+
server.stop(0);
103+
}
104+
}
105+
106+
private static byte[] zippedYaml(String entryName, String yaml) throws Exception {
107+
var output = new ByteArrayOutputStream();
108+
try (var zip = new ZipOutputStream(output)) {
109+
zip.putNextEntry(new ZipEntry(entryName));
110+
zip.write(yaml.getBytes(StandardCharsets.UTF_8));
111+
zip.closeEntry();
112+
}
113+
return output.toByteArray();
114+
}
115+
116+
private static Method fetchFlowsMethod() throws Exception {
117+
var method = TenantSync.class.getDeclaredMethod(
118+
"fetchFlowsFromKestra",
119+
KestraClient.class,
120+
io.kestra.core.runners.RunContext.class,
121+
String.class,
122+
TenantSync.OnInvalidSyntax.class
123+
);
124+
method.setAccessible(true);
125+
return method;
126+
}
127+
}

0 commit comments

Comments
 (0)