Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 4002064

Browse files
authored
Merge branch 'main' into dependabot/go_modules/ci/it/github.com/ClickHouse/ch-go-0.65.0
2 parents 361256f + 110df7c commit 4002064

File tree

15 files changed

+520
-47
lines changed

15 files changed

+520
-47
lines changed

docs/public/package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

platform/backend_connectors/elasticsearch_backend_connector.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package backend_connectors
66
import (
77
"bytes"
88
"context"
9-
"crypto/tls"
109
"fmt"
1110
"github.com/QuesmaOrg/quesma/platform/config"
1211
"github.com/QuesmaOrg/quesma/platform/elasticsearch"
@@ -32,32 +31,29 @@ type ElasticsearchBackendConnector struct {
3231

3332
// NewElasticsearchBackendConnector is a constructor which uses old (v1) configuration object
3433
func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *ElasticsearchBackendConnector {
34+
client := elasticsearch.NewHttpsClient(&cfg, esRequestTimeout)
3535
conn := &ElasticsearchBackendConnector{
3636
config: cfg,
37-
client: &http.Client{
38-
Transport: &http.Transport{
39-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
40-
},
41-
Timeout: esRequestTimeout,
42-
},
37+
client: client,
4338
}
4439
return conn
4540
}
4641

4742
// NewElasticsearchBackendConnectorFromDbConfig is an alternative constructor which uses the generic database configuration object
4843
func NewElasticsearchBackendConnectorFromDbConfig(cfg config.RelationalDbConfiguration) *ElasticsearchBackendConnector {
44+
esConfig := config.ElasticsearchConfiguration{
45+
Url: cfg.Url,
46+
User: cfg.User,
47+
Password: cfg.Password,
48+
ClientCertPath: cfg.ClientCertPath,
49+
ClientKeyPath: cfg.ClientKeyPath,
50+
CACertPath: cfg.CACertPath,
51+
}
52+
53+
client := elasticsearch.NewHttpsClient(&esConfig, esRequestTimeout)
4954
conn := &ElasticsearchBackendConnector{
50-
config: config.ElasticsearchConfiguration{
51-
Url: cfg.Url,
52-
User: cfg.User,
53-
Password: cfg.Password,
54-
},
55-
client: &http.Client{
56-
Transport: &http.Transport{
57-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
58-
},
59-
Timeout: esRequestTimeout,
60-
},
55+
config: esConfig,
56+
client: client,
6157
}
6258
return conn
6359
}

platform/clickhouse/clickhouse.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ type discoveredTable struct {
137137
createTableQuery string
138138
timestampFieldName string
139139
virtualTable bool
140+
existsOnAllNodes bool // => table can be queried using `FROM cluster(<clusterName>,<tableName>)` syntax
140141
}
141142

142143
func (lm *LogManager) ReloadTables() {

platform/clickhouse/table.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ type Table struct {
2828

2929
DiscoveredTimestampFieldName *string
3030

31-
VirtualTable bool
31+
VirtualTable bool
32+
ExistsOnAllNodes bool
3233
}
3334

3435
func (t *Table) createTableOurFieldsString() []string {

platform/clickhouse/table_discovery.go

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,17 @@ func (td *tableDiscovery) ReloadTableDefinitions() {
217217
configuredTables = td.configureTables(tables, databaseName)
218218
}
219219
}
220+
var tablePresence map[string][]TablePresence
221+
if td.cfg.ClusterName != "" {
222+
var err error
223+
tablePresence, err = td.getTablePresenceAcrossClusters(databaseName)
224+
if err != nil {
225+
logger.Warn().Msgf("could not get table presence across clusters: %v", err)
226+
}
227+
logger.Info().Msgf("Table presence across clusters: %v", tablePresence)
228+
configuredTables = td.populateClusterNodes(configuredTables, databaseName, tablePresence)
229+
}
230+
220231
configuredTables = td.readVirtualTables(configuredTables)
221232

222233
td.ReloadTablesError = nil
@@ -328,7 +339,7 @@ func (td *tableDiscovery) configureTables(tables map[string]map[string]columnMet
328339
comment := td.tableComment(databaseName, table)
329340
createTableQuery := td.createTableQuery(databaseName, table)
330341
// we assume here that @timestamp field is always present in the table, or it's explicitly configured
331-
configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, "", false}
342+
configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, "", false, false}
332343
}
333344
} else {
334345
notConfiguredTables = append(notConfiguredTables, table)
@@ -358,7 +369,7 @@ func (td *tableDiscovery) autoConfigureTables(tables map[string]map[string]colum
358369
maybeTimestampField = td.tableTimestampField(databaseName, table, ClickHouse)
359370
}
360371
const isVirtualTable = false
361-
configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField, isVirtualTable}
372+
configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField, isVirtualTable, false}
362373

363374
}
364375
for tableName, table := range configuredTables {
@@ -415,6 +426,7 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
415426
CreateTableQuery: resTable.createTableQuery,
416427
DiscoveredTimestampFieldName: timestampFieldName,
417428
VirtualTable: resTable.virtualTable,
429+
ExistsOnAllNodes: resTable.existsOnAllNodes,
418430
}
419431
if containsAttributes(resTable.columnTypes) {
420432
table.Config.Attributes = []Attribute{NewDefaultStringAttribute()}
@@ -699,6 +711,90 @@ func (td *tableDiscovery) enrichTableWithMapFields(inputTable map[string]map[str
699711
return outputTable
700712
}
701713

714+
type TablePresence struct {
715+
Database string
716+
Table string
717+
FoundNodes int
718+
TotalNodes int
719+
ExistsOnAllNodes bool
720+
}
721+
722+
func (td *tableDiscovery) getTablePresenceAcrossClusters(database string) (map[string][]TablePresence, error) {
723+
// Step 1: Get all cluster names
724+
clusterQuery := `SELECT DISTINCT cluster FROM system.clusters ORDER BY cluster`
725+
rows, err := td.dbConnPool.Query(context.Background(), clusterQuery)
726+
if err != nil {
727+
return nil, fmt.Errorf("failed to query cluster list: %w", err)
728+
}
729+
defer rows.Close()
730+
731+
var clusters []string
732+
for rows.Next() {
733+
var cluster string
734+
if err := rows.Scan(&cluster); err != nil {
735+
return nil, fmt.Errorf("failed to scan cluster name: %w", err)
736+
}
737+
clusters = append(clusters, cluster)
738+
}
739+
740+
// Step 2: For each cluster, safely query for the given database
741+
presenceData := make(map[string][]TablePresence)
742+
743+
for _, cluster := range clusters {
744+
query := `
745+
WITH (
746+
SELECT count(DISTINCT host_name)
747+
FROM system.clusters
748+
WHERE cluster = ?
749+
) AS total_nodes
750+
751+
SELECT
752+
database,
753+
name AS table_name,
754+
count(DISTINCT hostName()) AS found_nodes,
755+
total_nodes,
756+
count(DISTINCT hostName()) = total_nodes AS exists_on_all_nodes
757+
FROM cluster(?, system.tables)
758+
WHERE database = ?
759+
GROUP BY database, name, total_nodes
760+
`
761+
762+
rows, err := td.dbConnPool.Query(context.Background(), query, cluster, cluster, database)
763+
if err != nil {
764+
return nil, fmt.Errorf("failed to query tables for cluster %s: %w", cluster, err)
765+
}
766+
defer rows.Close()
767+
768+
var tables []TablePresence
769+
for rows.Next() {
770+
var tp TablePresence
771+
if err := rows.Scan(&tp.Database, &tp.Table, &tp.FoundNodes, &tp.TotalNodes, &tp.ExistsOnAllNodes); err != nil {
772+
return nil, fmt.Errorf("failed to scan table row: %w", err)
773+
}
774+
tables = append(tables, tp)
775+
}
776+
777+
if len(tables) > 0 {
778+
presenceData[cluster] = tables
779+
}
780+
}
781+
782+
return presenceData, nil
783+
}
784+
785+
func (td *tableDiscovery) populateClusterNodes(configuredTables map[string]discoveredTable, databaseName string, tablePresence map[string][]TablePresence) map[string]discoveredTable {
786+
for _, tables := range tablePresence {
787+
for _, table := range tables {
788+
if table.Database == databaseName {
789+
if discoTable, ok := configuredTables[table.Table]; ok {
790+
discoTable.existsOnAllNodes = table.ExistsOnAllNodes
791+
}
792+
}
793+
}
794+
}
795+
return configuredTables
796+
}
797+
702798
func (td *tableDiscovery) readTables(database string) (map[string]map[string]columnMetadata, error) {
703799
logger.Debug().Msgf("describing tables: %s", database)
704800

platform/config/config_v2.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ type RelationalDbConfiguration struct {
9494
ClusterName string `koanf:"clusterName"` // When creating tables by Quesma - they'll use `ON CLUSTER ClusterName` clause
9595
AdminUrl *Url `koanf:"adminUrl"`
9696
DisableTLS bool `koanf:"disableTLS"`
97+
98+
// This supports es backend only.
99+
ClientCertPath string `koanf:"clientCertPath"`
100+
ClientKeyPath string `koanf:"clientKeyPath"`
101+
CACertPath string `koanf:"caCertPath"`
97102
}
98103

99104
func (c *RelationalDbConfiguration) IsEmpty() bool {
@@ -1069,9 +1074,12 @@ func (c *QuesmaNewConfiguration) getRelationalDBBackendConnector() (*BackendConn
10691074
func (c *QuesmaNewConfiguration) getElasticsearchConfig() (ElasticsearchConfiguration, error) {
10701075
if esBackendConn := c.getElasticsearchBackendConnector(); esBackendConn != nil {
10711076
return ElasticsearchConfiguration{
1072-
Url: esBackendConn.Config.Url,
1073-
User: esBackendConn.Config.User,
1074-
Password: esBackendConn.Config.Password,
1077+
Url: esBackendConn.Config.Url,
1078+
User: esBackendConn.Config.User,
1079+
Password: esBackendConn.Config.Password,
1080+
ClientCertPath: esBackendConn.Config.ClientCertPath,
1081+
ClientKeyPath: esBackendConn.Config.ClientKeyPath,
1082+
CACertPath: esBackendConn.Config.CACertPath,
10751083
}, nil
10761084
}
10771085
return ElasticsearchConfiguration{}, fmt.Errorf("elasticsearch backend connector must be configured")

platform/config/elasticsearch_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ type ElasticsearchConfiguration struct {
77
User string `koanf:"user"`
88
Password string `koanf:"password"`
99
AdminUrl *Url `koanf:"adminUrl"`
10+
11+
ClientCertPath string `koanf:"clientCertPath"`
12+
ClientKeyPath string `koanf:"clientKeyPath"`
13+
CACertPath string `koanf:"caCertPath"`
1014
}

platform/elasticsearch/client.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"bytes"
77
"context"
88
"crypto/tls"
9+
"crypto/x509"
910
"fmt"
1011
"github.com/QuesmaOrg/quesma/platform/config"
1112
"github.com/QuesmaOrg/quesma/platform/logger"
1213
"net/http"
14+
"os"
1315
"time"
1416
)
1517

@@ -24,18 +26,53 @@ type SimpleClient struct {
2426
config *config.ElasticsearchConfiguration
2527
}
2628

27-
func NewSimpleClient(configuration *config.ElasticsearchConfiguration) *SimpleClient {
28-
client := &http.Client{
29+
// NewHttpsClient should be merged with NewSimpleClient at some point -> TODO!
30+
func NewHttpsClient(configuration *config.ElasticsearchConfiguration, timeout time.Duration) *http.Client {
31+
tlsConfig := &tls.Config{
32+
MinVersion: tls.VersionTLS12,
33+
InsecureSkipVerify: true,
34+
}
35+
36+
if configuration.CACertPath != "" {
37+
caCert, err := os.ReadFile(configuration.CACertPath)
38+
if err != nil {
39+
logger.Warn().Msgf("failed to read CA certificate: %v. Fallback to skipping tls.", err)
40+
} else {
41+
caCertPool := x509.NewCertPool()
42+
if !caCertPool.AppendCertsFromPEM(caCert) {
43+
logger.Warn().Msgf("failed to append CA certificate: %v. Fallback to skipping tls.", err)
44+
} else {
45+
tlsConfig.RootCAs = caCertPool
46+
tlsConfig.InsecureSkipVerify = false
47+
}
48+
}
49+
}
50+
51+
if configuration.ClientCertPath != "" && configuration.ClientKeyPath != "" {
52+
cert, err := tls.LoadX509KeyPair(configuration.ClientCertPath, configuration.ClientKeyPath)
53+
if err != nil {
54+
logger.Warn().Msgf("failed to load client certificate/key: %v. Fallback to certificate-less client.", err)
55+
} else {
56+
tlsConfig.Certificates = []tls.Certificate{cert}
57+
}
58+
}
59+
60+
return &http.Client{
2961
Transport: &http.Transport{
30-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
62+
TLSClientConfig: tlsConfig,
3163
},
32-
Timeout: esRequestTimeout,
64+
Timeout: timeout,
3365
}
66+
}
67+
68+
func NewSimpleClient(configuration *config.ElasticsearchConfiguration) *SimpleClient {
69+
client := NewHttpsClient(configuration, esRequestTimeout)
3470
return &SimpleClient{
3571
client: client,
3672
config: configuration,
3773
}
3874
}
75+
3976
func (es *SimpleClient) Request(ctx context.Context, method, endpoint string, body []byte) (*http.Response, error) {
4077
return es.doRequest(ctx, method, endpoint, body, nil)
4178
}

0 commit comments

Comments
 (0)