Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit fb0c361

Browse files
pdelewskimieciu
andauthored
Exctracting ingest ddl transformation (#1472)
This PR extracts and introduces a new transformation interface: ``` type Lowerer interface { LowerToDDL(validatedJsons []types.JSON, table *chLib.Table, invalidJsons []types.JSON, encodings map[schema.FieldEncodingKey]schema.EncodedFieldName, createTableCmd CreateTableStatement) ([]string, error) } ``` The intention is to transpile input into other DDL representations, with the primary goal of supporting Hydrolix ingestion. It also introduces basic abstractions for building DDL using an object model instead of raw strings. This interface may evolve over time to adapt to our needs. --------- Signed-off-by: Przemyslaw Delewski <[email protected]> Co-authored-by: Przemysław Hejman <[email protected]>
1 parent 3e14761 commit fb0c361

File tree

14 files changed

+678
-127
lines changed

14 files changed

+678
-127
lines changed

cmd/main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/QuesmaOrg/quesma/platform/table_resolver"
2424
"github.com/QuesmaOrg/quesma/platform/telemetry"
2525
"github.com/QuesmaOrg/quesma/platform/ui"
26+
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
2627
"log"
2728
"os"
2829
"os/signal"
@@ -111,8 +112,11 @@ func main() {
111112
// Ensure common table exists. This table have to be created before ingest processor starts
112113
common_table.EnsureCommonTableExists(connectionPool, cfg.ClusterName)
113114
}
114-
115-
ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage, tableResolver)
115+
sqlLowerer := ingest.NewSqlLowerer(virtualTableStorage)
116+
hydrolixLowerer := ingest.NewHydrolixLowerer(virtualTableStorage)
117+
ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, sqlLowerer, tableResolver)
118+
ingestProcessor.RegisterLowerer(sqlLowerer, quesma_api.ClickHouseSQLBackend)
119+
ingestProcessor.RegisterLowerer(hydrolixLowerer, quesma_api.HydrolixSQLBackend)
116120
} else {
117121
logger.Info().Msg("Ingest processor is disabled.")
118122
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
4+
package backend_connectors
5+
6+
import (
7+
"context"
8+
"database/sql"
9+
"github.com/QuesmaOrg/quesma/platform/config"
10+
11+
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
12+
)
13+
14+
type HydrolixBackendConnector struct {
15+
BasicSqlBackendConnector
16+
cfg *config.RelationalDbConfiguration
17+
}
18+
19+
func (p *HydrolixBackendConnector) GetId() quesma_api.BackendConnectorType {
20+
return quesma_api.HydrolixSQLBackend
21+
}
22+
23+
func (p *HydrolixBackendConnector) Open() error {
24+
conn, err := initDBConnection(p.cfg)
25+
if err != nil {
26+
return err
27+
}
28+
p.connection = conn
29+
return nil
30+
}
31+
32+
func NewHydrolixBackendConnector(configuration *config.RelationalDbConfiguration) *HydrolixBackendConnector {
33+
return &HydrolixBackendConnector{
34+
cfg: configuration,
35+
}
36+
}
37+
38+
func NewHydrolixBackendConnectorWithConnection(_ string, conn *sql.DB) *HydrolixBackendConnector {
39+
return &HydrolixBackendConnector{
40+
BasicSqlBackendConnector: BasicSqlBackendConnector{
41+
connection: conn,
42+
},
43+
}
44+
}
45+
46+
func (p *HydrolixBackendConnector) InstanceName() string {
47+
return "hydrolix" // TODO add name taken from config
48+
}
49+
50+
func (p *HydrolixBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
51+
return nil
52+
}

platform/ingest/alter_table_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ func TestAlterTable(t *testing.T) {
5151

5252
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
5353
for i := range rowsToInsert {
54-
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings)
54+
alter, onlySchemaFields, nonSchemaFields, err := ip.lowerer.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings)
5555
assert.NoError(t, err)
5656
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
5757
assert.Equal(t, expectedInsert[i], insert)
58-
assert.Equal(t, alters[i], alter[0])
58+
assert.Equal(t, alters[i], alter[0].ToSql())
5959
// Table will grow with each iteration
6060
assert.Equal(t, i+1, len(table.Cols))
6161
for _, col := range columns[:i+1] {
@@ -128,9 +128,9 @@ func TestAlterTableHeuristic(t *testing.T) {
128128
previousRow = currentRow
129129
}
130130

131-
assert.Equal(t, int64(0), ip.ingestCounter)
131+
assert.Equal(t, int64(0), ip.lowerer.ingestCounter)
132132
for i := range rowsToInsert {
133-
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings)
133+
_, _, _, err := ip.lowerer.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings)
134134
assert.NoError(t, err)
135135
}
136136
assert.Equal(t, tc.expected, len(table.Cols))

platform/ingest/ast.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package ingest
4+
5+
import (
6+
"fmt"
7+
"github.com/QuesmaOrg/quesma/platform/util"
8+
"strings"
9+
)
10+
11+
type ColumnStatement struct {
12+
ColumnName string
13+
ColumnType string
14+
Comment string
15+
PropertyName string
16+
AdditionalMetadata string
17+
}
18+
19+
type CreateTableStatement struct {
20+
Name string
21+
Cluster string // Optional: ON CLUSTER
22+
Columns []ColumnStatement
23+
Indexes string // Optional: INDEXES
24+
Comment string
25+
PostClause string // e.g. ENGINE, ORDER BY, etc.
26+
}
27+
28+
type AlterStatementType int
29+
30+
const (
31+
AddColumn AlterStatementType = iota
32+
CommentColumn
33+
)
34+
35+
type AlterStatement struct {
36+
Type AlterStatementType
37+
TableName string
38+
OnCluster string
39+
ColumnName string
40+
ColumnType string // used only for AddColumn
41+
Comment string // used only for CommentColumn
42+
}
43+
44+
type InsertStatement struct {
45+
TableName string
46+
InsertValues string // expected to be JSONEachRow-compatible content
47+
}
48+
49+
func (ct CreateTableStatement) ToSQL() string {
50+
if ct.Name == "" {
51+
return ""
52+
}
53+
var b strings.Builder
54+
55+
if ct.Cluster != "" {
56+
b.WriteString(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s" ON CLUSTER "%s"`+" \n(\n\n", ct.Name, ct.Cluster))
57+
} else {
58+
b.WriteString(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s"`, ct.Name))
59+
}
60+
61+
first := true
62+
63+
if len(ct.Columns) > 0 {
64+
b.WriteString(" \n(\n\n")
65+
66+
}
67+
68+
for _, column := range ct.Columns {
69+
if first {
70+
first = false
71+
} else {
72+
b.WriteString(",\n")
73+
}
74+
b.WriteString(util.Indent(1))
75+
b.WriteString(fmt.Sprintf("\"%s\" %s", column.ColumnName, column.ColumnType))
76+
if column.Comment != "" {
77+
b.WriteString(fmt.Sprintf(" COMMENT '%s'", column.Comment))
78+
}
79+
if column.AdditionalMetadata != "" {
80+
b.WriteString(fmt.Sprintf(" %s", column.AdditionalMetadata))
81+
}
82+
}
83+
84+
b.WriteString(ct.Indexes)
85+
86+
if len(ct.Columns) > 0 {
87+
b.WriteString("\n)\n")
88+
}
89+
90+
if ct.PostClause != "" {
91+
b.WriteString(ct.PostClause + "\n")
92+
}
93+
if ct.Comment != "" {
94+
b.WriteString(fmt.Sprintf("COMMENT '%s'", ct.Comment))
95+
}
96+
97+
return b.String()
98+
}
99+
100+
func (stmt AlterStatement) ToSql() string {
101+
var onCluster string
102+
if stmt.OnCluster != "" {
103+
onCluster = fmt.Sprintf(` ON CLUSTER "%s"`, stmt.OnCluster)
104+
}
105+
106+
switch stmt.Type {
107+
case AddColumn:
108+
return fmt.Sprintf(
109+
`ALTER TABLE "%s"%s ADD COLUMN IF NOT EXISTS "%s" %s`,
110+
stmt.TableName, onCluster, stmt.ColumnName, stmt.ColumnType,
111+
)
112+
case CommentColumn:
113+
return fmt.Sprintf(
114+
`ALTER TABLE "%s"%s COMMENT COLUMN "%s" '%s'`,
115+
stmt.TableName, onCluster, stmt.ColumnName, stmt.Comment,
116+
)
117+
default:
118+
panic(fmt.Sprintf("unsupported AlterStatementType: %v", stmt.Type))
119+
}
120+
}
121+
122+
func (s InsertStatement) ToSQL() string {
123+
return fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow %s`, s.TableName, s.InsertValues)
124+
}

platform/ingest/common_table_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func TestIngestToCommonTable(t *testing.T) {
209209

210210
ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
211211
ingest.chDb = db
212-
ingest.virtualTableStorage = virtualTableStorage
212+
ingest.lowerer.virtualTableStorage = virtualTableStorage
213213
ingest.schemaRegistry = schemaRegistry
214214
ingest.tableResolver = resolver
215215

@@ -227,7 +227,7 @@ func TestIngestToCommonTable(t *testing.T) {
227227
}
228228

229229
tables.Store(indexName, testTable)
230-
err = ingest.storeVirtualTable(testTable)
230+
err = storeVirtualTable(testTable, ingest.lowerer.virtualTableStorage)
231231
if err != nil {
232232
t.Fatalf("error storing virtual table: %v", err)
233233
}

platform/ingest/ddllowerer.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package ingest
4+
5+
import (
6+
chLib "github.com/QuesmaOrg/quesma/platform/clickhouse"
7+
"github.com/QuesmaOrg/quesma/platform/schema"
8+
"github.com/QuesmaOrg/quesma/platform/types"
9+
)
10+
11+
// The main purpose of Lowerer interface is to infers a schema from input JSON
12+
// data and then generates a backend-specific DDL (Data Definition Language) representation, such as a CREATE TABLE statement.
13+
// or other DDL commands that are needed to create or modify a table in the database.
14+
type Lowerer interface {
15+
LowerToDDL(validatedJsons []types.JSON,
16+
table *chLib.Table,
17+
invalidJsons []types.JSON,
18+
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName,
19+
createTableCmd CreateTableStatement) ([]string, error)
20+
}

platform/ingest/hydrolixlowerer.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package ingest
4+
5+
import (
6+
chLib "github.com/QuesmaOrg/quesma/platform/clickhouse"
7+
"github.com/QuesmaOrg/quesma/platform/persistence"
8+
"github.com/QuesmaOrg/quesma/platform/schema"
9+
"github.com/QuesmaOrg/quesma/platform/types"
10+
)
11+
12+
type HydrolixLowerer struct {
13+
virtualTableStorage persistence.JSONDatabase
14+
}
15+
16+
func NewHydrolixLowerer(virtualTableStorage persistence.JSONDatabase) *HydrolixLowerer {
17+
return &HydrolixLowerer{
18+
virtualTableStorage: virtualTableStorage,
19+
}
20+
}
21+
22+
func (l *HydrolixLowerer) LowerToDDL(validatedJsons []types.JSON,
23+
table *chLib.Table,
24+
invalidJsons []types.JSON,
25+
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName,
26+
createTableCmd CreateTableStatement) ([]string, error) {
27+
for i, preprocessedJson := range validatedJsons {
28+
_ = i
29+
_ = preprocessedJson
30+
}
31+
32+
result := []string{`{
33+
"schema": {
34+
"project": "",
35+
"name": "test_index",
36+
"time_column": "ingest_time",
37+
"columns": [
38+
{ "name": "new_field", "type": "string" },
39+
{ "name": "ingest_time", "type": "datetime", "default": "NOW" }
40+
],
41+
"partitioning": {
42+
"strategy": "time",
43+
"field": "ingest_time",
44+
"granularity": "day"
45+
}
46+
},
47+
"events": [
48+
{
49+
"new_field": "bar"
50+
}
51+
]
52+
}`}
53+
54+
return result, nil
55+
}

0 commit comments

Comments
 (0)