Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,20 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin
if err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName); err != nil {
return fmt.Errorf("[clickhouse] unable to clear metadata for sync flow cleanup: %w", err)
}
c.logger.Info("successfully cleared metadata for flow " + jobName)

// delete raw table if exists
rawTableIdentifier := c.getRawTableName(jobName)
if err := c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, rawTableIdentifier)); err != nil {
return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err)
}
c.logger.Info("successfully dropped raw table " + rawTableIdentifier)

// clear avro stage for this flow
if err := DeleteAvroStageForFlow(ctx, jobName); err != nil {
return fmt.Errorf("[clickhouse] unable to clear avro stage: %w", err)
}
c.logger.Info("successfully cleared avro stage for flow " + jobName)

return nil
}
Expand Down
17 changes: 17 additions & 0 deletions flow/connectors/clickhouse/s3_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,20 @@ func GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (*

return &avroFile, nil
}

func DeleteAvroStageForFlow(ctx context.Context, flowJobName string) error {
conn, err := internal.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
return fmt.Errorf("failed to get connection for clearing avro stage: %w", err)
}

if _, err := conn.Exec(ctx, `
DELETE FROM ch_s3_stage
WHERE flow_job_name = $1`,
flowJobName,
); err != nil {
return fmt.Errorf("failed to clear avro stage for flow %s: %w", flowJobName, err)
}

return nil
}
8 changes: 7 additions & 1 deletion flow/e2e/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,13 @@ func (s Suite) TestDropCompleted() {
RequestedFlowState: protos.FlowStatus_STATUS_TERMINATING,
})
require.NoError(s.t, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "drop", func() bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "wait for avro stage dropped", func() bool {
var workflowID string
return s.pg.PostgresConnector.Conn().QueryRow(
s.t.Context(), "SELECT avro_file FROM ch_s3_stage WHERE flow_job_name = $1", flowConnConfig.FlowJobName,
).Scan(&workflowID) == pgx.ErrNoRows
})
e2e.EnvWaitFor(s.t, env, time.Minute, "wait for flow dropped", func() bool {
var workflowID string
return s.pg.PostgresConnector.Conn().QueryRow(
s.t.Context(), "select workflow_id from flows where name = $1", flowConnConfig.FlowJobName,
Expand Down