Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 66e8595

Browse files
committed
Improvements #3
1 parent eb53768 commit 66e8595

File tree

1 file changed

+14
-13
lines changed

1 file changed

+14
-13
lines changed

platform/backend_connectors/hydrolix_backend_connector.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type HydrolixBackendConnector struct {
2828
AccessToken string
2929
Headers map[string]string
3030
client *http.Client
31+
tableCache map[string]uuid.UUID
32+
tableMutex sync.Mutex
3133
}
3234

3335
func (p *HydrolixBackendConnector) GetId() quesma_api.BackendConnectorType {
@@ -52,6 +54,7 @@ func NewHydrolixBackendConnector(configuration *config.RelationalDbConfiguration
5254
DisableKeepAlives: true,
5355
},
5456
},
57+
tableCache: make(map[string]uuid.UUID),
5558
}
5659
}
5760

@@ -66,6 +69,7 @@ func NewHydrolixBackendConnectorWithConnection(_ string, conn *sql.DB) *Hydrolix
6669
DisableKeepAlives: true,
6770
},
6871
},
72+
tableCache: make(map[string]uuid.UUID),
6973
}
7074
}
7175

@@ -117,9 +121,6 @@ type HydrolixResponse struct {
117121
Message string `json:"message"`
118122
}
119123

120-
var tableCache = make(map[string]uuid.UUID)
121-
var tableMutex sync.Mutex
122-
123124
func (p *HydrolixBackendConnector) ingestFun(ctx context.Context, ingestSlice []map[string]interface{}, tableName string, tableId string) error {
124125
logger.InfoWithCtx(ctx).Msgf("Ingests len: %s %d", tableName, len(ingestSlice))
125126

@@ -160,17 +161,17 @@ func (p *HydrolixBackendConnector) ingestFun(ctx context.Context, ingestSlice []
160161
return fmt.Errorf("failed to ingest after %d retries: %s", maxRetries, tableName)
161162
}
162163

163-
func getTableIdFromCache(tableName string) (uuid.UUID, bool) {
164-
tableMutex.Lock()
165-
defer tableMutex.Unlock()
166-
id, exists := tableCache[tableName]
164+
func (p *HydrolixBackendConnector) getTableIdFromCache(tableName string) (uuid.UUID, bool) {
165+
p.tableMutex.Lock()
166+
defer p.tableMutex.Unlock()
167+
id, exists := p.tableCache[tableName]
167168
return id, exists
168169
}
169170

170-
func setTableIdInCache(tableName string, tableId uuid.UUID) {
171-
tableMutex.Lock()
172-
defer tableMutex.Unlock()
173-
tableCache[tableName] = tableId
171+
func (p *HydrolixBackendConnector) setTableIdInCache(tableName string, tableId uuid.UUID) {
172+
p.tableMutex.Lock()
173+
defer p.tableMutex.Unlock()
174+
p.tableCache[tableName] = tableId
174175
}
175176

176177
func (p *HydrolixBackendConnector) createTableWithSchema(ctx context.Context,
@@ -233,7 +234,7 @@ func (p *HydrolixBackendConnector) Exec(_ context.Context, query string, args ..
233234
}
234235
tableName := createTable["name"].(string)
235236

236-
tableId, _ := getTableIdFromCache(tableName)
237+
tableId, _ := p.getTableIdFromCache(tableName)
237238
if len(createTable) > 0 && tableId == uuid.Nil {
238239
tableId = uuid.New()
239240
createTable["uuid"] = tableId.String()
@@ -242,7 +243,7 @@ func (p *HydrolixBackendConnector) Exec(_ context.Context, query string, args ..
242243
logger.ErrorWithCtx(ctx).Msgf("error creating table with schema: %v", err)
243244
return err
244245
}
245-
setTableIdInCache(tableName, tableId)
246+
p.setTableIdInCache(tableName, tableId)
246247
}
247248

248249
if len(ingestSlice) > 0 {

0 commit comments

Comments
 (0)