Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/main/java/io/kestra/plugin/git/NamespaceSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public Output run(RunContext runContext) throws Exception {
}

// Fetch Kestra state limited to target namespace only
KestraState kestraState = loadKestraState(runContext, rNamespace);
KestraState kestraState = loadKestraState(runContext, rNamespace, rOnInvalidSyntax);
Map<String, byte[]> namespaceFiles = listNamespaceFiles(runContext, rNamespace);

List<DiffLine> diffs = new ArrayList<>();
Expand Down Expand Up @@ -323,7 +323,7 @@ private void planFlows(RunContext rc, Path baseDir, GitTree gitTree, KestraState
throw new FlowProcessingException(flowValidated.getConstraints());
}
fs.importFlow(tenant, gitNode.rawYaml, false);
} catch (FlowProcessingException e) {
} catch (Exception e) {
handleInvalid(rc, rInvalid, "FLOW " + key, e);
}
});
Expand Down Expand Up @@ -383,7 +383,7 @@ private void planFlows(RunContext rc, Path baseDir, GitTree gitTree, KestraState
}

fs.importFlow(tenant, gitNode.rawYaml, false);
} catch (FlowProcessingException e) {
} catch (Exception e) {
handleInvalid(rc, rInvalid, "FLOW " + key, e);
}
});
Expand Down Expand Up @@ -478,11 +478,19 @@ private static class GitTree {
private record KestraState(Map<String, FlowWithSource> flows) {
}

private KestraState loadKestraState(RunContext rc, String rootNamespace) {
private KestraState loadKestraState(RunContext rc, String rootNamespace, OnInvalidSyntax rOnInvalidSyntax) {
FlowService fs = flowService(rc);
String tenant = rc.flowInfo().tenantId();

Map<String, FlowWithSource> flowsWithSource = fs.findByNamespaceWithSource(tenant, rootNamespace).stream()
List<FlowWithSource> allFlows;
try {
allFlows = fs.findByNamespaceWithSource(tenant, rootNamespace);
} catch (Exception e) {
handleInvalid(rc, rOnInvalidSyntax, "flows for namespace " + rootNamespace, e);
allFlows = List.of();
}

Map<String, FlowWithSource> flowsWithSource = allFlows.stream()
.collect(Collectors.toMap(f -> key(f.getNamespace(), f.getId()), Function.identity(), (a, b) -> a));

return new KestraState(flowsWithSource);
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/io/kestra/plugin/git/TenantSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.YamlParser;
import io.kestra.plugin.git.services.GitService;
import io.kestra.sdk.KestraClient;
import io.kestra.sdk.api.FilesApi;
Expand All @@ -34,6 +35,8 @@
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.*;

Expand Down Expand Up @@ -242,7 +245,7 @@ public Output run(RunContext runContext) throws Exception {
}
}

List<FlowWithSource> kestraFlows = fetchFlowsFromKestra(kestraClient, runContext, namespace);
List<FlowWithSource> kestraFlows = fetchFlowsFromKestra(kestraClient, runContext, namespace, rOnInvalidSyntax);
Map<String, byte[]> kestraFiles = listNamespaceFiles(kestraClient, runContext, namespace);

planNamespace(
Expand Down Expand Up @@ -728,7 +731,7 @@ private void planDashboards(
}
}

private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, RunContext runContext, String namespace) {
private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, RunContext runContext, String namespace, OnInvalidSyntax rOnInvalidSyntax) {
try {
// Export all flows from Kestra for the given namespace (including sub-namespaces)
byte[] zippedFlows = kestraClient.flows().exportFlowsByQuery(
Expand All @@ -740,19 +743,31 @@ private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, Run

try (
var bais = new ByteArrayInputStream(zippedFlows);
var zis = new java.util.zip.ZipInputStream(bais)
var zis = new ZipInputStream(bais)
) {
java.util.zip.ZipEntry entry;
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (!entry.getName().endsWith(".yml") && !entry.getName().endsWith(".yaml")) {
continue;
}

String yaml = new String(zis.readAllBytes(), StandardCharsets.UTF_8);
var entryName = entry.getName();
var yaml = new String(zis.readAllBytes(), StandardCharsets.UTF_8);

io.kestra.core.models.flows.Flow parsed = io.kestra.core.serializers.YamlParser.parse(yaml, io.kestra.core.models.flows.Flow.class);
io.kestra.core.models.flows.Flow parsed;
try {
parsed = YamlParser.parse(yaml, io.kestra.core.models.flows.Flow.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only catch flow schema validations and not invalid task definitions e.g some property no longer avaiable or incorrect property name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix proposal: ef6dbf9

What do you think @Malaydewangan09 ?


if (!namespace.equals(parsed.getNamespace())) {
continue;
}

if (!namespace.equals(parsed.getNamespace())) {
var flowValidated = kestraClient.flows().validateFlows(runContext.flowInfo().tenantId(), yaml).getFirst();
if (flowValidated.getConstraints() != null) {
throw new FlowProcessingException(flowValidated.getConstraints());
}
} catch (Exception e) {
handleInvalid(runContext, rOnInvalidSyntax, "FLOW from entry " + entryName, e);
continue;
}

Expand All @@ -762,6 +777,9 @@ private List<FlowWithSource> fetchFlowsFromKestra(KestraClient kestraClient, Run

return flows;

} catch (KestraRuntimeException e) {
// Re-throw KestraRuntimeException from handleInvalid(FAIL) without wrapping
throw e;
} catch (Exception e) {
throw new KestraRuntimeException("Failed to export flows from Kestra for namespace " + namespace, e);
}
Expand Down
127 changes: 127 additions & 0 deletions src/test/java/io/kestra/plugin/git/TenantSyncTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.kestra.plugin.git;

import com.sun.net.httpserver.HttpServer;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.tenant.TenantService;
import io.kestra.sdk.KestraClient;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

@KestraTest
class TenantSyncTest {
private static final String TENANT_ID = TenantService.MAIN_TENANT;
private static final String NAMESPACE = "my.namespace";

@Inject
private RunContextFactory runContextFactory;

@Test
void shouldFailOnInvalidTaskDefinitionFromKestraExport() throws Exception {
var yaml = """
id: exported-flow
namespace: my.namespace

tasks:
- id: log
type: io.kestra.core.tasks.log.Log
message: hello
""";
var exportedZip = zippedYaml("my.namespace/exported-flow.yaml", yaml);
var validateCalls = new AtomicInteger();

var server = HttpServer.create(new InetSocketAddress(0), 0);
server.createContext("/api/v1/" + TENANT_ID + "/flows/export/by-query", exchange -> {
exchange.getRequestBody().readAllBytes();
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(200, exportedZip.length);
exchange.getResponseBody().write(exportedZip);
exchange.close();
});
server.createContext("/api/v1/" + TENANT_ID + "/flows/validate", exchange -> {
validateCalls.incrementAndGet();
exchange.getRequestBody().readAllBytes();

var body = """
[
{
"index": 0,
"constraints": "invalid task definition"
}
]
""";
var payload = body.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(200, payload.length);
exchange.getResponseBody().write(payload);
exchange.close();
});
server.start();

try {
var task = TenantSync.builder().build();
var method = fetchFlowsMethod();
var runContext = runContextFactory.of(Map.of(
"flow", Map.of(
"tenantId", TENANT_ID,
"namespace", NAMESPACE,
"id", "tenant-sync-test"
)
));
var kestraClient = KestraClient.builder()
.url("http://localhost:" + server.getAddress().getPort())
.basicAuth("user", "pass")
.build();

var exception = assertThrows(
InvocationTargetException.class,
() -> method.invoke(task, kestraClient, runContext, NAMESPACE, TenantSync.OnInvalidSyntax.FAIL)
);
assertThat(exception.getCause(), instanceOf(KestraRuntimeException.class));
assertThat(exception.getCause().getMessage(), containsString("FLOW from entry my.namespace/exported-flow.yaml"));
assertThat(exception.getCause().getMessage(), containsString("invalid task definition"));
assertEquals(1, validateCalls.get());
} finally {
server.stop(0);
}
}

private static byte[] zippedYaml(String entryName, String yaml) throws Exception {
var output = new ByteArrayOutputStream();
try (var zip = new ZipOutputStream(output)) {
zip.putNextEntry(new ZipEntry(entryName));
zip.write(yaml.getBytes(StandardCharsets.UTF_8));
zip.closeEntry();
}
return output.toByteArray();
}

private static Method fetchFlowsMethod() throws Exception {
var method = TenantSync.class.getDeclaredMethod(
"fetchFlowsFromKestra",
KestraClient.class,
io.kestra.core.runners.RunContext.class,
String.class,
TenantSync.OnInvalidSyntax.class
);
method.setAccessible(true);
return method;
}
}
Loading