Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 4d145ba

Browse files
committed
Fixing data amount ingestion
1 parent 90be129 commit 4d145ba

File tree

1 file changed

+53
-14
lines changed

1 file changed

+53
-14
lines changed

platform/backend_connectors/hydrolix_backend_connector.go

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import (
99
"crypto/tls"
1010
"database/sql"
1111
"encoding/json"
12+
"errors"
1213
"fmt"
1314
"github.com/QuesmaOrg/quesma/platform/config"
1415
"github.com/QuesmaOrg/quesma/platform/logger"
1516
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
1617
"github.com/google/uuid"
1718
"io"
19+
"net"
1820
"net/http"
1921
"strings"
2022
"sync"
@@ -53,7 +55,8 @@ func NewHydrolixBackendConnector(configuration *config.RelationalDbConfiguration
5355
createTableChan: createTableChan,
5456
client: &http.Client{
5557
Transport: &http.Transport{
56-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
58+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
59+
DisableKeepAlives: true,
5760
},
5861
},
5962
}
@@ -69,7 +72,8 @@ func NewHydrolixBackendConnectorWithConnection(_ string, conn *sql.DB) *Hydrolix
6972
createTableChan: createTableChan,
7073
client: &http.Client{
7174
Transport: &http.Transport{
72-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
75+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
76+
DisableKeepAlives: true,
7377
},
7478
},
7579
}
@@ -108,14 +112,12 @@ func (p *HydrolixBackendConnector) makeRequest(ctx context.Context, method strin
108112
respBody, err := io.ReadAll(resp.Body)
109113
if resp.StatusCode >= 400 {
110114
return nil, fmt.Errorf("ingest failed: %s — %s", resp.Status, string(respBody))
111-
} else {
112-
logger.InfoWithCtx(ctx).Msgf("Ingest successful: %s %s — %s", tableName, resp.Status, string(respBody))
113115
}
114116
return respBody, err
115117
}
116118

117119
// TODO hardcoded for now
118-
const token = "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICIybDZyTk1YV2hYQTA5M2tkRHA5ZFctaEMzM2NkOEtWUFhJdURZLWlLeUFjIn0.eyJleHAiOjE3NTMzNTQ5ODQsImlhdCI6MTc1MzI2ODU4NCwianRpIjoiN2IxZWNjZGItYTAwNi00Mzk3LTg4MmYtNjBiM2MyYzdmM2JmIiwiaXNzIjoiaHR0cHM6Ly9sb2NhbGhvc3Qva2V5Y2xvYWsvcmVhbG1zL2h5ZHJvbGl4LXVzZXJzIiwiYXVkIjpbImNvbmZpZy1hcGkiLCJhY2NvdW50Il0sInN1YiI6ImRiMWM1YTJiLTdhYjMtNGNmZi04NGU4LTQ3Yzc0YjRlZjAyMSIsInR5cCI6IkJlYXJlciIsImF6cCI6ImNvbmZpZy1hcGkiLCJzZXNzaW9uX3N0YXRlIjoiOTZjNTFmMDMtMmE5MS00YmUwLTg0MTktM2U2MDA1YWIxYWJmIiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwOi8vbG9jYWxob3N0Il0sInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJkZWZhdWx0LXJvbGVzLWh5ZHJvbGl4LXVzZXJzIiwib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGNvbmZpZy1hcGktc2VydmljZSBlbWFpbCBwcm9maWxlIiwic2lkIjoiOTZjNTFmMDMtMmE5MS00YmUwLTg0MTktM2U2MDA1YWIxYWJmIiwiZW1haWxfdmVyaWZpZWQiOnRydWUsInByZWZlcnJlZF91c2VybmFtZSI6Im1lQGh5ZHJvbGl4LmlvIiwiZW1haWwiOiJtZUBoeWRyb2xpeC5pbyJ9.e1lWfxphcCAN3aGCBUKOA8hl4gcuw9oNI60YRqAs2azGrVOCdIIZ8ri-kGn_6QtDKFdBmlFa7kXhiB7PzVgS5N_8QM5GlXWp-8LgvF7mhpmhP84xHWLhbYsxV3xDqLEcjnwxFN3XbVvzg_AWiECNP2qtqN5yqsSUMPR99JLcPnrZA7pXWXMflVvlenvrjlXvdZt6fmfEgXPoA54OBrS6QUDUNMIk9qdsSJCM-n96k7vo3dDCGyO12EoYQB2-yq7VegSNyaKi1BW1Jl33sSF7GQapU4YJ6ixMN_PUTkL0_ZzRWrPR6ry1qtxGz6phZbbj4LmmduvJlLqjcSrcMTbLdg"
120+
const token = "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICIybDZyTk1YV2hYQTA5M2tkRHA5ZFctaEMzM2NkOEtWUFhJdURZLWlLeUFjIn0.eyJleHAiOjE3NTM3NzY2NTksImlhdCI6MTc1MzY5MDI1OSwianRpIjoiMzNmNzI2M2MtMTA2Zi00MTc1LWJhZTEtOTEzNTJkNTdmOWM0IiwiaXNzIjoiaHR0cHM6Ly9sb2NhbGhvc3Qva2V5Y2xvYWsvcmVhbG1zL2h5ZHJvbGl4LXVzZXJzIiwiYXVkIjpbImNvbmZpZy1hcGkiLCJhY2NvdW50Il0sInN1YiI6ImRiMWM1YTJiLTdhYjMtNGNmZi04NGU4LTQ3Yzc0YjRlZjAyMSIsInR5cCI6IkJlYXJlciIsImF6cCI6ImNvbmZpZy1hcGkiLCJzZXNzaW9uX3N0YXRlIjoiNGRhZWM2YzItMzA4ZC00MzFkLTg0ZWMtNGFiMjJjOTFmZjg3IiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwOi8vbG9jYWxob3N0Il0sInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJkZWZhdWx0LXJvbGVzLWh5ZHJvbGl4LXVzZXJzIiwib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGNvbmZpZy1hcGktc2VydmljZSBlbWFpbCBwcm9maWxlIiwic2lkIjoiNGRhZWM2YzItMzA4ZC00MzFkLTg0ZWMtNGFiMjJjOTFmZjg3IiwiZW1haWxfdmVyaWZpZWQiOnRydWUsInByZWZlcnJlZF91c2VybmFtZSI6Im1lQGh5ZHJvbGl4LmlvIiwiZW1haWwiOiJtZUBoeWRyb2xpeC5pbyJ9.Yr0hleV6sJZCmOQKXSN82HVRm4RKC7IGW7CVXHJai8vOKMW5uPIiw_1BwaHzKi8DjwftHvhWW0hmEXh492Mj_6csQgvejeCfwbKvZx9rQbBZ-4P4GboB4OgqtZ5macY6D_QQyeXol2otS80E8OTAUBM8o07v_fYd92-nz-qY7ceicT8oI7kLMgEOD6VA7Glue7hqQblofIZMoDK1Ve2WhrOhfgqVDxCloFrLs1VhXevGBkVgz7LF_XoxLyR0UPhyVj7lM3ep3M8FJbuP5afKuJUr2nb3qm5Bxs_r1uuQe7INuEH-CYCPJmsOArJ0BIULgtB3LW1zCsLl_DAMQJhwtg"
119121
const hdxHost = "3.20.203.177:8888"
120122
const orgID = "d9ce0431-f26f-44e3-b0ef-abc1653d04eb"
121123
const projectID = "27506b30-0c78-41fa-a059-048d687f1164"
@@ -134,32 +136,69 @@ func listenForCreateTable(ch <-chan string) {
134136
}
135137
}
136138

139+
func isConnectionReset(err error) bool {
140+
// Look for specific substrings or types indicating connection reset
141+
var netErr net.Error
142+
if errors.As(err, &netErr) {
143+
// You may add extra checks here
144+
}
145+
// Match known error message
146+
return strings.Contains(err.Error(), "connection reset by peer")
147+
}
148+
137149
func (p *HydrolixBackendConnector) ingestFun(ctx context.Context, ingest []map[string]interface{}, tableName string, tableId string) error {
138150
logger.InfoWithCtx(ctx).Msgf("Ingests len: %s %d", tableName, len(ingest))
151+
152+
var data []json.RawMessage
153+
139154
for _, row := range ingest {
140155
if len(row) == 0 {
141156
continue
142157
}
143158
ingestJson, err := json.Marshal(row)
144159
if err != nil {
145-
return fmt.Errorf("error marshalling ingest JSON: %v", err)
160+
logger.ErrorWithCtx(ctx).Msg("Failed to marshal row")
161+
continue
146162
}
147-
url := fmt.Sprintf("http://%s/ingest/event", hdxHost)
148-
//logger.Info().Msgf("ingest event: %s %s", createTable["name"].(string), string(ingestJson))
149-
emitRequest:
150-
respJson, err := p.makeRequest(ctx, "POST", url, ingestJson, token, tableName)
163+
data = append(data, ingestJson)
164+
}
165+
166+
// Final payload: a JSON array of the rows
167+
finalJson, err := json.Marshal(data)
168+
if err != nil {
169+
return fmt.Errorf("failed to marshal final JSON array: %w", err)
170+
}
171+
172+
url := fmt.Sprintf("http://%s/ingest/event", hdxHost)
173+
for {
174+
respJson, err := p.makeRequest(ctx, "POST", url, finalJson, token, tableName)
151175
if err != nil {
152-
logger.DebugWithCtx(ctx).Msgf("Error ingesting table %s: %v", tableName, err)
176+
logger.ErrorWithCtx(ctx).Msgf("Error ingesting table %s: %v", tableName, err)
177+
178+
// Retry on connection reset
179+
if isConnectionReset(err) {
180+
logger.WarnWithCtx(ctx).Msgf("Connection reset while ingesting table %s, retrying...", tableName)
181+
time.Sleep(5 * time.Second)
182+
continue
183+
}
184+
185+
// Try to inspect response (even if err is non-nil)
153186
var resp HydrolixResponse
154-
if err := json.Unmarshal(respJson, &resp); err != nil {
187+
if len(respJson) > 0 && json.Unmarshal(respJson, &resp) == nil {
155188
if strings.Contains(resp.Message, "no table") {
189+
logger.WarnWithCtx(ctx).Msgf("Table %s not found yet, retrying...", tableName)
156190
time.Sleep(5 * time.Second)
157-
goto emitRequest
191+
continue
158192
}
159193
}
194+
195+
// If it's another kind of error — continue to the next iteration
196+
continue
160197
}
198+
199+
logger.InfoWithCtx(ctx).Msgf("Ingests successfull: %s %d", tableName, len(ingest))
200+
return nil
161201
}
162-
return nil
163202
}
164203

165204
func (p *HydrolixBackendConnector) Exec(_ context.Context, query string, args ...interface{}) error {

0 commit comments

Comments
 (0)