Skip to content

Conversation

@vishalm0509
Copy link
Collaborator

Description

This PR introduces the Clear Destination feature that allows users to manually or automatically clear data from a destination linked to a job when configuration or stream changes are detected.

Added two new API endpoints:

  • POST /api/v1/project/:projectid/jobs/:id/clear-destination – Triggers a clear destination workflow for the specified job.

  • GET /api/v1/project/:projectid/jobs/:id/clear-destination – Returns the current status of the clear destination workflow (running or not running).

Type of change

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How Has This Been Tested?

  • Run clear destination manually
  • Update job and check if configured streams are getting dropped from the destination

vishalm0509 and others added 17 commits October 27, 2025 16:28
* feat: show error logs in test connection in source and destination

* fix: fix test connection fail modal

* fix: resolve comments

* chore: add comments (#224)

* chore: add comments

* chore: 🚨 lint fix

* feat: add error logs in check connection for source and destination (#195)

* feat: add error logs i check conenction for source and destination

* fix: parse json message as string

* chore: update response key

* chore: refactor

* chore: remove logEntry struct

* fix: log parsing

* fix: log parsing

* fix: logs parsing logic

* docs: updated api contract

* test: ✅ base setup for automation testing using play… (#203)

* test: ✅ base setup for automation testing using playwright , add test cases fro major flows

* fix: integration workflow changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* test: add postgres flow

* fix: lint changes

* test: add iceberg specific test

* fix: tests

* fix: stream name

* fix: test fix

* fix: test fix

* fix: test fix

* fix: test fix

* fix: changes

* fix: ci changes

* fix: ci changes

* fix: ci changes

* test: fail test on test connection failure

* fix: ci changes

* fix: changes

* fix: changes

* fix: changes

* fix: changes

* fix: changes

* fix: changes

* fix: test fix

* fix: changes

* fix: changes

* fix: test-data fix

* fix: test-data fix

* fix: changes

* fix: changes

* fix: changes

* fix: host and port changes

* fix: changes

* fix: change

* test: login test updated

* test: refactor create source and destination page

* test: refactor tests

* fix: test fix

* fix: changes

* fix: fix changes

* fix: changes

* fix: changes

* fix: review changes

* refactor: refactor tests to use authenticated state of playwright

* fix: changes

* docs: update tests readme

* test: intercept and log destination spec response

* docs: update readme

* fix: update timeouts for playwright tests

* test: update readme

* fix: add retries in playwright config

* test: fix timeout duration

* fix: update readme

* test: add connector data test-id

* fix: resolve comments

* refactor(test): make and use enums instead of strings

* fix: fix test data builder

* fix: test data job name

* fix: add todo

---------

Co-authored-by: Duke Dhal <[email protected]>
Co-authored-by: deepanshupal09-datazip <[email protected]>

* feat: show refetch success message in job history page (#227)

* Added success message on Job History refetch

* Update ui/src/modules/jobs/pages/JobHistory.tsx

Co-authored-by: deepanshupal09-datazip <[email protected]>

---------

Co-authored-by: vikash choudhary <[email protected]>
Co-authored-by: deepanshupal09-datazip <[email protected]>

* fix: replace deprecated Phosphor icons with latest *Icon alternatives (#216)

* fix: replace deprecated Phosphor icons with latest *Icon alternatives

* fix: update Phosphor icons to latest alternatives in DestinationEdit and SourceEdit components and fix Lint issue

* refactor: remove EditSourceModal

* fix: update icon import

---------

Co-authored-by: vikash choudhary <[email protected]>

* feat: add clear destination

* fix: merge conflicts

* fix: build error fix

* feat: append only mode (#232)

* feat: add append only mode in ui

* fix: data filter bug

* fix: fix color of append only mode switch

* fix: add custom option in all streams ingestion mode change

* fix: use clsx for conditional styling

* fix: fix icons

* chore: add utils

---------

Co-authored-by: Taraka Swathi <[email protected]>
Co-authored-by: Ankit Sharma <[email protected]>
Co-authored-by: vikash choudhary <[email protected]>
Co-authored-by: Duke Dhal <[email protected]>
Co-authored-by: SARTHAK RANA <[email protected]>
Co-authored-by: vishalm0509 <[email protected]>
@vishalm0509
Copy link
Collaborator Author

Even after resuming a paused job, the options are blocked. @deepanshupal09-datazip

  1. Pause a job
  2. Resume it
image

Comment on lines 100 to 112
if clearRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.ClearDestination); clearRunning {
return fmt.Errorf("clear-destination is in progress, cannot update job")
}

// Cancel sync before updating the job
if syncRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); syncRunning {
logger.Infof("sync is running for job %d, cancelling sync workflow", jobID)
jobSlice := []*models.Job{existingJob}
if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
return fmt.Errorf("failed to cancel sync: %s", err)
}
logger.Infof("successfully cancelled sync for job %d", jobID)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we return type as well from isWorkflow runing, and cancel bases on that, would reduce db queries

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed that check, so extra db query won't happen

Before

	// Cancel sync before updating the job
	if syncRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); syncRunning {
		logger.Infof("sync is running for job %d, cancelling sync workflow", jobID)
		jobSlice := []*models.Job{existingJob}
		if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
			return fmt.Errorf("failed to cancel sync: %s", err)
		}
		logger.Infof("successfully cancelled sync for job %d", jobID)
	}

After

		if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
			return fmt.Errorf("failed to cancel sync: %s", err)
		}


if len(diffCatalog) > 0 {
logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID)
if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 10*time.Second); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure clear destination would only run after any of the workflow is not running

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing this...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 264 to 269
// Check if sync is running and wait for it to stop
if running, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); running {
if err := waitForSyncToStop(ctx, s.temporal, projectID, jobID, syncWaitTime); err != nil {
return nil, fmt.Errorf("sync is in progress, please cancel it before running clear-destination")
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is part of same workflow at a time any one of them would run , why to keep this check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we need to wait for sync to cancel after cancelling it in updateJob flow

return nil, fmt.Errorf("job not found: %s", err)
}

diffCatalog, err := s.temporal.GetDifferenceStreams(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
diffCatalog, err := s.temporal.GetDifferenceStreams(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig)
diffCatalog, err := s.temporal.GetStreamDifference(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for _, job := range jobs {
conditions = append(conditions, fmt.Sprintf(
"(WorkflowId BETWEEN 'sync-%s-%d' AND 'sync-%s-%d-~')",
"(WorkflowId BETWEEN 'sync-%s-%d' AND 'sync-%s-%d-~' AND OperationType = 'sync')",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be backward compatible ,i f not or you can list OperationType != 'clear'

Copy link
Collaborator Author

@vishalm0509 vishalm0509 Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OperationType != 'clear' and ``OperationType = 'sync'` Both will work

Anyways if a workflow is running it will definitely have operationType since we are registering it at the start of the workflow.

doc: https://docs.temporal.io/develop/go/observability#remove-search-attribute

}

// waitForSyncToStop waits for sync workflows to stop with timeout
func waitForSyncToStop(ctx context.Context, tempClient *temporal.Temporal, projectID string, jobID int, maxWaitTime time.Duration) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not fire db queries in loop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked we can't use the DescribeWorkflowExecution so I have increased timeout to 1s and also removed time.After check and added a context timeout instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

projectID, jobID, projectID, jobID, opType,
)

resp, err := tempClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see if can use tempClient.Client.DescribeWorkflowExecution()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tempClient.Client.DescribeWorkflowExecution()

for this we need to provide workflowID, since the workflowID will change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 83 to 102
// UpdateScheduleSpec updates an existing schedule's spec
func (t *Temporal) UpdateScheduleSpec(ctx context.Context, frequency, projectID string, jobID int) error {
cronExpression := utils.ToCron(frequency)
_, scheduleID := t.WorkflowAndScheduleID(projectID, jobID)

handle := t.Client.ScheduleClient().GetHandle(ctx, scheduleID)
return handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
input.Description.Schedule.Spec = &client.ScheduleSpec{
CronExpressions: []string{cronExpression},
}
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
}, nil
},
})
}

// UpdateScheduleAction updates the action (workflow args) of an existing schedule
func (t *Temporal) UpdateScheduleAction(ctx context.Context, projectID string, jobID int, req interface{}) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use only one function for schedule update?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +102 to +103
case ClearDestination:
return time.Hour * 24 * 30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear destination wont take that much time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up for discussion

@deepanshupal09-datazip
Copy link
Collaborator

Even after resuming a paused job, the options are blocked. @deepanshupal09-datazip

  1. Pause a job
  2. Resume it
image

not able to reproduce this

})
}

// @router /project/:projectid/jobs/:id/stream-difference [get]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// @router /project/:projectid/jobs/:id/stream-difference [get]
// @router /project/:projectid/jobs/:id/clear-destination [get]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if len(diffCatalog) > 0 {
logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID)
if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 30*time.Second); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it constant and let us discuss about max wait time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if len(diffCatalog) > 0 {
logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID)
if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 30*time.Second); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if cleat destination failed, how would we recover and clear destination for only these streams req.DifferenceStreams??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO

// TODO: handle clear-destination workflow failure


// Check if sync is running and wait for it to stop
if running, _, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); running {
if err := waitForSyncToStop(ctx, s.temporal, projectID, jobID, syncWaitTime); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if sync is running , ans user click clear destination?, why to wait in that case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't wait because in that case we are passing syncWaitTIme=0

)

resp, err := tempClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Query: query,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass pageSize:1, Avoids loading unnecessary workflow metadata

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOne

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants