Skip to content

Commit a889c27

Browse files
committed
cleanup
Signed-off-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
1 parent c122e47 commit a889c27

1 file changed

Lines changed: 85 additions & 81 deletions

File tree

gitops-engine/pkg/sync/sync_context.go

Lines changed: 85 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -552,9 +552,10 @@ func (sc *syncContext) Sync() {
552552
// syncFailTasks only run during failure, so separate them from regular tasks
553553
syncFailTasks, tasks := tasks.Split(func(t *syncTask) bool { return t.phase == common.SyncPhaseSyncFail })
554554

555-
syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
555+
syncFailedTasks := tasks.Filter(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
556556

557557
// if there are any completed but unsuccessful tasks, sync is a failure.
558+
// we already know tasks do not contain running tasks
558559
if tasks.Any(func(t *syncTask) bool { return t.completed() && !t.successful() }) {
559560
sc.deleteHooks(hooksPendingDeletionFailed)
560561
sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more synchronization tasks completed unsuccessfully")
@@ -578,18 +579,12 @@ func (sc *syncContext) Sync() {
578579
return
579580
}
580581

581-
// remove any tasks not in this wave
582582
phase := tasks.phase()
583583
wave := tasks.wave()
584584
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
585585

586-
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
587-
// EVEN if those objects subsequently degraded
588-
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
589-
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
590-
591586
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
592-
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
587+
tasks, remainingTasks := tasks.Split(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
593588

594589
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
595590

@@ -610,11 +605,31 @@ func (sc *syncContext) Sync() {
610605

611606
switch runState {
612607
case failed:
613-
syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
614-
sc.deleteHooks(hooksPendingDeletionFailed)
615-
sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
608+
// If we failed to apply at least one resource, we need to start the syncFailTasks and wait
609+
// for the completion of any running hooks. In this case, the operation should be running.
610+
syncFailedTasks := tasks.Filter(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
611+
runningHooks := tasks.Filter(func(t *syncTask) bool { return t.running() })
612+
if len(runningHooks) > 0 {
613+
if len(syncFailTasks) > 0 {
614+
completed := sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
615+
if !completed {
616+
runningHooks = append(runningHooks, syncFailTasks...)
617+
}
618+
}
619+
sc.setRunningPhase(runningHooks, false)
620+
} else {
621+
completed := sc.executeSyncFailPhase(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
622+
if completed {
623+
sc.deleteHooks(hooksPendingDeletionFailed)
624+
}
625+
}
616626
case successful:
617-
if remainingTasks.Len() == 0 {
627+
if remainingTasks.Len() == 0 && !tasks.Any(func(task *syncTask) bool { return task.isHook() }) {
628+
// if it is the last phase/wave and the only running tasks are non-hooks, then we are successful
629+
// EVEN if those objects subsequently degrades
630+
// This handles the common case where neither hooks or waves are used and a sync equates to simply
631+
// an (asynchronous) kubectl apply of manifests, which succeeds immediately.
632+
618633
// delete all completed hooks which have appropriate delete policy
619634
sc.deleteHooks(hooksPendingDeletionSuccessful)
620635
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)")
@@ -628,6 +643,41 @@ func (sc *syncContext) Sync() {
628643
}
629644
}
630645

646+
// terminate looks for any running jobs/workflow hooks and deletes the resource
647+
func (sc *syncContext) Terminate() {
648+
sc.log.V(1).Info("terminating")
649+
tasks, _ := sc.getSyncTasks()
650+
651+
// Remove completed hook finalizers
652+
hooksCompleted := tasks.Filter(func(task *syncTask) bool {
653+
return task.isHook() && task.completed()
654+
})
655+
for _, task := range hooksCompleted {
656+
if err := sc.removeHookFinalizer(task); err != nil {
657+
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
658+
}
659+
}
660+
661+
// Terminate running hooks
662+
terminateSuccessful := sc.terminateHooksPreemptively(tasks.Filter(func(task *syncTask) bool { return task.isHook() }))
663+
if terminateSuccessful {
664+
sc.setOperationPhase(common.OperationFailed, "Operation terminated")
665+
} else {
666+
sc.setOperationPhase(common.OperationError, "Operation termination had errors")
667+
}
668+
}
669+
670+
func (sc *syncContext) GetState() (common.OperationPhase, string, []common.ResourceSyncResult) {
671+
var resourceRes []common.ResourceSyncResult
672+
for _, v := range sc.syncRes {
673+
resourceRes = append(resourceRes, v)
674+
}
675+
sort.Slice(resourceRes, func(i, j int) bool {
676+
return resourceRes[i].Order < resourceRes[j].Order
677+
})
678+
return sc.phase, sc.message, resourceRes
679+
}
680+
631681
// filter out out-of-sync tasks
632682
func (sc *syncContext) filterOutOfSyncTasks(tasks syncTasks) syncTasks {
633683
return tasks.Filter(func(t *syncTask) bool {
@@ -762,32 +812,6 @@ func (sc *syncContext) removeHookFinalizer(task *syncTask) error {
762812
})
763813
}
764814

765-
func (sc *syncContext) getResource(task *syncTask) (*unstructured.Unstructured, error) {
766-
sc.log.WithValues("task", task).V(1).Info("Getting resource")
767-
resIf, err := sc.getResourceIf(task, "get")
768-
if err != nil {
769-
return nil, err
770-
}
771-
liveObj, err := resIf.Get(context.TODO(), task.name(), metav1.GetOptions{})
772-
if err != nil {
773-
return nil, fmt.Errorf("failed to get resource: %w", err)
774-
}
775-
return liveObj, nil
776-
}
777-
778-
func (sc *syncContext) updateResource(task *syncTask) error {
779-
sc.log.WithValues("task", task).V(1).Info("Updating resource")
780-
resIf, err := sc.getResourceIf(task, "update")
781-
if err != nil {
782-
return err
783-
}
784-
_, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{})
785-
if err != nil {
786-
return fmt.Errorf("failed to update resource: %w", err)
787-
}
788-
return nil
789-
}
790-
791815
func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) {
792816
for _, task := range hooksPendingDeletion {
793817
err := sc.deleteResource(task)
@@ -797,17 +821,6 @@ func (sc *syncContext) deleteHooks(hooksPendingDeletion syncTasks) {
797821
}
798822
}
799823

800-
func (sc *syncContext) GetState() (common.OperationPhase, string, []common.ResourceSyncResult) {
801-
var resourceRes []common.ResourceSyncResult
802-
for _, v := range sc.syncRes {
803-
resourceRes = append(resourceRes, v)
804-
}
805-
sort.Slice(resourceRes, func(i, j int) bool {
806-
return resourceRes[i].Order < resourceRes[j].Order
807-
})
808-
return sc.phase, sc.message, resourceRes
809-
}
810-
811824
func (sc *syncContext) executeSyncFailPhase(syncFailTasks, syncFailedTasks syncTasks, message string) (completed bool) {
812825
errorMessageFactory := func(tasks syncTasks, message string) string {
813826
messages := tasks.Map(func(task *syncTask) string {
@@ -1369,28 +1382,30 @@ func (sc *syncContext) hasCRDOfGroupKind(group string, kind string) bool {
13691382
return false
13701383
}
13711384

1372-
// terminate looks for any running jobs/workflow hooks and deletes the resource
1373-
func (sc *syncContext) Terminate() {
1374-
sc.log.V(1).Info("terminating")
1375-
tasks, _ := sc.getSyncTasks()
1376-
1377-
// Remove completed hook finalizers
1378-
hooksCompleted := tasks.Filter(func(task *syncTask) bool {
1379-
return task.isHook() && task.completed()
1380-
})
1381-
for _, task := range hooksCompleted {
1382-
if err := sc.removeHookFinalizer(task); err != nil {
1383-
sc.setResourceResult(task, task.syncStatus, common.OperationError, fmt.Sprintf("Failed to remove hook finalizer: %v", err))
1384-
}
1385+
func (sc *syncContext) getResource(task *syncTask) (*unstructured.Unstructured, error) {
1386+
sc.log.WithValues("task", task).V(1).Info("Getting resource")
1387+
resIf, err := sc.getResourceIf(task, "get")
1388+
if err != nil {
1389+
return nil, err
1390+
}
1391+
liveObj, err := resIf.Get(context.TODO(), task.name(), metav1.GetOptions{})
1392+
if err != nil {
1393+
return nil, fmt.Errorf("failed to get resource: %w", err)
13851394
}
1395+
return liveObj, nil
1396+
}
13861397

1387-
// Terminate running hooks
1388-
terminateSuccessful := sc.terminateHooksPreemptively(tasks.Filter(func(task *syncTask) bool { return task.isHook() }))
1389-
if terminateSuccessful {
1390-
sc.setOperationPhase(common.OperationFailed, "Operation terminated")
1391-
} else {
1392-
sc.setOperationPhase(common.OperationError, "Operation termination had errors")
1398+
func (sc *syncContext) updateResource(task *syncTask) error {
1399+
sc.log.WithValues("task", task).V(1).Info("Updating resource")
1400+
resIf, err := sc.getResourceIf(task, "update")
1401+
if err != nil {
1402+
return err
1403+
}
1404+
_, err = resIf.Update(context.TODO(), task.liveObj, metav1.UpdateOptions{})
1405+
if err != nil {
1406+
return fmt.Errorf("failed to update resource: %w", err)
13931407
}
1408+
return nil
13941409
}
13951410

13961411
func (sc *syncContext) deleteResource(task *syncTask) error {
@@ -1438,16 +1453,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
14381453
sc.log.WithValues("numTasks", len(tasks), "dryRun", dryRun).V(1).Info("Running tasks")
14391454

14401455
state := successful
1441-
var createTasks syncTasks
1442-
var pruneTasks syncTasks
1443-
1444-
for _, task := range tasks {
1445-
if task.isPrune() {
1446-
pruneTasks = append(pruneTasks, task)
1447-
} else {
1448-
createTasks = append(createTasks, task)
1449-
}
1450-
}
1456+
pruneTasks, createTasks := tasks.Split(func(task *syncTask) bool { return task.isPrune() })
14511457

14521458
// remove finalizers from previous sync on existing hooks to make sure the operation is idempotent
14531459
{
@@ -1512,7 +1518,6 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
15121518
}
15131519
state = ss.Wait()
15141520
}
1515-
15161521
if state != successful {
15171522
return state
15181523
}
@@ -1530,7 +1535,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
15301535
err := sc.deleteResource(t)
15311536
if err != nil {
15321537
// it is possible to get a race condition here, such that the resource does not exist when
1533-
// delete is requested, we treat this as a nopand remove the liveObj
1538+
// delete is requested, we treat this as a no-op and remove the liveObj
15341539
if !apierrors.IsNotFound(err) {
15351540
state = failed
15361541
sc.setResourceResult(t, t.syncStatus, common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
@@ -1549,7 +1554,6 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
15491554
}
15501555
state = ss.Wait()
15511556
}
1552-
15531557
if state != successful {
15541558
return state
15551559
}

0 commit comments

Comments
 (0)