Skip to content

feat: incremental @dlt.transformation #2716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: devel
Choose a base branch
from

Conversation

zilto
Copy link
Collaborator

@zilto zilto commented Jun 5, 2025

Description

This is a work in progress.

Follows iterations: #2612 #2386

@zilto zilto self-assigned this Jun 5, 2025
@zilto zilto added the enhancement New feature or request label Jun 5, 2025
Copy link

netlify bot commented Jun 5, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 9abc3a5
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/6840eed8f9be8800097227d1

@rudolfix
Copy link
Collaborator

this is doing something that I expected we'll do as a next step - to enable Incremental for models. your code does a lot while remaining simple so maybe we should try this now. Here's my feedback (after +- 1 hour of thinking about it from scratch).

Incremental models:

I think our (or mine) approach is right now incorrect. Incremental is a property of the model, not on the input tables. So we need to implement incremental on top of the user query. we do not need incremental/non-incremental tables or relations.

this is what sqlmesh does: https://sqlmesh.readthedocs.io/en/stable/examples/incremental_time_full_walkthrough/#setup, the time_column must be in top select of every model

concept code:

# customers table has `_dlt_load_id` column
incremental = dlt.sources.incremental("_dlt_load_id")
# I assume that query() returns sqlglot query (this is concept code)
sqlglot_query = dataset.query("SELECT customers.*, SUM(orders.amount) FROM customers AS c JOIN orders AS o ON c.id = o.order_id GROUP BY ").query()
incremental_sqlglot_query = sqlglot_query.filter(incremental.to_sqlglot_filter())

here we just apply filter to the results of user's query to make it incremental. "_dlt_load_id` must be present in top level SELECT obviously and query optimizer (SQLglot has a good one) will push it down to the customers table.

Incremental instance can be represented as SQL filter and we have it done already for sql_database (SQLAlchemy ORM, not SqlGlot: https://github.com/dlt-hub/dlt/blob/devel/dlt/sources/sql_database/helpers.py#L109 (you already have something like that in your code)

so it seems we do not need to extend dataset/relation etc. for that to work. NOTE:

  • I'm not talking about eager transformations). A syntax similar to yours (apply incremental to relation) may be necessary

Updating incremental state

Your code is very close to what we need, but we have a dedicated place for that: we need to implement incremental for the SqlModel:

this is what we have now (in Incremental class)

def _get_transformer(self, items: TDataItems) -> IncrementalTransform:
        # Assume list is all of the same type
        for item in items if isinstance(items, list) else [items]:
            if is_arrow_item(item):
                return self._make_or_get_transformer(ArrowIncremental)
            elif pandas is not None and isinstance(item, pandas.DataFrame):
                return self._make_or_get_transformer(ArrowIncremental)
            return self._make_or_get_transformer(JsonIncremental)
        return self._make_or_get_transformer(JsonIncremental)

So we implement ModelIncremental

and in there (call method)

# rewrite the query, SqlModel should carry the original relation (currently it is pure text, easy to change)
incremental_sqlglot_query = item.relation.filter(self.to_sqlglot_filter())
# this automatically updates the state
if self.end_value:
   self.last_value = self.end_value
esle:
   # may be pretty expensive
   self.last_value = incremental_sqlglot_query.select(incremental.cursor).max()  # or min: depends on `last_value_func` in the incremental
item.update(incremental_sqlglot_query)
return item

sqlmesh is always passing a time range to the incremental models so they do not need to compute max (corresponds to setting explicit end_value on Incremental)
also (it seems) all the incremental models have the same semantics of their time_column and when they are executed share the same time range (not 100% sure but https://github.com/TobikoData/sqlmesh-examples/blob/main/001_sushi/2_moderate/models/order_items.sql is like that). maybe this corresponds to transformations in dlt source to share the same incremental? (ie _dlt_load_id)

Incremental on _dlt_load_id:

I think I wrote how to do that without any race conditions etc. If we want to use Icremental class for it, we'll need to ie. subclass it to be able to generate custom query

  • incremental should work on inserted_at, not on load_id
  • the end_value should be now() ie. taken from active load package (we have a timestamp of package start). we do not need to do max() anywhere
  • the filter is a little more complicated becaue you need to select from _dlt_loads_table to get a list of load ids from a timestmap range

Coming back to incremental on the particular table or subquery

I need to think about it. IMO "SqlMesh" way is pretty limiting and be able to apply incremental on any subquery within the model and still be able to track the ranges would be prett cool

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants