Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions platform/backend_connectors/basic_sql_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"database/sql"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
)

Expand Down Expand Up @@ -64,6 +65,7 @@ func (p *BasicSqlBackendConnector) Ping() error {
}

func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
logger.Info().Msgf("query sql: %s", query)
rows, err := p.connection.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
Expand All @@ -72,10 +74,12 @@ func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args
}

func (p *BasicSqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
logger.Info().Msgf("queryRow sql: %s", query)
return p.connection.QueryRowContext(ctx, query, args...)
}

func (p *BasicSqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
logger.Info().Msgf("Exec sql: %s", query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three above make a huge sense for debugging. You may also leverage our internal debug console http://localhost:9999.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

if len(args) == 0 {
_, err := p.connection.ExecContext(ctx, query)
return err
Expand Down
82 changes: 54 additions & 28 deletions platform/clickhouse/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"crypto/tls"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to create a new directory apache_doris and implement below functionality there. Then proper initDBConnection can be invoked here https://github.com/QuesmaOrg/quesma/blob/main/cmd/main.go#L87

"database/sql"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
"github.com/QuesmaOrg/quesma/platform/buildinfo"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
Expand All @@ -19,40 +17,68 @@ import (

func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {

options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}
if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true",
c.ClickHouse.User,
c.ClickHouse.Password,
c.ClickHouse.Url.Host,
c.ClickHouse.Database)

options.Auth = clickhouse.Auth{
Username: c.ClickHouse.User,
Password: c.ClickHouse.Password,
Database: c.ClickHouse.Database,
}
}
if !c.ClickHouse.DisableTLS {
options.TLS = tlsConfig
}

info := struct {
Name string
Version string
}{
Name: "quesma",
Version: buildinfo.Version,
db, err := sql.Open("mysql", dsn)
if err != nil {
logger.Error().Err(err).Msg("failed to initialize Doris connection pool")
return nil
}

// Setting limit here is not working. It causes runtime error.
// Set it after opening the connection.
//
// options.MaxIdleConns = 50
// options.MaxOpenConns = 50
// options.ConnMaxLifetime = 0
db.SetMaxOpenConns(30)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(time.Hour)

options.ClientInfo.Products = append(options.ClientInfo.Products, info)
if err := db.Ping(); err != nil {
logger.Error().Err(err).Msg("failed to ping Doris server")
return nil
}

return clickhouse.OpenDB(&options)
logger.Info().Msg("Doris connection pool initialized successfully")
return db

}

//func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {
//
// options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}
// if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {
//
// options.Auth = clickhouse.Auth{
// Username: c.ClickHouse.User,
// Password: c.ClickHouse.Password,
// Database: c.ClickHouse.Database,
// }
// }
// if !c.ClickHouse.DisableTLS {
// options.TLS = tlsConfig
// }
//
// info := struct {
// Name string
// Version string
// }{
// Name: "quesma",
// Version: buildinfo.Version,
// }
//
// // Setting limit here is not working. It causes runtime error.
// // Set it after opening the connection.
// //
// // options.MaxIdleConns = 50
// // options.MaxOpenConns = 50
// // options.ConnMaxLifetime = 0
//
// options.ClientInfo.Products = append(options.ClientInfo.Products, info)
//
// return clickhouse.OpenDB(&options)
//
//}

func InitDBConnectionPool(c *config.QuesmaConfiguration) quesma_api.BackendConnector {
if c.ClickHouse.Url == nil {
return nil
Expand Down
10 changes: 9 additions & 1 deletion platform/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,15 @@ func read(ctx context.Context, rows quesma_api.Rows, selectFields []string, rowT
}
resultRow := model.QueryResultRow{Cols: make([]model.QueryResultCol, len(selectFields))}
for i, field := range selectFields {
resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: rowToScan[i]}
var val interface{}
switch v := rowToScan[i].(type) {
// just for doris query
case []uint8:
val = string(v)
default:
val = v
}
resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: val}
}
resultRows = append(resultRows, resultRow)
}
Expand Down
79 changes: 60 additions & 19 deletions platform/clickhouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (
DateTime64 DateTimeType = iota
DateTime
Invalid
datetime
)

func (c *Column) String() string {
Expand Down Expand Up @@ -220,35 +221,75 @@ func NewBaseType(clickHouseTypeName string) BaseType {
// this is catch all type for all types we do not exlicitly support
type UnknownType struct{}

func ResolveType(clickHouseTypeName string) reflect.Type {
switch clickHouseTypeName {
case "String", "LowCardinality(String)", "UUID", "FixedString":
func ResolveType(dorisTypeName string) reflect.Type {
dorisTypeName = strings.ToLower(dorisTypeName)
switch dorisTypeName {
case "char", "varchar", "string", "text":
return reflect.TypeOf("")
case "DateTime64", "DateTime", "Date", "DateTime64(3)":
case "date", "datetime", "datev2", "datetimev2":
return reflect.TypeOf(time.Time{})
case "UInt8", "UInt16", "UInt32", "UInt64":
return reflect.TypeOf(uint64(0))
case "Int8", "Int16", "Int32":
case "tinyint", "smallint", "int":
return reflect.TypeOf(int32(0))
case "Int64":
case "bigint":
return reflect.TypeOf(int64(0))
case "Float32", "Float64":
return reflect.TypeOf(float64(0))
case "Point":
return reflect.TypeOf(Point{})
case "Bool":
case "largeint":
return reflect.TypeOf("") // LargeInt is typically handled as string due to size
case "boolean":
return reflect.TypeOf(true)
case "JSON":
case "float":
return reflect.TypeOf(float32(0))
case "double":
return reflect.TypeOf(float64(0))
case "decimal", "decimalv2", "decimal32", "decimal64", "decimal128":
return reflect.TypeOf("") // Decimals often handled as strings for precision
case "json":
return reflect.TypeOf(map[string]interface{}{})
case "hll":
return reflect.TypeOf([]byte{}) // HLL is a binary type
case "bitmap":
return reflect.TypeOf([]byte{}) // Bitmap is also binary
case "array":
return reflect.TypeOf([]interface{}{})
case "map":
return reflect.TypeOf(map[string]interface{}{})
case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))":
return reflect.TypeOf(map[string]string{})
case "Unknown":
return reflect.TypeOf(UnknownType{})
case "struct":
return reflect.TypeOf(map[string]interface{}{})
case "null":
return reflect.TypeOf(nil)
}

return nil
}

//func ResolveType(clickHouseTypeName string) reflect.Type {
// switch clickHouseTypeName {
// case "String", "LowCardinality(String)", "UUID", "FixedString":
// return reflect.TypeOf("")
// case "DateTime64", "DateTime", "Date", "DateTime64(3)":
// return reflect.TypeOf(time.Time{})
// case "UInt8", "UInt16", "UInt32", "UInt64":
// return reflect.TypeOf(uint64(0))
// case "Int8", "Int16", "Int32":
// return reflect.TypeOf(int32(0))
// case "Int64":
// return reflect.TypeOf(int64(0))
// case "Float32", "Float64":
// return reflect.TypeOf(float64(0))
// case "Point":
// return reflect.TypeOf(Point{})
// case "Bool":
// return reflect.TypeOf(true)
// case "JSON":
// return reflect.TypeOf(map[string]interface{}{})
// case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))":
// return reflect.TypeOf(map[string]string{})
// case "Unknown":
// return reflect.TypeOf(UnknownType{})
// }
//
// return nil
//}

// 'value': value of a field, from unmarshalled JSON
// 'valueOrigin': name of the field (for error messages)
func NewType(value any, valueOrigin string) (Type, error) {
Expand Down Expand Up @@ -428,7 +469,7 @@ func NewDefaultBoolAttribute() Attribute {
}

func (dt DateTimeType) String() string {
return []string{"DateTime64", "DateTime", "Invalid"}[dt]
return []string{"DateTime64", "DateTime", "datetime", "Invalid"}[dt]
}

func IsColumnAttributes(colName string) bool {
Expand Down
4 changes: 2 additions & 2 deletions platform/clickhouse/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string, dateInSch
if col, ok := t.Cols[fieldName]; ok {
typeName := col.Type.String()
// hasPrefix, not equal, because we can have DateTime64(3) and we want to catch it
if strings.HasPrefix(typeName, "DateTime64") {
if strings.HasPrefix(typeName, "datetime") {
return DateTime64
}
if strings.HasPrefix(typeName, "DateTime") {
if strings.HasPrefix(typeName, "date") {
return DateTime
}
}
Expand Down
19 changes: 10 additions & 9 deletions platform/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,8 @@ func (td *tableDiscovery) readTables(database string) (map[string]map[string]col
return map[string]map[string]columnMetadata{}, fmt.Errorf("database connection pool is nil, cannot describe tables")
}

rows, err := td.dbConnPool.Query(context.Background(), "SELECT table, name, type, comment FROM system.columns WHERE database = ?", database)
querySql := fmt.Sprintf("SELECT table_name, column_name, data_type, column_comment FROM information_schema.columns WHERE table_schema = '%s'", database)
rows, err := td.dbConnPool.Query(context.Background(), querySql)

if err != nil {
err = end_user_errors.GuessClickhouseErrorType(err).InternalDetails("reading list of columns from system.columns")
Expand Down Expand Up @@ -861,18 +862,18 @@ func (td *tableDiscovery) getTimestampFieldForClickHouse(database, table string)
}

func (td *tableDiscovery) tableComment(database, table string) (comment string) {
err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)
if err != nil {
logger.Error().Msgf("could not get table comment: %v", err)
}
//err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)
//if err != nil {
// logger.Error().Msgf("could not get table comment: %v", err)
//}
return comment
}

func (td *tableDiscovery) createTableQuery(database, table string) (ddl string) {
err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl)
if err != nil {
logger.Error().Msgf("could not get create table statement: %v", err)
}
//err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl)
//if err != nil {
// logger.Error().Msgf("could not get create table statement: %v", err)
//}
return ddl
}

Expand Down
Loading