Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.dbt.ResultParser;
import io.kestra.plugin.dbt.internals.PythonBasedPlugin;
import io.kestra.plugin.dbt.internals.PythonEnvironmentManager;
import io.kestra.plugin.dbt.internals.PythonEnvironmentManager.ResolvedPythonEnvironment;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
Expand All @@ -37,8 +40,7 @@
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {
private static final String DEFAULT_IMAGE = "ghcr.io/kestra-io/dbt";
public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface, PythonBasedPlugin {

@Builder.Default
@Schema(
Expand Down Expand Up @@ -138,19 +140,27 @@ public void setDockerOptions(Property<DockerOptions> dockerOptions) {

private Property<List<String>> outputFiles;

protected Property<List<String>> dependencies;

protected Property<String> pythonVersion;

protected Property<Boolean> dependencyCacheEnabled = Property.of(true);

protected abstract java.util.List<String> dbtCommands(RunContext runContext) throws IllegalVariableEvaluationException;

@Override
public ScriptOutput run(RunContext runContext) throws Exception {
var renderedOutputFiles = runContext.render(this.outputFiles).asList(String.class);
var renderedEnvMap = runContext.render(this.getEnv()).asMap(String.class, String.class);

RunnerType runnerType = runContext.render(this.getRunner()).as(RunnerType.class).orElse(null);

CommandsWrapper commandsWrapper = new CommandsWrapper(runContext)
.withEnv(renderedEnvMap.isEmpty() ? new HashMap<>() : renderedEnvMap)
.withNamespaceFiles(namespaceFiles)
.withInputFiles(inputFiles)
.withOutputFiles(renderedOutputFiles.isEmpty() ? null : renderedOutputFiles)
.withRunnerType(runContext.render(this.getRunner()).as(RunnerType.class).orElse(null))
.withRunnerType(runnerType)
.withDockerOptions(runContext.render(this.getDocker()).as(DockerOptions.class).orElse(null))
.withContainerImage(runContext.render(this.getContainerImage()).as(String.class).orElseThrow())
.withTaskRunner(this.taskRunner)
Expand Down Expand Up @@ -180,11 +190,17 @@ public void accept(String line, Boolean isStdErr) {
);
}

PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(runContext, this);
ResolvedPythonEnvironment pythonEnvironment = pythonEnvironmentManager.setup(containerImage, taskRunner, runnerType);

Map<String, String> env = new HashMap<>();
env.put("PYTHONUNBUFFERED", "true");
env.put("PIP_ROOT_USER_ACTION", "ignore");
if (pythonEnvironment.packages() != null) {
env.put("PYTHONPATH", pythonEnvironment.packages().path().toString());
};
ScriptOutput run = commandsWrapper
.addEnv(Map.of(
"PYTHONUNBUFFERED", "true",
"PIP_ROOT_USER_ACTION", "ignore"
))
.addEnv(env)
.withInterpreter(Property.of(List.of("/bin/sh", "-c")))
.withCommands(new Property<>(JacksonMapper.ofJson().writeValueAsString(
List.of(createDbtCommand(runContext)))
Expand All @@ -193,6 +209,11 @@ public void accept(String line, Boolean isStdErr) {

parseResults(runContext, workingDirectory, run);

// Cache upload
if (pythonEnvironmentManager.isCacheEnabled() && pythonEnvironment.packages() != null && !pythonEnvironment.cached()) {
pythonEnvironmentManager.uploadCache(runContext, pythonEnvironment.packages());
}

return run;
}

Expand All @@ -202,15 +223,15 @@ private String createDbtCommand(RunContext runContext) throws IllegalVariableEva
"--log-format json"
));

if (Boolean.TRUE.equals(runContext.render(this.debug).as(Boolean.class).orElse(false))) {
if (runContext.render(this.debug).as(Boolean.class).orElse(false)) {
commands.add("--debug");
}

if (Boolean.TRUE.equals(runContext.render(this.failFast).as(Boolean.class).orElse(false))) {
if (runContext.render(this.failFast).as(Boolean.class).orElse(false)) {
commands.add("--fail-fast");
}

if (Boolean.TRUE.equals(runContext.render(this.warnError).as(Boolean.class).orElse(false))) {
if (runContext.render(this.warnError).as(Boolean.class).orElse(false)) {
commands.add("--warn-error");
}

Expand Down
38 changes: 28 additions & 10 deletions src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.plugin.dbt.ResultParser;
import io.kestra.plugin.dbt.internals.PythonBasedPlugin;
import io.kestra.plugin.dbt.internals.PythonEnvironmentManager;
import io.kestra.plugin.dbt.internals.PythonEnvironmentManager.ResolvedPythonEnvironment;
import io.kestra.plugin.scripts.exec.AbstractExecScript;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
Expand All @@ -35,11 +37,11 @@
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -275,9 +277,8 @@
)
}
)
public class DbtCLI extends AbstractExecScript {
public class DbtCLI extends AbstractExecScript implements PythonBasedPlugin {
private static final ObjectMapper MAPPER = JacksonMapper.ofYaml();
private static final String DEFAULT_IMAGE = "ghcr.io/kestra-io/dbt";

@Schema(
title = "The list of dbt CLI commands to run."
Expand Down Expand Up @@ -346,6 +347,12 @@ public class DbtCLI extends AbstractExecScript {
@Builder.Default
private Property<LogFormat> logFormat = Property.of(LogFormat.JSON);

protected Property<List<String>> dependencies;

protected Property<String> pythonVersion;

protected Property<Boolean> dependencyCacheEnabled = Property.of(true);

@Override
protected DockerOptions injectDefaults(RunContext runContext, DockerOptions original) throws IllegalVariableEvaluationException {
if (original == null) {
Expand Down Expand Up @@ -418,13 +425,20 @@ public void accept(String line, Boolean isStdErr) {

LogFormat renderedLogFormat = runContext.render(this.logFormat).as(LogFormat.class).orElseThrow();

PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(runContext, this);
ResolvedPythonEnvironment pythonEnvironment = pythonEnvironmentManager.setup(containerImage, taskRunner, runner);

Map<String, String> env = new HashMap<>();
env.put("PYTHONUNBUFFERED", "true");
env.put("PIP_ROOT_USER_ACTION", "ignore");
if (pythonEnvironment.packages() != null) {
env.put("PYTHONPATH", pythonEnvironment.packages().path().toString());
};

ScriptOutput runResults = commandsWrapper
.addEnv(Map.of(
"PYTHONUNBUFFERED", "true",
"PIP_ROOT_USER_ACTION", "ignore"
))
.addEnv(env)
.withInterpreter(this.interpreter)
.withBeforeCommands(this.beforeCommands)
.withBeforeCommands(beforeCommands)
.withBeforeCommandsWithOptions(true)
.withCommands(Property.of(
renderedCommands.stream()
Expand All @@ -438,9 +452,13 @@ public void accept(String line, Boolean isStdErr) {
)
.run();

//Parse run results
// Parse run results
parseRunResults(runContext, projectWorkingDirectory, runResults, storeManifestKvStore);

// Cache upload
if (pythonEnvironmentManager.isCacheEnabled() && pythonEnvironment.packages() != null && !pythonEnvironment.cached()) {
pythonEnvironmentManager.uploadCache(runContext, pythonEnvironment.packages());
}
return runResults;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.kestra.plugin.dbt.internals;

import io.kestra.core.models.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;

import java.util.List;

/**
* Interface for Python-based plugin.
*/
public interface PythonBasedPlugin extends Plugin {

String DEFAULT_PYTHON_VERSION = "3.13";
String DEFAULT_IMAGE = "python:" + DEFAULT_PYTHON_VERSION + "-slim";

@Schema(
title = "The script dependencies."
)
@PluginProperty
Property<List<String>> getDependencies();

@Schema(
title = "The version of Python to use for the script.",
description = "If no version is explicitly specified, the task will attempt to extract the version from the configured container image. If it cannot determine the version from the image, the task will default to Python '"+ DEFAULT_PYTHON_VERSION +" '"
)
@PluginProperty
Property<String> getPythonVersion();

@Schema(
title = "Enable Python dependency caching",
description = "When enabled, Python dependencies will be cached across task executions. This locks dependency versions and speeds up subsequent runs by avoiding redundant installations."
)
@PluginProperty
Property<Boolean> getDependencyCacheEnabled();
}
Loading
Loading