-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathsqlgen.go
More file actions
416 lines (379 loc) · 15.9 KB
/
sqlgen.go
File metadata and controls
416 lines (379 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
package main
import (
"fmt"
"math"
"regexp"
"slices"
"strconv"
"strings"
"text/template"
"time"
sql "github.com/estuary/connectors/materialize-sql"
log "github.com/sirupsen/logrus"
)
const (
clickhouseMinimumDate = "1900-01-01"
clickhouseMinimumDatetime = "1925-01-01T00:00:00Z"
)
// clickHouseClampDate is similar to sql.ClampDate but clamps to 1900-01-01 instead of year 0,
// since ClickHouse's Date32 type has a minimum value of 1900-01-01. It also returns a time.Time
// rather than a string, as required by the ClickHouse driver.
var clickHouseClampDate = sql.StringCastConverter(func(str string) (interface{}, error) {
parsed, err := time.Parse(time.DateOnly, str)
if err != nil {
return nil, fmt.Errorf("could not parse %q as date: %w", str, err)
}
if parsed.Year() < 1900 {
parsed, _ = time.Parse(time.DateOnly, clickhouseMinimumDate)
}
return parsed, nil
})
// clickHouseClampDatetime is similar to sql.ClampDatetime but clamps to 1925-01-01 instead of
// year 0, since ClickHouse's DateTime type has a minimum value of 1925-01-01. It also returns a
// time.Time rather than a string, as required by the ClickHouse driver.
var clickHouseClampDatetime = sql.StringCastConverter(func(str string) (interface{}, error) {
str = strings.ToUpper(str)
parsed, err := time.Parse(time.RFC3339Nano, str)
if err != nil {
return nil, fmt.Errorf("could not parse %q as RFC3339 date-time: %w", str, err)
}
if parsed.Year() < 1925 {
parsed, _ = time.Parse(time.RFC3339Nano, clickhouseMinimumDatetime)
}
return parsed.UTC(), nil
})
var clickHouseDialect = func(database string) sql.Dialect {
mapper := sql.NewDDLMapper(
sql.FlatTypeMappings{
sql.INTEGER: sql.MapSignedInt64(
sql.MapStatic("Int64"),
sql.MapStatic("String", sql.UsingConverter(sql.ToStr)),
),
sql.NUMBER: sql.MapStatic("Float64"),
sql.BOOLEAN: sql.MapStatic("Bool"),
sql.OBJECT: sql.MapStatic("String", sql.UsingConverter(sql.ToJsonString)),
sql.ARRAY: sql.MapStatic("String", sql.UsingConverter(sql.ToJsonString)),
sql.BINARY: sql.MapStatic("String"),
sql.MULTIPLE: sql.MapStatic("String", sql.UsingConverter(sql.ToJsonString)),
sql.STRING_INTEGER: sql.MapStatic("String", sql.UsingConverter(sql.ToStr)),
sql.STRING_NUMBER: sql.MapStatic("Float64", sql.UsingConverter(sql.StrToFloat(math.NaN(), math.Inf(1), math.Inf(-1)))),
sql.STRING: sql.MapString(sql.StringMappings{
Fallback: sql.MapStatic("String"),
WithFormat: map[string]sql.MapProjectionFn{
"date": sql.MapStatic("Date32", sql.UsingConverter(clickHouseClampDate)),
"date-time": sql.MapStatic("DateTime64(6, 'UTC')", sql.UsingConverter(clickHouseClampDatetime)),
},
WithContentType: map[string]sql.MapProjectionFn{
"application/x-protobuf; proto=flow.MaterializationSpec": sql.MapStatic("String"),
"application/x-protobuf; proto=consumer.Checkpoint": sql.MapStatic("String"),
},
}),
},
// ClickHouse columns are NOT NULL by default. We handle Nullable wrapping in templates.
)
simpleIdentifier := regexp.MustCompile(`^[a-zA-Z_][0-9a-zA-Z_]*$`)
return sql.Dialect{
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
return sql.InfoTableLocation{TableSchema: database, TableName: path[0]}
}),
SchemaLocatorer: sql.SchemaLocatorFn(func(schema string) string { return schema }),
ColumnLocatorer: sql.ColumnLocatorFn(func(field string) string { return field }),
Identifierer: sql.IdentifierFn(sql.JoinTransform(".",
sql.PassThroughTransform(
func(s string) bool {
// Identifiers can contain any special characters, wrapping with backticks
// is required in those cases. Backtick is the only escaped character.
// https://clickhouse.com/docs/sql-reference/syntax#identifiers
return simpleIdentifier.MatchString(s) && !slices.Contains(CLICKHOUSE_RESERVED_WORDS, strings.ToLower(s))
},
sql.QuoteTransform("`", "``"),
))),
// String literals can be much more complicated, but
// backslashes and single-quotes cover our needs.
// https://clickhouse.com/docs/sql-reference/syntax#string
Literaler: sql.ToLiteralFn(sql.QuoteTransformEscapedBackslash("'", "\\'")),
Placeholderer: sql.PlaceholderFn(func(index int) string { return "?" }),
TypeMapper: mapper,
MaxColumnCharLength: 256,
}
}
type templates struct {
createTargetTable *template.Template
alterTargetColumns *template.Template
createLoadTable *template.Template
insertLoadTable *template.Template
queryLoadTable *template.Template
queryLoadTableNoFlowDocument *template.Template
dropLoadTable *template.Template
createStoreTable *template.Template
insertStoreTable *template.Template
queryStoreParts *template.Template
moveStorePartition *template.Template
existsStoreTable *template.Template
dropStoreTable *template.Template
}
func renderTemplates(dialect sql.Dialect, hardDelete bool) templates {
var isDeletedColumn string
var isDeletedEngineArg string
var isDeletedInsert string
if hardDelete {
isDeletedColumn = ",\n\t\t_is_deleted UInt8 DEFAULT 0"
isDeletedEngineArg = ", _is_deleted"
isDeletedInsert = ", _is_deleted"
}
var tplAll = sql.MustParseTemplate(dialect, "root", `
---- Target tables
-- ClickHouse is an "append-only" database. When DeltaUpdates is enabled, we use
-- a plain MergeTree: every Store is appended as-is with no deduplication or
-- deletion — rows accumulate and are never removed.
--
-- When DeltaUpdates is disabled (standard mode), we use ReplacingMergeTree so
-- that multiple versions of an Estuary key (as ClickHouse ORDER BY key) can
-- exist concurrently, with monotonically increasing flow_published_at timestamps.
-- Primary keys are deduplicated in a background ClickHouse process, but queries
-- must use the FINAL qualifier, which deduplicates per ORDER BY key (selecting
-- the highest flow_published_at record) and excludes rows where _is_deleted = 1.
--
-- The SETTINGS block enables automatic background CLEANUP merges:
-- allow_experimental_replacing_merge_with_cleanup: enables the CLEANUP merge
-- feature (experimental). Required for the other settings to have any effect,
-- and also enables manual: OPTIMIZE TABLE ... FINAL CLEANUP.
-- Added in ClickHouse 23.12 and 24.1.
-- min_age_to_force_merge_seconds: minimum age (in seconds) of all parts in a
-- partition before forcing a merge. Only partitions where every part is older
-- than this threshold are eligible. 604800 = 1 week.
-- Added in ClickHouse 22.10.
-- min_age_to_force_merge_on_partition_only: restricts forced merges to only run
-- when merging an entire partition into one part. Required for CLEANUP to safely
-- remove deleted rows (ensures no older versions remain).
-- Added in ClickHouse 22.10.
-- enable_replacing_merge_with_cleanup_for_min_age_to_force_merge: actually enables
-- automatic background CLEANUP merges when the age threshold is met. Without this,
-- cleanup only happens via OPTIMIZE ... FINAL CLEANUP.
-- Added in ClickHouse 25.3.
{{ define "createTargetTable" }}
CREATE TABLE IF NOT EXISTS {{$.Identifier}} (
{{- range $ind, $col := $.Columns }}
{{- if $ind }},{{ end }}
{{$col.Identifier}} {{ if not $col.MustExist }}Nullable({{ end }}{{$col.DDL}}{{ if not $col.MustExist }}){{ end }}
{{- end -}}
{{ if not $.DeltaUpdates }}`+isDeletedColumn+`{{ end }}
)
{{ if $.DeltaUpdates -}}
ENGINE = MergeTree
{{ else -}}
ENGINE = ReplacingMergeTree(flow_published_at`+isDeletedEngineArg+`)
{{ end -}}
ORDER BY (
{{- range $ind, $key := $.Keys }}
{{- if $ind }}, {{ end -}}
{{$key.Identifier}}
{{- end -}}
)
{{ if not $.DeltaUpdates -}}
SETTINGS
allow_experimental_replacing_merge_with_cleanup = 1,
min_age_to_force_merge_seconds = 604800,
min_age_to_force_merge_on_partition_only = 1,
enable_replacing_merge_with_cleanup_for_min_age_to_force_merge = 1
{{- end -}}
;
{{ end }}
{{ define "alterTargetColumns" }}
{{- range $ind, $col := $.AddColumns }}
ALTER TABLE {{$.Identifier}} ADD COLUMN IF NOT EXISTS {{$col.Identifier}} Nullable({{$col.NullableDDL}});
{{ end -}}
{{- range $ind, $col := $.DropNotNulls }}
ALTER TABLE {{$.Identifier}} MODIFY COLUMN {{ ColumnIdentifier $col.Name }} Nullable({{$col.Type}});
{{ end -}}
{{ end }}
---- Load tables
-- Load tables stage the keys of documents the connector needs to look up during a
-- transaction's load phase. The connector inserts keys into the staging load table,
-- then joins the target table (using FINAL for deduplication) against the staging
-- table to retrieve the current version of documents for those keys.
--
-- Load tables use the MergeTree engine so that large key sets are spilled to disk
-- rather than held entirely in memory, avoiding server memory limit issues.
--
-- CREATE OR REPLACE TABLE is used so that the table is reset on connector restart,
-- discarding any stale state from a previous run. The table is dropped when the
-- connector shuts down.
{{ define "loadTableName" -}}
{{ printf "flow_temp_load_%s_%s" $.RangeKey (index $.Path 0) | ColumnIdentifier }}
{{- end }}
{{ define "createLoadTable" }}
CREATE OR REPLACE TABLE {{ template "loadTableName" . }} (
{{- range $ind, $key := $.Keys }}
{{- if $ind }},{{ end }}
{{ $key.Identifier }} {{ $key.DDL }}
{{- end }}
)
ENGINE = MergeTree
ORDER BY (
{{- range $ind, $key := $.Keys -}}
{{- if $ind }}, {{ end -}}
{{ $key.Identifier }}
{{- end -}}
);
{{ end }}
{{ define "insertLoadTable" }}
INSERT INTO {{ template "loadTableName" . }} (
{{- range $ind, $key := $.Keys }}
{{- if $ind }}, {{ end -}}
{{$key.Identifier}}
{{- end -}}
)
{{ end }}
{{ define "queryLoadTable" }}
{{ if $.Document -}}
SELECT {{ $.Binding }}::Int32, r.{{$.Document.Identifier}}
FROM {{$.Identifier}} AS r FINAL
JOIN {{ template "loadTableName" . }} AS l
{{- range $ind, $key := $.Keys }}
{{ if $ind }} AND {{ else }} ON {{ end -}}
l.{{$key.Identifier}} = r.{{$key.Identifier}}
{{- end }}
{{ end -}}
{{ end }}
-- Templated query for no_flow_document mode - reconstructs JSON from root-level
-- columns using concat(). Each column value is individually serialized:
-- - OBJECT, ARRAY, MULTIPLE: stored as String containing valid JSON, embedded
-- directly via ifNull(col, 'null') so they appear as proper JSON
-- objects/arrays rather than double-encoded strings.
-- - DateTime64: formatDateTime produces RFC3339, wrapped in toJSONString for
-- proper quoting.
-- - STRING_NUMBER: stored as Float64 but must appear as a JSON string;
-- toString() converts before toJSONString quotes it.
-- - All other types: toJSONString handles correct JSON serialization
-- (quoting strings, bare numbers/bools).
-- Nullable columns use ifNull(..., 'null') to emit JSON null.
{{ define "concatValue" -}}
{{ $ident := printf "%s.%s" $.Alias $.Identifier }}
{{- if or (eq $.AsFlatType "object") (eq $.AsFlatType "array") (eq $.AsFlatType "multiple") -}}
ifNull({{ $ident }}, 'null')
{{- else if and (eq $.AsFlatType "string") (eq $.Format "date-time") -}}
ifNull(toJSONString(formatDateTime({{ $ident }}, '%Y-%m-%dT%H:%i:%S.%f', 'UTC') || 'Z'), 'null')
{{- else if eq $.AsFlatType "string_number" -}}
ifNull(toJSONString(toString({{ $ident }})), 'null')
{{- else -}}
ifNull(toJSONString({{ $ident }}), 'null')
{{- end -}}
{{- end }}
{{ define "queryLoadTableNoFlowDocument" }}
{{ if not $.DeltaUpdates -}}
SELECT {{ $.Binding }}::Int32,
concat('{',
{{- range $i, $col := $.RootLevelColumns -}}
{{- if $i }}, ',',{{ end }}
'"{{ $col.Field }}":', {{ template "concatValue" (ColumnWithAlias $col "r") }}
{{- end }}
, '}') AS flow_document
FROM {{$.Identifier}} AS r FINAL
JOIN {{ template "loadTableName" . }} AS l
{{- range $ind, $key := $.Keys }}
{{ if $ind }} AND {{ else }} ON {{ end -}}
l.{{$key.Identifier}} = r.{{$key.Identifier}}
{{- end }}
{{ else -}}
SELECT * FROM (SELECT -1::Int32, ''::String LIMIT 0) as nodoc
{{ end -}}
{{ end }}
{{ define "dropLoadTable" }}
DROP TABLE IF EXISTS {{ template "loadTableName" . }};
{{ end }}
---- Store tables
-- Store tables stage documents written during a transaction's store phase. They are
-- created with CREATE TABLE ... AS, which clones the target table's schema and engine
-- (ReplacingMergeTree), so that their on-disk parts are partition-compatible with the
-- target table.
--
-- Documents are inserted into the store table during the store phase. On commit, the
-- connector enumerates the store table's parts via system.parts and moves each
-- partition to the target table using ALTER TABLE ... MOVE PARTITION. Moving a
-- partition is a metadata-only operation (it relinks existing parts rather than
-- copying data), so the commit phase is very low latency regardless of transaction
-- size. This is significantly faster than INSERT ... SELECT or other bulk-copy
-- methods that would rewrite data.
--
-- The commit is not atomic across bindings: partitions are moved one at a time and a
-- failure mid-commit will leave some partitions moved and others not. This is safe
-- because the target table uses ReplacingMergeTree, which deduplicates by ORDER BY
-- key and flow_published_at version — re-applying a partial commit is idempotent.
--
-- CREATE OR REPLACE TABLE resets the store table on connector restart. The table is
-- truncated between transactions and dropped when the connector shuts down.
{{ define "storeTableNameIdentifier" -}}
{{ printf "flow_temp_store_%s_%s" $.RangeKey (index $.Path 0) | ColumnIdentifier }}
{{- end }}
{{ define "storeTableNameString" -}}
{{ printf "flow_temp_store_%s_%s" $.RangeKey (index $.Path 0) | Literal }}
{{- end }}
{{ define "createStoreTable" }}
CREATE OR REPLACE TABLE {{ template "storeTableNameIdentifier" . }}
AS {{$.Identifier}};
{{ end }}
{{ define "insertStoreTable" }}
INSERT INTO {{ template "storeTableNameIdentifier" . }} (
{{- range $ind, $col := $.Columns }}
{{- if $ind }}, {{ end -}}
{{$col.Identifier}}
{{- end -}}
{{- if not $.DeltaUpdates }}`+isDeletedInsert+`{{ end -}}
)
{{ end }}
{{ define "queryStoreParts" }}
SELECT DISTINCT partition_id FROM system.parts
WHERE table = {{ template "storeTableNameString" . }}
AND database = ? AND active;
{{ end }}
{{ define "moveStorePartition" }}
ALTER TABLE {{ template "storeTableNameIdentifier" . }}
MOVE PARTITION ID ?
TO TABLE {{ $.Identifier }};
{{ end }}
{{ define "existsStoreTable" }}
SELECT count() FROM system.tables
WHERE database = currentDatabase()
AND name = {{ template "storeTableNameString" . }};
{{ end }}
{{ define "dropStoreTable" }}
DROP TABLE IF EXISTS {{ template "storeTableNameIdentifier" . }};
{{ end }}
`)
return templates{
createTargetTable: tplAll.Lookup("createTargetTable"),
alterTargetColumns: tplAll.Lookup("alterTargetColumns"),
createLoadTable: tplAll.Lookup("createLoadTable"),
insertLoadTable: tplAll.Lookup("insertLoadTable"),
queryLoadTable: tplAll.Lookup("queryLoadTable"),
queryLoadTableNoFlowDocument: tplAll.Lookup("queryLoadTableNoFlowDocument"),
dropLoadTable: tplAll.Lookup("dropLoadTable"),
createStoreTable: tplAll.Lookup("createStoreTable"),
insertStoreTable: tplAll.Lookup("insertStoreTable"),
queryStoreParts: tplAll.Lookup("queryStoreParts"),
moveStorePartition: tplAll.Lookup("moveStorePartition"),
existsStoreTable: tplAll.Lookup("existsStoreTable"),
dropStoreTable: tplAll.Lookup("dropStoreTable"),
}
}
func renderTableAndRangeKey(table sql.Table, rangeKey uint32, tpl *template.Template) (rendered string, err error) {
v := struct {
sql.Table
RangeKey string
}{
Table: table,
RangeKey: strconv.FormatInt(int64(rangeKey), 16),
}
var w strings.Builder
if err = tpl.Execute(&w, &v); err != nil {
return
}
rendered = w.String()
log.WithFields(log.Fields{
"rendered": rendered,
"table": table,
"range-key": rangeKey,
}).Debug("rendered template")
return
}