Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit df0aaa1

Browse files
committed
add doris connection source
1 parent 3e14761 commit df0aaa1

File tree

93 files changed

+1016
-645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1016
-645
lines changed

cmd/dual_write_proxy_v2.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"github.com/QuesmaOrg/quesma/platform/ab_testing"
88
"github.com/QuesmaOrg/quesma/platform/async_search_storage"
99
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
10-
"github.com/QuesmaOrg/quesma/platform/clickhouse"
1110
"github.com/QuesmaOrg/quesma/platform/config"
11+
"github.com/QuesmaOrg/quesma/platform/database_common"
1212
"github.com/QuesmaOrg/quesma/platform/frontend_connectors"
1313
"github.com/QuesmaOrg/quesma/platform/ingest"
1414
"github.com/QuesmaOrg/quesma/platform/logger"
@@ -51,19 +51,19 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http.
5151

5252
type dualWriteHttpProxyV2 struct {
5353
quesmaV2 quesma_api.QuesmaBuilder
54-
logManager *clickhouse.LogManager
54+
logManager *database_common.LogManager
5555
publicPort util.Port
5656
asyncQueriesEvictor *async_search_storage.AsyncQueriesEvictor
5757
queryRunner *frontend_connectors.QueryRunner
5858
schemaRegistry schema.Registry
59-
schemaLoader clickhouse.TableDiscovery
59+
schemaLoader database_common.TableDiscovery
6060
}
6161

6262
func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) {
6363
q.Close(ctx)
6464
}
6565

66-
func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
66+
func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader database_common.TableDiscovery, logManager *database_common.LogManager, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
6767

6868
queryProcessor := frontend_connectors.NewQueryRunner(logManager, config, dependencies.DebugInfoCollector(), registry, abResultsRepository, resolver, schemaLoader)
6969

cmd/experimental/dual_write_proxy_v2.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"github.com/QuesmaOrg/quesma/platform/ab_testing"
88
"github.com/QuesmaOrg/quesma/platform/async_search_storage"
99
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
10-
"github.com/QuesmaOrg/quesma/platform/clickhouse"
1110
"github.com/QuesmaOrg/quesma/platform/config"
11+
"github.com/QuesmaOrg/quesma/platform/database_common"
1212
"github.com/QuesmaOrg/quesma/platform/frontend_connectors"
1313
"github.com/QuesmaOrg/quesma/platform/ingest"
1414
"github.com/QuesmaOrg/quesma/platform/logger"
@@ -51,19 +51,19 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http.
5151

5252
type dualWriteHttpProxyV2 struct {
5353
quesmaV2 quesma_api.QuesmaBuilder
54-
logManager *clickhouse.LogManager
54+
logManager *database_common.LogManager
5555
publicPort util.Port
5656
asyncQueriesEvictor *async_search_storage.AsyncQueriesEvictor
5757
queryRunner *frontend_connectors.QueryRunner
5858
schemaRegistry schema.Registry
59-
schemaLoader clickhouse.TableDiscovery
59+
schemaLoader database_common.TableDiscovery
6060
}
6161

6262
func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) {
6363
q.Close(ctx)
6464
}
6565

66-
func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
66+
func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader database_common.TableDiscovery, logManager *database_common.LogManager, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
6767

6868
queryProcessor := frontend_connectors.NewQueryRunner(logManager, config, dependencies.DebugInfoCollector(), registry, abResultsRepository, resolver, schemaLoader)
6969

cmd/experimental/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.24.0
44

55
replace github.com/QuesmaOrg/quesma/platform => ../../platform
66

7-
require github.com/QuesmaOrg/quesma/platform v0.0.0-00010101000000-000000000000
7+
require github.com/QuesmaOrg/quesma/platform v0.0.0-20250630134911-e11a59a7d078
88

99
require (
1010
filippo.io/edwards25519 v1.1.0 // indirect
@@ -85,6 +85,7 @@ require (
8585
golang.org/x/sync v0.11.0 // indirect
8686
golang.org/x/sys v0.30.0 // indirect
8787
golang.org/x/text v0.22.0 // indirect
88+
golang.org/x/time v0.12.0 // indirect
8889
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
8990
google.golang.org/grpc v1.66.2 // indirect
9091
google.golang.org/protobuf v1.36.1 // indirect

cmd/experimental/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
252252
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
253253
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
254254
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
255+
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
256+
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
255257
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
256258
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
257259
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=

cmd/experimental/quesma.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ package main
55
import (
66
"context"
77
"github.com/QuesmaOrg/quesma/platform/ab_testing"
8-
"github.com/QuesmaOrg/quesma/platform/clickhouse"
98
"github.com/QuesmaOrg/quesma/platform/config"
9+
"github.com/QuesmaOrg/quesma/platform/database_common"
1010
"github.com/QuesmaOrg/quesma/platform/ingest"
1111
"github.com/QuesmaOrg/quesma/platform/logger"
1212
"github.com/QuesmaOrg/quesma/platform/recovery"
@@ -45,8 +45,8 @@ func (q *Quesma) Start() {
4545
}
4646

4747
func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent,
48-
logManager *clickhouse.LogManager, ingestProcessor *ingest.IngestProcessor,
49-
schemaLoader clickhouse.TableDiscovery,
48+
logManager *database_common.LogManager, ingestProcessor *ingest.IngestProcessor,
49+
schemaLoader database_common.TableDiscovery,
5050
schemaRegistry schema.Registry, config *config.QuesmaConfiguration,
5151
quesmaManagementConsole *ui.QuesmaManagementConsole,
5252
abResultsRepository ab_testing.Sender, resolver table_resolver.TableResolver) *Quesma {

cmd/experimental/v2_quesma_builder.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,14 @@ func BuildNewQuesma() quesma_api.QuesmaBuilder {
4646

4747
deps := quesma_api.EmptyDependencies()
4848

49-
legacyDependencies := es_to_ch_common.InitializeLegacyQuesmaDependencies(deps, &cfg, logChan)
49+
logger.Info().Msgf("loaded config: %s", cfg.String())
50+
51+
var legacyDependencies *es_to_ch_common.LegacyQuesmaDependencies
52+
if cfg.ClickHouse.ConnectorType == "doris" {
53+
legacyDependencies = es_to_ch_common.InitializeLegacyDorisQuesmaDependencies(deps, &cfg, logChan)
54+
} else {
55+
legacyDependencies = es_to_ch_common.InitializeLegacyQuesmaDependencies(deps, &cfg, logChan)
56+
}
5057

5158
return buildQuesmaFromV2Config(newConfiguration, legacyDependencies)
5259
}
@@ -98,6 +105,10 @@ func buildQuesmaFromV2Config(cfg config.QuesmaNewConfiguration, deps *es_to_ch_c
98105
connectorDeclaration := cfg.GetBackendConnectorByType(config.ElasticsearchBackendConnectorName)
99106
backendConnector := backend_connectors.NewElasticsearchBackendConnectorFromDbConfig(connectorDeclaration.Config)
100107
pipeline.AddBackendConnector(backendConnector)
108+
case config.DorisBackendConnectorName:
109+
connectorDeclaration := cfg.GetBackendConnectorByType(config.DorisBackendConnectorName)
110+
backendConnector := backend_connectors.NewDorisBackendConnector(&connectorDeclaration.Config)
111+
pipeline.AddBackendConnector(backendConnector)
101112
default:
102113
log.Fatalf("unknown backend connector type: %s", bc.Type)
103114
}

cmd/l4_proxy.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package main
55
import (
66
"context"
77
"fmt"
8-
"github.com/QuesmaOrg/quesma/platform/clickhouse"
8+
"github.com/QuesmaOrg/quesma/platform/database_common"
99
"github.com/QuesmaOrg/quesma/platform/elasticsearch"
1010
"github.com/QuesmaOrg/quesma/platform/logger"
1111
"github.com/QuesmaOrg/quesma/platform/stats"
@@ -69,7 +69,7 @@ func configureRouting() *http.ServeMux {
6969
}
7070

7171
if !elasticsearch.IsInternalIndex(index) {
72-
stats.GlobalStatistics.Process(false, index, jsonBody, clickhouse.NestedSeparator)
72+
stats.GlobalStatistics.Process(false, index, jsonBody, database_common.NestedSeparator)
7373
}
7474
}))
7575

@@ -87,7 +87,7 @@ func configureRouting() *http.ServeMux {
8787
}
8888

8989
if !elasticsearch.IsInternalIndex(index) {
90-
stats.GlobalStatistics.Process(false, index, jsonBody, clickhouse.NestedSeparator)
90+
stats.GlobalStatistics.Process(false, index, jsonBody, database_common.NestedSeparator)
9191
}
9292
}))
9393

@@ -112,7 +112,7 @@ func configureRouting() *http.ServeMux {
112112
}
113113

114114
if !elasticsearch.IsInternalIndex(index) {
115-
stats.GlobalStatistics.Process(false, index, document, clickhouse.NestedSeparator)
115+
stats.GlobalStatistics.Process(false, index, document, database_common.NestedSeparator)
116116
}
117117
return nil
118118
})

cmd/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ func main() {
9090
phoneHomeAgent.Start()
9191

9292
virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)
93-
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
94-
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.DefaultStringColumnType))
93+
tableDisco := database_common.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
94+
schemaRegistry := schema.NewSchemaRegistry(database_common.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewClickhouseSchemaTypeAdapter(cfg.DefaultStringColumnType))
9595
schemaRegistry.Start()
9696

9797
im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)
@@ -143,7 +143,7 @@ func main() {
143143

144144
}
145145

146-
func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *Quesma {
146+
func constructQuesma(cfg *config.QuesmaConfiguration, sl database_common.TableDiscovery, lm *database_common.LogManager, ip *ingest.IngestProcessor, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *Quesma {
147147
if cfg.TransparentProxy {
148148
return NewQuesmaTcpProxy(cfg, quesmaManagementConsole, logChan, false)
149149
} else {

cmd/quesma.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ package main
55
import (
66
"context"
77
"github.com/QuesmaOrg/quesma/platform/ab_testing"
8-
"github.com/QuesmaOrg/quesma/platform/clickhouse"
98
"github.com/QuesmaOrg/quesma/platform/config"
9+
"github.com/QuesmaOrg/quesma/platform/database_common"
1010
"github.com/QuesmaOrg/quesma/platform/ingest"
1111
"github.com/QuesmaOrg/quesma/platform/logger"
1212
"github.com/QuesmaOrg/quesma/platform/recovery"
@@ -53,8 +53,8 @@ func NewQuesmaTcpProxy(config *config.QuesmaConfiguration, quesmaManagementConso
5353
}
5454
}
5555
func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent,
56-
logManager *clickhouse.LogManager, ingestProcessor *ingest.IngestProcessor,
57-
schemaLoader clickhouse.TableDiscovery,
56+
logManager *database_common.LogManager, ingestProcessor *ingest.IngestProcessor,
57+
schemaLoader database_common.TableDiscovery,
5858
schemaRegistry schema.Registry, config *config.QuesmaConfiguration,
5959
quesmaManagementConsole *ui.QuesmaManagementConsole,
6060
abResultsRepository ab_testing.Sender, resolver table_resolver.TableResolver) *Quesma {

cmd/v2_test_objects.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ package main
55

66
import (
77
"context"
8+
"github.com/QuesmaOrg/quesma/platform/database_common"
89
"github.com/QuesmaOrg/quesma/platform/model"
910
"github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl"
1011

11-
// TODO elastic query parser needs a clickhouse package
12-
// due to the table dependency
13-
"github.com/QuesmaOrg/quesma/platform/clickhouse"
1412
"github.com/QuesmaOrg/quesma/platform/frontend_connectors"
1513
"github.com/QuesmaOrg/quesma/platform/logger"
1614
"github.com/QuesmaOrg/quesma/platform/processors"
@@ -388,15 +386,15 @@ func (p *QueryTransformationPipeline) ParseQuery(message any) (*model.ExecutionP
388386
// TODO this is a hack to create a table for the query
389387
// Why parser needs a table?
390388
tableName := "test_table"
391-
table := clickhouse.Table{
389+
table := database_common.Table{
392390
Name: tableName,
393391
DatabaseName: "default",
394-
Cols: map[string]*clickhouse.Column{
395-
"message": {Name: "message", Type: clickhouse.NewBaseType("String")},
396-
"@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")},
397-
"attributes_values": {Name: "attributes_values", Type: clickhouse.NewBaseType("Map(String,String)")},
392+
Cols: map[string]*database_common.Column{
393+
"message": {Name: "message", Type: database_common.NewBaseType("String")},
394+
"@timestamp": {Name: "@timestamp", Type: database_common.NewBaseType("DateTime64")},
395+
"attributes_values": {Name: "attributes_values", Type: database_common.NewBaseType("Map(String,String)")},
398396
},
399-
Config: clickhouse.NewNoTimestampOnlyStringAttrCHConfig(),
397+
Config: database_common.NewNoTimestampOnlyStringAttrCHConfig(),
400398
}
401399
cw := elastic_query_dsl.ClickhouseQueryTranslator{
402400
Ctx: req.OriginalRequest.Context(),

0 commit comments

Comments
 (0)