Skip to content

scheduler: task waits for slow input dependencies even when cff.Predicate=false #104

Description

@Saijayavinoth

Problem

When cff.Predicate returns false, the task body is skipped — but the scheduler still waits for every declared input dependency to finish before the skip happens. An unrelated slow input therefore imposes its full latency on the consumer, even though the predicate=false result is what decides the skip and the input value is never read.

Reproducer

// slowOut takes slowDelay; fastOut is immediate; predicate(fastOut)=false.
// The predicated task declares slowOut as input. A downstream consumer
// measures elapsed time from flow start.
func BlockingInputs(slowDelay time.Duration) (time.Duration, error) {
    type slowOut struct{}
    type fastOut struct{}
    type skippedOut struct{}

    var elapsed time.Duration
    start := time.Now()
    err := cff.Flow(
        context.Background(),
        cff.Results(&elapsed),
        cff.Task(func() slowOut { time.Sleep(slowDelay); return slowOut{} }),
        cff.Task(func() fastOut { return fastOut{} }),
        cff.Task(
            func(slowOut) (skippedOut, error) { return skippedOut{}, nil },
            cff.Predicate(func(fastOut) bool { return false }),
            cff.FallbackWith(skippedOut{}),
        ),
        cff.Task(func(skippedOut) time.Duration { return time.Since(start) }),
    )
    return elapsed, err
}
main expected
elapsed ≈ slowDelay elapsed ≈ 0

Hits real branch-parallelization flows: a cheap config-style predicate gates an expensive subtree, while a sibling input issues a network/RPC/DB call. Even when the predicate is false and the subtree won't run, the consumer still blocks on the I/O round-trip. Silent — no compile or runtime warning.

Proposed minimal design

# Where Code
1 New API (scheduler/scheduler.go:308,382) type PredicateJob struct { Run func(context.Context) (bool, error); Dependencies []*ScheduledJob }
func (s *Scheduler) EnqueuePredicate(ctx context.Context, p PredicateJob) *ScheduledJob
2 Worker arm (scheduler.go:155) } else if j.predicateRun != nil { res.PredicateResult, res.Err = j.predicateRun(j.ctx) }
3 Done branch (scheduler.go:573) if job.predicateRun != nil && res.Err == nil && !res.PredicateResult { /* walk consumers, remaining=0, push ready */ }
4 Late-enqueue cache (scheduler.go:533 write, :502 read) if job.predicateRun != nil { job.predicateResult = res.PredicateResult } — so a consumer enqueued after the predicate completed can read dep.predicateResult and fast-dispatch
5 Codegen (internal/templates/flow/predicate.go.tmpl) Emits sched.EnqueuePredicate(ctx, cff.PredicateJob{Run: ..., Dependencies: ...}). Consumer codegen byte-identical — its existing if !p<hash> { return nil } skip gate is unchanged.

~40 LOC scheduler change. No behavior change for predicate=true or predicate=error. Existing tests pass with no modifications. Local branch shows ~118× reduction in consumer dispatch latency (5.25ms → 44µs against a 5ms upstream stall). Opening a PR concurrently with this issue for concrete review.

Alternatives considered

Alternative Why rejected
Shared result pointerIsPredicate bool + Result *bool on existing Job; Run writes through the pointer. Bool is a return value, not shared mutable state. Pointer aliases worker↔loop, needs an explicit happens-before argument. Job field meaningless unless flag is set.
JobKind enum on Job — single Kind field, type-switched dispatch. Either still needs *bool (→ row 1), or changes Job.Run signature (breaks every caller).
Sentinel errorRun returns errPredicateFalse to signal skip. Overloads error semantics — scheduler must inspect error values for control flow, and the user's predicate would need wrapping to translate false → sentinel.

Out of scope (planned follow-up PR)

Fast-dispatch invalid consumers. Same mechanism, generalized. When a consumer is marked invalid (its dep errored), its body won't run — the worker's j.invalid check at scheduler.go:152 short-circuits to errJobInvalid without dereferencing any inputs. So waiting on the consumer's other deps is dead time, exactly like the predicate-false case. Only meaningful in ContinueOnError mode (default mode returns on first error at scheduler.go:544-546).

Two sites — mirror of this PR's two sites:

Site Location When it fires
Done-branch invalidation loop scheduler.go:553-555 Dep errors mid-flight; consumer already enqueued and waiting on this dep + others. Common case.
Enqueue handler scheduler.go:499-501 Consumer enqueued after a dep has already errored. Narrow.

Unified fast-dispatch condition for both this PR and the follow-up — no new variable, just fastDispatch = true colocated with invalid = true at each site:

// Site 1 — enqueue handler (scheduler.go:499-504)
if dep.err != nil {
    job.invalid = true
    fastDispatch = true                  // ← follow-up
}

// Site 2 — done-branch invalidation loop (scheduler.go:553-555)
for _, consumer := range job.consumers {
    consumer.invalid = true
    if !consumer.done && consumer.remaining != 0 {
        consumer.remaining = 0           // ← follow-up
        waiting--
        ready.PushBack(consumer)
    }
}

Will be opened as a follow-up PR after this one lands, framed as "same mechanism as the predicate-early-dispatch PR, second arm of the dispatch condition." Kept out of this PR to keep the predicate fix reviewable in isolation and because the invalid case requires new tests against ContinueOnError flows with slow sibling deps.

Ask

Is the direction acceptable? Open to API alternatives (e.g. JobKind enum on Job instead of a new type). Kindly refer to the PR linked to this issue for concrete review.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions