Skip to content

Commit 90fc51d

Browse files
authored
document internal/utils (#211)
Signed-off-by: Geoff Flarity <gflarity@nvidia.com>
1 parent c2171c7 commit 90fc51d

File tree

2 files changed

+18
-23
lines changed

2 files changed

+18
-23
lines changed

operator/internal/utils/componentname.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,9 @@ func GetPodCliqueSetReplicaIndexFromPodCliqueFQN(pcsName, pclqFQNName string) (i
3838
return strconv.Atoi(pclqFQNName[replicaStartIndex:replicaEndIndex])
3939
}
4040

41-
// GetPodCliqueNameFromPodCliqueFQN get unqualified PodClique name from FQN.
41+
// GetPodCliqueNameFromPodCliqueFQN extracts the unqualified PodClique name from a fully qualified name.
4242
func GetPodCliqueNameFromPodCliqueFQN(pclqObjectMeta metav1.ObjectMeta) (string, error) {
4343
pclqObjectKey := client.ObjectKey{Name: pclqObjectMeta.Name, Namespace: pclqObjectMeta.Namespace}
44-
45-
// Check if the PodClique is part of a PodCliqueScalingGroup
4644
pcsgName, ok := pclqObjectMeta.Labels[apicommon.LabelPodCliqueScalingGroup]
4745
if ok {
4846
// get the pcsg replica index
@@ -54,12 +52,10 @@ func GetPodCliqueNameFromPodCliqueFQN(pclqObjectMeta metav1.ObjectMeta) (string,
5452
return pclqObjectMeta.Name[len(pclqNamePrefix):], nil
5553
}
5654

57-
// If it is not part of PCSG then the PCLQ is a standalone PCLQ which is part of PCS
5855
pcsName, ok := pclqObjectMeta.Labels[apicommon.LabelPartOfKey]
5956
if !ok {
6057
return "", fmt.Errorf("missing label %s on PodClique: %v", apicommon.LabelPartOfKey, pclqObjectKey)
6158
}
62-
// Get the PCS replica index
6359
pcsReplicaIndex, ok := pclqObjectMeta.Labels[apicommon.LabelPodCliqueSetReplicaIndex]
6460
if !ok {
6561
return "", fmt.Errorf("missing label %s on PodClique: %v", apicommon.LabelPodCliqueSetReplicaIndex, pclqObjectKey)

operator/internal/utils/concurrent.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,42 +35,40 @@ type Task struct {
3535
Fn func(ctx context.Context) error
3636
}
3737

38-
// RunResult is a structure that holds the results of running taskConfigs concurrently.
38+
// RunResult holds the results of running tasks concurrently.
3939
type RunResult struct {
40-
// SuccessfulTasks holds the names of taskConfigs that were executed successfully.
40+
// SuccessfulTasks are the names of tasks that executed successfully.
4141
SuccessfulTasks []string
42-
// FailedTasks holds the names of taskConfigs that failed during execution.
42+
// FailedTasks are the names of tasks that failed during execution.
4343
FailedTasks []string
44-
// SkippedTasks holds the names of taskConfigs that were skipped (not executed).
44+
// SkippedTasks are the names of tasks that were skipped.
4545
SkippedTasks []string
46-
// Errors holds the errors encountered during the execution of taskConfigs.
46+
// Errors contains all errors encountered during task execution.
4747
Errors []error
4848
}
4949

50-
// HasErrors checks if there are any taskConfigs that errored out during execution.
50+
// HasErrors returns true if any tasks encountered errors.
5151
func (r *RunResult) HasErrors() bool {
5252
return len(r.Errors) > 0
5353
}
5454

55-
// GetAggregatedError aggregates all errors encountered during the execution of taskConfigs.
55+
// GetAggregatedError returns all task errors joined into a single error.
5656
func (r *RunResult) GetAggregatedError() error {
5757
if !r.HasErrors() {
5858
return nil
5959
}
6060
return errors.Join(r.Errors...)
6161
}
6262

63-
// GetSummary returns a summary of the RunResult, including the number of successful, failed, and skipped taskConfigs.
63+
// GetSummary returns a summary of successful, failed, and skipped tasks.
6464
func (r *RunResult) GetSummary() string {
6565
return fmt.Sprintf("RunResult{SuccessfulTasks: %v, FailedTasks: %v, SkippedTasks: %v}",
6666
r.SuccessfulTasks, r.FailedTasks, r.SkippedTasks)
6767
}
6868

69-
// RunConcurrentlyWithSlowStart executes a slice of Tasks by grouping them into batches that double in size each time, starting with `initialBatchSize`.
70-
// If the whole batch succeeds, the next batch is doubled in size and executed.
71-
// If there are any errors in the batch, then it fails fast and returns the errors immediately, thus halting the execution of further taskConfigs.
72-
// kube-apiserver does not provide batching of requests, therefore if there are many resources for which calls are made to kube-apiserver,
73-
// then this function prevents overwhelming the kube-apiserver with too many requests at once.
69+
// RunConcurrentlyWithSlowStart executes tasks in exponentially growing batches starting at initialBatchSize.
70+
// Each successful batch doubles the size of the next batch. On any batch failure, execution halts immediately
71+
// and remaining tasks are marked as skipped. This prevents overwhelming kube-apiserver with concurrent requests.
7472
func RunConcurrentlyWithSlowStart(ctx context.Context, logger logr.Logger, initialBatchSize int, tasks []Task) RunResult {
7573
remaining := len(tasks)
7674
aggregatedRunResult := RunResult{}
@@ -106,8 +104,7 @@ func RunConcurrentlyWithBounds(ctx context.Context, logger logr.Logger, tasks []
106104
return createRunResult(tasks, tasksInError)
107105
}
108106

109-
// Functions to create and update RunResult
110-
// -------------------------------------------------------------------------------------------
107+
// createRunResult builds a RunResult from all tasks, separating successful and failed tasks.
111108
func createRunResult(allTasks []Task, tasksInError []lo.Tuple2[string, error]) RunResult {
112109
result := RunResult{
113110
SuccessfulTasks: make([]string, 0, len(allTasks)),
@@ -129,12 +126,14 @@ func createRunResult(allTasks []Task, tasksInError []lo.Tuple2[string, error]) R
129126
return result
130127
}
131128

129+
// updateWithBatchRunResult merges a batch result into the aggregated result.
132130
func updateWithBatchRunResult(aggregatedRunResult *RunResult, batchRunResult RunResult) {
133131
aggregatedRunResult.SuccessfulTasks = append(aggregatedRunResult.SuccessfulTasks, batchRunResult.SuccessfulTasks...)
134132
aggregatedRunResult.FailedTasks = append(aggregatedRunResult.FailedTasks, batchRunResult.FailedTasks...)
135133
aggregatedRunResult.Errors = append(aggregatedRunResult.Errors, batchRunResult.Errors...)
136134
}
137135

136+
// computeAndUpdateSkippedTasks identifies tasks that were neither successful nor failed and marks them as skipped.
138137
func computeAndUpdateSkippedTasks(result *RunResult, allTasks []Task) {
139138
allTaskNames := lo.Map(allTasks, func(task Task, _ int) string {
140139
return task.Name
@@ -145,15 +144,13 @@ func computeAndUpdateSkippedTasks(result *RunResult, allTasks []Task) {
145144
result.SkippedTasks = append(result.SkippedTasks, skippedTaskNames...)
146145
}
147146

148-
// Types and functions/methods to manage concurrent execution of taskConfigs
149-
// -------------------------------------------------------------------------------------------
150-
151147
type runGroup struct {
152148
logger logr.Logger
153149
wg sync.WaitGroup
154150
errTaskCh chan lo.Tuple2[string, error]
155151
}
156152

153+
// newRunGroup creates a runGroup for managing concurrent task execution.
157154
func newRunGroup(numTasks int, logger logr.Logger) *runGroup {
158155
return &runGroup{
159156
logger: logger,
@@ -162,6 +159,7 @@ func newRunGroup(numTasks int, logger logr.Logger) *runGroup {
162159
}
163160
}
164161

162+
// trigger starts a task in a new goroutine, capturing panics and errors.
165163
func (rg *runGroup) trigger(ctx context.Context, task Task) {
166164
rg.wg.Add(1)
167165
rg.logger.V(4).Info("triggering concurrent execution of task", "taskName", task.Name)
@@ -181,6 +179,7 @@ func (rg *runGroup) trigger(ctx context.Context, task Task) {
181179
}(task)
182180
}
183181

182+
// waitAndCollectErroneousTasks waits for all tasks to complete and returns tasks that encountered errors.
184183
func (rg *runGroup) waitAndCollectErroneousTasks() []lo.Tuple2[string, error] {
185184
rg.wg.Wait()
186185
close(rg.errTaskCh)

0 commit comments

Comments
 (0)