Skip to content

Commit 1da83a3

Browse files
move logic to the query itself
1 parent 0595a9d commit 1da83a3

File tree

2 files changed

+337
-335
lines changed

2 files changed

+337
-335
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,25 +1709,15 @@ def remove_managed_columns(
17091709
existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_(
17101710
target_table
17111711
)
1712+
1713+
cleanup_ts = None
17121714
if truncate:
17131715
existing_rows_query = existing_rows_query.limit(0)
17141716
else:
17151717
# If truncate is false it is not the first insert
17161718
# Determine the cleanup timestamp for restatement or a regular incremental run
17171719
cleanup_ts = to_time_column(start, time_data_type, self.dialect, nullable=True)
17181720

1719-
# Delete records that were created at or after cleanup point
1720-
self.delete_from(table_name=target_table, where=valid_from_col > cleanup_ts)
1721-
1722-
# "Re-open" records that were closed at or after cleanup point
1723-
self.update_table(
1724-
table_name=target_table,
1725-
properties={valid_to_col.name: exp.Null()},
1726-
where=exp.and_(
1727-
valid_to_col > cleanup_ts,
1728-
),
1729-
)
1730-
17311721
with source_queries[0] as source_query:
17321722
prefixed_columns_to_types = []
17331723
for column in columns_to_types:
@@ -1764,12 +1754,40 @@ def remove_managed_columns(
17641754
# Historical Records that Do Not Change
17651755
.with_(
17661756
"static",
1767-
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
1757+
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_())
1758+
if truncate
1759+
else existing_rows_query.where(
1760+
exp.and_(
1761+
valid_from_col <= cleanup_ts,
1762+
exp.and_(
1763+
valid_to_col.is_(exp.Null().not_()),
1764+
valid_to_col < cleanup_ts,
1765+
),
1766+
),
1767+
),
17681768
)
17691769
# Latest Records that can be updated
17701770
.with_(
17711771
"latest",
1772-
existing_rows_query.where(valid_to_col.is_(exp.Null())),
1772+
existing_rows_query.where(valid_to_col.is_(exp.Null()))
1773+
if truncate
1774+
else exp.select(
1775+
*(
1776+
exp.Null().as_(col) if col == valid_to_col.name else exp.column(col)
1777+
for col in columns_to_types
1778+
),
1779+
exp.true().as_("_exists"),
1780+
)
1781+
.from_(target_table)
1782+
.where(
1783+
exp.and_(
1784+
valid_from_col <= cleanup_ts,
1785+
exp.or_(
1786+
valid_to_col.is_(exp.Null()),
1787+
valid_to_col >= cleanup_ts,
1788+
),
1789+
)
1790+
),
17731791
)
17741792
# Deleted records which can be used to determine `valid_from` for undeleted source records
17751793
.with_(

0 commit comments

Comments
 (0)