Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controller/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func hasSharedResourceCondition(app *v1alpha1.Application) (bool, string) {
// Note, this is not foolproof, since a proper fix would require the CRD record
// status.observedGeneration coupled with a health.lua that verifies
// status.observedGeneration == metadata.generation
func delayBetweenSyncWaves(_ common.SyncPhase, _ int, finalWave bool) error {
func delayBetweenSyncWaves(_ []common.SyncIdentity, finalWave bool) error {
if !finalWave {
delaySec := 2
if delaySecStr := os.Getenv(EnvVarSyncWaveDelay); delaySecStr != "" {
Expand Down
12 changes: 11 additions & 1 deletion gitops-engine/pkg/sync/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const (
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
// AnnotationSyncWaveGroup indicates which wave of the sync the resource or hook should be in
AnnotationSyncWaveGroup = "argocd.argoproj.io/sync-wave-group"
// AnnotationSyncWaveGroupDependencies indicates which wave of the sync the resource or hook should be in
AnnotationSyncWaveGroupDependencies = "argocd.argoproj.io/sync-wave-group-dependencies"
// AnnotationKeyHook contains the hook type of a resource
AnnotationKeyHook = "argocd.argoproj.io/hook"
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
Expand Down Expand Up @@ -59,10 +63,16 @@ type PermissionValidator func(un *unstructured.Unstructured, res *metav1.APIReso

type SyncPhase string

type SyncIdentity struct {
Phase SyncPhase
Wave int
WaveGroup int
}

// SyncWaveHook is a callback function which will be invoked after each sync wave is successfully
// applied during a sync operation. The callback indicates which phase and wave it had just
// executed, and whether or not that wave was the final one.
type SyncWaveHook func(phase SyncPhase, wave int, final bool) error
type SyncWaveHook func(t []SyncIdentity, final bool) error

const (
SyncPhasePreSync = "PreSync"
Expand Down
5 changes: 5 additions & 0 deletions gitops-engine/pkg/sync/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat
annotations:
argocd.argoproj.io/sync-wave: "5"

# Sync Groups

The wave groups allow to define independant/dependent sync processes

# Sync Options

The sync options allows customizing the synchronization of selected resources. The options are specified using the
Expand All @@ -89,6 +93,7 @@ How Does It Work Together?
Syncing process orders the resources in the following precedence:

- The phase
- The group with respect to group dependencies
- The wave they are in (lower values first)
- By kind (e.g. namespaces first)
- By name
Expand Down
95 changes: 54 additions & 41 deletions gitops-engine/pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -560,26 +561,29 @@ func (sc *syncContext) Sync() {
return
}

// remove any tasks not in this wave
// remove any tasks which have unsynced dependencies
phase := tasks.phase()
wave := tasks.wave()
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
independantSyncIdentities := tasks.independantSyncIdentities()
allSyncIdentities := tasks.syncIdentities()

// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
// EVEN if those objects subsequently degraded
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
remainingTasks := tasks.Filter(func(t *syncTask) bool {
return !slices.Contains(independantSyncIdentities, t.identity()) || t.isHook()
})

sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
sc.log.WithValues("phase", phase, "independantSyncIdentities", independantSyncIdentities, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool { return slices.Contains(independantSyncIdentities, t.identity()) })

sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")

sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
runState := sc.runTasks(tasks, false)

if sc.syncWaveHook != nil && runState != failed {
err := sc.syncWaveHook(phase, wave, finalWave)
finalWave := phase == tasks.lastPhase() && len(independantSyncIdentities) == len(allSyncIdentities)
err := sc.syncWaveHook(independantSyncIdentities, finalWave)
if err != nil {
sc.deleteHooks(hooksPendingDeletionFailed)
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
Expand Down Expand Up @@ -909,52 +913,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}

// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
pruneTasks := make(map[int][]*syncTask)

tasksByWaveGroup := make(map[int][]*syncTask)
for _, task := range tasks {
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
}
tasksByWaveGroup[task.waveGroup()] = append(tasksByWaveGroup[task.waveGroup()], task)
}
for waveGroup := range tasksByWaveGroup {
pruneTasks := make(map[int][]*syncTask)
for _, task := range tasksByWaveGroup[waveGroup] {
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
}
}

var uniquePruneWaves []int
for k := range pruneTasks {
uniquePruneWaves = append(uniquePruneWaves, k)
}
sort.Ints(uniquePruneWaves)
var uniquePruneWaves []int
for k := range pruneTasks {
uniquePruneWaves = append(uniquePruneWaves, k)
}
sort.Ints(uniquePruneWaves)

// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWaves)
for i := 0; i < n/2; i++ {
// waves to swap
startWave := uniquePruneWaves[i]
endWave := uniquePruneWaves[n-1-i]
// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWaves)
for j := 0; j < n/2; j++ {
// waves to swap
startWave := uniquePruneWaves[j]
endWave := uniquePruneWaves[n-1-j]

for _, task := range pruneTasks[startWave] {
task.waveOverride = &endWave
}
for _, task := range pruneTasks[startWave] {
task.waveOverride = &endWave
}

for _, task := range pruneTasks[endWave] {
task.waveOverride = &startWave
for _, task := range pruneTasks[endWave] {
task.waveOverride = &startWave
}
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
syncPhaseLastWave := 0
for _, task := range tasks {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave

syncPhaseLastWave := 0
for _, task := range tasksByWaveGroup[waveGroup] {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
}
}
}
}
syncPhaseLastWave = syncPhaseLastWave + 1
syncPhaseLastWave = syncPhaseLastWave + 1

for _, task := range tasks {
if task.isPrune() &&
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
task.waveOverride = &syncPhaseLastWave
for _, task := range tasksByWaveGroup[waveGroup] {
if task.isPrune() &&
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
task.waveOverride = &syncPhaseLastWave
}
}

}

tasks.Sort()
Expand Down
Loading
Loading