|
| 1 | +# Custom Incremental Dynamic Tables (Private Preview) |
| 2 | + |
| 3 | +Custom incremental DTs let you define refresh logic using **imperative DML** (MERGE or INSERT INTO) instead of a declarative SELECT. This unlocks patterns that standard DTs can't express efficiently. |
| 4 | + |
| 5 | +**When to use:** Standard DTs should always be your first choice. Use custom incremental only when: |
| 6 | +- You need **stream-static joins** (fact stream + dimension snapshot) |
| 7 | +- You need **append-only pipelines** (only process inserts, ignore updates/deletes) |
| 8 | +- You need **user-defined semantics** (audit deletes, soft-delete, running aggregates) |
| 9 | + |
| 10 | +## Syntax |
| 11 | + |
| 12 | +```sql |
| 13 | +CREATE OR REPLACE DYNAMIC TABLE my_dt ( |
| 14 | + col1 TYPE, col2 TYPE -- explicit columns required |
| 15 | +) |
| 16 | + TARGET_LAG = '5 minutes' |
| 17 | + WAREHOUSE = my_wh |
| 18 | + REFRESH_MODE = CUSTOM_INCREMENTAL |
| 19 | + [ BACKFILL FROM existing_table ] |
| 20 | + REFRESH USING ( |
| 21 | + -- MERGE INTO SELF or INSERT INTO SELF |
| 22 | + ); |
| 23 | +``` |
| 24 | + |
| 25 | +Key concepts: |
| 26 | +- `SELF` references the DT being created (you cannot use the DT's name) |
| 27 | +- `CHANGES(INFORMATION => { DEFAULT | APPEND_ONLY })` consumes changes since last refresh |
| 28 | +- Tables outside `CHANGES()` are read as static snapshots at refresh time |
| 29 | +- Explicit column schema is required (no `AS SELECT` inference) |
| 30 | + |
| 31 | +## Pattern: Stream-Static Join (Append-Only) |
| 32 | + |
| 33 | +Enrich new events with current dimension data. Only new events are processed — dimension changes don't trigger reprocessing. |
| 34 | + |
| 35 | +```sql |
| 36 | +CREATE OR REPLACE DYNAMIC TABLE enriched_clicks ( |
| 37 | + click_id INT, user_id INT, page_title STRING, |
| 38 | + section STRING, click_ts TIMESTAMP |
| 39 | +) |
| 40 | + TARGET_LAG = DOWNSTREAM |
| 41 | + WAREHOUSE = my_wh |
| 42 | + REFRESH USING ( |
| 43 | + INSERT INTO SELF |
| 44 | + SELECT c.click_id, c.user_id, p.page_title, p.section, c.click_ts |
| 45 | + FROM clicks CHANGES(INFORMATION => APPEND_ONLY) AS c |
| 46 | + LEFT OUTER JOIN pages AS p ON c.page_id = p.page_id |
| 47 | + ); |
| 48 | +``` |
| 49 | + |
| 50 | +## Pattern: Stream-Static Join (MERGE with Updates/Deletes) |
| 51 | + |
| 52 | +When the fact table has updates and deletes, use MERGE with `ROW_NUMBER()` dedup: |
| 53 | + |
| 54 | +```sql |
| 55 | +CREATE OR REPLACE DYNAMIC TABLE enriched_inventory ( |
| 56 | + sku_id INT, product_name STRING, category STRING, |
| 57 | + warehouse_name STRING, region STRING, qty_on_hand INT |
| 58 | +) |
| 59 | + TARGET_LAG = DOWNSTREAM |
| 60 | + WAREHOUSE = my_wh |
| 61 | + REFRESH USING ( |
| 62 | + MERGE INTO SELF AS tgt |
| 63 | + USING ( |
| 64 | + SELECT sku_id, product_name, category, warehouse_name, region, |
| 65 | + qty_on_hand, action |
| 66 | + FROM ( |
| 67 | + SELECT s.sku_id, p.product_name, p.category, |
| 68 | + w.warehouse_name, w.region, s.qty_on_hand, |
| 69 | + s.METADATA$ACTION AS action, |
| 70 | + ROW_NUMBER() OVER ( |
| 71 | + PARTITION BY s.sku_id |
| 72 | + ORDER BY CASE s.METADATA$ACTION WHEN 'INSERT' THEN 0 ELSE 1 END |
| 73 | + ) AS rn |
| 74 | + FROM stock CHANGES(INFORMATION => DEFAULT) AS s |
| 75 | + LEFT OUTER JOIN products AS p ON s.product_id = p.product_id |
| 76 | + LEFT OUTER JOIN warehouses AS w ON s.warehouse_id = w.warehouse_id |
| 77 | + ) |
| 78 | + WHERE rn = 1 |
| 79 | + ) AS src |
| 80 | + ON tgt.sku_id = src.sku_id |
| 81 | + WHEN MATCHED AND src.action = 'DELETE' THEN DELETE |
| 82 | + WHEN MATCHED AND src.action = 'INSERT' THEN |
| 83 | + UPDATE SET tgt.product_name = src.product_name, |
| 84 | + tgt.category = src.category, |
| 85 | + tgt.warehouse_name = src.warehouse_name, |
| 86 | + tgt.region = src.region, |
| 87 | + tgt.qty_on_hand = src.qty_on_hand |
| 88 | + WHEN NOT MATCHED AND src.action = 'INSERT' THEN |
| 89 | + INSERT (sku_id, product_name, category, warehouse_name, region, qty_on_hand) |
| 90 | + VALUES (src.sku_id, src.product_name, src.category, src.warehouse_name, |
| 91 | + src.region, src.qty_on_hand) |
| 92 | + ); |
| 93 | +``` |
| 94 | + |
| 95 | +## Example: Stream-Static Join End-to-End |
| 96 | + |
| 97 | +A complete walkthrough showing how a stream-static join works in practice. Scenario: an IoT pipeline where sensor readings (high-volume, append-only) are enriched with device metadata (low-volume, rarely changes). |
| 98 | + |
| 99 | +```sql |
| 100 | +-- 1. Setup: fact table (append-only sensor readings) + dimension table (device registry) |
| 101 | +CREATE TABLE sensor_readings ( |
| 102 | + reading_id INT AUTOINCREMENT, |
| 103 | + device_id INT, |
| 104 | + temperature FLOAT, |
| 105 | + humidity FLOAT, |
| 106 | + reading_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP() |
| 107 | +); |
| 108 | +ALTER TABLE sensor_readings SET CHANGE_TRACKING = TRUE; |
| 109 | + |
| 110 | +CREATE TABLE devices ( |
| 111 | + device_id INT PRIMARY KEY, |
| 112 | + device_name STRING, |
| 113 | + location STRING, |
| 114 | + floor INT |
| 115 | +); |
| 116 | + |
| 117 | +-- 2. Custom incremental DT: enrich readings with device info |
| 118 | +-- - sensor_readings is the STREAM side (CHANGES => APPEND_ONLY) |
| 119 | +-- - devices is the STATIC side (read in full at each refresh, changes ignored) |
| 120 | +CREATE OR REPLACE DYNAMIC TABLE enriched_readings ( |
| 121 | + reading_id INT, |
| 122 | + device_id INT, |
| 123 | + device_name STRING, |
| 124 | + location STRING, |
| 125 | + floor INT, |
| 126 | + temperature FLOAT, |
| 127 | + humidity FLOAT, |
| 128 | + reading_ts TIMESTAMP |
| 129 | +) |
| 130 | + TARGET_LAG = '1 minute' |
| 131 | + WAREHOUSE = iot_wh |
| 132 | + REFRESH USING ( |
| 133 | + INSERT INTO SELF |
| 134 | + SELECT |
| 135 | + r.reading_id, r.device_id, |
| 136 | + d.device_name, d.location, d.floor, |
| 137 | + r.temperature, r.humidity, r.reading_ts |
| 138 | + FROM sensor_readings CHANGES(INFORMATION => APPEND_ONLY) AS r |
| 139 | + LEFT OUTER JOIN devices AS d ON r.device_id = d.device_id |
| 140 | + ); |
| 141 | +``` |
| 142 | + |
| 143 | +**What happens at each refresh:** |
| 144 | +1. `CHANGES(APPEND_ONLY)` returns only new sensor readings since last refresh |
| 145 | +2. Each new reading is joined to the **current** device metadata (static snapshot) |
| 146 | +3. Results are appended to the DT — previously enriched rows are never touched |
| 147 | +4. If a device name changes in `devices`, old readings keep the old name — only new readings pick up the update |
| 148 | + |
| 149 | +**Why this matters:** A standard DT would reprocess ALL readings whenever a device name changes (since it depends on `devices`). The custom incremental version only processes new readings, making it orders of magnitude cheaper for high-volume fact tables with slowly-changing dimensions. |
| 150 | + |
| 151 | +--- |
| 152 | + |
| 153 | +## Pattern: Audit Deletes Log |
| 154 | + |
| 155 | +Append-only log of every deletion from a source table: |
| 156 | + |
| 157 | +```sql |
| 158 | +CREATE OR REPLACE DYNAMIC TABLE deletions_log (id INT, name STRING, email STRING) |
| 159 | + TARGET_LAG = DOWNSTREAM |
| 160 | + WAREHOUSE = my_wh |
| 161 | + INITIALIZE = ON_SCHEDULE |
| 162 | + REFRESH USING ( |
| 163 | + INSERT INTO SELF |
| 164 | + SELECT * EXCLUDE (METADATA$ISUPDATE, METADATA$ACTION) |
| 165 | + FROM users CHANGES(INFORMATION => DEFAULT) |
| 166 | + WHERE NOT METADATA$ISUPDATE AND METADATA$ACTION = 'DELETE' |
| 167 | + ); |
| 168 | +``` |
| 169 | + |
| 170 | +## Limitations (PrPr) |
| 171 | + |
| 172 | +- No cloning or replication |
| 173 | +- No DCM/dbt integration yet |
| 174 | +- No data governance policies on custom incremental DTs |
| 175 | +- No CREATE OR ALTER — must use CREATE OR REPLACE |
| 176 | +- Correctness is the user's responsibility (not delayed-view semantics) |
0 commit comments