Skip to content

Commit 96e3db5

Browse files
Dropping ClickHouse mirrors: Cleanup avro stage catalog table (#2762)
This PR deletes all avro file entries in the ch_avro_stage table in catalog for a mirror when that mirror is dropped. Pending: Completing the E2E test
1 parent cad4093 commit 96e3db5

File tree

3 files changed

+32
-1
lines changed

3 files changed

+32
-1
lines changed

flow/connectors/clickhouse/cdc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,20 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin
225225
if err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName); err != nil {
226226
return fmt.Errorf("[clickhouse] unable to clear metadata for sync flow cleanup: %w", err)
227227
}
228+
c.logger.Info("successfully cleared metadata for flow " + jobName)
228229

229230
// delete raw table if exists
230231
rawTableIdentifier := c.getRawTableName(jobName)
231232
if err := c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, rawTableIdentifier)); err != nil {
232233
return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err)
233234
}
235+
c.logger.Info("successfully dropped raw table " + rawTableIdentifier)
236+
237+
// clear avro stage for this flow
238+
if err := DeleteAvroStageForFlow(ctx, jobName); err != nil {
239+
return fmt.Errorf("[clickhouse] unable to clear avro stage: %w", err)
240+
}
241+
c.logger.Info("successfully cleared avro stage for flow " + jobName)
234242

235243
return nil
236244
}

flow/connectors/clickhouse/s3_stage.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,20 @@ func GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (*
6666

6767
return &avroFile, nil
6868
}
69+
70+
func DeleteAvroStageForFlow(ctx context.Context, flowJobName string) error {
71+
conn, err := internal.GetCatalogConnectionPoolFromEnv(ctx)
72+
if err != nil {
73+
return fmt.Errorf("failed to get connection for clearing avro stage: %w", err)
74+
}
75+
76+
if _, err := conn.Exec(ctx, `
77+
DELETE FROM ch_s3_stage
78+
WHERE flow_job_name = $1`,
79+
flowJobName,
80+
); err != nil {
81+
return fmt.Errorf("failed to clear avro stage for flow %s: %w", flowJobName, err)
82+
}
83+
84+
return nil
85+
}

flow/e2e/api/api_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,13 @@ func (s Suite) TestDropCompleted() {
431431
RequestedFlowState: protos.FlowStatus_STATUS_TERMINATING,
432432
})
433433
require.NoError(s.t, err)
434-
e2e.EnvWaitFor(s.t, env, time.Minute, "drop", func() bool {
434+
e2e.EnvWaitFor(s.t, env, time.Minute, "wait for avro stage dropped", func() bool {
435+
var workflowID string
436+
return s.pg.PostgresConnector.Conn().QueryRow(
437+
s.t.Context(), "SELECT avro_file FROM ch_s3_stage WHERE flow_job_name = $1", flowConnConfig.FlowJobName,
438+
).Scan(&workflowID) == pgx.ErrNoRows
439+
})
440+
e2e.EnvWaitFor(s.t, env, time.Minute, "wait for flow dropped", func() bool {
435441
var workflowID string
436442
return s.pg.PostgresConnector.Conn().QueryRow(
437443
s.t.Context(), "select workflow_id from flows where name = $1", flowConnConfig.FlowJobName,

0 commit comments

Comments
 (0)