Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 357342d

Browse files
committed
draft of integrate apache doris
1 parent 7614a7e commit 357342d

30 files changed

+581
-280
lines changed

platform/backend_connectors/basic_sql_backend_connector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"database/sql"
99
"github.com/QuesmaOrg/quesma/platform/config"
10+
"github.com/QuesmaOrg/quesma/platform/logger"
1011
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
1112
)
1213

@@ -64,6 +65,7 @@ func (p *BasicSqlBackendConnector) Ping() error {
6465
}
6566

6667
func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
68+
logger.Info().Msgf("query sql: %s", query)
6769
rows, err := p.connection.QueryContext(ctx, query, args...)
6870
if err != nil {
6971
return nil, err
@@ -72,10 +74,12 @@ func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args
7274
}
7375

7476
func (p *BasicSqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
77+
logger.Info().Msgf("queryRow sql: %s", query)
7578
return p.connection.QueryRowContext(ctx, query, args...)
7679
}
7780

7881
func (p *BasicSqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
82+
logger.Info().Msgf("Exec sql: %s", query)
7983
if len(args) == 0 {
8084
_, err := p.connection.ExecContext(ctx, query)
8185
return err

platform/clickhouse/connection.go

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ import (
66
"crypto/tls"
77
"database/sql"
88
"fmt"
9-
"github.com/ClickHouse/clickhouse-go/v2"
109
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
11-
"github.com/QuesmaOrg/quesma/platform/buildinfo"
1210
"github.com/QuesmaOrg/quesma/platform/config"
1311
"github.com/QuesmaOrg/quesma/platform/logger"
1412
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
@@ -19,40 +17,68 @@ import (
1917

2018
func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {
2119

22-
options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}
23-
if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {
20+
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true",
21+
c.ClickHouse.User,
22+
c.ClickHouse.Password,
23+
c.ClickHouse.Url.Host,
24+
c.ClickHouse.Database)
2425

25-
options.Auth = clickhouse.Auth{
26-
Username: c.ClickHouse.User,
27-
Password: c.ClickHouse.Password,
28-
Database: c.ClickHouse.Database,
29-
}
30-
}
31-
if !c.ClickHouse.DisableTLS {
32-
options.TLS = tlsConfig
33-
}
34-
35-
info := struct {
36-
Name string
37-
Version string
38-
}{
39-
Name: "quesma",
40-
Version: buildinfo.Version,
26+
db, err := sql.Open("mysql", dsn)
27+
if err != nil {
28+
logger.Error().Err(err).Msg("failed to initialize Doris connection pool")
29+
return nil
4130
}
4231

43-
// Setting limit here is not working. It causes runtime error.
44-
// Set it after opening the connection.
45-
//
46-
// options.MaxIdleConns = 50
47-
// options.MaxOpenConns = 50
48-
// options.ConnMaxLifetime = 0
32+
db.SetMaxOpenConns(30)
33+
db.SetMaxIdleConns(10)
34+
db.SetConnMaxLifetime(time.Hour)
4935

50-
options.ClientInfo.Products = append(options.ClientInfo.Products, info)
36+
if err := db.Ping(); err != nil {
37+
logger.Error().Err(err).Msg("failed to ping Doris server")
38+
return nil
39+
}
5140

52-
return clickhouse.OpenDB(&options)
41+
logger.Info().Msg("Doris connection pool initialized successfully")
42+
return db
5343

5444
}
5545

46+
//func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {
47+
//
48+
// options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}
49+
// if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {
50+
//
51+
// options.Auth = clickhouse.Auth{
52+
// Username: c.ClickHouse.User,
53+
// Password: c.ClickHouse.Password,
54+
// Database: c.ClickHouse.Database,
55+
// }
56+
// }
57+
// if !c.ClickHouse.DisableTLS {
58+
// options.TLS = tlsConfig
59+
// }
60+
//
61+
// info := struct {
62+
// Name string
63+
// Version string
64+
// }{
65+
// Name: "quesma",
66+
// Version: buildinfo.Version,
67+
// }
68+
//
69+
// // Setting limit here is not working. It causes runtime error.
70+
// // Set it after opening the connection.
71+
// //
72+
// // options.MaxIdleConns = 50
73+
// // options.MaxOpenConns = 50
74+
// // options.ConnMaxLifetime = 0
75+
//
76+
// options.ClientInfo.Products = append(options.ClientInfo.Products, info)
77+
//
78+
// return clickhouse.OpenDB(&options)
79+
//
80+
//}
81+
5682
func InitDBConnectionPool(c *config.QuesmaConfiguration) quesma_api.BackendConnector {
5783
if c.ClickHouse.Url == nil {
5884
return nil

platform/clickhouse/quesma_communicator.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,15 @@ func read(ctx context.Context, rows quesma_api.Rows, selectFields []string, rowT
223223
}
224224
resultRow := model.QueryResultRow{Cols: make([]model.QueryResultCol, len(selectFields))}
225225
for i, field := range selectFields {
226-
resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: rowToScan[i]}
226+
var val interface{}
227+
switch v := rowToScan[i].(type) {
228+
// just for doris query
229+
case []uint8:
230+
val = string(v)
231+
default:
232+
val = v
233+
}
234+
resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: val}
227235
}
228236
resultRows = append(resultRows, resultRow)
229237
}

platform/clickhouse/schema.go

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const (
7373
DateTime64 DateTimeType = iota
7474
DateTime
7575
Invalid
76+
datetime
7677
)
7778

7879
func (c *Column) String() string {
@@ -220,35 +221,75 @@ func NewBaseType(clickHouseTypeName string) BaseType {
220221
// this is catch all type for all types we do not exlicitly support
221222
type UnknownType struct{}
222223

223-
func ResolveType(clickHouseTypeName string) reflect.Type {
224-
switch clickHouseTypeName {
225-
case "String", "LowCardinality(String)", "UUID", "FixedString":
224+
func ResolveType(dorisTypeName string) reflect.Type {
225+
dorisTypeName = strings.ToLower(dorisTypeName)
226+
switch dorisTypeName {
227+
case "char", "varchar", "string", "text":
226228
return reflect.TypeOf("")
227-
case "DateTime64", "DateTime", "Date", "DateTime64(3)":
229+
case "date", "datetime", "datev2", "datetimev2":
228230
return reflect.TypeOf(time.Time{})
229-
case "UInt8", "UInt16", "UInt32", "UInt64":
230-
return reflect.TypeOf(uint64(0))
231-
case "Int8", "Int16", "Int32":
231+
case "tinyint", "smallint", "int":
232232
return reflect.TypeOf(int32(0))
233-
case "Int64":
233+
case "bigint":
234234
return reflect.TypeOf(int64(0))
235-
case "Float32", "Float64":
236-
return reflect.TypeOf(float64(0))
237-
case "Point":
238-
return reflect.TypeOf(Point{})
239-
case "Bool":
235+
case "largeint":
236+
return reflect.TypeOf("") // LargeInt is typically handled as string due to size
237+
case "boolean":
240238
return reflect.TypeOf(true)
241-
case "JSON":
239+
case "float":
240+
return reflect.TypeOf(float32(0))
241+
case "double":
242+
return reflect.TypeOf(float64(0))
243+
case "decimal", "decimalv2", "decimal32", "decimal64", "decimal128":
244+
return reflect.TypeOf("") // Decimals often handled as strings for precision
245+
case "json":
246+
return reflect.TypeOf(map[string]interface{}{})
247+
case "hll":
248+
return reflect.TypeOf([]byte{}) // HLL is a binary type
249+
case "bitmap":
250+
return reflect.TypeOf([]byte{}) // Bitmap is also binary
251+
case "array":
252+
return reflect.TypeOf([]interface{}{})
253+
case "map":
242254
return reflect.TypeOf(map[string]interface{}{})
243-
case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))":
244-
return reflect.TypeOf(map[string]string{})
245-
case "Unknown":
246-
return reflect.TypeOf(UnknownType{})
255+
case "struct":
256+
return reflect.TypeOf(map[string]interface{}{})
257+
case "null":
258+
return reflect.TypeOf(nil)
247259
}
248260

249261
return nil
250262
}
251263

264+
//func ResolveType(clickHouseTypeName string) reflect.Type {
265+
// switch clickHouseTypeName {
266+
// case "String", "LowCardinality(String)", "UUID", "FixedString":
267+
// return reflect.TypeOf("")
268+
// case "DateTime64", "DateTime", "Date", "DateTime64(3)":
269+
// return reflect.TypeOf(time.Time{})
270+
// case "UInt8", "UInt16", "UInt32", "UInt64":
271+
// return reflect.TypeOf(uint64(0))
272+
// case "Int8", "Int16", "Int32":
273+
// return reflect.TypeOf(int32(0))
274+
// case "Int64":
275+
// return reflect.TypeOf(int64(0))
276+
// case "Float32", "Float64":
277+
// return reflect.TypeOf(float64(0))
278+
// case "Point":
279+
// return reflect.TypeOf(Point{})
280+
// case "Bool":
281+
// return reflect.TypeOf(true)
282+
// case "JSON":
283+
// return reflect.TypeOf(map[string]interface{}{})
284+
// case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))":
285+
// return reflect.TypeOf(map[string]string{})
286+
// case "Unknown":
287+
// return reflect.TypeOf(UnknownType{})
288+
// }
289+
//
290+
// return nil
291+
//}
292+
252293
// 'value': value of a field, from unmarshalled JSON
253294
// 'valueOrigin': name of the field (for error messages)
254295
func NewType(value any, valueOrigin string) (Type, error) {
@@ -428,7 +469,7 @@ func NewDefaultBoolAttribute() Attribute {
428469
}
429470

430471
func (dt DateTimeType) String() string {
431-
return []string{"DateTime64", "DateTime", "Invalid"}[dt]
472+
return []string{"DateTime64", "DateTime", "datetime", "Invalid"}[dt]
432473
}
433474

434475
func IsColumnAttributes(colName string) bool {

platform/clickhouse/table.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string, dateInSch
106106
if col, ok := t.Cols[fieldName]; ok {
107107
typeName := col.Type.String()
108108
// hasPrefix, not equal, because we can have DateTime64(3) and we want to catch it
109-
if strings.HasPrefix(typeName, "DateTime64") {
109+
if strings.HasPrefix(typeName, "datetime") {
110110
return DateTime64
111111
}
112-
if strings.HasPrefix(typeName, "DateTime") {
112+
if strings.HasPrefix(typeName, "date") {
113113
return DateTime
114114
}
115115
}

platform/clickhouse/table_discovery.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,8 @@ func (td *tableDiscovery) readTables(database string) (map[string]map[string]col
802802
return map[string]map[string]columnMetadata{}, fmt.Errorf("database connection pool is nil, cannot describe tables")
803803
}
804804

805-
rows, err := td.dbConnPool.Query(context.Background(), "SELECT table, name, type, comment FROM system.columns WHERE database = ?", database)
805+
querySql := fmt.Sprintf("SELECT table_name, column_name, data_type, column_comment FROM information_schema.columns WHERE table_schema = '%s'", database)
806+
rows, err := td.dbConnPool.Query(context.Background(), querySql)
806807

807808
if err != nil {
808809
err = end_user_errors.GuessClickhouseErrorType(err).InternalDetails("reading list of columns from system.columns")
@@ -861,18 +862,18 @@ func (td *tableDiscovery) getTimestampFieldForClickHouse(database, table string)
861862
}
862863

863864
func (td *tableDiscovery) tableComment(database, table string) (comment string) {
864-
err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)
865-
if err != nil {
866-
logger.Error().Msgf("could not get table comment: %v", err)
867-
}
865+
//err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)
866+
//if err != nil {
867+
// logger.Error().Msgf("could not get table comment: %v", err)
868+
//}
868869
return comment
869870
}
870871

871872
func (td *tableDiscovery) createTableQuery(database, table string) (ddl string) {
872-
err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl)
873-
if err != nil {
874-
logger.Error().Msgf("could not get create table statement: %v", err)
875-
}
873+
//err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl)
874+
//if err != nil {
875+
// logger.Error().Msgf("could not get create table statement: %v", err)
876+
//}
876877
return ddl
877878
}
878879

0 commit comments

Comments
 (0)