Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
f9306f7
Fill table name
pdelewski Jun 27, 2025
c55d8e4
Populate more content
pdelewski Jun 27, 2025
6c0c356
Moving some stuff around
pdelewski Jun 27, 2025
11c7f46
Populate events
pdelewski Jun 30, 2025
af2c054
Update test and Exec method
pdelewski Jun 30, 2025
bddae1a
HydrolixBackendConnector Exec
pdelewski Jun 30, 2025
06236ac
Update platform/ingest/hydrolixlowerer.go
pdelewski Jul 3, 2025
b5461b6
Fix compilation error
pdelewski Jul 4, 2025
b23254d
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 4, 2025
6426c82
Fixing linter
pdelewski Jul 4, 2025
f0da77e
Changing to atomic
pdelewski Jul 4, 2025
6bc5ba3
Adding comment
pdelewski Jul 4, 2025
cf8673c
Json sanity check
pdelewski Jul 4, 2025
58108f9
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 15, 2025
89b73ff
Use hydrolix backend connector
pdelewski Jul 15, 2025
6b24e67
First ingest (still hardcoded) to hydrolix via quesma
pdelewski Jul 15, 2025
2910aeb
Use maps instead of strings
pdelewski Jul 15, 2025
32dd1c5
Remove hardcoded name
pdelewski Jul 15, 2025
57c87e1
Handle more than one table
pdelewski Jul 16, 2025
3e54942
Schema improvements, almost correct column names
pdelewski Jul 16, 2025
0382627
Handle arrays and maps #1
pdelewski Jul 16, 2025
9c2076a
Handle arrays and maps #1
pdelewski Jul 16, 2025
91266a5
Some fixes
pdelewski Jul 16, 2025
e2f9aa8
Some tooling
pdelewski Jul 16, 2025
7b47b3a
Refactoring
pdelewski Jul 16, 2025
8eb48a2
Clear ids
pdelewski Jul 16, 2025
7678632
Fixing linter
pdelewski Jul 16, 2025
4f66cca
Skipping test for now
pdelewski Jul 16, 2025
d2c100d
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 16, 2025
3abd7ce
Adding hydrolix instance
pdelewski Jul 16, 2025
11e40dc
First ingest
pdelewski Jul 17, 2025
3bfa1d5
Ingest more than one row
pdelewski Jul 18, 2025
b5ba992
Some fixes plus introduction goroutine for async exec
pdelewski Jul 18, 2025
08b3a14
First real primitive types ingest
pdelewski Jul 18, 2025
88d579b
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 18, 2025
5f22d83
Set ctx to background
pdelewski Jul 18, 2025
cecbf90
Refactoring
pdelewski Jul 21, 2025
514be8d
Fixes
pdelewski Jul 21, 2025
1e2b338
Omit datetime for now
pdelewski Jul 21, 2025
2fffa03
Make ingest request again in the case of no table found error
pdelewski Jul 21, 2025
64bd5aa
Removing unused code
pdelewski Jul 21, 2025
9ba3a4f
Fixing static check
pdelewski Jul 21, 2025
ebc9bdc
Array handling
pdelewski Jul 22, 2025
4369ada
Handling datetime
pdelewski Jul 22, 2025
d1417da
Handling map type
pdelewski Jul 23, 2025
26a0a29
Attributes changes
pdelewski Jul 23, 2025
90be129
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 23, 2025
4d145ba
Fixing data amount ingestion
pdelewski Jul 28, 2025
df31f65
Simplifying
pdelewski Jul 28, 2025
0ae0add
Improvements
pdelewski Jul 28, 2025
eb53768
Improvements #2
pdelewski Jul 28, 2025
66e8595
Improvements #3
pdelewski Jul 28, 2025
955dd01
Getting rid of some hardcoded values
pdelewski Jul 28, 2025
4973f5e
Improvements #4
pdelewski Jul 28, 2025
ac7e005
Returning error instead of panic
pdelewski Jul 29, 2025
a896c63
Review remarks #2
pdelewski Jul 29, 2025
8203503
Review remarks #3
pdelewski Jul 29, 2025
f0e3f88
Adding Hydrolix prefix
pdelewski Jul 29, 2025
f0a1879
Config validation
pdelewski Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 224 additions & 4 deletions platform/backend_connectors/hydrolix_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,28 @@
package backend_connectors

import (
"bytes"
"context"
"crypto/tls"
"database/sql"
"encoding/json"
"fmt"
"github.com/QuesmaOrg/quesma/platform/config"

"github.com/QuesmaOrg/quesma/platform/logger"
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
"github.com/google/uuid"
"io"
"net/http"
"sync"
"time"
)

type HydrolixBackendConnector struct {
BasicSqlBackendConnector
cfg *config.RelationalDbConfiguration
cfg *config.RelationalDbConfiguration
client *http.Client
tableCache map[string]uuid.UUID
tableMutex sync.Mutex
}

func (p *HydrolixBackendConnector) GetId() quesma_api.BackendConnectorType {
Expand All @@ -29,24 +41,232 @@ func (p *HydrolixBackendConnector) Open() error {
return nil
}

func checkHydrolixConfig(cfg *config.RelationalDbConfiguration) error {
if cfg.Url == nil {
return fmt.Errorf("hydrolix URL is not set")
}
if cfg.HydrolixToken == "" {
return fmt.Errorf("hydrolix token is not set")
}
if cfg.HydrolixOrgId == "" {
return fmt.Errorf("hydrolix organization ID is not set")
}
if cfg.HydrolixProjectId == "" {
return fmt.Errorf("hydrolix project ID is not set")
}
return nil
}

func NewHydrolixBackendConnector(configuration *config.RelationalDbConfiguration) *HydrolixBackendConnector {
if err := checkHydrolixConfig(configuration); err != nil {
logger.Error().Msgf("Invalid Hydrolix configuration: %v", err)
return nil
}
return &HydrolixBackendConnector{
cfg: configuration,
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DisableKeepAlives: true,
},
},
tableCache: make(map[string]uuid.UUID),
}
}

func NewHydrolixBackendConnectorWithConnection(_ string, conn *sql.DB) *HydrolixBackendConnector {
func NewHydrolixBackendConnectorWithConnection(configuration *config.RelationalDbConfiguration, conn *sql.DB) *HydrolixBackendConnector {
if err := checkHydrolixConfig(configuration); err != nil {
logger.Error().Msgf("Invalid Hydrolix configuration: %v", err)
return nil
}
return &HydrolixBackendConnector{
BasicSqlBackendConnector: BasicSqlBackendConnector{
connection: conn,
},
cfg: configuration,
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DisableKeepAlives: true,
},
},
tableCache: make(map[string]uuid.UUID),
}
}

func (p *HydrolixBackendConnector) InstanceName() string {
return "hydrolix" // TODO add name taken from config
}

func (p *HydrolixBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
func isValidJSON(s string) bool {
var js interface{}
return json.Unmarshal([]byte(s), &js) == nil
}

func (p *HydrolixBackendConnector) makeRequest(ctx context.Context, method string, url string, body []byte, token string, tableName string) ([]byte, error) {
// Build the request
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}

// Set headers
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-hdx-table", "sample_project."+tableName)

// Send the request
resp, err := p.client.Do(req)
if err != nil {
return nil, fmt.Errorf("ingest request failed: %s", err)
}
defer resp.Body.Close()

// Read and print response
respBody, err := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("ingest failed: %s — %s", resp.Status, string(respBody))
}
return respBody, err
}

type HydrolixResponse struct {
Code int `json:"code"`
Message string `json:"message"`
}

func (p *HydrolixBackendConnector) ingestFun(ctx context.Context, ingestSlice []map[string]interface{}, tableName string, tableId string) error {
logger.InfoWithCtx(ctx).Msgf("Ingests len: %s %d", tableName, len(ingestSlice))

var data []json.RawMessage

for _, row := range ingestSlice {
if len(row) == 0 {
continue
}
ingestJson, err := json.Marshal(row)
if err != nil {
logger.ErrorWithCtx(ctx).Msg("Failed to marshal row")
continue
}
data = append(data, ingestJson)
}

// Final payload: a JSON array of the rows
finalJson, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal final JSON array: %w", err)
}

url := fmt.Sprintf("%s/ingest/event", p.cfg.Url.String())
// Sleep duration is arbitrarily chosen.
// It seems that the Hydrolix API needs some time to process the table creation before ingesting data.
const sleepDuration = 5 * time.Second
const maxRetries = 5
for retries := 0; retries < maxRetries; retries++ {
_, err := p.makeRequest(ctx, "POST", url, finalJson, p.cfg.HydrolixToken, tableName)
if err != nil {
logger.WarnWithCtx(ctx).Msgf("Error ingesting table %s: %v retrying...", tableName, err)
time.Sleep(sleepDuration)
continue
}

logger.InfoWithCtx(ctx).Msgf("Ingests successfull: %s %d", tableName, len(ingestSlice))
return nil
}
return fmt.Errorf("failed to ingest after %d retries: %s", maxRetries, tableName)
}

func (p *HydrolixBackendConnector) getTableIdFromCache(tableName string) (uuid.UUID, bool) {
p.tableMutex.Lock()
defer p.tableMutex.Unlock()
id, exists := p.tableCache[tableName]
return id, exists
}

func (p *HydrolixBackendConnector) setTableIdInCache(tableName string, tableId uuid.UUID) {
p.tableMutex.Lock()
defer p.tableMutex.Unlock()
p.tableCache[tableName] = tableId
}

func (p *HydrolixBackendConnector) createTableWithSchema(ctx context.Context,
createTable map[string]interface{}, transform map[string]interface{},
tableName string, tableId uuid.UUID) error {
url := fmt.Sprintf("%s/config/v1/orgs/%s/projects/%s/tables/", p.cfg.Url.String(), p.cfg.HydrolixOrgId, p.cfg.HydrolixProjectId)
createTableJson, err := json.Marshal(createTable)
logger.Info().Msgf("createtable event: %s %s", tableName, string(createTableJson))

if err != nil {
return fmt.Errorf("error marshalling create_table JSON: %v", err)
}
_, err = p.makeRequest(ctx, "POST", url, createTableJson, p.cfg.HydrolixToken, tableName)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making request: %v", err)
return err
}

url = fmt.Sprintf("%s/config/v1/orgs/%s/projects/%s/tables/%s/transforms", p.cfg.Url.String(), p.cfg.HydrolixOrgId, p.cfg.HydrolixProjectId, tableId.String())
transformJson, err := json.Marshal(transform)
if err != nil {
return fmt.Errorf("error marshalling transform JSON: %v", err)
}
logger.Info().Msgf("transform event: %s %s", tableName, string(transformJson))

_, err = p.makeRequest(ctx, "POST", url, transformJson, p.cfg.HydrolixToken, tableName)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making request: %v", err)
return err
}
return nil
}

func (p *HydrolixBackendConnector) Exec(_ context.Context, query string, args ...interface{}) error {
// TODO context might be cancelled too early
ctx := context.Background()
if !isValidJSON(query) {
return fmt.Errorf("invalid JSON payload: %s", query)
}

// Top-level object
var root map[string]json.RawMessage
if err := json.Unmarshal([]byte(query), &root); err != nil {
return err
}

// Extract each section into its own map (or struct, if needed)
var createTable map[string]interface{}
var transform map[string]interface{}
var ingestSlice []map[string]interface{}

if err := json.Unmarshal(root["create_table"], &createTable); err != nil {
return err
}
if err := json.Unmarshal(root["transform"], &transform); err != nil {
return err
}
if err := json.Unmarshal(root["ingest"], &ingestSlice); err != nil {
return err
}
tableName := createTable["name"].(string)

tableId, _ := p.getTableIdFromCache(tableName)
if len(createTable) > 0 && tableId == uuid.Nil {
tableId = uuid.New()
createTable["uuid"] = tableId.String()
err := p.createTableWithSchema(ctx, createTable, transform, tableName, tableId)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error creating table with schema: %v", err)
return err
}
p.setTableIdInCache(tableName, tableId)
}

if len(ingestSlice) > 0 {
logger.Info().Msgf("Received %d rows for table %s", len(ingestSlice), tableName)
go p.ingestFun(ctx, ingestSlice, tableName, tableId.String())
}

return nil
}
4 changes: 4 additions & 0 deletions platform/clickhouse/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func InitDBConnectionPool(c *config.QuesmaConfiguration) quesma_api.BackendConne
// clean up connections after 5 minutes, before that they may be killed by the firewall
db.SetConnMaxLifetime(time.Duration(5) * time.Minute) // default is 1h

if c.Hydrolix.ConnectorType == quesma_api.GetBackendConnectorNameFromType(quesma_api.HydrolixSQLBackend) {
return backend_connectors.NewHydrolixBackendConnectorWithConnection(&c.Hydrolix, db)
}
return backend_connectors.NewClickHouseBackendConnectorWithConnection(c.ClickHouse.Url.String(), db)

}

// RunClickHouseConnectionDoctor is very blunt and verbose function which aims to print some helpful information
Expand Down
19 changes: 11 additions & 8 deletions platform/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,17 @@ type BackendConnector struct {
// RelationalDbConfiguration works fine for non-relational databases too, consider rename
type RelationalDbConfiguration struct {
//ConnectorName string `koanf:"name"`
ConnectorType string `koanf:"type"`
Url *Url `koanf:"url"`
User string `koanf:"user"`
Password string `koanf:"password"`
Database string `koanf:"database"`
ClusterName string `koanf:"clusterName"` // When creating tables by Quesma - they'll use `ON CLUSTER ClusterName` clause
AdminUrl *Url `koanf:"adminUrl"`
DisableTLS bool `koanf:"disableTLS"`
ConnectorType string `koanf:"type"`
Url *Url `koanf:"url"`
User string `koanf:"user"`
Password string `koanf:"password"`
Database string `koanf:"database"`
ClusterName string `koanf:"clusterName"` // When creating tables by Quesma - they'll use `ON CLUSTER ClusterName` clause
AdminUrl *Url `koanf:"adminUrl"`
DisableTLS bool `koanf:"disableTLS"`
HydrolixToken string `koanf:"token"`
HydrolixOrgId string `koanf:"orgId"`
HydrolixProjectId string `koanf:"projectId"`

// This supports es backend only.
ClientCertPath string `koanf:"clientCertPath"`
Expand Down
2 changes: 2 additions & 0 deletions platform/database_common/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func GetInstanceType(instanceName string) InstanceType {
switch instanceName {
case "clickhouse":
return ClickHouseInstance
case "hydrolix":
return ClickHouseInstance
case "doris":
return DorisInstance
default:
Expand Down
Loading