Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit a896c63

Browse files
committed
Review remarks #2
1 parent ac7e005 commit a896c63

File tree

2 files changed

+5
-0
lines changed

2 files changed

+5
-0
lines changed

platform/backend_connectors/hydrolix_backend_connector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ func (p *HydrolixBackendConnector) ingestFun(ctx context.Context, ingestSlice []
136136
}
137137

138138
url := fmt.Sprintf("%s/ingest/event", p.cfg.Url.String())
139+
// Sleep duration is arbitrarily chosen.
140+
// It seems that the Hydrolix API needs some time to process the table creation before ingesting data.
139141
const sleepDuration = 5 * time.Second
140142
const maxRetries = 5
141143
for retries := 0; retries < maxRetries; retries++ {

platform/ingest/hydrolixlowerer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type HydrolixLowerer struct {
2626
ingestFieldStatistics IngestFieldStatistics
2727
ingestFieldStatisticsLock sync.Mutex
2828
tableCreteStatementMapping map[*chLib.Table]CreateTableStatement // cache for table creation statements
29+
tableCreationLock sync.Mutex
2930
}
3031

3132
func NewHydrolixLowerer(virtualTableStorage persistence.JSONDatabase) *HydrolixLowerer {
@@ -451,11 +452,13 @@ func (l *HydrolixLowerer) LowerToDDL(
451452
createTableCmd CreateTableStatement,
452453
) ([]string, error) {
453454

455+
l.tableCreationLock.Lock()
454456
if _, exists := l.tableCreteStatementMapping[table]; !exists {
455457
l.tableCreteStatementMapping[table] = createTableCmd
456458
} else {
457459
createTableCmd = l.tableCreteStatementMapping[table]
458460
}
461+
l.tableCreationLock.Unlock()
459462

460463
// --- Create Table Section ---
461464
createTable := map[string]interface{}{

0 commit comments

Comments
 (0)