-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtable.sql
More file actions
198 lines (170 loc) · 7.51 KB
/
table.sql
File metadata and controls
198 lines (170 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
{# ============================================================
table.sql — Full-refresh materialization for SCOPE Delta tables
File-based processing: Discovers all source files, processes
them in batches of max_files_per_trigger, and writes to Delta.
The first batch DELETEs existing data; subsequent batches INSERT
only (same batching loop as incremental.sql).
============================================================ #}
{% materialization table, adapter='scope' %}
{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(
database=database, schema=schema, identifier=identifier, type='table'
) -%}
{# -- Pull config values -- #}
{%- set delta_location = config.get('delta_location', '') -%}
{%- set source_roots = config.get('source_roots', []) -%}
{%- set source_patterns = config.get('source_patterns', ['.*\\.ss$']) -%}
{%- set max_files_per_trigger = config.get('max_files_per_trigger', 50) | int -%}
{%- set safety_buffer_seconds = config.get('safety_buffer_seconds', 30) | int -%}
{%- set source_compaction_interval = config.get('source_compaction_interval', 10) | int -%}
{%- set source_retention_files = config.get('source_retention_files', 100) | int -%}
{%- set starting_timestamp = config.get('starting_timestamp', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}
{%- set scope_settings = config.get('scope_settings', {}) -%}
{%- set scope_columns = config.get('scope_columns', []) -%}
{%- set feature_previews = config.get('scope_feature_previews', 'EnableDeltaTableDynamicInsert:on') -%}
{# -- Delete checkpoint for full refresh -- #}
{% do adapter.delete_checkpoint(delta_location) %}
{# -- Batching loop: discover → submit → checkpoint → repeat -- #}
{# NOTE: file_batch MUST live in the namespace — see incremental.sql for details. #}
{%- set ns = namespace(
batch_num=0,
total_files=0,
file_batch=adapter.discover_files(
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds, starting_timestamp
)
) -%}
{%- if ns.file_batch | length == 0 -%}
{{ log("SCOPE: No files found for full-refresh of " ~ identifier, info=True) }}
{%- call statement('main') -%}
-- no-op: no source files found
{%- endcall -%}
{%- else -%}
{%- for _ in range(1000) -%}
{%- if ns.file_batch | length == 0 -%}
{# Break out of loop #}
{%- else -%}
{%- set ns.batch_num = ns.batch_num + 1 -%}
{%- set ns.total_files = ns.total_files + ns.file_batch | length -%}
{%- set scope_script = scope__build_file_based_script(
identifier,
delta_location,
partition_by,
scope_settings,
scope_columns,
feature_previews,
sql,
ns.file_batch,
is_full_refresh=(ns.batch_num == 1)
) -%}
{{ log("SCOPE: full-refresh " ~ identifier ~ " batch " ~ ns.batch_num ~ " (" ~ ns.file_batch | length ~ " files)", info=True) }}
{%- set job_suffix = "full-refresh_batch" ~ ns.batch_num ~ "_" ~ ns.file_batch | length ~ "files" -%}
{% do adapter.set_next_job_name(identifier ~ "_" ~ job_suffix) %}
{%- call statement('main') -%}
{{ scope_script }}
{%- endcall -%}
{% do adapter.update_checkpoint(delta_location, source_roots, source_patterns, ns.file_batch, source_compaction_interval, source_retention_files) %}
{# -- Discover next batch (watermark advanced) -- #}
{%- set ns.file_batch = adapter.discover_files(
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds, starting_timestamp
) -%}
{%- endif -%}
{%- endfor -%}
{{ log("SCOPE: " ~ identifier ~ " full-refresh complete — " ~ ns.batch_num ~ " batches, " ~ ns.total_files ~ " files total", info=True) }}
{%- endif -%}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
{# ============================================================
Macro: build a file-based SCOPE script
============================================================ #}
{% macro scope__build_file_based_script(
table_name,
delta_location,
partition_by,
scope_settings,
scope_columns,
feature_previews,
model_sql,
source_files,
is_full_refresh=false,
is_incremental=false
) %}
{# -- Normalize partition_by to a list -- #}
{%- set partition_cols = partition_by if partition_by is iterable and partition_by is not string else ([partition_by] if partition_by else []) -%}
{# -- Header -- #}
// ============================================================
// Generated by dbt-scope adapter
// Model: {{ table_name }}
// Strategy: {{ 'full-refresh' if is_full_refresh else 'incremental' }} ({{ source_files | length }} files)
// ============================================================
SET @@FeaturePreviews = "{{ feature_previews }}";
{% if is_incremental %}
SET @@DeltaLakeCommitCondition = "FailIfPartitionConflict";
{% endif %}
#DECLARE @deltaPath string = "{{ delta_location }}";
{# -- CREATE TABLE IF NOT EXISTS -- #}
CREATE TABLE IF NOT EXISTS @target (
{%- for col in scope_columns %}
{{ col.name }} {{ col.type }}{{ "," if not loop.last }}
{%- endfor %}
)
{%- if partition_cols %}
PARTITIONED BY ({{ partition_cols | join(', ') }})
{%- endif %}
LOCATION @deltaPath
OPTIONS (LAYOUT = DELTA);
{# -- ALTER TABLE SET TBLPROPERTIES (declarative) -- #}
{%- if scope_settings %}
ALTER TABLE @target SET TBLPROPERTIES (
{%- for key, value in scope_settings.items() %}
"{{ key }}" = {{ scope__quote_property(value) }}{{ "," if not loop.last }}
{%- endfor %}
);
{%- endif %}
{# -- DELETE existing data for full-refresh idempotency -- #}
{%- if is_full_refresh %}
DECLARE TABLE @target_rw
LOCATION @deltaPath
OPTIONS (LAYOUT = DELTA);
DELETE FROM @target_rw WHERE true;
{%- endif %}
{# -- EXTRACT from explicit file list (exclude extract=false, use FILE.* for virtual columns) -- #}
{%- set extract_columns = [] -%}
{%- for col in scope_columns -%}
{%- if col.get('extract', true) != false -%}
{%- do extract_columns.append(col) -%}
{%- endif -%}
{%- endfor -%}
{# Map of virtual column names to SCOPE FILE.* functions #}
{%- set virtual_map = {
'source_file_uri': 'FILE.URI()',
'source_file_length': 'FILE.LENGTH()',
'source_file_created': 'FILE.CREATED()',
'source_file_modified': 'FILE.MODIFIED()'
} -%}
@data =
EXTRACT
{%- for col in extract_columns %}
{%- if col.name in virtual_map %}
{{ col.name }} = {{ virtual_map[col.name] }}{{ "," if not loop.last }}
{%- else %}
{{ col.name }} : {{ col.type }}{{ "," if not loop.last }}
{%- endif %}
{%- endfor %}
FROM {{ source_files | map('tojson') | join(',\n ') }}
USING Extractors.SStream();
{# -- User's transformation + INSERT -- #}
@batch_data =
{{ model_sql }};
INSERT INTO @target
SELECT * FROM @batch_data;
{% endmacro %}
{# -- Helper: quote a TBLPROPERTIES value -- #}
{% macro scope__quote_property(value) %}
{%- if value is string -%}
"{{ value }}"
{%- else -%}
{{ value }}
{%- endif -%}
{% endmacro %}