2727import io .kestra .plugin .scripts .runner .docker .Docker ;
2828import io .swagger .v3 .oas .annotations .media .Schema ;
2929import jakarta .validation .Valid ;
30+ import jakarta .validation .constraints .NotNull ;
3031import lombok .*;
3132import lombok .experimental .SuperBuilder ;
3233import org .apache .commons .io .FileUtils ;
34+ import org .apache .commons .lang3 .StringUtils ;
3335
3436import java .io .File ;
3537import java .io .IOException ;
3840import java .nio .file .Files ;
3941import java .nio .file .Path ;
4042import java .time .Instant ;
41- import java .util .ArrayList ;
42- import java .util .HashMap ;
43+ import java .util .*;
4344import java .util .List ;
44- import java .util .Map ;
45- import java .util .Optional ;
4645import java .util .concurrent .atomic .AtomicBoolean ;
4746
48- import jakarta .validation .constraints .NotNull ;
49- import org .apache .commons .lang3 .StringUtils ;
50-
5147@ SuperBuilder
5248@ ToString
5349@ EqualsAndHashCode
132128 title = "Install a custom dbt version and run `dbt deps` and `dbt build` commands. Note how you can also configure the memory limit for the Docker runner. This is useful when you see Zombie processes." ,
133129 full = true ,
134130 code = """
135- id: dbt_custom_dependencies
136- namespace: company.team
131+ id: dbt_custom_dependencies
132+ namespace: company.team
137133
138- inputs:
139- - id: dbt_version
140- type: STRING
141- defaults: "dbt-duckdb==1.6.0"
134+ inputs:
135+ - id: dbt_version
136+ type: STRING
137+ defaults: "dbt-duckdb==1.6.0"
142138
143- tasks:
144- - id: git
145- type: io.kestra.plugin.core.flow.WorkingDirectory
146139 tasks:
147- - id: clone_repository
148- type: io.kestra.plugin.git.Clone
149- url: https://github.com/kestra-io/dbt-example
150- branch: main
140+ - id: git
141+ type: io.kestra.plugin.core.flow.WorkingDirectory
142+ tasks:
143+ - id: clone_repository
144+ type: io.kestra.plugin.git.Clone
145+ url: https://github.com/kestra-io/dbt-example
146+ branch: main
151147
152- - id: dbt
153- type: io.kestra.plugin.dbt.cli.DbtCLI
154- taskRunner:
155- type: io.kestra.plugin.scripts.runner.docker.Docker
156- memory:
157- memory: 1GB
158- containerImage: python:3.11-slim
159- beforeCommands:
160- - pip install uv
161- - uv venv --quiet
162- - . .venv/bin/activate --quiet
163- - uv pip install --quiet {{ inputs.dbt_version }}
164- commands:
165- - dbt deps
166- - dbt build
167- profiles: |
168- my_dbt_project:
169- outputs:
170- dev:
171- type: duckdb
172- path: ":memory:"
173- fixed_retries: 1
174- threads: 16
175- timeout_seconds: 300
176- target: dev
177- """
148+ - id: dbt
149+ type: io.kestra.plugin.dbt.cli.DbtCLI
150+ taskRunner:
151+ type: io.kestra.plugin.scripts.runner.docker.Docker
152+ memory:
153+ memory: 1GB
154+ containerImage: python:3.11-slim
155+ beforeCommands:
156+ - pip install uv
157+ - uv venv --quiet
158+ - . .venv/bin/activate --quiet
159+ - uv pip install --quiet {{ inputs.dbt_version }}
160+ commands:
161+ - dbt deps
162+ - dbt build
163+ profiles: |
164+ my_dbt_project:
165+ outputs:
166+ dev:
167+ type: duckdb
168+ path: ":memory:"
169+ fixed_retries: 1
170+ threads: 16
171+ timeout_seconds: 300
172+ target: dev
173+ """
178174 ),
179175 @ Example (
180176 title = "Clone a [Git repository](https://github.com/kestra-io/dbt-example) and build dbt models. Note that, as the dbt project files are in a separate directory, you need to set the `projectDir` task property and use `--project-dir` in each dbt CLI command." ,
215211 prod:
216212 type: duckdb
217213 path: dbt2.duckdb
218- extensions:\s
214+ extensions:
219215 - parquet
220216 fixed_retries: 1
221217 threads: 16
@@ -369,25 +365,25 @@ public class DbtCLI extends AbstractExecScript implements RunnableTask<DbtCLI.Ou
369365 @ Schema (
370366 title = "Log format." ,
371367 description = """
372- The log format is JSON by default. The format will be applied after all commands like this --log-format <logFormat>.
373- The possible values are JSON, DEBUG, TEXT. You can set it to NONE to avoid adding this argument to your commands.
374- """
368+ The log format is JSON by default. The format will be applied after all commands like this --log-format <logFormat>.
369+ The possible values are JSON, DEBUG, TEXT. You can set it to NONE to avoid adding this argument to your commands.
370+ """
375371 )
376372 @ Builder .Default
377373 private Property <LogFormat > logFormat = Property .ofValue (LogFormat .JSON );
378374
379375 @ Schema (
380376 title = "dbt engine" ,
381377 description = """
382- Selects the default container image when no explicit image is provided.
383-
384- Image resolution priority:
385- - If `taskRunner.image` is set, that image is used.
386- - Otherwise, if `containerImage` is set on the task, it is used.
387- - Otherwise, the `engine` determines the default image:
388- - CORE → ghcr.io/kestra-io/dbt
389- - FUSION → ghcr.io/kestra-io/dbt-fusion
390- """
378+ Selects the default container image when no explicit image is provided.
379+
380+ Image resolution priority:
381+ - If `taskRunner.image` is set, that image is used.
382+ - Otherwise, if `containerImage` is set on the task, it is used.
383+ - Otherwise, the `engine` determines the default image:
384+ - CORE → ghcr.io/kestra-io/dbt
385+ - FUSION → ghcr.io/kestra-io/dbt-fusion
386+ """
391387 )
392388 @ Builder .Default
393389 private Property <Engine > engine = Property .ofValue (Engine .CORE );
@@ -419,11 +415,14 @@ protected DockerOptions injectDefaults(RunContext runContext, DockerOptions orig
419415
420416 @ Override
421417 public Output run (RunContext runContext ) throws Exception {
418+
419+ var logger = runContext .logger ();
420+
422421 KVStore storeManifestKvStore = null ;
423422 AtomicBoolean hasWarning = new AtomicBoolean (false );
424423
425- //Check/fail if a KV store exists with given namespace
426- if (this .getStoreManifest () != null ) {
424+ // Check/fail if a KV store exists with given namespace
425+ if (this .getStoreManifest () != null ) {
427426 storeManifestKvStore = runContext .namespaceKv (runContext .render (this .getStoreManifest ().getNamespace ()).as (String .class ).orElseThrow ());
428427 }
429428
@@ -434,27 +433,27 @@ public Output run(RunContext runContext) throws Exception {
434433 public void accept (String line , Boolean isStdErr , Instant instant ) {
435434 LogService .parse (runContext , line , hasWarning );
436435 }
436+
437437 @ Override
438438 public void accept (String line , Boolean isStdErr ) {
439439 LogService .parse (runContext , line , hasWarning );
440440 }
441441 });
442442
443- var renderedProjectDir = runContext .render (projectDir ).as (String .class );
444- Path projectWorkingDirectory = renderedProjectDir .map (s -> commandsWrapper .getWorkingDirectory ().resolve (s )).orElseGet (commandsWrapper ::getWorkingDirectory );
443+ var rProjectDir = runContext .render (projectDir ).as (String .class );
444+ Path projectWorkingDirectory = rProjectDir .map (s -> commandsWrapper .getWorkingDirectory ().resolve (s )).orElseGet (commandsWrapper ::getWorkingDirectory );
445445
446- //Load manifest from KV store
447- if (this .getLoadManifest () != null ) {
446+ // Load manifest from KV store
447+ if (this .getLoadManifest () != null ) {
448448 KVStore loadManifestKvStore = runContext .namespaceKv (runContext .render (this .getLoadManifest ().getNamespace ()).as (String .class ).orElseThrow ());
449449 fetchAndStoreManifestIfExists (runContext , loadManifestKvStore , projectWorkingDirectory );
450450 }
451451
452- //Create profiles.yml
453452 String profilesString = runContext .render (profiles ).as (String .class ).orElse (null );
454453 if (profilesString != null && !profilesString .isEmpty ()) {
455454 var profileFile = new File (commandsWrapper .getWorkingDirectory ().toString (), "profiles.yml" );
456455 if (profileFile .exists ()) {
457- runContext . logger () .info ("A 'profiles.yml' file already exist in the task working directory, it will be overridden." );
456+ logger .info ("A 'profiles.yml' file already exist in the task working directory, it will be overridden." );
458457 }
459458
460459 FileUtils .writeStringToFile (
@@ -464,14 +463,13 @@ public void accept(String line, Boolean isStdErr) {
464463 );
465464 }
466465
467- var renderedCommands = runContext .render (this .commands ).asList (String .class );
466+ var rCommands = runContext .render (this .commands ).asList (String .class );
468467
469- // check that if a command uses --project-dir, the projectDir must be set
470- if (renderedCommands .stream ().anyMatch (cmd -> cmd .contains ("--project-dir" )) && this .projectDir == null ) {
471- runContext .logger ().warn ("One of the dbt CLI commands uses the `--project-dir` flag, but the `projectDir` task property is not set. Make sure to set the `projectDir` property." );
468+ if (rCommands .stream ().anyMatch (cmd -> cmd .contains ("--project-dir" )) && this .projectDir == null ) {
469+ logger .warn ("One of the dbt CLI commands uses the `--project-dir` flag, but the `projectDir` task property is not set. Make sure to set the `projectDir` property." );
472470 }
473471
474- LogFormat renderedLogFormat = runContext .render (this .logFormat ).as (LogFormat .class ).orElseThrow ();
472+ LogFormat rLogFormat = runContext .render (this .logFormat ).as (LogFormat .class ).orElseThrow ();
475473
476474 ScriptOutput runResults ;
477475 try {
@@ -484,10 +482,10 @@ public void accept(String line, Boolean isStdErr) {
484482 .withBeforeCommands (this .beforeCommands )
485483 .withBeforeCommandsWithOptions (true )
486484 .withCommands (Property .ofValue (
487- renderedCommands .stream ()
485+ rCommands .stream ()
488486 .map (command -> {
489- if (command .startsWith ("dbt" ) && !LogFormat .NONE .equals (renderedLogFormat )) {
490- return command .concat (" --log-format " + renderedLogFormat .toString ().toLowerCase ());
487+ if (command .startsWith ("dbt" ) && !LogFormat .NONE .equals (rLogFormat )) {
488+ return command .concat (" --log-format " + rLogFormat .toString ().toLowerCase ());
491489 }
492490 return command ;
493491 })
@@ -510,7 +508,6 @@ public void accept(String line, Boolean isStdErr) {
510508 throw new RunnableTaskException (e .getMessage (), dbtOutput );
511509 }
512510
513- //Parse run results
514511 parseRunResults (runContext , projectWorkingDirectory , runResults , storeManifestKvStore );
515512
516513 return Output .builder ()
@@ -528,21 +525,20 @@ private void parseRunResults(RunContext runContext, Path projectWorkingDirectory
528525
529526 File manifestFile = projectWorkingDirectory .resolve ("target/manifest.json" ).toFile ();
530527 if (manifestFile .exists ()) {
531- if (this .getStoreManifest () != null ) {
528+ if (this .getStoreManifest () != null ) {
532529 final String key = runContext .render (this .getStoreManifest ().getKey ()).as (String .class ).orElseThrow ();
533530 storeManifestKvStore .put (key , new KVValueAndMetadata (null , JacksonMapper .toObject (Files .readString (manifestFile .toPath ()))));
534531 }
535532
536533 URI manifest = ResultParser .parseManifest (runContext , manifestFile );
537534 run .getOutputFiles ().put ("manifest.json" , manifest );
538-
539535 }
540536 }
541537
542538 private void fetchAndStoreManifestIfExists (RunContext runContext , KVStore loadManifestKvStore , Path projectWorkingDirectory ) throws IOException , ResourceExpiredException , IllegalVariableEvaluationException {
543539 Optional <KVValue > manifestValue = loadManifestKvStore .getValue (runContext .render (this .getLoadManifest ().getKey ()).as (String .class ).get ());
544540
545- if (manifestValue .isEmpty () || manifestValue .get ().value () == null || StringUtils .isBlank (manifestValue .get ().value ().toString ())) {
541+ if (manifestValue .isEmpty () || manifestValue .get ().value () == null || StringUtils .isBlank (manifestValue .get ().value ().toString ())) {
546542 runContext .logger ().warn ("Property `loadManifest` has been used but no manifest has been found in the KV Store." );
547543 return ;
548544 }
0 commit comments