diff --git a/scenarios/scheduler_stress.go b/scenarios/scheduler_stress.go index 78a3a162..1585d88c 100644 --- a/scenarios/scheduler_stress.go +++ b/scenarios/scheduler_stress.go @@ -24,7 +24,7 @@ func init() { Description: fmt.Sprintf("Stress test Temporal's scheduler functionality by creating, reading, updating, and deleting multiple schedules concurrently. "+ "Available parameters: '%s' (default: %d), '%s' (default: %d), '%s' (default: %d), '%s' (default: %v), '%s' (default: %d), '%s' (default: %v), '%s' (default: %v), "+ "'%s' (default: '%s'), '%s' (default: '%s', options: skip,buffer_one,buffer_all,cancel_other,terminate_other,all), "+ - "'%s' (default: '%s', options: %s,%s), '%s' (default: %v)", + "'%s' (default: '%s', options: %s,%s), '%s' (default: %v), '%s' (default: %v), '%s' (default: %v), '%s' (default: %d)", ScheduleCreationPerIterationFlag, DefaultScheduleCreationPerIteration, ScheduleReadsPerCreationFlag, DefaultScheduleReadsPerCreation, ScheduleUpdatesPerCreationFlag, DefaultScheduleUpdatesPerCreation, @@ -35,7 +35,10 @@ func init() { CronExpressionFlag, DefaultCronExpression, OverlapPolicyFlag, DefaultOverlapPolicy, ScheduledWorkflowTypeFlag, DefaultScheduledWorkflowType, NoopScheduledWorkflowType, SleepScheduleWorkflowType, - EnableChasmSchedulerFlag, DefaultEnableChasmScheduler), + EnableChasmSchedulerFlag, DefaultEnableChasmScheduler, + SkipDeletionFlag, DefaultSkipDeletion, + SkipCreationFlag, DefaultSkipCreation, + WorkerCountFlag, DefaultWorkerCount), ExecutorFn: func() loadgen.Executor { return &loadgen.GenericExecutor{ Execute: func(ctx context.Context, run *loadgen.Run) error { @@ -64,6 +67,9 @@ type schedulerExecutorConfig struct { OverlapPolicy []enums.ScheduleOverlapPolicy ScheduledWorkflowType string EnableChasmScheduler bool + SkipDeletion bool + SkipCreation bool + WorkerCount int } var _ loadgen.Configurable = (*SchedulerExecutor)(nil) @@ -90,6 +96,9 @@ const ( OverlapPolicyFlag = "overlap-policy" ScheduledWorkflowTypeFlag = "scheduled-workflow-type" EnableChasmSchedulerFlag = "enable-chasm-scheduler" + SkipDeletionFlag = "skip-deletion" + SkipCreationFlag = "skip-creation" + WorkerCountFlag = "worker-count" ) const ( @@ -97,10 +106,13 @@ const ( DefaultScheduleReadsPerCreation = 3 DefaultScheduleUpdatesPerCreation = 3 DefaultPayloadSize = 1024 - DefaultCronExpression = "*/5 * * * * *" + DefaultCronExpression = "* * * * * *" DefaultScheduledWorkflowType = NoopScheduledWorkflowType DefaultOverlapPolicy = "all" DefaultEnableChasmScheduler = true + DefaultSkipDeletion = false + DefaultSkipCreation = false + DefaultWorkerCount = 1000 ) // Default duration constants @@ -124,6 +136,9 @@ func (s *SchedulerExecutor) Configure(info loadgen.ScenarioInfo) error { OverlapPolicy: parseOverlapPolicy(info.ScenarioOptionString(OverlapPolicyFlag, DefaultOverlapPolicy)), ScheduledWorkflowType: info.ScenarioOptionString(ScheduledWorkflowTypeFlag, DefaultScheduledWorkflowType), EnableChasmScheduler: info.ScenarioOptionBool(EnableChasmSchedulerFlag, DefaultEnableChasmScheduler), + SkipDeletion: info.ScenarioOptionBool(SkipDeletionFlag, DefaultSkipDeletion), + SkipCreation: info.ScenarioOptionBool(SkipCreationFlag, DefaultSkipCreation), + WorkerCount: info.ScenarioOptionInt(WorkerCountFlag, DefaultWorkerCount), } if config.ScheduleCreationPerIteration <= 0 { @@ -147,6 +162,9 @@ func (s *SchedulerExecutor) Configure(info loadgen.ScenarioInfo) error { if config.OperationInterval < 0 { return fmt.Errorf("operation-interval cannot be negative, got %v", config.OperationInterval) } + if config.WorkerCount <= 0 { + return fmt.Errorf("worker-count must be positive, got %d", config.WorkerCount) + } s.config = config return nil } @@ -164,53 +182,202 @@ func (s *SchedulerExecutor) Execute(ctx context.Context, run *loadgen.Run) error ctx = metadata.AppendToOutgoingContext(ctx, "temporal-experiment", "chasm-scheduler") } - var wg sync.WaitGroup - for i := range s.config.ScheduleCreationPerIteration { - wg.Go(func() { - ticker := time.NewTicker(s.config.OperationInterval) - defer ticker.Stop() - start := time.Now() + if s.config.SkipCreation { + return s.executeWithExistingSchedules(ctx, client, logger) + } + return s.executeWithNewSchedules(ctx, run, client, logger) +} + +type scheduleConfig struct { + ScheduleID string + TaskQueue string +} + +func (s *SchedulerExecutor) executeWithNewSchedules(ctx context.Context, run *loadgen.Run, client client.Client, logger *zap.SugaredLogger) error { + // Create a channel to send schedule configs + scheduleConfigChan := make(chan scheduleConfig, 1000) + + // Start a goroutine to produce schedule configs + go func() { + defer close(scheduleConfigChan) + for i := range s.config.ScheduleCreationPerIteration { sch_id := fmt.Sprintf("sched-%s-%d-%d-%s", run.RunID, run.Iteration, i, uuid.New()) - sc, err := s.createSchedule(ctx, client, sch_id, run.TaskQueue(), logger) - if err != nil { - logger.Error("Failed to create schedule", "scheduleID", sc.ScheduleID, "error", err) + select { + case scheduleConfigChan <- scheduleConfig{ + ScheduleID: sch_id, + TaskQueue: run.TaskQueue(), + }: + case <-ctx.Done(): + logger.Infow("Context canceled while producing schedule configs") return } - <-ticker.C - for range s.config.ScheduleReadsPerCreation { - if err := s.describeSchedule(ctx, client, sc.ScheduleID, logger); err != nil { - logger.Error("Failed to describe schedule", "scheduleID", sc.ScheduleID, "error", err) - } - <-ticker.C // Wait between read operations - } + } + logger.Infow("Finished producing schedule configs") + }() - for range s.config.ScheduleUpdatesPerCreation { - if err := s.updateSchedule(ctx, client, sc.ScheduleID, logger); err != nil { - logger.Error("Failed to update schedule", "scheduleID", sc.ScheduleID, "error", err) - } - <-ticker.C // Wait between update operations + // Start workers that will create and process schedules as configs arrive + var wg sync.WaitGroup + + for range s.config.WorkerCount { + wg.Go(func() { + // Each worker keeps consuming from the channel until it's closed + for config := range scheduleConfigChan { + func() { + + start := time.Now() + ticker := time.NewTicker(s.config.OperationInterval) + + sc, err := s.createSchedule(ctx, client, config.ScheduleID, config.TaskQueue, logger) + if err != nil { + logger.Errorw("Failed to create schedule", "scheduleID", config.ScheduleID, "error", err) + ticker.Stop() + return + } + + if !s.config.SkipDeletion { + defer func(id string, startTime time.Time) { + dur := time.Until(startTime.Add(s.config.WaitTimeBeforeCleanup)) + logger.Debugw("Waiting for deletion", "scheduleID", id, "duration", dur) + select { + case <-time.After(dur): + logger.Debugw("deleting", "scheduleID", id, "duration", dur) + if err := s.deleteSchedule(ctx, client, id, logger); err != nil { + logger.Errorw("Failed to delete schedule", "scheduleID", id, "error", err) + } + logger.Debugw("deleted", "scheduleID", id) + case <-ctx.Done(): + logger.Infow("Context canceled") + } + }(sc.ScheduleID, start) + } + + <-ticker.C + s.performScheduleOperations(ctx, client, sc.ScheduleID, ticker, logger) + ticker.Stop() + }() + logger.Debug("worker exited") } - dur := time.Until(start.Add(s.config.WaitTimeBeforeCleanup)) - select { - case <-time.After(dur): - if err := s.deleteSchedule(ctx, client, sc.ScheduleID, logger); err != nil { - logger.Error("Failed to delete schedule", "scheduleID", sc.ScheduleID, "error", err) - } - case <-ctx.Done(): - logger.Info("Context canceled") + }) + } + + // Wait for all workers to complete (they'll finish when channel closes) + wg.Wait() + logger.Infow("all workers closed") + + return nil +} + +func (s *SchedulerExecutor) executeWithExistingSchedules(ctx context.Context, client client.Client, logger *zap.SugaredLogger) error { + // Create a channel to receive schedule IDs + scheduleIDChan := make(chan string, 100) + listErrChan := make(chan error, 1) + + // Start a goroutine to list schedules and send them to the channel + go func() { + defer close(scheduleIDChan) + listErrChan <- s.listSchedules(ctx, client, scheduleIDChan, logger) + logger.Infow("about to close ch") + }() + + // Start workers that will process schedules as they arrive + var wg sync.WaitGroup + + for range s.config.WorkerCount { + wg.Go(func() { + // Each worker keeps consuming from the channel until it's closed + for scheduleID := range scheduleIDChan { + func() { + + start := time.Now() + ticker := time.NewTicker(s.config.OperationInterval) + + if !s.config.SkipDeletion { + defer func(id string, startTime time.Time) { + dur := time.Until(startTime.Add(s.config.WaitTimeBeforeCleanup)) + select { + case <-time.After(dur): + if err := s.deleteSchedule(ctx, client, id, logger); err != nil { + logger.Errorw("Failed to delete schedule", "scheduleID", id, "error", err) + } + case <-ctx.Done(): + logger.Infow("Context canceled") + } + }(scheduleID, start) + } + + <-ticker.C + s.performScheduleOperations(ctx, client, scheduleID, ticker, logger) + ticker.Stop() + }() } }) } + + // Wait for all workers to complete (they'll finish when channel closes) wg.Wait() + logger.Infow("all workers closed") + + // Check if there was an error during listing + listErr := <-listErrChan + if listErr != nil { + return fmt.Errorf("failed to list schedules: %w", listErr) + } + return nil } +func (s *SchedulerExecutor) performScheduleOperations(ctx context.Context, client client.Client, scheduleID string, ticker *time.Ticker, logger *zap.SugaredLogger) { + for range s.config.ScheduleReadsPerCreation { + <-ticker.C // Wait between read operations + if err := s.describeSchedule(ctx, client, scheduleID, logger); err != nil { + logger.Errorw("Failed to describe schedule", "scheduleID", scheduleID, "error", err) + } + <-ticker.C // Wait between read operations + } + + for range s.config.ScheduleUpdatesPerCreation { + if err := s.updateSchedule(ctx, client, scheduleID, logger); err != nil { + logger.Errorw("Failed to update schedule", "scheduleID", scheduleID, "error", err) + } + <-ticker.C // Wait between update operations + } +} + type ScheduleState struct { ScheduleID string DeleteAfter time.Duration } +func (s *SchedulerExecutor) listSchedules(ctx context.Context, c client.Client, scheduleIDChan chan<- string, logger *zap.SugaredLogger) error { + iter, err := c.ScheduleClient().List(ctx, client.ScheduleListOptions{}) + if err != nil { + logger.Errorw("Error creating schedule list iterator", "error", err) + return fmt.Errorf("error creating schedule list iterator: %w", err) + } + + count := 0 + for iter.HasNext() { + entry, err := iter.Next() + if err != nil { + logger.Errorw("Error iterating schedules", "error", err, "count", count) + return fmt.Errorf("error iterating schedules: %w", err) + } + select { + case scheduleIDChan <- entry.ID: + count++ + if count%100 == 0 { + logger.Debugw("Listing schedules in progress", "count", count) + } + case <-ctx.Done(): + logger.Infow("Context canceled while listing schedules", "count", count) + return ctx.Err() + } + } + logger.Infow("Finished listing schedules", "totalCount", count) + return nil +} + func (s *SchedulerExecutor) createSchedule(ctx context.Context, c client.Client, scheduleID string, taskQueue string, logger *zap.SugaredLogger) (ScheduleState, error) { sc := ScheduleState{ ScheduleID: scheduleID, @@ -237,41 +404,145 @@ func (s *SchedulerExecutor) createSchedule(ctx context.Context, c client.Client, CronExpressions: []string{s.config.CronExpression}, // defining an end at ensures all schedules will be removed // regardless of errors from this executor - EndAt: endTime, + EndAt: endTime, + StartAt: time.Now(), }, - Action: action, - Overlap: pickOverlap(s.config.OverlapPolicy, logger), + TriggerImmediately: true, + Action: action, + Overlap: pickOverlap(s.config.OverlapPolicy, logger), } - _, err := c.ScheduleClient().Create(ctx, options) + + err := retryWithBackoff(ctx, "createSchedule", logger, func() error { + // Create new context with timeout for this operation + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + _, err := c.ScheduleClient().Create(opCtx, options) + return err + }) return sc, err } func pickOverlap(policies []enums.ScheduleOverlapPolicy, logger *zap.SugaredLogger) enums.ScheduleOverlapPolicy { n, err := rand.Int(rand.Reader, big.NewInt(int64(len(policies)))) if err != nil { - logger.Error("Failed to select overlap policy", "error", err) + logger.Errorw("Failed to select overlap policy", "error", err) return policies[0] } return policies[n.Int64()] } -func (s *SchedulerExecutor) describeSchedule(ctx context.Context, c client.Client, scheduleID string, logger *zap.SugaredLogger) error { - handle := c.ScheduleClient().GetHandle(ctx, scheduleID) - _, err := handle.Describe(ctx) - if err != nil { - var notFoundErr *serviceerror.NotFound - if errors.As(err, ¬FoundErr) { - // Return nil if schedule is not found (already deleted or never existed) - logger.Debug("Schedule not found during describe operation", "scheduleID", scheduleID) +// retryConfig defines retry behavior for schedule operations +const ( + maxRetries = 5 + baseBackoff = 100 * time.Millisecond + maxBackoff = 10 * time.Second + backoffMultiple = 2.0 +) + +// retryWithBackoff executes an operation with exponential backoff and jitter on context deadline errors +func retryWithBackoff(ctx context.Context, operation string, logger *zap.SugaredLogger, fn func() error) error { + var lastErr error + backoff := baseBackoff + + for attempt := 0; attempt < maxRetries; attempt++ { + // Check if parent context is cancelled before attempting + if ctx.Err() != nil { + logger.Debugw("Parent context cancelled, stopping retries", "operation", operation, "attempt", attempt) + return ctx.Err() + } + + // Execute the operation + lastErr = fn() + if lastErr == nil { return nil } + + // Check if this is a deadline exceeded error (from SDK or context) + var deadlineErr *serviceerror.DeadlineExceeded + isDeadlineError := errors.As(lastErr, &deadlineErr) || errors.Is(lastErr, context.DeadlineExceeded) + + // Log error details for debugging + if deadlineErr != nil { + logger.Debugw("Operation failed", + "operation", operation, + "attempt", attempt+1, + "err", lastErr.Error(), + "message", deadlineErr.Message, + "status", fmt.Sprintf("%v", deadlineErr.Status()), + "code", deadlineErr.Status().Code()) + } else { + logger.Debugw("Operation failed", + "operation", operation, + "attempt", attempt+1, + "errorType", fmt.Sprintf("%T", lastErr), + "errorMsg", lastErr.Error()) + } + + if !isDeadlineError { + logger.Debugw("Not a deadline error, not retrying") + return lastErr + } + + // Don't retry if we've exhausted attempts + if attempt == maxRetries-1 { + logger.Warnw("Max retries reached", "operation", operation, "attempts", maxRetries, "error", lastErr) + return fmt.Errorf("max retries (%d) exceeded for %s: %w", maxRetries, operation, lastErr) + } + + // Calculate backoff with jitter + jitterMax := big.NewInt(int64(backoff / 2)) + jitterVal, err := rand.Int(rand.Reader, jitterMax) + if err != nil { + jitterVal = big.NewInt(0) + } + sleepDuration := backoff + time.Duration(jitterVal.Int64()) + + logger.Debugw("Retrying after backoff", + "operation", operation, + "attempt", attempt+1, + "backoff", sleepDuration, + "error", lastErr) + + // Sleep with backoff, but respect parent context cancellation + select { + case <-time.After(sleepDuration): + // Continue to next attempt + case <-ctx.Done(): + logger.Debugw("Parent context cancelled during backoff", "operation", operation) + return ctx.Err() + } + + // Increase backoff exponentially, capped at maxBackoff + backoff = time.Duration(float64(backoff) * backoffMultiple) + if backoff > maxBackoff { + backoff = maxBackoff + } } - return err + + return lastErr } -func (s *SchedulerExecutor) updateSchedule(ctx context.Context, c client.Client, scheduleID string, logger *zap.SugaredLogger) error { - handle := c.ScheduleClient().GetHandle(ctx, scheduleID) +func (s *SchedulerExecutor) describeSchedule(ctx context.Context, c client.Client, scheduleID string, logger *zap.SugaredLogger) error { + return retryWithBackoff(ctx, "describeSchedule", logger, func() error { + // Create new context with timeout for this operation + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + handle := c.ScheduleClient().GetHandle(opCtx, scheduleID) + _, err := handle.Describe(opCtx) + if err != nil { + var notFoundErr *serviceerror.NotFound + if errors.As(err, ¬FoundErr) { + // Return nil if schedule is not found (already deleted or never existed) + logger.Debugw("Schedule not found during describe operation", "scheduleID", scheduleID) + return nil + } + } + return err + }) +} +func (s *SchedulerExecutor) updateSchedule(ctx context.Context, c client.Client, scheduleID string, logger *zap.SugaredLogger) error { updateFn := func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { schedule := input.Description.Schedule @@ -285,32 +556,45 @@ func (s *SchedulerExecutor) updateSchedule(ctx context.Context, c client.Client, }, nil } - err := handle.Update(ctx, client.ScheduleUpdateOptions{ - DoUpdate: updateFn, - }) - if err != nil { - var notFoundErr *serviceerror.NotFound - if errors.As(err, ¬FoundErr) { - // Return nil if schedule is not found (already deleted or never existed) - logger.Debug("Schedule not found during update operation", "scheduleID", scheduleID) - return nil + return retryWithBackoff(ctx, "updateSchedule", logger, func() error { + // Create new context with timeout for this operation + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + handle := c.ScheduleClient().GetHandle(opCtx, scheduleID) + err := handle.Update(opCtx, client.ScheduleUpdateOptions{ + DoUpdate: updateFn, + }) + if err != nil { + var notFoundErr *serviceerror.NotFound + if errors.As(err, ¬FoundErr) { + // Return nil if schedule is not found (already deleted or never existed) + logger.Debugw("Schedule not found during update operation", "scheduleID", scheduleID) + return nil + } } - } - return err + return err + }) } func (s *SchedulerExecutor) deleteSchedule(ctx context.Context, c client.Client, scheduleID string, logger *zap.SugaredLogger) error { - handle := c.ScheduleClient().GetHandle(ctx, scheduleID) - err := handle.Delete(ctx) - if err != nil { - var notFoundErr *serviceerror.NotFound - if errors.As(err, ¬FoundErr) { - // Return nil if schedule is not found (already deleted or never existed) - logger.Debug("Schedule not found during delete operation", "scheduleID", scheduleID) - return nil + return retryWithBackoff(ctx, "deleteSchedule", logger, func() error { + // Create new context with timeout for this operation + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + handle := c.ScheduleClient().GetHandle(opCtx, scheduleID) + err := handle.Delete(opCtx) + if err != nil { + var notFoundErr *serviceerror.NotFound + if errors.As(err, ¬FoundErr) { + // Return nil if schedule is not found (already deleted or never existed) + logger.Debugw("Schedule not found during delete operation", "scheduleID", scheduleID) + return nil + } } - } - return err + return err + }) } // parseOverlapPolicy converts string overlap policy to enum