Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
f9306f7
Fill table name
pdelewski Jun 27, 2025
c55d8e4
Populate more content
pdelewski Jun 27, 2025
6c0c356
Moving some stuff around
pdelewski Jun 27, 2025
11c7f46
Populate events
pdelewski Jun 30, 2025
af2c054
Update test and Exec method
pdelewski Jun 30, 2025
bddae1a
HydrolixBackendConnector Exec
pdelewski Jun 30, 2025
06236ac
Update platform/ingest/hydrolixlowerer.go
pdelewski Jul 3, 2025
b5461b6
Fix compilation error
pdelewski Jul 4, 2025
b23254d
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 4, 2025
6426c82
Fixing linter
pdelewski Jul 4, 2025
f0da77e
Changing to atomic
pdelewski Jul 4, 2025
6bc5ba3
Adding comment
pdelewski Jul 4, 2025
cf8673c
Json sanity check
pdelewski Jul 4, 2025
58108f9
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 15, 2025
89b73ff
Use hydrolix backend connector
pdelewski Jul 15, 2025
6b24e67
First ingest (still hardcoded) to hydrolix via quesma
pdelewski Jul 15, 2025
2910aeb
Use maps instead of strings
pdelewski Jul 15, 2025
32dd1c5
Remove hardcoded name
pdelewski Jul 15, 2025
57c87e1
Handle more than one table
pdelewski Jul 16, 2025
3e54942
Schema improvements, almost correct column names
pdelewski Jul 16, 2025
0382627
Handle arrays and maps #1
pdelewski Jul 16, 2025
9c2076a
Handle arrays and maps #1
pdelewski Jul 16, 2025
91266a5
Some fixes
pdelewski Jul 16, 2025
e2f9aa8
Some tooling
pdelewski Jul 16, 2025
7b47b3a
Refactoring
pdelewski Jul 16, 2025
8eb48a2
Clear ids
pdelewski Jul 16, 2025
7678632
Fixing linter
pdelewski Jul 16, 2025
4f66cca
Skipping test for now
pdelewski Jul 16, 2025
d2c100d
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 16, 2025
3abd7ce
Adding hydrolix instance
pdelewski Jul 16, 2025
11e40dc
First ingest
pdelewski Jul 17, 2025
3bfa1d5
Ingest more than one row
pdelewski Jul 18, 2025
b5ba992
Some fixes plus introduction goroutine for async exec
pdelewski Jul 18, 2025
08b3a14
First real primitive types ingest
pdelewski Jul 18, 2025
88d579b
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 18, 2025
5f22d83
Set ctx to background
pdelewski Jul 18, 2025
cecbf90
Refactoring
pdelewski Jul 21, 2025
514be8d
Fixes
pdelewski Jul 21, 2025
1e2b338
Omit datetime for now
pdelewski Jul 21, 2025
2fffa03
Make ingest request again in the case of no table found error
pdelewski Jul 21, 2025
64bd5aa
Removing unused code
pdelewski Jul 21, 2025
9ba3a4f
Fixing static check
pdelewski Jul 21, 2025
ebc9bdc
Array handling
pdelewski Jul 22, 2025
4369ada
Handling datetime
pdelewski Jul 22, 2025
d1417da
Handling map type
pdelewski Jul 23, 2025
26a0a29
Attributes changes
pdelewski Jul 23, 2025
90be129
Merge branch 'main' into que-312-implement-ddl
pdelewski Jul 23, 2025
4d145ba
Fixing data amount ingestion
pdelewski Jul 28, 2025
df31f65
Simplifying
pdelewski Jul 28, 2025
0ae0add
Improvements
pdelewski Jul 28, 2025
eb53768
Improvements #2
pdelewski Jul 28, 2025
66e8595
Improvements #3
pdelewski Jul 28, 2025
955dd01
Getting rid of some hardcoded values
pdelewski Jul 28, 2025
4973f5e
Improvements #4
pdelewski Jul 28, 2025
ac7e005
Returning error instead of panic
pdelewski Jul 29, 2025
a896c63
Review remarks #2
pdelewski Jul 29, 2025
8203503
Review remarks #3
pdelewski Jul 29, 2025
f0e3f88
Adding Hydrolix prefix
pdelewski Jul 29, 2025
f0a1879
Config validation
pdelewski Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion platform/backend_connectors/hydrolix_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,23 @@
package backend_connectors

import (
"bytes"
"context"
"database/sql"
"fmt"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
"io"
"net/http"

quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
)

type HydrolixBackendConnector struct {
BasicSqlBackendConnector
cfg *config.RelationalDbConfiguration
cfg *config.RelationalDbConfiguration
IngestURL string
AccessToken string
}

func (p *HydrolixBackendConnector) GetId() quesma_api.BackendConnectorType {
Expand Down Expand Up @@ -48,5 +55,34 @@ func (p *HydrolixBackendConnector) InstanceName() string {
}

func (p *HydrolixBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if p.IngestURL == "" || p.AccessToken == "" {
logger.Info().Msg("missing ingest URL or access token")
// TODO for fallback, execute the query directly on the database connection
_, err := p.connection.ExecContext(ctx, query)
return err
}

// Create HTTP request using the JSON payload from query
req, err := http.NewRequestWithContext(ctx, "POST", p.IngestURL, bytes.NewBufferString(query))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+p.AccessToken)
req.Header.Set("Content-Type", "application/json")

// Execute HTTP request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()

// Handle error response
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("ingest failed: %s — %s", resp.Status, string(body))
}

return nil
}
155 changes: 130 additions & 25 deletions platform/ingest/hydrolixlowerer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
package ingest

import (
"encoding/json"
"fmt"
chLib "github.com/QuesmaOrg/quesma/platform/clickhouse"
"github.com/QuesmaOrg/quesma/platform/persistence"
"github.com/QuesmaOrg/quesma/platform/schema"
"github.com/QuesmaOrg/quesma/platform/types"
"strings"
"sync/atomic"
)

type HydrolixLowerer struct {
virtualTableStorage persistence.JSONDatabase
ingestCounter int64
}

func NewHydrolixLowerer(virtualTableStorage persistence.JSONDatabase) *HydrolixLowerer {
Expand All @@ -19,37 +24,137 @@ func NewHydrolixLowerer(virtualTableStorage persistence.JSONDatabase) *HydrolixL
}
}

func (l *HydrolixLowerer) LowerToDDL(validatedJsons []types.JSON,
func (ip *HydrolixLowerer) GenerateIngestContent(table *chLib.Table,
data types.JSON,
inValidJson types.JSON,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]AlterStatement, types.JSON, []NonSchemaField, error) {

if len(table.Config.Attributes) == 0 {
return nil, data, nil, nil
}

mDiff := DifferenceMap(data, table) // TODO change to DifferenceMap(m, t)

if len(mDiff) == 0 && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return nil, data, nil, nil
}

// check attributes precondition
if len(table.Config.Attributes) <= 0 {
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
}
attrsMap, _ := BuildAttrsMap(mDiff, table.Config)

// generateNewColumns is called on original attributes map
// before adding invalid fields to it
// otherwise it would contain invalid fields e.g. with wrong types
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterStatements []AlterStatement
atomic.AddInt64(&ip.ingestCounter, 1)
//if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
// alterStatements = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
//}
// If there are some invalid fields, we need to add them to the attributes map
// to not lose them and be able to store them later by
// generating correct update query
// addInvalidJsonFieldsToAttributes returns a new map with invalid fields added
// this map is then used to generate non-schema fields string
attrsMapWithInvalidFields := addInvalidJsonFieldsToAttributes(attrsMap, inValidJson)
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)

if err != nil {
return nil, nil, nil, err
}

onlySchemaFields := RemoveNonSchemaFields(data, table)

return alterStatements, onlySchemaFields, nonSchemaFields, nil
}

func (l *HydrolixLowerer) LowerToDDL(
validatedJsons []types.JSON,
table *chLib.Table,
invalidJsons []types.JSON,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName,
createTableCmd CreateTableStatement) ([]string, error) {
createTableCmd CreateTableStatement,
) ([]string, error) {
// Construct columns array
var columnsJSON strings.Builder
columnsJSON.WriteString("[\n")

for i, col := range createTableCmd.Columns {
if i > 0 {
columnsJSON.WriteString(",\n")
}
columnsJSON.WriteString(fmt.Sprintf(` { "name": "%s", "type": "%s"`, col.ColumnName, col.ColumnType))
if col.Comment != "" {
columnsJSON.WriteString(fmt.Sprintf(`, "comment": "%s"`, col.Comment))
}
if col.AdditionalMetadata != "" {
columnsJSON.WriteString(fmt.Sprintf(`, "metadata": "%s"`, col.AdditionalMetadata))
}
columnsJSON.WriteString(" }")
}

columnsJSON.WriteString("\n]")

const timeColumnName = "ingest_time"

const (
partitioningStrategy = "strategy"
partitioningField = "field"
partitioningGranularity = "granularity"

defaultStrategy = "time"
defaultField = "ingest_time"
defaultGranularity = "day"
)
partitioningJSON := fmt.Sprintf(`"partitioning": {
"%s": "%s",
"%s": "%s",
"%s": "%s"
}`,
partitioningStrategy, defaultStrategy,
partitioningField, defaultField,
partitioningGranularity, defaultGranularity)
events := make(map[string]any)
for i, preprocessedJson := range validatedJsons {
_ = i
_ = preprocessedJson
_, onlySchemaFields, nonSchemaFields, err := l.GenerateIngestContent(table, preprocessedJson,
invalidJsons[i], encodings)
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
}
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
}
content := convertNonSchemaFieldsToMap(nonSchemaFields)

for k, v := range onlySchemaFields {
content[k] = v
}

for k, v := range content {
events[k] = v
}
}

eventList := []map[string]any{events}
eventBytes, err := json.MarshalIndent(eventList, " ", " ")
if err != nil {
return nil, err
}
eventJSON := string(eventBytes)

result := []string{`{
result := fmt.Sprintf(`{
"schema": {
"project": "",
"name": "test_index",
"time_column": "ingest_time",
"columns": [
{ "name": "new_field", "type": "string" },
{ "name": "ingest_time", "type": "datetime", "default": "NOW" }
],
"partitioning": {
"strategy": "time",
"field": "ingest_time",
"granularity": "day"
}
"project": "%s",
"name": "%s",
"time_column": "%s",
"columns": %s,
%s,
},
"events": [
{
"new_field": "bar"
}
]
}`}

return result, nil
"events": %s
}`, table.DatabaseName, table.Name, timeColumnName, columnsJSON.String(), partitioningJSON, eventJSON)
return []string{result}, nil
}
26 changes: 14 additions & 12 deletions platform/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,25 +480,27 @@ func TestHydrolixIngest(t *testing.T) {

expectedStatements: []string{
fmt.Sprintf(`{
"schema": {
"schema": {
"project": "%s",
"name": "test_index",
"time_column": "ingest_time",
"columns": [
{ "name": "new_field", "type": "string" },
{ "name": "ingest_time", "type": "datetime", "default": "NOW" }
],
{ "name": "@timestamp", "type": "DateTime64(3)", "metadata": "DEFAULT now64()" },
{ "name": "attributes_values", "type": "Map(String,String)" },
{ "name": "attributes_metadata", "type": "Map(String,String)" },
{ "name": "new_field", "type": "Nullable(String)", "comment": "quesmaMetadataV1:fieldName=new_field" }
],
"partitioning": {
"strategy": "time",
"field": "ingest_time",
"granularity": "day"
}
"strategy": "time",
"field": "ingest_time",
"granularity": "day"
},
},
"events": [
{
"new_field": "bar"
}
]
{
"new_field": "bar"
}
]
}`, projectName),
},
},
Expand Down
48 changes: 0 additions & 48 deletions platform/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,54 +627,6 @@ func (ip *SqlLowerer) shouldAlterColumns(table *chLib.Table, attrsMap map[string
return false, nil
}

func (ip *SqlLowerer) GenerateIngestContent(table *chLib.Table,
data types.JSON,
inValidJson types.JSON,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]AlterStatement, types.JSON, []NonSchemaField, error) {

if len(table.Config.Attributes) == 0 {
return nil, data, nil, nil
}

mDiff := DifferenceMap(data, table) // TODO change to DifferenceMap(m, t)

if len(mDiff) == 0 && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return nil, data, nil, nil
}

// check attributes precondition
if len(table.Config.Attributes) <= 0 {
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
}
attrsMap, _ := BuildAttrsMap(mDiff, table.Config)

// generateNewColumns is called on original attributes map
// before adding invalid fields to it
// otherwise it would contain invalid fields e.g. with wrong types
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterStatements []AlterStatement
atomic.AddInt64(&ip.ingestCounter, 1)
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
alterStatements = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
}
// If there are some invalid fields, we need to add them to the attributes map
// to not lose them and be able to store them later by
// generating correct update query
// addInvalidJsonFieldsToAttributes returns a new map with invalid fields added
// this map is then used to generate non-schema fields string
attrsMapWithInvalidFields := addInvalidJsonFieldsToAttributes(attrsMap, inValidJson)
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)

if err != nil {
return nil, nil, nil, err
}

onlySchemaFields := RemoveNonSchemaFields(data, table)

return alterStatements, onlySchemaFields, nonSchemaFields, nil
}

func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
result := convertNonSchemaFieldsToMap(nonSchemaFields)

Expand Down
Loading
Loading