Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions platform/backend_connectors/basic_sql_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"database/sql"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
)

Expand Down Expand Up @@ -64,6 +65,7 @@ func (p *BasicSqlBackendConnector) Ping() error {
}

func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
logger.Info().Msgf("query sql: %s", query)
rows, err := p.connection.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
Expand All @@ -72,10 +74,12 @@ func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args
}

func (p *BasicSqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row {
logger.Info().Msgf("queryRow sql: %s", query)
return p.connection.QueryRowContext(ctx, query, args...)
}

func (p *BasicSqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
logger.Info().Msgf("Exec sql: %s", query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three above make a huge sense for debugging. You may also leverage our internal debug console http://localhost:9999.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

if len(args) == 0 {
_, err := p.connection.ExecContext(ctx, query)
return err
Expand Down
82 changes: 54 additions & 28 deletions platform/clickhouse/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"crypto/tls"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to create a new directory apache_doris and implement below functionality there. Then proper initDBConnection can be invoked here https://github.com/QuesmaOrg/quesma/blob/main/cmd/main.go#L87

"database/sql"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
"github.com/QuesmaOrg/quesma/platform/buildinfo"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
Expand All @@ -19,40 +17,68 @@ import (

func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {

options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}
if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true",
c.ClickHouse.User,
c.ClickHouse.Password,
c.ClickHouse.Url.Host,
c.ClickHouse.Database)

options.Auth = clickhouse.Auth{
Username: c.ClickHouse.User,
Password: c.ClickHouse.Password,
Database: c.ClickHouse.Database,
}
}
if !c.ClickHouse.DisableTLS {
options.TLS = tlsConfig
}

info := struct {
Name string
Version string
}{
Name: "quesma",
Version: buildinfo.Version,
db, err := sql.Open("mysql", dsn)
if err != nil {
logger.Error().Err(err).Msg("failed to initialize Doris connection pool")
return nil
}

// Setting limit here is not working. It causes runtime error.
// Set it after opening the connection.
//
// options.MaxIdleConns = 50
// options.MaxOpenConns = 50
// options.ConnMaxLifetime = 0
db.SetMaxOpenConns(30)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(time.Hour)

options.ClientInfo.Products = append(options.ClientInfo.Products, info)
if err := db.Ping(); err != nil {
logger.Error().Err(err).Msg("failed to ping Doris server")
return nil
}

return clickhouse.OpenDB(&options)
logger.Info().Msg("Doris connection pool initialized successfully")
return db

}

//func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {
//
// options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}
// if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {
//
// options.Auth = clickhouse.Auth{
// Username: c.ClickHouse.User,
// Password: c.ClickHouse.Password,
// Database: c.ClickHouse.Database,
// }
// }
// if !c.ClickHouse.DisableTLS {
// options.TLS = tlsConfig
// }
//
// info := struct {
// Name string
// Version string
// }{
// Name: "quesma",
// Version: buildinfo.Version,
// }
//
// // Setting limit here is not working. It causes runtime error.
// // Set it after opening the connection.
// //
// // options.MaxIdleConns = 50
// // options.MaxOpenConns = 50
// // options.ConnMaxLifetime = 0
//
// options.ClientInfo.Products = append(options.ClientInfo.Products, info)
//
// return clickhouse.OpenDB(&options)
//
//}

func InitDBConnectionPool(c *config.QuesmaConfiguration) quesma_api.BackendConnector {
if c.ClickHouse.Url == nil {
return nil
Expand Down
10 changes: 9 additions & 1 deletion platform/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,15 @@ func read(ctx context.Context, rows quesma_api.Rows, selectFields []string, rowT
}
resultRow := model.QueryResultRow{Cols: make([]model.QueryResultCol, len(selectFields))}
for i, field := range selectFields {
resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: rowToScan[i]}
var val interface{}
switch v := rowToScan[i].(type) {
// just for doris query
case []uint8:
val = string(v)
default:
val = v
}
resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: val}
}
resultRows = append(resultRows, resultRow)
}
Expand Down
79 changes: 60 additions & 19 deletions platform/clickhouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (
DateTime64 DateTimeType = iota
DateTime
Invalid
datetime
)

func (c *Column) String() string {
Expand Down Expand Up @@ -220,35 +221,75 @@ func NewBaseType(clickHouseTypeName string) BaseType {
// this is catch all type for all types we do not exlicitly support
type UnknownType struct{}

func ResolveType(clickHouseTypeName string) reflect.Type {
switch clickHouseTypeName {
case "String", "LowCardinality(String)", "UUID", "FixedString":
func ResolveType(dorisTypeName string) reflect.Type {
dorisTypeName = strings.ToLower(dorisTypeName)
switch dorisTypeName {
case "char", "varchar", "string", "text":
return reflect.TypeOf("")
case "DateTime64", "DateTime", "Date", "DateTime64(3)":
case "date", "datetime", "datev2", "datetimev2":
return reflect.TypeOf(time.Time{})
case "UInt8", "UInt16", "UInt32", "UInt64":
return reflect.TypeOf(uint64(0))
case "Int8", "Int16", "Int32":
case "tinyint", "smallint", "int":
return reflect.TypeOf(int32(0))
case "Int64":
case "bigint":
return reflect.TypeOf(int64(0))
case "Float32", "Float64":
return reflect.TypeOf(float64(0))
case "Point":
return reflect.TypeOf(Point{})
case "Bool":
case "largeint":
return reflect.TypeOf("") // LargeInt is typically handled as string due to size
case "boolean":
return reflect.TypeOf(true)
case "JSON":
case "float":
return reflect.TypeOf(float32(0))
case "double":
return reflect.TypeOf(float64(0))
case "decimal", "decimalv2", "decimal32", "decimal64", "decimal128":
return reflect.TypeOf("") // Decimals often handled as strings for precision
case "json":
return reflect.TypeOf(map[string]interface{}{})
case "hll":
return reflect.TypeOf([]byte{}) // HLL is a binary type
case "bitmap":
return reflect.TypeOf([]byte{}) // Bitmap is also binary
case "array":
return reflect.TypeOf([]interface{}{})
case "map":
return reflect.TypeOf(map[string]interface{}{})
case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))":
return reflect.TypeOf(map[string]string{})
case "Unknown":
return reflect.TypeOf(UnknownType{})
case "struct":
return reflect.TypeOf(map[string]interface{}{})
case "null":
return reflect.TypeOf(nil)
}

return nil
}

//func ResolveType(clickHouseTypeName string) reflect.Type {
// switch clickHouseTypeName {
// case "String", "LowCardinality(String)", "UUID", "FixedString":
// return reflect.TypeOf("")
// case "DateTime64", "DateTime", "Date", "DateTime64(3)":
// return reflect.TypeOf(time.Time{})
// case "UInt8", "UInt16", "UInt32", "UInt64":
// return reflect.TypeOf(uint64(0))
// case "Int8", "Int16", "Int32":
// return reflect.TypeOf(int32(0))
// case "Int64":
// return reflect.TypeOf(int64(0))
// case "Float32", "Float64":
// return reflect.TypeOf(float64(0))
// case "Point":
// return reflect.TypeOf(Point{})
// case "Bool":
// return reflect.TypeOf(true)
// case "JSON":
// return reflect.TypeOf(map[string]interface{}{})
// case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))":
// return reflect.TypeOf(map[string]string{})
// case "Unknown":
// return reflect.TypeOf(UnknownType{})
// }
//
// return nil
//}

// 'value': value of a field, from unmarshalled JSON
// 'valueOrigin': name of the field (for error messages)
func NewType(value any, valueOrigin string) (Type, error) {
Expand Down Expand Up @@ -428,7 +469,7 @@ func NewDefaultBoolAttribute() Attribute {
}

func (dt DateTimeType) String() string {
return []string{"DateTime64", "DateTime", "Invalid"}[dt]
return []string{"DateTime64", "DateTime", "datetime", "Invalid"}[dt]
}

func IsColumnAttributes(colName string) bool {
Expand Down
4 changes: 2 additions & 2 deletions platform/clickhouse/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string, dateInSch
if col, ok := t.Cols[fieldName]; ok {
typeName := col.Type.String()
// hasPrefix, not equal, because we can have DateTime64(3) and we want to catch it
if strings.HasPrefix(typeName, "DateTime64") {
if strings.HasPrefix(typeName, "datetime") {
return DateTime64
}
if strings.HasPrefix(typeName, "DateTime") {
if strings.HasPrefix(typeName, "date") {
return DateTime
}
}
Expand Down
19 changes: 10 additions & 9 deletions platform/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,8 @@ func (td *tableDiscovery) readTables(database string) (map[string]map[string]col
return map[string]map[string]columnMetadata{}, fmt.Errorf("database connection pool is nil, cannot describe tables")
}

rows, err := td.dbConnPool.Query(context.Background(), "SELECT table, name, type, comment FROM system.columns WHERE database = ?", database)
querySql := fmt.Sprintf("SELECT table_name, column_name, data_type, column_comment FROM information_schema.columns WHERE table_schema = '%s'", database)
rows, err := td.dbConnPool.Query(context.Background(), querySql)

if err != nil {
err = end_user_errors.GuessClickhouseErrorType(err).InternalDetails("reading list of columns from system.columns")
Expand Down Expand Up @@ -861,18 +862,18 @@ func (td *tableDiscovery) getTimestampFieldForClickHouse(database, table string)
}

func (td *tableDiscovery) tableComment(database, table string) (comment string) {
err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)
if err != nil {
logger.Error().Msgf("could not get table comment: %v", err)
}
//err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)
//if err != nil {
// logger.Error().Msgf("could not get table comment: %v", err)
//}
return comment
}

func (td *tableDiscovery) createTableQuery(database, table string) (ddl string) {
err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl)
if err != nil {
logger.Error().Msgf("could not get create table statement: %v", err)
}
//err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl)
//if err != nil {
// logger.Error().Msgf("could not get create table statement: %v", err)
//}
return ddl
}

Expand Down
Loading