@@ -9,12 +9,14 @@ import (
99 "crypto/tls"
1010 "database/sql"
1111 "encoding/json"
12+ "errors"
1213 "fmt"
1314 "github.com/QuesmaOrg/quesma/platform/config"
1415 "github.com/QuesmaOrg/quesma/platform/logger"
1516 quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
1617 "github.com/google/uuid"
1718 "io"
19+ "net"
1820 "net/http"
1921 "strings"
2022 "sync"
@@ -53,7 +55,8 @@ func NewHydrolixBackendConnector(configuration *config.RelationalDbConfiguration
5355 createTableChan : createTableChan ,
5456 client : & http.Client {
5557 Transport : & http.Transport {
56- TLSClientConfig : & tls.Config {InsecureSkipVerify : true },
58+ TLSClientConfig : & tls.Config {InsecureSkipVerify : true },
59+ DisableKeepAlives : true ,
5760 },
5861 },
5962 }
@@ -69,7 +72,8 @@ func NewHydrolixBackendConnectorWithConnection(_ string, conn *sql.DB) *Hydrolix
6972 createTableChan : createTableChan ,
7073 client : & http.Client {
7174 Transport : & http.Transport {
72- TLSClientConfig : & tls.Config {InsecureSkipVerify : true },
75+ TLSClientConfig : & tls.Config {InsecureSkipVerify : true },
76+ DisableKeepAlives : true ,
7377 },
7478 },
7579 }
@@ -108,14 +112,12 @@ func (p *HydrolixBackendConnector) makeRequest(ctx context.Context, method strin
108112 respBody , err := io .ReadAll (resp .Body )
109113 if resp .StatusCode >= 400 {
110114 return nil , fmt .Errorf ("ingest failed: %s — %s" , resp .Status , string (respBody ))
111- } else {
112- logger .InfoWithCtx (ctx ).Msgf ("Ingest successful: %s %s — %s" , tableName , resp .Status , string (respBody ))
113115 }
114116 return respBody , err
115117}
116118
117119// TODO hardcoded for now
118- const token = "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICIybDZyTk1YV2hYQTA5M2tkRHA5ZFctaEMzM2NkOEtWUFhJdURZLWlLeUFjIn0.eyJleHAiOjE3NTMzNTQ5ODQsImlhdCI6MTc1MzI2ODU4NCwianRpIjoiN2IxZWNjZGItYTAwNi00Mzk3LTg4MmYtNjBiM2MyYzdmM2JmIiwiaXNzIjoiaHR0cHM6Ly9sb2NhbGhvc3Qva2V5Y2xvYWsvcmVhbG1zL2h5ZHJvbGl4LXVzZXJzIiwiYXVkIjpbImNvbmZpZy1hcGkiLCJhY2NvdW50Il0sInN1YiI6ImRiMWM1YTJiLTdhYjMtNGNmZi04NGU4LTQ3Yzc0YjRlZjAyMSIsInR5cCI6IkJlYXJlciIsImF6cCI6ImNvbmZpZy1hcGkiLCJzZXNzaW9uX3N0YXRlIjoiOTZjNTFmMDMtMmE5MS00YmUwLTg0MTktM2U2MDA1YWIxYWJmIiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwOi8vbG9jYWxob3N0Il0sInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJkZWZhdWx0LXJvbGVzLWh5ZHJvbGl4LXVzZXJzIiwib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGNvbmZpZy1hcGktc2VydmljZSBlbWFpbCBwcm9maWxlIiwic2lkIjoiOTZjNTFmMDMtMmE5MS00YmUwLTg0MTktM2U2MDA1YWIxYWJmIiwiZW1haWxfdmVyaWZpZWQiOnRydWUsInByZWZlcnJlZF91c2VybmFtZSI6Im1lQGh5ZHJvbGl4LmlvIiwiZW1haWwiOiJtZUBoeWRyb2xpeC5pbyJ9.e1lWfxphcCAN3aGCBUKOA8hl4gcuw9oNI60YRqAs2azGrVOCdIIZ8ri-kGn_6QtDKFdBmlFa7kXhiB7PzVgS5N_8QM5GlXWp-8LgvF7mhpmhP84xHWLhbYsxV3xDqLEcjnwxFN3XbVvzg_AWiECNP2qtqN5yqsSUMPR99JLcPnrZA7pXWXMflVvlenvrjlXvdZt6fmfEgXPoA54OBrS6QUDUNMIk9qdsSJCM-n96k7vo3dDCGyO12EoYQB2-yq7VegSNyaKi1BW1Jl33sSF7GQapU4YJ6ixMN_PUTkL0_ZzRWrPR6ry1qtxGz6phZbbj4LmmduvJlLqjcSrcMTbLdg"
120+ const token = "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICIybDZyTk1YV2hYQTA5M2tkRHA5ZFctaEMzM2NkOEtWUFhJdURZLWlLeUFjIn0.eyJleHAiOjE3NTM3NzY2NTksImlhdCI6MTc1MzY5MDI1OSwianRpIjoiMzNmNzI2M2MtMTA2Zi00MTc1LWJhZTEtOTEzNTJkNTdmOWM0IiwiaXNzIjoiaHR0cHM6Ly9sb2NhbGhvc3Qva2V5Y2xvYWsvcmVhbG1zL2h5ZHJvbGl4LXVzZXJzIiwiYXVkIjpbImNvbmZpZy1hcGkiLCJhY2NvdW50Il0sInN1YiI6ImRiMWM1YTJiLTdhYjMtNGNmZi04NGU4LTQ3Yzc0YjRlZjAyMSIsInR5cCI6IkJlYXJlciIsImF6cCI6ImNvbmZpZy1hcGkiLCJzZXNzaW9uX3N0YXRlIjoiNGRhZWM2YzItMzA4ZC00MzFkLTg0ZWMtNGFiMjJjOTFmZjg3IiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwOi8vbG9jYWxob3N0Il0sInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJkZWZhdWx0LXJvbGVzLWh5ZHJvbGl4LXVzZXJzIiwib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGNvbmZpZy1hcGktc2VydmljZSBlbWFpbCBwcm9maWxlIiwic2lkIjoiNGRhZWM2YzItMzA4ZC00MzFkLTg0ZWMtNGFiMjJjOTFmZjg3IiwiZW1haWxfdmVyaWZpZWQiOnRydWUsInByZWZlcnJlZF91c2VybmFtZSI6Im1lQGh5ZHJvbGl4LmlvIiwiZW1haWwiOiJtZUBoeWRyb2xpeC5pbyJ9.Yr0hleV6sJZCmOQKXSN82HVRm4RKC7IGW7CVXHJai8vOKMW5uPIiw_1BwaHzKi8DjwftHvhWW0hmEXh492Mj_6csQgvejeCfwbKvZx9rQbBZ-4P4GboB4OgqtZ5macY6D_QQyeXol2otS80E8OTAUBM8o07v_fYd92-nz-qY7ceicT8oI7kLMgEOD6VA7Glue7hqQblofIZMoDK1Ve2WhrOhfgqVDxCloFrLs1VhXevGBkVgz7LF_XoxLyR0UPhyVj7lM3ep3M8FJbuP5afKuJUr2nb3qm5Bxs_r1uuQe7INuEH-CYCPJmsOArJ0BIULgtB3LW1zCsLl_DAMQJhwtg"
119121const hdxHost = "3.20.203.177:8888"
120122const orgID = "d9ce0431-f26f-44e3-b0ef-abc1653d04eb"
121123const projectID = "27506b30-0c78-41fa-a059-048d687f1164"
@@ -134,32 +136,69 @@ func listenForCreateTable(ch <-chan string) {
134136 }
135137}
136138
139+ func isConnectionReset (err error ) bool {
140+ // Look for specific substrings or types indicating connection reset
141+ var netErr net.Error
142+ if errors .As (err , & netErr ) {
143+ // You may add extra checks here
144+ }
145+ // Match known error message
146+ return strings .Contains (err .Error (), "connection reset by peer" )
147+ }
148+
137149func (p * HydrolixBackendConnector ) ingestFun (ctx context.Context , ingest []map [string ]interface {}, tableName string , tableId string ) error {
138150 logger .InfoWithCtx (ctx ).Msgf ("Ingests len: %s %d" , tableName , len (ingest ))
151+
152+ var data []json.RawMessage
153+
139154 for _ , row := range ingest {
140155 if len (row ) == 0 {
141156 continue
142157 }
143158 ingestJson , err := json .Marshal (row )
144159 if err != nil {
145- return fmt .Errorf ("error marshalling ingest JSON: %v" , err )
160+ logger .ErrorWithCtx (ctx ).Msg ("Failed to marshal row" )
161+ continue
146162 }
147- url := fmt .Sprintf ("http://%s/ingest/event" , hdxHost )
148- //logger.Info().Msgf("ingest event: %s %s", createTable["name"].(string), string(ingestJson))
149- emitRequest:
150- respJson , err := p .makeRequest (ctx , "POST" , url , ingestJson , token , tableName )
163+ data = append (data , ingestJson )
164+ }
165+
166+ // Final payload: a JSON array of the rows
167+ finalJson , err := json .Marshal (data )
168+ if err != nil {
169+ return fmt .Errorf ("failed to marshal final JSON array: %w" , err )
170+ }
171+
172+ url := fmt .Sprintf ("http://%s/ingest/event" , hdxHost )
173+ for {
174+ respJson , err := p .makeRequest (ctx , "POST" , url , finalJson , token , tableName )
151175 if err != nil {
152- logger .DebugWithCtx (ctx ).Msgf ("Error ingesting table %s: %v" , tableName , err )
176+ logger .ErrorWithCtx (ctx ).Msgf ("Error ingesting table %s: %v" , tableName , err )
177+
178+ // Retry on connection reset
179+ if isConnectionReset (err ) {
180+ logger .WarnWithCtx (ctx ).Msgf ("Connection reset while ingesting table %s, retrying..." , tableName )
181+ time .Sleep (5 * time .Second )
182+ continue
183+ }
184+
185+ // Try to inspect response (even if err is non-nil)
153186 var resp HydrolixResponse
154- if err := json .Unmarshal (respJson , & resp ); err ! = nil {
187+ if len ( respJson ) > 0 && json .Unmarshal (respJson , & resp ) = = nil {
155188 if strings .Contains (resp .Message , "no table" ) {
189+ logger .WarnWithCtx (ctx ).Msgf ("Table %s not found yet, retrying..." , tableName )
156190 time .Sleep (5 * time .Second )
157- goto emitRequest
191+ continue
158192 }
159193 }
194+
195+ // If it's another kind of error — fail
196+ continue
160197 }
198+
199+ logger .InfoWithCtx (ctx ).Msgf ("Ingests successfull: %s %d" , tableName , len (ingest ))
200+ return nil
161201 }
162- return nil
163202}
164203
165204func (p * HydrolixBackendConnector ) Exec (_ context.Context , query string , args ... interface {}) error {
0 commit comments