Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit cb30918

Browse files
pdelewskimieciu
andauthored
Implement hydrolix ddl (#1480)
The goal of this PR is about implementing hydrolix lowere and backend connector Example of backend connectors def ``` backendConnectors: - name: my-minimal-elasticsearch type: elasticsearch config: url: "http://localhost:9200" - name: my-clickhouse-data-source type: clickhouse-os config: url: "http://3.20.203.177:8888" user: "[email protected]" password: "Pp_1zWUxk8-jQfWC7UxH8Q5x7g" token: "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICIybDZyTk1YV2hYQTA5M2tkRHA5ZFctaEMzM2NkOEtWUFhJdURZLWlLeUFjIn0.eyJleHAiOjE3NTM3NzY2NTksImlhdCI6MTc1MzY5MDI1OSwianRpIjoiMzNmNzI2M2MtMTA2Zi00MTc1LWJhZTEtOTEzNTJkNTdmOWM0IiwiaXNzIjoiaHR0cHM6Ly9sb2NhbGhvc3Qva2V5Y2xvYWsvcmVhbG1zL2h5ZHJvbGl4LXVzZXJzIiwiYXVkIjpbImNvbmZpZy1hcGkiLCJhY2NvdW50Il0sInN1YiI6ImRiMWM1YTJiLTdhYjMtNGNmZi04NGU4LTQ3Yzc0YjRlZjAyMSIsInR5cCI6IkJlYXJlciIsImF6cCI6ImNvbmZpZy1hcGkiLCJzZXNzaW9uX3N0YXRlIjoiNGRhZWM2YzItMzA4ZC00MzFkLTg0ZWMtNGFiMjJjOTFmZjg3IiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwOi8vbG9jYWxob3N0Il0sInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJkZWZhdWx0LXJvbGVzLWh5ZHJvbGl4LXVzZXJzIiwib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGNvbmZpZy1hcGktc2VydmljZSBlbWFpbCBwcm9maWxlIiwic2lkIjoiNGRhZWM2YzItMzA4ZC00MzFkLTg0ZWMtNGFiMjJjOTFmZjg3IiwiZW1haWxfdmVyaWZpZWQiOnRydWUsInByZWZlcnJlZF91c2VybmFtZSI6Im1lQGh5ZHJvbGl4LmlvIiwiZW1haWwiOiJtZUBoeWRyb2xpeC5pbyJ9.Yr0hleV6sJZCmOQKXSN82HVRm4RKC7IGW7CVXHJai8vOKMW5uPIiw_1BwaHzKi8DjwftHvhWW0hmEXh492Mj_6csQgvejeCfwbKvZx9rQbBZ-4P4GboB4OgqtZ5macY6D_QQyeXol2otS80E8OTAUBM8o07v_fYd92-nz-qY7ceicT8oI7kLMgEOD6VA7Glue7hqQblofIZMoDK1Ve2WhrOhfgqVDxCloFrLs1VhXevGBkVgz7LF_XoxLyR0UPhyVj7lM3ep3M8FJbuP5afKuJUr2nb3qm5Bxs_r1uuQe7INuEH-CYCPJmsOArJ0BIULgtB3LW1zCsLl_DAMQJhwtg" orgId: "d9ce0431-f26f-44e3-b0ef-abc1653d04eb" projectId: "27506b30-0c78-41fa-a059-048d687f1164" clusterName: "my-cluster" ingestStatistics: true ``` <!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' or '/run-it' --> --------- Signed-off-by: Przemyslaw Delewski <[email protected]> Co-authored-by: Przemysław Hejman <[email protected]>
1 parent 7b3d02e commit cb30918

File tree

9 files changed

+1037
-199
lines changed

9 files changed

+1037
-199
lines changed

platform/backend_connectors/hydrolix_backend_connector.go

Lines changed: 224 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,28 @@
44
package backend_connectors
55

66
import (
7+
"bytes"
78
"context"
9+
"crypto/tls"
810
"database/sql"
11+
"encoding/json"
12+
"fmt"
913
"github.com/QuesmaOrg/quesma/platform/config"
10-
14+
"github.com/QuesmaOrg/quesma/platform/logger"
1115
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
16+
"github.com/google/uuid"
17+
"io"
18+
"net/http"
19+
"sync"
20+
"time"
1221
)
1322

1423
type HydrolixBackendConnector struct {
1524
BasicSqlBackendConnector
16-
cfg *config.RelationalDbConfiguration
25+
cfg *config.RelationalDbConfiguration
26+
client *http.Client
27+
tableCache map[string]uuid.UUID
28+
tableMutex sync.Mutex
1729
}
1830

1931
func (p *HydrolixBackendConnector) GetId() quesma_api.BackendConnectorType {
@@ -29,24 +41,232 @@ func (p *HydrolixBackendConnector) Open() error {
2941
return nil
3042
}
3143

44+
func checkHydrolixConfig(cfg *config.RelationalDbConfiguration) error {
45+
if cfg.Url == nil {
46+
return fmt.Errorf("hydrolix URL is not set")
47+
}
48+
if cfg.HydrolixToken == "" {
49+
return fmt.Errorf("hydrolix token is not set")
50+
}
51+
if cfg.HydrolixOrgId == "" {
52+
return fmt.Errorf("hydrolix organization ID is not set")
53+
}
54+
if cfg.HydrolixProjectId == "" {
55+
return fmt.Errorf("hydrolix project ID is not set")
56+
}
57+
return nil
58+
}
59+
3260
func NewHydrolixBackendConnector(configuration *config.RelationalDbConfiguration) *HydrolixBackendConnector {
61+
if err := checkHydrolixConfig(configuration); err != nil {
62+
logger.Error().Msgf("Invalid Hydrolix configuration: %v", err)
63+
return nil
64+
}
3365
return &HydrolixBackendConnector{
3466
cfg: configuration,
67+
client: &http.Client{
68+
Transport: &http.Transport{
69+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
70+
DisableKeepAlives: true,
71+
},
72+
},
73+
tableCache: make(map[string]uuid.UUID),
3574
}
3675
}
3776

38-
func NewHydrolixBackendConnectorWithConnection(_ string, conn *sql.DB) *HydrolixBackendConnector {
77+
func NewHydrolixBackendConnectorWithConnection(configuration *config.RelationalDbConfiguration, conn *sql.DB) *HydrolixBackendConnector {
78+
if err := checkHydrolixConfig(configuration); err != nil {
79+
logger.Error().Msgf("Invalid Hydrolix configuration: %v", err)
80+
return nil
81+
}
3982
return &HydrolixBackendConnector{
4083
BasicSqlBackendConnector: BasicSqlBackendConnector{
4184
connection: conn,
4285
},
86+
cfg: configuration,
87+
client: &http.Client{
88+
Transport: &http.Transport{
89+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
90+
DisableKeepAlives: true,
91+
},
92+
},
93+
tableCache: make(map[string]uuid.UUID),
4394
}
4495
}
4596

4697
func (p *HydrolixBackendConnector) InstanceName() string {
4798
return "hydrolix" // TODO add name taken from config
4899
}
49100

50-
func (p *HydrolixBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
101+
func isValidJSON(s string) bool {
102+
var js interface{}
103+
return json.Unmarshal([]byte(s), &js) == nil
104+
}
105+
106+
func (p *HydrolixBackendConnector) makeRequest(ctx context.Context, method string, url string, body []byte, token string, tableName string) ([]byte, error) {
107+
// Build the request
108+
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
// Set headers
114+
req.Header.Set("Authorization", "Bearer "+token)
115+
req.Header.Set("Accept", "application/json")
116+
req.Header.Set("Content-Type", "application/json")
117+
req.Header.Set("x-hdx-table", "sample_project."+tableName)
118+
119+
// Send the request
120+
resp, err := p.client.Do(req)
121+
if err != nil {
122+
return nil, fmt.Errorf("ingest request failed: %s", err)
123+
}
124+
defer resp.Body.Close()
125+
126+
// Read and print response
127+
respBody, err := io.ReadAll(resp.Body)
128+
if resp.StatusCode >= 400 {
129+
return nil, fmt.Errorf("ingest failed: %s — %s", resp.Status, string(respBody))
130+
}
131+
return respBody, err
132+
}
133+
134+
type HydrolixResponse struct {
135+
Code int `json:"code"`
136+
Message string `json:"message"`
137+
}
138+
139+
func (p *HydrolixBackendConnector) ingestFun(ctx context.Context, ingestSlice []map[string]interface{}, tableName string, tableId string) error {
140+
logger.InfoWithCtx(ctx).Msgf("Ingests len: %s %d", tableName, len(ingestSlice))
141+
142+
var data []json.RawMessage
143+
144+
for _, row := range ingestSlice {
145+
if len(row) == 0 {
146+
continue
147+
}
148+
ingestJson, err := json.Marshal(row)
149+
if err != nil {
150+
logger.ErrorWithCtx(ctx).Msg("Failed to marshal row")
151+
continue
152+
}
153+
data = append(data, ingestJson)
154+
}
155+
156+
// Final payload: a JSON array of the rows
157+
finalJson, err := json.Marshal(data)
158+
if err != nil {
159+
return fmt.Errorf("failed to marshal final JSON array: %w", err)
160+
}
161+
162+
url := fmt.Sprintf("%s/ingest/event", p.cfg.Url.String())
163+
// Sleep duration is arbitrarily chosen.
164+
// It seems that the Hydrolix API needs some time to process the table creation before ingesting data.
165+
const sleepDuration = 5 * time.Second
166+
const maxRetries = 5
167+
for retries := 0; retries < maxRetries; retries++ {
168+
_, err := p.makeRequest(ctx, "POST", url, finalJson, p.cfg.HydrolixToken, tableName)
169+
if err != nil {
170+
logger.WarnWithCtx(ctx).Msgf("Error ingesting table %s: %v retrying...", tableName, err)
171+
time.Sleep(sleepDuration)
172+
continue
173+
}
174+
175+
logger.InfoWithCtx(ctx).Msgf("Ingests successfull: %s %d", tableName, len(ingestSlice))
176+
return nil
177+
}
178+
return fmt.Errorf("failed to ingest after %d retries: %s", maxRetries, tableName)
179+
}
180+
181+
func (p *HydrolixBackendConnector) getTableIdFromCache(tableName string) (uuid.UUID, bool) {
182+
p.tableMutex.Lock()
183+
defer p.tableMutex.Unlock()
184+
id, exists := p.tableCache[tableName]
185+
return id, exists
186+
}
187+
188+
func (p *HydrolixBackendConnector) setTableIdInCache(tableName string, tableId uuid.UUID) {
189+
p.tableMutex.Lock()
190+
defer p.tableMutex.Unlock()
191+
p.tableCache[tableName] = tableId
192+
}
193+
194+
func (p *HydrolixBackendConnector) createTableWithSchema(ctx context.Context,
195+
createTable map[string]interface{}, transform map[string]interface{},
196+
tableName string, tableId uuid.UUID) error {
197+
url := fmt.Sprintf("%s/config/v1/orgs/%s/projects/%s/tables/", p.cfg.Url.String(), p.cfg.HydrolixOrgId, p.cfg.HydrolixProjectId)
198+
createTableJson, err := json.Marshal(createTable)
199+
logger.Info().Msgf("createtable event: %s %s", tableName, string(createTableJson))
200+
201+
if err != nil {
202+
return fmt.Errorf("error marshalling create_table JSON: %v", err)
203+
}
204+
_, err = p.makeRequest(ctx, "POST", url, createTableJson, p.cfg.HydrolixToken, tableName)
205+
if err != nil {
206+
logger.ErrorWithCtx(ctx).Msgf("error making request: %v", err)
207+
return err
208+
}
209+
210+
url = fmt.Sprintf("%s/config/v1/orgs/%s/projects/%s/tables/%s/transforms", p.cfg.Url.String(), p.cfg.HydrolixOrgId, p.cfg.HydrolixProjectId, tableId.String())
211+
transformJson, err := json.Marshal(transform)
212+
if err != nil {
213+
return fmt.Errorf("error marshalling transform JSON: %v", err)
214+
}
215+
logger.Info().Msgf("transform event: %s %s", tableName, string(transformJson))
216+
217+
_, err = p.makeRequest(ctx, "POST", url, transformJson, p.cfg.HydrolixToken, tableName)
218+
if err != nil {
219+
logger.ErrorWithCtx(ctx).Msgf("error making request: %v", err)
220+
return err
221+
}
222+
return nil
223+
}
224+
225+
func (p *HydrolixBackendConnector) Exec(_ context.Context, query string, args ...interface{}) error {
226+
// TODO context might be cancelled too early
227+
ctx := context.Background()
228+
if !isValidJSON(query) {
229+
return fmt.Errorf("invalid JSON payload: %s", query)
230+
}
231+
232+
// Top-level object
233+
var root map[string]json.RawMessage
234+
if err := json.Unmarshal([]byte(query), &root); err != nil {
235+
return err
236+
}
237+
238+
// Extract each section into its own map (or struct, if needed)
239+
var createTable map[string]interface{}
240+
var transform map[string]interface{}
241+
var ingestSlice []map[string]interface{}
242+
243+
if err := json.Unmarshal(root["create_table"], &createTable); err != nil {
244+
return err
245+
}
246+
if err := json.Unmarshal(root["transform"], &transform); err != nil {
247+
return err
248+
}
249+
if err := json.Unmarshal(root["ingest"], &ingestSlice); err != nil {
250+
return err
251+
}
252+
tableName := createTable["name"].(string)
253+
254+
tableId, _ := p.getTableIdFromCache(tableName)
255+
if len(createTable) > 0 && tableId == uuid.Nil {
256+
tableId = uuid.New()
257+
createTable["uuid"] = tableId.String()
258+
err := p.createTableWithSchema(ctx, createTable, transform, tableName, tableId)
259+
if err != nil {
260+
logger.ErrorWithCtx(ctx).Msgf("error creating table with schema: %v", err)
261+
return err
262+
}
263+
p.setTableIdInCache(tableName, tableId)
264+
}
265+
266+
if len(ingestSlice) > 0 {
267+
logger.Info().Msgf("Received %d rows for table %s", len(ingestSlice), tableName)
268+
go p.ingestFun(ctx, ingestSlice, tableName, tableId.String())
269+
}
270+
51271
return nil
52272
}

platform/clickhouse/connection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ func InitDBConnectionPool(c *config.QuesmaConfiguration) quesma_api.BackendConne
9797
// clean up connections after 5 minutes, before that they may be killed by the firewall
9898
db.SetConnMaxLifetime(time.Duration(5) * time.Minute) // default is 1h
9999

100+
if c.Hydrolix.ConnectorType == quesma_api.GetBackendConnectorNameFromType(quesma_api.HydrolixSQLBackend) {
101+
return backend_connectors.NewHydrolixBackendConnectorWithConnection(&c.Hydrolix, db)
102+
}
100103
return backend_connectors.NewClickHouseBackendConnectorWithConnection(c.ClickHouse.Url.String(), db)
104+
101105
}
102106

103107
// RunClickHouseConnectionDoctor is very blunt and verbose function which aims to print some helpful information

platform/config/config_v2.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,17 @@ type BackendConnector struct {
9595
// RelationalDbConfiguration works fine for non-relational databases too, consider rename
9696
type RelationalDbConfiguration struct {
9797
//ConnectorName string `koanf:"name"`
98-
ConnectorType string `koanf:"type"`
99-
Url *Url `koanf:"url"`
100-
User string `koanf:"user"`
101-
Password string `koanf:"password"`
102-
Database string `koanf:"database"`
103-
ClusterName string `koanf:"clusterName"` // When creating tables by Quesma - they'll use `ON CLUSTER ClusterName` clause
104-
AdminUrl *Url `koanf:"adminUrl"`
105-
DisableTLS bool `koanf:"disableTLS"`
98+
ConnectorType string `koanf:"type"`
99+
Url *Url `koanf:"url"`
100+
User string `koanf:"user"`
101+
Password string `koanf:"password"`
102+
Database string `koanf:"database"`
103+
ClusterName string `koanf:"clusterName"` // When creating tables by Quesma - they'll use `ON CLUSTER ClusterName` clause
104+
AdminUrl *Url `koanf:"adminUrl"`
105+
DisableTLS bool `koanf:"disableTLS"`
106+
HydrolixToken string `koanf:"token"`
107+
HydrolixOrgId string `koanf:"orgId"`
108+
HydrolixProjectId string `koanf:"projectId"`
106109

107110
// This supports es backend only.
108111
ClientCertPath string `koanf:"clientCertPath"`

platform/database_common/schema.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ func GetInstanceType(instanceName string) InstanceType {
167167
switch instanceName {
168168
case "clickhouse":
169169
return ClickHouseInstance
170+
case "hydrolix":
171+
return ClickHouseInstance
170172
case "doris":
171173
return DorisInstance
172174
default:

0 commit comments

Comments
 (0)