diff --git a/pkg/worker/context.go b/pkg/worker/context.go index 6dd2361745..45c28fcc17 100644 --- a/pkg/worker/context.go +++ b/pkg/worker/context.go @@ -14,6 +14,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/client" "github.com/hatchet-dev/hatchet/pkg/client/create" "github.com/hatchet-dev/hatchet/pkg/worker/condition" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" ) type HatchetWorkerContext interface { @@ -796,9 +797,33 @@ func (d *durableHatchetContext) WaitFor(conditions condition.Condition) (*WaitRe return nil, fmt.Errorf("failed to add signal: %w", err) } - data := <-resCh + // Mark the run as waiting for eviction tracking + actionKey := d.StepRunId() + resourceID := fmt.Sprintf("%s:%s", d.StepRunId(), signalKey) + if mgr := d.evictionManager(); mgr != nil { + mgr.MarkWaiting(actionKey, "durable_event", resourceID) + defer mgr.MarkActive(actionKey) + } - return newWaitResult(data) + // Wait for either the durable event or context cancellation (e.g. eviction) + select { + case data := <-resCh: + return newWaitResult(data) + case <-d.Done(): + cause := context.Cause(d) + if cause != nil { + return nil, cause + } + return nil, d.Err() + } +} + +// evictionManager returns the eviction manager from the worker, or nil. +func (d *durableHatchetContext) evictionManager() *eviction.Manager { + if d.w == nil || d.w.worker == nil { + return nil + } + return d.w.worker.evictionManager } func (h *durableHatchetContext) saveOrLoadDurableEventListener() (*client.DurableEventsListener, error) { diff --git a/pkg/worker/eviction/cache.go b/pkg/worker/eviction/cache.go new file mode 100644 index 0000000000..4551c77769 --- /dev/null +++ b/pkg/worker/eviction/cache.go @@ -0,0 +1,205 @@ +package eviction + +import ( + "context" + "sort" + "sync" + "time" +) + +// DurableRunRecord tracks the state of a single durable task run for eviction purposes. +type DurableRunRecord struct { + Key string + StepRunId string + Ctx context.Context // used to check if already cancelled + Cancel context.CancelCauseFunc // used to cancel with a cause (e.g. ErrEvicted) + Eviction *Policy // nil = never evictable + RegisteredAt time.Time + + // Waiting state + WaitingSince *time.Time + WaitKind string + WaitResourceID string +} + +// IsWaiting returns true if the run is currently in a waiting state. +func (r *DurableRunRecord) IsWaiting() bool { + return r.WaitingSince != nil +} + +// IsCancelled returns true if the run's context has been cancelled. +func (r *DurableRunRecord) IsCancelled() bool { + return r.Ctx.Err() != nil +} + +// RegisterRunOpts holds the parameters for registering a durable run. +type RegisterRunOpts struct { + Key string + StepRunId string + Ctx context.Context + Cancel context.CancelCauseFunc + Now time.Time + Eviction *Policy +} + +// DurableEvictionCache defines the interface for tracking durable run state. +type DurableEvictionCache interface { + RegisterRun(opts RegisterRunOpts) + UnregisterRun(key string) + MarkWaiting(key string, now time.Time, waitKind, resourceID string) + MarkActive(key string) + SelectEvictionCandidate(now time.Time, durableSlots, reserveSlots int, minWaitForCapacityEviction time.Duration) string + Get(key string) *DurableRunRecord +} + +// InMemoryDurableEvictionCache is a thread-safe in-memory implementation of DurableEvictionCache. +type InMemoryDurableEvictionCache struct { + runs map[string]*DurableRunRecord + mu sync.RWMutex +} + +// NewInMemoryDurableEvictionCache creates a new in-memory eviction cache. +func NewInMemoryDurableEvictionCache() *InMemoryDurableEvictionCache { + return &InMemoryDurableEvictionCache{ + runs: make(map[string]*DurableRunRecord), + } +} + +func (c *InMemoryDurableEvictionCache) RegisterRun(key, stepRunId string, ctx context.Context, cancel context.CancelCauseFunc, now time.Time, eviction *Policy) { + c.mu.Lock() + defer c.mu.Unlock() + c.runs[key] = &DurableRunRecord{ + Key: key, + StepRunId: stepRunId, + Ctx: ctx, + Cancel: cancel, + Eviction: eviction, + RegisteredAt: now, + } +} + +func (c *InMemoryDurableEvictionCache) UnregisterRun(key string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.runs, key) +} + +func (c *InMemoryDurableEvictionCache) Get(key string) *DurableRunRecord { + c.mu.RLock() + defer c.mu.RUnlock() + return c.runs[key] +} + +func (c *InMemoryDurableEvictionCache) MarkWaiting(key string, now time.Time, waitKind, resourceID string) { + c.mu.Lock() + defer c.mu.Unlock() + rec, ok := c.runs[key] + if !ok { + return + } + if rec.IsCancelled() { + return + } + rec.WaitingSince = &now + rec.WaitKind = waitKind + rec.WaitResourceID = resourceID +} + +func (c *InMemoryDurableEvictionCache) MarkActive(key string) { + c.mu.Lock() + defer c.mu.Unlock() + rec, ok := c.runs[key] + if !ok { + return + } + rec.WaitingSince = nil + rec.WaitKind = "" + rec.WaitResourceID = "" +} + +func (c *InMemoryDurableEvictionCache) capacityPressure(durableSlots, reserveSlots, waitingCount int) bool { + if durableSlots <= 0 { + return false + } + maxWaiting := durableSlots - reserveSlots + if maxWaiting <= 0 { + return false + } + return waitingCount >= maxWaiting +} + +// SelectEvictionCandidate selects a run to evict based on TTL or capacity pressure. +// Returns the key of the selected candidate, or "" if no candidate is eligible. +func (c *InMemoryDurableEvictionCache) SelectEvictionCandidate( + now time.Time, + durableSlots, reserveSlots int, + minWaitForCapacityEviction time.Duration, +) string { + c.mu.RLock() + defer c.mu.RUnlock() + + // Collect waiting runs that are eligible for eviction (have an eviction policy). + var waiting []*DurableRunRecord + for _, r := range c.runs { + if r.IsWaiting() && !r.IsCancelled() && r.Eviction != nil { + waiting = append(waiting, r) + } + } + + if len(waiting) == 0 { + return "" + } + + // Prefer TTL-eligible candidates first. + var ttlEligible []*DurableRunRecord + for _, r := range waiting { + if r.Eviction.TTL == nil || r.WaitingSince == nil { + continue + } + if now.Sub(*r.WaitingSince) >= *r.Eviction.TTL { + ttlEligible = append(ttlEligible, r) + } + } + + if len(ttlEligible) > 0 { + sort.Slice(ttlEligible, func(i, j int) bool { + if ttlEligible[i].Eviction.Priority != ttlEligible[j].Eviction.Priority { + return ttlEligible[i].Eviction.Priority < ttlEligible[j].Eviction.Priority + } + return ttlEligible[i].WaitingSince.Before(*ttlEligible[j].WaitingSince) + }) + return ttlEligible[0].Key + } + + // Capacity eviction: only if under pressure and run allows it. + if !c.capacityPressure(durableSlots, reserveSlots, len(waiting)) { + return "" + } + + var capacityCandidates []*DurableRunRecord + for _, r := range waiting { + if r.Eviction == nil || !r.Eviction.AllowCapacityEviction { + continue + } + if r.WaitingSince == nil { + continue + } + if now.Sub(*r.WaitingSince) < minWaitForCapacityEviction { + continue + } + capacityCandidates = append(capacityCandidates, r) + } + + if len(capacityCandidates) == 0 { + return "" + } + + sort.Slice(capacityCandidates, func(i, j int) bool { + if capacityCandidates[i].Eviction.Priority != capacityCandidates[j].Eviction.Priority { + return capacityCandidates[i].Eviction.Priority < capacityCandidates[j].Eviction.Priority + } + return capacityCandidates[i].WaitingSince.Before(*capacityCandidates[j].WaitingSince) + }) + + return capacityCandidates[0].Key +} diff --git a/pkg/worker/eviction/cache_test.go b/pkg/worker/eviction/cache_test.go new file mode 100644 index 0000000000..2dc701cfe6 --- /dev/null +++ b/pkg/worker/eviction/cache_test.go @@ -0,0 +1,279 @@ +package eviction + +import ( + "context" + "testing" + "time" +) + +func newTestRecord(key string, evictionPolicy *Policy) (context.Context, context.CancelCauseFunc) { + return context.WithCancelCause(context.Background()) +} + +func TestInMemoryCache_RegisterAndGet(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ctx, cancel := newTestRecord("run-1", nil) + + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), nil) + + rec := cache.Get("run-1") + if rec == nil { + t.Fatal("expected record, got nil") + } + if rec.StepRunId != "step-1" { + t.Fatalf("expected step-1, got %s", rec.StepRunId) + } + if rec.IsWaiting() { + t.Fatal("expected not waiting") + } + if rec.IsCancelled() { + t.Fatal("expected not cancelled") + } +} + +func TestInMemoryCache_UnregisterRun(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ctx, cancel := newTestRecord("run-1", nil) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), nil) + + cache.UnregisterRun("run-1") + + if rec := cache.Get("run-1"); rec != nil { + t.Fatal("expected nil after unregister") + } +} + +func TestInMemoryCache_UnregisterNonexistent(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + // Should not panic + cache.UnregisterRun("nonexistent") +} + +func TestInMemoryCache_MarkWaitingAndActive(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ctx, cancel := newTestRecord("run-1", nil) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), nil) + + now := time.Now().UTC() + cache.MarkWaiting("run-1", now, "sleep", "signal-1") + + rec := cache.Get("run-1") + if !rec.IsWaiting() { + t.Fatal("expected waiting") + } + if rec.WaitKind != "sleep" { + t.Fatalf("expected wait kind 'sleep', got '%s'", rec.WaitKind) + } + if rec.WaitResourceID != "signal-1" { + t.Fatalf("expected resource id 'signal-1', got '%s'", rec.WaitResourceID) + } + + cache.MarkActive("run-1") + + rec = cache.Get("run-1") + if rec.IsWaiting() { + t.Fatal("expected not waiting after mark active") + } + if rec.WaitKind != "" { + t.Fatalf("expected empty wait kind, got '%s'", rec.WaitKind) + } +} + +func TestInMemoryCache_MarkWaitingSkipsCancelled(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ctx, cancel := newTestRecord("run-1", nil) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), nil) + + // Cancel the context + cancel(nil) + + cache.MarkWaiting("run-1", time.Now().UTC(), "sleep", "signal-1") + + rec := cache.Get("run-1") + if rec.IsWaiting() { + t.Fatal("expected not waiting after context cancelled") + } +} + +func TestInMemoryCache_MarkWaitingNonexistent(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + // Should not panic + cache.MarkWaiting("nonexistent", time.Now().UTC(), "sleep", "signal-1") +} + +func TestInMemoryCache_MarkActiveNonexistent(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + // Should not panic + cache.MarkActive("nonexistent") +} + +func TestInMemoryCache_SelectEvictionCandidate_NoWaiting(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key, got %s", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_NoEvictionPolicy(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ctx, cancel := newTestRecord("run-1", nil) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), nil) // nil policy + cache.MarkWaiting("run-1", time.Now().Add(-20*time.Minute), "sleep", "signal-1") + + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key for nil eviction policy, got %s", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_TTLEviction(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ttl := 5 * time.Minute + policy := &Policy{TTL: &ttl, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + + // Mark waiting 10 minutes ago (exceeds 5-minute TTL) + cache.MarkWaiting("run-1", time.Now().Add(-10*time.Minute), "sleep", "signal-1") + + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "run-1" { + t.Fatalf("expected 'run-1', got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_TTLNotYetExpired(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ttl := 15 * time.Minute + policy := &Policy{TTL: &ttl, AllowCapacityEviction: false, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + + // Mark waiting 5 minutes ago (less than 15-minute TTL) + cache.MarkWaiting("run-1", time.Now().Add(-5*time.Minute), "sleep", "signal-1") + + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key (TTL not expired), got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_TTLPriorityOrdering(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ttl := 1 * time.Minute + + // run-1: priority 5 (higher = evicted later) + policy1 := &Policy{TTL: &ttl, Priority: 5} + ctx1, cancel1 := newTestRecord("run-1", policy1) + cache.RegisterRun("run-1", "step-1", ctx1, cancel1, time.Now().UTC(), policy1) + cache.MarkWaiting("run-1", time.Now().Add(-10*time.Minute), "sleep", "s1") + + // run-2: priority 1 (lower = evicted first) + policy2 := &Policy{TTL: &ttl, Priority: 1} + ctx2, cancel2 := newTestRecord("run-2", policy2) + cache.RegisterRun("run-2", "step-2", ctx2, cancel2, time.Now().UTC(), policy2) + cache.MarkWaiting("run-2", time.Now().Add(-10*time.Minute), "sleep", "s2") + + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "run-2" { + t.Fatalf("expected 'run-2' (lower priority), got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_CapacityEviction(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + // No TTL, but allow capacity eviction + policy := &Policy{TTL: nil, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + cache.MarkWaiting("run-1", time.Now().Add(-20*time.Second), "sleep", "signal-1") + + // durableSlots=1, reserveSlots=0, 1 waiting >= (1-0)=1 -> capacity pressure + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1, 0, 10*time.Second) + if key != "run-1" { + t.Fatalf("expected 'run-1' under capacity pressure, got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_CapacityNoPresure(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + policy := &Policy{TTL: nil, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + cache.MarkWaiting("run-1", time.Now().Add(-20*time.Second), "sleep", "signal-1") + + // durableSlots=1000, reserveSlots=0, 1 waiting < (1000-0)=1000 -> no pressure + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key (no capacity pressure), got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_CapacityMinWait(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + policy := &Policy{TTL: nil, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + // Only been waiting 2 seconds, min is 10 seconds + cache.MarkWaiting("run-1", time.Now().Add(-2*time.Second), "sleep", "signal-1") + + // Under capacity pressure but hasn't waited long enough + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key (min wait not met), got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_CapacityNotAllowed(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + policy := &Policy{TTL: nil, AllowCapacityEviction: false, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + cache.MarkWaiting("run-1", time.Now().Add(-20*time.Second), "sleep", "signal-1") + + // Under pressure but AllowCapacityEviction is false + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key (capacity eviction not allowed), got '%s'", key) + } +} + +func TestInMemoryCache_SelectEvictionCandidate_SkipsCancelled(t *testing.T) { + cache := NewInMemoryDurableEvictionCache() + ttl := 1 * time.Minute + policy := &Policy{TTL: &ttl, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := newTestRecord("run-1", policy) + cache.RegisterRun("run-1", "step-1", ctx, cancel, time.Now().UTC(), policy) + cache.MarkWaiting("run-1", time.Now().Add(-10*time.Minute), "sleep", "signal-1") + + // Cancel the context before selection + cancel(nil) + + key := cache.SelectEvictionCandidate(time.Now().UTC(), 1000, 0, 10*time.Second) + if key != "" { + t.Fatalf("expected empty key (cancelled run), got '%s'", key) + } +} + +func TestDurableRunRecord_IsCancelled(t *testing.T) { + ctx, cancel := context.WithCancelCause(context.Background()) + rec := &DurableRunRecord{Ctx: ctx, Cancel: cancel} + + if rec.IsCancelled() { + t.Fatal("expected not cancelled") + } + + cancel(ErrEvicted) + + if !rec.IsCancelled() { + t.Fatal("expected cancelled") + } +} diff --git a/pkg/worker/eviction/manager.go b/pkg/worker/eviction/manager.go new file mode 100644 index 0000000000..f01db10394 --- /dev/null +++ b/pkg/worker/eviction/manager.go @@ -0,0 +1,233 @@ +package eviction + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/rs/zerolog" +) + +// ErrEvicted is the cause set on a run's context when it is evicted by the manager. +var ErrEvicted = errors.New("durable run evicted") + +// ManagerConfig holds per-worker eviction settings. +type ManagerConfig struct { + // CheckInterval is how often the manager checks for eviction candidates. + CheckInterval time.Duration + + // ReserveSlots is the number of durable slots to reserve from capacity-based eviction. + ReserveSlots int + + // MinWaitForCapacityEviction avoids immediately evicting runs that just entered a wait. + MinWaitForCapacityEviction time.Duration +} + +// DefaultManagerConfig returns sensible defaults. +func DefaultManagerConfig() ManagerConfig { + return ManagerConfig{ + CheckInterval: 1 * time.Second, + ReserveSlots: 0, + MinWaitForCapacityEviction: 10 * time.Second, + } +} + +// EvictionHook is an optional callback invoked during eviction. +type EvictionHook func(key string, rec *DurableRunRecord) + +// Manager orchestrates the background eviction loop. +type Manager struct { + cache DurableEvictionCache + l *zerolog.Logger + onEvictionSelected EvictionHook + onEvictionCancelled EvictionHook + cancel context.CancelFunc + config ManagerConfig + wg sync.WaitGroup + durableSlots int + mu sync.Mutex +} + +// NewManager creates a new eviction manager. +func NewManager(durableSlots int, config ManagerConfig, l *zerolog.Logger, opts ...ManagerOption) *Manager { + m := &Manager{ + durableSlots: durableSlots, + config: config, + cache: NewInMemoryDurableEvictionCache(), + l: l, + } + for _, opt := range opts { + opt(m) + } + return m +} + +// ManagerOption configures optional manager behavior. +type ManagerOption func(*Manager) + +// WithEvictionCache overrides the default in-memory cache. +func WithEvictionCache(cache DurableEvictionCache) ManagerOption { + return func(m *Manager) { + m.cache = cache + } +} + +// WithOnEvictionSelected sets a hook called when a candidate is selected. +func WithOnEvictionSelected(hook EvictionHook) ManagerOption { + return func(m *Manager) { + m.onEvictionSelected = hook + } +} + +// WithOnEvictionCancelled sets a hook called after local cancellation. +func WithOnEvictionCancelled(hook EvictionHook) ManagerOption { + return func(m *Manager) { + m.onEvictionCancelled = hook + } +} + +// Cache returns the underlying eviction cache. +func (m *Manager) Cache() DurableEvictionCache { + return m.cache +} + +// Start begins the background eviction loop. +func (m *Manager) Start() { + m.mu.Lock() + defer m.mu.Unlock() + + if m.cancel != nil { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.runLoop(ctx) + }() +} + +// Stop halts the background loop and waits for it to finish. +func (m *Manager) Stop() { + m.mu.Lock() + cancel := m.cancel + m.cancel = nil + m.mu.Unlock() + + if cancel != nil { + cancel() + m.wg.Wait() + } +} + +// RegisterRun registers a durable run for eviction tracking. +func (m *Manager) RegisterRun(key, stepRunId string, ctx context.Context, cancel context.CancelCauseFunc, eviction *Policy) { + m.cache.RegisterRun(key, stepRunId, ctx, cancel, time.Now().UTC(), eviction) +} + +// UnregisterRun removes a run from eviction tracking. +func (m *Manager) UnregisterRun(key string) { + m.cache.UnregisterRun(key) +} + +// MarkWaiting marks a run as entering a wait state. +func (m *Manager) MarkWaiting(key, waitKind, resourceID string) { + m.cache.MarkWaiting(key, time.Now().UTC(), waitKind, resourceID) +} + +// MarkActive marks a run as leaving a wait state. +func (m *Manager) MarkActive(key string) { + m.cache.MarkActive(key) +} + +func (m *Manager) runLoop(ctx context.Context) { + ticker := time.NewTicker(m.config.CheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.tickSafe() + } + } +} + +func (m *Manager) tickSafe() { + defer func() { + if r := recover(); r != nil { + m.l.Error().Interface("panic", r).Msg("eviction manager: panic in tick") + } + }() + m.tick() +} + +func (m *Manager) tick() { + evictedThisTick := map[string]bool{} + + for { + now := time.Now().UTC() + key := m.cache.SelectEvictionCandidate( + now, + m.durableSlots, + m.config.ReserveSlots, + m.config.MinWaitForCapacityEviction, + ) + + if key == "" { + return + } + + if evictedThisTick[key] { + return + } + evictedThisTick[key] = true + + rec := m.cache.Get(key) + if rec == nil { + continue + } + + if rec.Eviction == nil { + continue + } + + m.l.Warn(). + Str("step_run_id", rec.StepRunId). + Str("wait_kind", rec.WaitKind). + Str("resource_id", rec.WaitResourceID). + Msg("eviction manager: evicting durable run") + + // Observability hook: emitted when selected from eviction cache. + if m.onEvictionSelected != nil { + func() { + defer func() { + if r := recover(); r != nil { + m.l.Error().Interface("panic", r).Msg("eviction manager: panic in onEvictionSelected hook") + } + }() + m.onEvictionSelected(key, rec) + }() + } + + // Cancel the run's context with ErrEvicted cause. + rec.Cancel(ErrEvicted) + + // Observability hook: emitted after local cancellation. + if m.onEvictionCancelled != nil { + func() { + defer func() { + if r := recover(); r != nil { + m.l.Error().Interface("panic", r).Msg("eviction manager: panic in onEvictionCancelled hook") + } + }() + m.onEvictionCancelled(key, rec) + }() + } + } +} diff --git a/pkg/worker/eviction/manager_test.go b/pkg/worker/eviction/manager_test.go new file mode 100644 index 0000000000..99bcd29fbc --- /dev/null +++ b/pkg/worker/eviction/manager_test.go @@ -0,0 +1,209 @@ +package eviction + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" +) + +func testLogger() *zerolog.Logger { + l := zerolog.Nop() + return &l +} + +func TestManager_StartStop(t *testing.T) { + m := NewManager(1000, DefaultManagerConfig(), testLogger()) + m.Start() + // Starting again should be a no-op + m.Start() + m.Stop() + // Stopping again should be safe + m.Stop() +} + +func TestManager_RegisterUnregister(t *testing.T) { + m := NewManager(1000, DefaultManagerConfig(), testLogger()) + ctx, cancel := context.WithCancelCause(context.Background()) + + m.RegisterRun("run-1", "step-1", ctx, cancel, nil) + + rec := m.Cache().Get("run-1") + if rec == nil { + t.Fatal("expected record") + } + if rec.StepRunId != "step-1" { + t.Fatalf("expected step-1, got %s", rec.StepRunId) + } + + m.UnregisterRun("run-1") + if m.Cache().Get("run-1") != nil { + t.Fatal("expected nil after unregister") + } +} + +func TestManager_MarkWaitingActive(t *testing.T) { + m := NewManager(1000, DefaultManagerConfig(), testLogger()) + ctx, cancel := context.WithCancelCause(context.Background()) + + m.RegisterRun("run-1", "step-1", ctx, cancel, nil) + + m.MarkWaiting("run-1", "durable_event", "step-1:signal-1") + rec := m.Cache().Get("run-1") + if !rec.IsWaiting() { + t.Fatal("expected waiting") + } + + m.MarkActive("run-1") + rec = m.Cache().Get("run-1") + if rec.IsWaiting() { + t.Fatal("expected active") + } +} + +func TestManager_EvictsTTLExpiredRun(t *testing.T) { + config := ManagerConfig{ + CheckInterval: 50 * time.Millisecond, + ReserveSlots: 0, + MinWaitForCapacityEviction: 10 * time.Second, + } + + var mu sync.Mutex + var evictedKeys []string + var cancelledKeys []string + + m := NewManager(1000, config, testLogger(), + WithOnEvictionSelected(func(key string, rec *DurableRunRecord) { + mu.Lock() + evictedKeys = append(evictedKeys, key) + mu.Unlock() + }), + WithOnEvictionCancelled(func(key string, rec *DurableRunRecord) { + mu.Lock() + cancelledKeys = append(cancelledKeys, key) + mu.Unlock() + }), + ) + + ttl := 100 * time.Millisecond + policy := &Policy{TTL: &ttl, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := context.WithCancelCause(context.Background()) + m.RegisterRun("run-1", "step-1", ctx, cancel, policy) + + // Mark waiting so it's eligible for TTL eviction + m.Cache().MarkWaiting("run-1", time.Now().Add(-1*time.Second), "sleep", "signal-1") + + m.Start() + defer m.Stop() + + // Wait for the eviction to happen + deadline := time.After(2 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timeout waiting for eviction") + default: + } + + mu.Lock() + evicted := len(evictedKeys) > 0 + cancelled := len(cancelledKeys) > 0 + mu.Unlock() + + if evicted && cancelled { + break + } + time.Sleep(25 * time.Millisecond) + } + + // Verify context was cancelled with ErrEvicted + if ctx.Err() == nil { + t.Fatal("expected context to be cancelled") + } + if context.Cause(ctx) != ErrEvicted { + t.Fatalf("expected ErrEvicted cause, got %v", context.Cause(ctx)) + } + + mu.Lock() + defer mu.Unlock() + if len(evictedKeys) != 1 || evictedKeys[0] != "run-1" { + t.Fatalf("expected evictedKeys=['run-1'], got %v", evictedKeys) + } + if len(cancelledKeys) != 1 || cancelledKeys[0] != "run-1" { + t.Fatalf("expected cancelledKeys=['run-1'], got %v", cancelledKeys) + } +} + +func TestManager_DoesNotEvictWithoutPolicy(t *testing.T) { + config := ManagerConfig{ + CheckInterval: 50 * time.Millisecond, + ReserveSlots: 0, + MinWaitForCapacityEviction: 10 * time.Second, + } + + m := NewManager(1000, config, testLogger()) + + ctx, cancel := context.WithCancelCause(context.Background()) + m.RegisterRun("run-1", "step-1", ctx, cancel, nil) // nil policy + + m.Cache().MarkWaiting("run-1", time.Now().Add(-1*time.Hour), "sleep", "signal-1") + + m.Start() + time.Sleep(200 * time.Millisecond) + m.Stop() + + if ctx.Err() != nil { + t.Fatal("expected context NOT to be cancelled (no eviction policy)") + } +} + +func TestManager_EvictsUnderCapacityPressure(t *testing.T) { + config := ManagerConfig{ + CheckInterval: 50 * time.Millisecond, + ReserveSlots: 0, + MinWaitForCapacityEviction: 50 * time.Millisecond, + } + + m := NewManager(1, config, testLogger()) // Only 1 durable slot + + // No TTL, but allow capacity eviction + policy := &Policy{TTL: nil, AllowCapacityEviction: true, Priority: 0} + + ctx, cancel := context.WithCancelCause(context.Background()) + m.RegisterRun("run-1", "step-1", ctx, cancel, policy) + + // Mark waiting 200ms ago (exceeds min wait of 50ms) + m.Cache().MarkWaiting("run-1", time.Now().Add(-200*time.Millisecond), "sleep", "signal-1") + + m.Start() + defer m.Stop() + + deadline := time.After(2 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timeout waiting for capacity eviction") + default: + } + if ctx.Err() != nil { + break + } + time.Sleep(25 * time.Millisecond) + } + + if context.Cause(ctx) != ErrEvicted { + t.Fatalf("expected ErrEvicted cause, got %v", context.Cause(ctx)) + } +} + +func TestManager_CustomCache(t *testing.T) { + customCache := NewInMemoryDurableEvictionCache() + m := NewManager(1000, DefaultManagerConfig(), testLogger(), WithEvictionCache(customCache)) + + if m.Cache() != customCache { + t.Fatal("expected custom cache to be used") + } +} diff --git a/pkg/worker/eviction/policy.go b/pkg/worker/eviction/policy.go new file mode 100644 index 0000000000..87c1b16a64 --- /dev/null +++ b/pkg/worker/eviction/policy.go @@ -0,0 +1,31 @@ +package eviction + +import "time" + +// Policy defines task-scoped eviction parameters for durable tasks. +// +// A nil Policy on a durable run means the run is never eligible for eviction. +// TTL applies to time spent in SDK-instrumented "waiting" states (e.g. WaitFor, SleepFor). +type Policy struct { + // TTL is the maximum continuous waiting duration before the run becomes + // TTL-eligible for eviction. A nil TTL disables TTL-based eviction. + TTL *time.Duration + + // AllowCapacityEviction controls whether this task may be evicted when + // the worker is under durable-slot pressure. + AllowCapacityEviction bool + + // Priority determines eviction order when multiple candidates exist. + // Lower values are evicted first. + Priority int +} + +// DefaultPolicy returns sensible defaults for durable task eviction. +func DefaultPolicy() *Policy { + ttl := 15 * time.Minute + return &Policy{ + TTL: &ttl, + AllowCapacityEviction: true, + Priority: 0, + } +} diff --git a/pkg/worker/eviction/policy_test.go b/pkg/worker/eviction/policy_test.go new file mode 100644 index 0000000000..848959f71a --- /dev/null +++ b/pkg/worker/eviction/policy_test.go @@ -0,0 +1,47 @@ +package eviction + +import ( + "testing" + "time" +) + +func TestDefaultPolicy(t *testing.T) { + p := DefaultPolicy() + if p == nil { + t.Fatal("expected non-nil policy") + } + if p.TTL == nil { + t.Fatal("expected non-nil TTL") + } + if *p.TTL != 15*time.Minute { + t.Fatalf("expected 15m TTL, got %v", *p.TTL) + } + if !p.AllowCapacityEviction { + t.Fatal("expected AllowCapacityEviction to be true") + } + if p.Priority != 0 { + t.Fatalf("expected priority 0, got %d", p.Priority) + } +} + +func TestDefaultManagerConfig(t *testing.T) { + c := DefaultManagerConfig() + if c.CheckInterval != 1*time.Second { + t.Fatalf("expected 1s check interval, got %v", c.CheckInterval) + } + if c.ReserveSlots != 0 { + t.Fatalf("expected 0 reserve slots, got %d", c.ReserveSlots) + } + if c.MinWaitForCapacityEviction != 10*time.Second { + t.Fatalf("expected 10s min wait, got %v", c.MinWaitForCapacityEviction) + } +} + +func TestErrEvicted(t *testing.T) { + if ErrEvicted == nil { + t.Fatal("expected non-nil ErrEvicted") + } + if ErrEvicted.Error() != "durable run evicted" { + t.Fatalf("unexpected error message: %s", ErrEvicted.Error()) + } +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 40cef12c06..e0018e4d68 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -19,6 +19,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/errors" "github.com/hatchet-dev/hatchet/pkg/integrations" "github.com/hatchet-dev/hatchet/pkg/logger" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1" ) @@ -50,7 +51,9 @@ type actionImpl struct { method any service string - compute *compute.Compute + compute *compute.Compute + isDurable bool + evictionPolicy *eviction.Policy } func (j *actionImpl) Name() string { @@ -111,6 +114,9 @@ type Worker struct { id *string panicHandler func(ctx HatchetContext, recovered any) + + evictionManager *eviction.Manager + evictionConfig *eviction.ManagerConfig } type WorkerOpt func(*WorkerOpts) @@ -129,6 +135,8 @@ type WorkerOpts struct { actions []string labels map[string]interface{} + + evictionConfig *eviction.ManagerConfig } func defaultWorkerOpts() *WorkerOpts { @@ -200,6 +208,13 @@ func WithLabels(labels map[string]interface{}) WorkerOpt { } } +// WithEvictionConfig sets the eviction manager configuration for the worker. +func WithEvictionConfig(config *eviction.ManagerConfig) WorkerOpt { + return func(opts *WorkerOpts) { + opts.evictionConfig = config + } +} + func WithLogger(l *zerolog.Logger) WorkerOpt { return func(opts *WorkerOpts) { if opts.l != nil { @@ -271,6 +286,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) { initActionNames: opts.actions, labels: opts.labels, registered_workflows: map[string]bool{}, + evictionConfig: opts.evictionConfig, } mws.add(w.panicMiddleware) @@ -384,6 +400,39 @@ func (w *Worker) RegisterAction(actionId string, method any) error { return w.registerAction(action.Service, action.Verb, method, nil) } +// RegisterDurableAction registers a durable action with an optional eviction policy. +func (w *Worker) RegisterDurableAction(actionId string, method any, evictionPolicy *eviction.Policy) error { + action, err := types.ParseActionID(actionId) + + if err != nil { + return fmt.Errorf("could not parse action id: %w", err) + } + + if _, ok := w.services.Load(action.Service); !ok { + w.NewService(action.Service) + } + + err = w.registerAction(action.Service, action.Verb, method, nil) + if err != nil { + return err + } + + // Mark the action as durable with its eviction policy + actionID := strings.ToLower(fmt.Sprintf("%s:%s", action.Service, action.Verb)) + if a, ok := w.actions[actionID]; ok { + impl := a.(*actionImpl) + impl.isDurable = true + impl.evictionPolicy = evictionPolicy + } + + return nil +} + +// EvictionManager returns the worker's eviction manager (may be nil if not started). +func (w *Worker) EvictionManager() *eviction.Manager { + return w.evictionManager +} + func (w *Worker) registerAction(service, verb string, method any, compute *compute.Compute) error { actionID := strings.ToLower(fmt.Sprintf("%s:%s", service, verb)) @@ -467,6 +516,7 @@ func (w *Worker) ID() *string { func (w *Worker) startBlocking(ctx context.Context) error { actionNames := []string{} + hasDurableActions := false for _, action := range w.actions { if w.client.RunnableActions() != nil { if !slices.Contains(w.client.RunnableActions(), action.Name()) { @@ -475,10 +525,36 @@ func (w *Worker) startBlocking(ctx context.Context) error { } actionNames = append(actionNames, action.Name()) + + if impl, ok := action.(*actionImpl); ok && impl.isDurable { + hasDurableActions = true + } } w.l.Debug().Msgf("worker %s is listening for actions: %v", w.name, actionNames) + // Initialize eviction manager if there are durable actions + if hasDurableActions { + evictionCfg := eviction.DefaultManagerConfig() + if w.evictionConfig != nil { + evictionCfg = *w.evictionConfig + } + + durableSlotCount := 1000 + if w.durableSlots != nil { + durableSlotCount = *w.durableSlots + } + if w.slotConfig != nil { + if ds, ok := w.slotConfig["durable"]; ok { + durableSlotCount = int(ds) + } + } + + w.evictionManager = eviction.NewManager(durableSlotCount, evictionCfg, w.l) + w.evictionManager.Start() + defer w.evictionManager.Stop() + } + _ = NewManagedCompute(&w.actions, w.client, 1) listener, id, err := w.client.Dispatcher().GetActionListener(ctx, &client.GetActionListenerRequest{ @@ -592,11 +668,19 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action return fmt.Errorf("could not decode args to interface: %w", err) } - runContext, cancel := context.WithCancel(context.Background()) + runContext, cancelCause := context.WithCancelCause(context.Background()) - w.cancelMap.Store(assignedAction.StepRunId, cancel) + w.cancelMap.Store(assignedAction.StepRunId, cancelCause) defer w.cancelMap.Delete(assignedAction.StepRunId) + // Register durable action with eviction manager + actionKey := assignedAction.StepRunId + impl, isActionImpl := action.(*actionImpl) + if isActionImpl && impl.isDurable && w.evictionManager != nil { + w.evictionManager.RegisterRun(actionKey, assignedAction.StepRunId, runContext, cancelCause, impl.evictionPolicy) + defer w.evictionManager.UnregisterRun(actionKey) + } + hCtx, err := newHatchetContext(runContext, assignedAction, w.client, w.l, w) if err != nil { @@ -616,7 +700,7 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action // the service-specific middleware return w.middlewares.runAll(hCtx, func(ctx HatchetContext) error { return svc.mws.runAll(ctx, func(ctx HatchetContext) error { - defer cancel() + defer cancelCause(nil) args := []any{ctx} @@ -756,9 +840,9 @@ func (w *Worker) cancelStepRun(ctx context.Context, assignedAction *client.Actio w.l.Debug().Msgf("cancelling step run %s", assignedAction.StepRunId) - cancelFn := cancel.(context.CancelFunc) + cancelFn := cancel.(context.CancelCauseFunc) - cancelFn() + cancelFn(nil) return nil } diff --git a/sdks/go/client.go b/sdks/go/client.go index ba19cbeb4a..f98a94c632 100644 --- a/sdks/go/client.go +++ b/sdks/go/client.go @@ -102,6 +102,10 @@ func (c *Client) NewWorker(name string, options ...WorkerOption) (*Worker, error workerOpts = append(workerOpts, worker.WithLabels(config.labels)) } + if config.evictionConfig != nil { + workerOpts = append(workerOpts, worker.WithEvictionConfig(config.evictionConfig)) + } + mainWorker, err := worker.NewWorker(workerOpts...) if err != nil { return nil, err @@ -118,7 +122,7 @@ func (c *Client) NewWorker(name string, options ...WorkerOption) (*Worker, error } for _, namedFn := range dump.durableActions { - err = mainWorker.RegisterAction(namedFn.ActionID, namedFn.Fn) + err = mainWorker.RegisterDurableAction(namedFn.ActionID, namedFn.Fn, namedFn.EvictionPolicy) if err != nil { return nil, err } diff --git a/sdks/go/hatchet.go b/sdks/go/hatchet.go index 18a33eaf6d..c40caab81d 100644 --- a/sdks/go/hatchet.go +++ b/sdks/go/hatchet.go @@ -55,6 +55,7 @@ import ( pkgWorker "github.com/hatchet-dev/hatchet/pkg/worker" "github.com/hatchet-dev/hatchet/pkg/worker/condition" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" ) // Context represents the execution context passed to task functions. @@ -91,3 +92,24 @@ func OrCondition(conditions ...condition.Condition) condition.Condition { func AndCondition(conditions ...condition.Condition) condition.Condition { return condition.Conditions(conditions...) } + +// Eviction types re-exported for convenience. + +// EvictionPolicy defines task-scoped eviction parameters for durable tasks. +type EvictionPolicy = eviction.Policy + +// EvictionManagerConfig holds per-worker eviction manager settings. +type EvictionManagerConfig = eviction.ManagerConfig + +// ErrEvicted is the error cause set on a durable run's context when it is evicted. +var ErrEvicted = eviction.ErrEvicted + +// DefaultEvictionPolicy returns sensible defaults for durable task eviction. +func DefaultEvictionPolicy() *EvictionPolicy { + return eviction.DefaultPolicy() +} + +// DefaultEvictionManagerConfig returns sensible defaults for the eviction manager. +func DefaultEvictionManagerConfig() EvictionManagerConfig { + return eviction.DefaultManagerConfig() +} diff --git a/sdks/go/internal/declaration.go b/sdks/go/internal/declaration.go index 23398865e6..9aea74f045 100644 --- a/sdks/go/internal/declaration.go +++ b/sdks/go/internal/declaration.go @@ -15,6 +15,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/client/rest" "github.com/hatchet-dev/hatchet/pkg/client/types" "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" "github.com/hatchet-dev/hatchet/sdks/go/features" "github.com/hatchet-dev/hatchet/sdks/go/internal/task" @@ -31,8 +32,9 @@ type DurableWrappedTaskFn func(ctx worker.DurableHatchetContext) (interface{}, e // NamedFunction represents a function with its associated action ID type NamedFunction struct { - ActionID string - Fn WrappedTaskFn + ActionID string + Fn WrappedTaskFn + EvictionPolicy *eviction.Policy } // WorkflowBase defines the common interface for all workflow types. diff --git a/sdks/go/internal/task/task.go b/sdks/go/internal/task/task.go index 508d130ab3..2766f3e5dc 100644 --- a/sdks/go/internal/task/task.go +++ b/sdks/go/internal/task/task.go @@ -9,6 +9,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/client/create" "github.com/hatchet-dev/hatchet/pkg/client/types" "github.com/hatchet-dev/hatchet/pkg/worker/condition" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" ) type NamedTaskImpl struct { @@ -109,6 +110,10 @@ type DurableTaskDeclaration[I any] struct { // The function to execute when the task runs // must be a function that takes an input and a DurableHatchetContext and returns an output and an error Fn interface{} + + // EvictionPolicy controls when and how this durable task can be evicted from a slot while waiting. + // A nil policy means the task is never eligible for eviction. + EvictionPolicy *eviction.Policy } // OnFailureTaskDeclaration represents a task that will be executed if diff --git a/sdks/go/worker.go b/sdks/go/worker.go index 11869a4fad..f0f062c35f 100644 --- a/sdks/go/worker.go +++ b/sdks/go/worker.go @@ -4,6 +4,7 @@ import ( "github.com/rs/zerolog" v1 "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" "github.com/hatchet-dev/hatchet/sdks/go/internal" ) @@ -19,6 +20,7 @@ type workerConfig struct { labels map[string]any logger *zerolog.Logger panicHandler func(ctx Context, recovered any) + evictionConfig *eviction.ManagerConfig } type WorkflowBase interface { @@ -75,3 +77,11 @@ func WithPanicHandler(panicHandler func(ctx Context, recovered any)) WorkerOptio config.panicHandler = panicHandler } } + +// WithEvictionConfig sets the eviction manager configuration for the worker. +// This controls the background eviction loop that manages durable task slot pressure. +func WithEvictionConfig(evictionConfig *eviction.ManagerConfig) WorkerOption { + return func(config *workerConfig) { + config.evictionConfig = evictionConfig + } +} diff --git a/sdks/go/workflow.go b/sdks/go/workflow.go index e72e36e4c4..f47be8f4be 100644 --- a/sdks/go/workflow.go +++ b/sdks/go/workflow.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "reflect" + "strings" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/client/types" "github.com/hatchet-dev/hatchet/pkg/worker" "github.com/hatchet-dev/hatchet/pkg/worker/condition" + "github.com/hatchet-dev/hatchet/pkg/worker/eviction" "github.com/hatchet-dev/hatchet/sdks/go/features" "github.com/hatchet-dev/hatchet/sdks/go/internal" ) @@ -96,8 +98,9 @@ func convertInputToType(input any, expectedType reflect.Type) reflect.Value { // Workflow defines a Hatchet workflow, which can then declare tasks and be run, scheduled, and so on. type Workflow struct { - declaration internal.WorkflowDeclaration[any, any] - v0Client v0Client.Client + declaration internal.WorkflowDeclaration[any, any] + v0Client v0Client.Client + evictionPolicies map[string]*eviction.Policy // task name -> eviction policy } // GetName returns the resolved workflow name (including namespace if applicable). @@ -228,8 +231,9 @@ func newWorkflow(name string, v0Client v0Client.Client, options ...WorkflowOptio declaration := internal.NewWorkflowDeclaration[any, any](createOpts, v0Client) return &Workflow{ - declaration: declaration, - v0Client: v0Client, + declaration: declaration, + v0Client: v0Client, + evictionPolicies: make(map[string]*eviction.Policy), } } @@ -252,6 +256,7 @@ type taskConfig struct { waitFor condition.Condition skipIf condition.Condition description string + evictionPolicy *eviction.Policy } // WithRetries sets the number of retry attempts for failed tasks. @@ -360,6 +365,16 @@ func WithDescription(description string) TaskOption { } } +// WithEvictionPolicy sets the eviction policy for a durable task. +// This controls when and how the task can be evicted from a durable slot +// while it is in a waiting state (e.g. SleepFor, WaitForEvent). +// A nil policy (the default) means the task is never eligible for eviction. +func WithEvictionPolicy(policy *eviction.Policy) TaskOption { + return func(config *taskConfig) { + config.evictionPolicy = policy + } +} + // Task represents a task reference for building DAGs and conditions. type Task struct { name string @@ -473,6 +488,11 @@ func (w *Workflow) NewTask(name string, fn any, options ...TaskOption) *Task { w.declaration.Task(taskOpts, wrapper) + // Store eviction policy for durable tasks + if config.isDurable && config.evictionPolicy != nil { + w.evictionPolicies[name] = config.evictionPolicy + } + return &Task{name: name} } @@ -490,7 +510,24 @@ func (w *Workflow) NewDurableTask(name string, fn any, options ...TaskOption) *T // Dump implements the WorkflowBase interface for internal use. func (w *Workflow) Dump() (*v1.CreateWorkflowVersionRequest, []internal.NamedFunction, []internal.NamedFunction, internal.WrappedTaskFn) { - return w.declaration.Dump() + req, regularFns, durableFns, onFailureFn := w.declaration.Dump() + + // Attach eviction policies to named functions that match durable tasks. + workflowName := w.declaration.Name() + attachEvictionPolicies := func(fns []internal.NamedFunction) { + for i, fn := range fns { + for taskName, policy := range w.evictionPolicies { + expectedActionID := strings.ToLower(fmt.Sprintf("%s:%s", workflowName, taskName)) + if fn.ActionID == expectedActionID { + fns[i].EvictionPolicy = policy + } + } + } + } + attachEvictionPolicies(regularFns) + attachEvictionPolicies(durableFns) + + return req, regularFns, durableFns, onFailureFn } // OnFailure sets a failure handler for the workflow.