Skip to content

feat(assets): create assets for dbt models#231

Draft
fdelbrayelle wants to merge 1 commit intomainfrom
kestra-ee/issues/6473
Draft

feat(assets): create assets for dbt models#231
fdelbrayelle wants to merge 1 commit intomainfrom
kestra-ee/issues/6473

Conversation

@fdelbrayelle
Copy link
Member

@fdelbrayelle fdelbrayelle commented Jan 23, 2026

closes https://github.com/kestra-io/kestra-ee/issues/6473

What changes are being made and why?

Upsert assets as tables by parsing the manifest file and handle lineages through the "depends on".


How the changes have been QAed?

Checked basic assets with this flow:

id: dbt_cli_assets_kv_test2
namespace: company.team

inputs:
  - id: kv_namespace
    type: STRING
    defaults: "{{ flow.namespace }}"
  - id: kv_key
    type: STRING
    defaults: "manifest.json"

tasks:
  - id: wd
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      # 1) Create a minimal dbt project in the working directory
      - id: create_project
        type: io.kestra.plugin.core.storage.LocalFiles
        inputs:
          dbt_project.yml: |
            name: 'my_dbt_project'
            version: '1.0.0'
            config-version: 2

            profile: 'my_dbt_project'

            model-paths: ["models"]
            analysis-paths: ["analyses"]
            test-paths: ["tests"]
            seed-paths: ["seeds"]
            macro-paths: ["macros"]
            snapshot-paths: ["snapshots"]

            target-path: "target"
            clean-targets:
              - "target"
              - "dbt_packages"

            models:
              my_dbt_project:
                +materialized: table
          packages.yml: |
            packages: []
          models/example.sql: |
            select 1 as id
          models/example_two.sql: |
            select 2 as id
          models/example_three.sql: |
            select 3 as id

      # 2) Run #1: dbt deps + dbt build (generates manifest + run_results)
      #    and store the manifest in the KV store
      - id: dbt_build_and_store_manifest
        type: io.kestra.plugin.dbt.cli.DbtCLI
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
          delete: true
        containerImage: ghcr.io/kestra-io/dbt-duckdb:latest
        assets:
          enableAuto: true

        # IMPORTANT: this test creates the manifest, parses it (with assets), and pushes it to KV
        storeManifest:
          namespace: "{{ inputs.kv_namespace }}"
          key: "{{ inputs.kv_key }}"

        commands:
          - dbt deps
          - dbt build --no-partial-parse
          - ls -lah
          - ls -lah target || true
          - find . -maxdepth 3 -type f -name "manifest.json" -o -name "run_results.json"
          - head -n 5 target/manifest.json || true

        profiles: |
          my_dbt_project:
            outputs:
              dev:
                type: duckdb
                path: ":memory:"
                threads: 4
            target: dev

      # 3) Run #2: simulate a "defer/state" run (or just a run) by reloading
      #    the manifest from KV before running dbt again
      - id: dbt_build_with_loaded_manifest
        type: io.kestra.plugin.dbt.cli.DbtCLI
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
          delete: true
        containerImage: ghcr.io/kestra-io/dbt-duckdb:latest
        assets:
          enableAuto: true

        loadManifest:
          namespace: "{{ inputs.kv_namespace }}"
          key: "{{ inputs.kv_key }}"

        # (optional) shows it works with flags too; adapt as needed
        commands:
          - dbt build --no-partial-parse

        profiles: |
          my_dbt_project:
            outputs:
              dev:
                type: duckdb
                path: ":memory:"
                threads: 4
            target: dev

outputs:
  - id: first_manifest_uri
    type: STRING
    value: "{{ outputs.dbt_build_and_store_manifest.outputFiles['manifest.json'] }}"
  - id: first_run_results_uri
    type: STRING
    value: "{{ outputs.dbt_build_and_store_manifest.outputFiles['run_results.json'] }}"
  - id: second_manifest_uri
    type: STRING
    value: "{{ outputs.dbt_build_with_loaded_manifest.outputFiles['manifest.json'] }}"
  - id: second_run_results_uri
    type: STRING
    value: "{{ outputs.dbt_build_with_loaded_manifest.outputFiles['run_results.json'] }}"

Results:

image

Which leads to 3 assets in the assets table of Kestra:

[
  {
    "key": "fde_memory.main.example_three",
    "value": {"id": "memory.main.example_three", "name": "example_three", "type": "io.kestra.plugin.ee.assets.Table", "schema": "main", "system": "duckdb", "created": "2026-02-02T15:49:00.753088Z", "deleted": false, "updated": "2026-02-02T15:49:00.873185Z", "database": "memory", "metadata": {"name": "example_three", "schema": "main", "system": "duckdb", "database": "memory"}, "tenantId": "fde"},
    "tenant_id": "fde",
    "namespace": null,
    "id": "memory.main.example_three",
    "type": "io.kestra.plugin.ee.assets.Table",
    "display_name": null,
    "created": "2026-02-02 15:49:00.753088 +00:00",
    "updated": "2026-02-02 15:49:00.873185 +00:00",
    "deleted": false,
    "fulltext": null
  },
  {
    "key": "fde_memory.main.example",
    "value": {"id": "memory.main.example", "name": "example", "type": "io.kestra.plugin.ee.assets.Table", "schema": "main", "system": "duckdb", "created": "2026-02-02T15:49:00.753096Z", "deleted": false, "updated": "2026-02-02T15:49:00.885007Z", "database": "memory", "metadata": {"name": "example", "schema": "main", "system": "duckdb", "database": "memory"}, "tenantId": "fde"},
    "tenant_id": "fde",
    "namespace": null,
    "id": "memory.main.example",
    "type": "io.kestra.plugin.ee.assets.Table",
    "display_name": null,
    "created": "2026-02-02 15:49:00.753096 +00:00",
    "updated": "2026-02-02 15:49:00.885007 +00:00",
    "deleted": false,
    "fulltext": null
  },
  {
    "key": "fde_memory.main.example_two",
    "value": {"id": "memory.main.example_two", "name": "example_two", "type": "io.kestra.plugin.ee.assets.Table", "schema": "main", "system": "duckdb", "created": "2026-02-02T15:49:00.753101Z", "deleted": false, "updated": "2026-02-02T15:49:00.895062Z", "database": "memory", "metadata": {"name": "example_two", "schema": "main", "system": "duckdb", "database": "memory"}, "tenantId": "fde"},
    "tenant_id": "fde",
    "namespace": null,
    "id": "memory.main.example_two",
    "type": "io.kestra.plugin.ee.assets.Table",
    "display_name": null,
    "created": "2026-02-02 15:49:00.753101 +00:00",
    "updated": "2026-02-02 15:49:00.895062 +00:00",
    "deleted": false,
    "fulltext": null
  }
]

Second example flow to check the lineage:

id: dbt_cli_assets_lineage_test
namespace: company.team

tasks:
  - id: wd
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: create_project
        type: io.kestra.plugin.core.storage.LocalFiles
        inputs:
          dbt_project.yml: |
            name: 'my_dbt_project'
            version: '1.0.0'
            config-version: 2
            profile: 'my_dbt_project'

            model-paths: ["models"]
            macro-paths: ["macros"]

            target-path: "target"
            clean-targets:
              - "target"
              - "dbt_packages"

            models:
              my_dbt_project:
                +materialized: table

          packages.yml: |
            packages: []

          models/example_base.sql: |
            select 1 as id

          models/example_two.sql: |
            select * from {% raw %}{{ ref('example_base') }}{% endraw %}

          models/example_three.sql: |
            select count(*) as cnt from {% raw %}{{ ref('example_two') }}{% endraw %}

      - id: dbt_build
        type: io.kestra.plugin.dbt.cli.DbtCLI
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
          delete: true
        containerImage: ghcr.io/kestra-io/dbt-duckdb:latest

        assets:
          enableAuto: true

        commands:
          - dbt deps
          - dbt build --no-partial-parse

        profiles: |
          my_dbt_project:
            outputs:
              dev:
                type: duckdb
                path: ":memory:"
                threads: 4
            target: dev

Results:

image

Which leads to 3 assets with lineage in the assets_lineage_events table of Kestra:

[
  {
    "key": "1PSncPPjkJEcpmTImYQanT",
    "value": {"uid": "1PSncPPjkJEcpmTImYQanT", "state": "SUCCESS", "flowId": "dbt_cli_assets_lineage_test", "inputs": [], "taskId": "dbt_build", "created": "2026-02-02T15:52:14.041424Z", "endDate": "2026-02-02T15:52:13.942231Z", "outputs": [{"id": "memory.main.example_base", "type": "io.kestra.plugin.ee.assets.Table", "tenantId": "fde"}, {"id": "memory.main.example_three", "type": "io.kestra.plugin.ee.assets.Table", "tenantId": "fde"}, {"id": "memory.main.example_two", "type": "io.kestra.plugin.ee.assets.Table", "tenantId": "fde"}], "tenantId": "fde", "namespace": "company.team", "startDate": "2026-02-02T15:52:02.957498Z", "taskRunId": "2cKEhiYR8dVdWtvHlKy3Rx", "executionId": "67XPdrJYENHYxOk2B3ljHW", "flowRevision": 2},
    "tenant_id": "fde",
    "namespace": "company.team",
    "flow_id": "dbt_cli_assets_lineage_test",
    "flow_revision": 2,
    "execution_id": "67XPdrJYENHYxOk2B3ljHW",
    "task_id": "dbt_build",
    "task_run_id": "2cKEhiYR8dVdWtvHlKy3Rx",
    "created": "2026-02-02 15:52:14.041424 +00:00"
  }
]

Setup Instructions


Contributor Checklist ✅

@fdelbrayelle fdelbrayelle marked this pull request as draft January 23, 2026 14:44
@github-project-automation github-project-automation bot moved this to To review in Pull Requests Jan 23, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Jan 23, 2026

🧪 Java Unit Tests

TestsPassed ✅Skipped ⚠️FailedTime ⏱
Java Tests Report20 ran19 ✅1 ⚠️0 ❌6m 30s 287ms

📦 Artifacts

Name Size Updated Expiration
jar 208.02 KB Feb 2, 26, 3:56:03 PM UTC Feb 9, 26, 3:56:03 PM UTC

🔁 Unreleased Commits

2 commits since v1.1.3

SHA Title Author Date
25baeba chore: improve contributor guidelines Malay Dewangan Jan 20, 26, 8:34:50 AM UTC
fadc2c8 chore: change labels on issue template Ludovic DEHON Jan 22, 26, 7:03:43 PM UTC

@fdelbrayelle fdelbrayelle force-pushed the kestra-ee/issues/6473 branch 4 times, most recently from f9d0793 to 0a4815e Compare January 29, 2026 09:56
@fdelbrayelle fdelbrayelle force-pushed the kestra-ee/issues/6473 branch from 0a4815e to 2d6ab07 Compare February 2, 2026 15:47
@fdelbrayelle fdelbrayelle marked this pull request as ready for review February 2, 2026 15:47
@fdelbrayelle fdelbrayelle self-assigned this Feb 2, 2026
@fdelbrayelle fdelbrayelle marked this pull request as draft February 2, 2026 16:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: To review

Development

Successfully merging this pull request may close these issues.

1 participant