Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
dd8138e
fix: add devcontainer spec
d2burkhalter Feb 3, 2026
dfd4d16
fix: update test runner to handle preemption
d2burkhalter Feb 3, 2026
f34cbaf
Merge pull request #1 from d2burkhalter/fix/preemption-tests
williamvega Feb 4, 2026
63e7956
preempt by id and ids
williamvega Feb 4, 2026
005ead5
fix: race condition for preempted jobs caused test to fail
williamvega Feb 4, 2026
469d5bd
remove devcontainer
williamvega Feb 4, 2026
426c9b7
rm dev container refs
williamvega Feb 4, 2026
079a648
only allow failed to follow preempted
williamvega Feb 4, 2026
8c74c77
correction
williamvega Feb 5, 2026
bcafae0
Merge branch 'master' into preempt-test-suite
williamvega Feb 5, 2026
2ac8e6d
failed then preempt race condition was killing the last job
williamvega Feb 5, 2026
b615e50
Merge branch 'preempt-test-suite' of github.com-personal:williamvega/…
williamvega Feb 5, 2026
0568f22
Merge branch 'master' into preempt-test-suite
williamvega Feb 6, 2026
e9b04af
Merge branch 'master' into preempt-test-suite
williamvega Feb 6, 2026
b02f3b4
Merge branch 'master' into preempt-test-suite
williamvega Feb 9, 2026
011bd58
failed then preempted
williamvega Feb 9, 2026
dc4b5d6
fix: intermittently reaches timeout
williamvega Feb 9, 2026
0c5842c
reset eventwatcher
williamvega Feb 9, 2026
7d13d46
add preempt to term event, add failed to test
williamvega Feb 9, 2026
e1446c5
update errorOnNoActiveJobs and ignore extra events from preemption
williamvega Feb 9, 2026
a1b5f96
logging
williamvega Feb 9, 2026
915d01e
fewer jobs
williamvega Feb 10, 2026
94c06ae
resource reqs in line with other tests
williamvega Feb 10, 2026
4f9a54c
intermittent oom
williamvega Feb 10, 2026
cf4856a
Revert "logging"
williamvega Feb 10, 2026
e94e191
Merge branch 'master' into preempt-test-suite
williamvega Feb 10, 2026
476896d
Merge branch 'master' into preempt-test-suite
williamvega Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion internal/testsuite/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,20 @@ func isTerminalEvent(msg *api.EventMessage) bool {
return true
case *api.EventMessage_Cancelled:
return true
case *api.EventMessage_Preempted:
return true
}
return false
}

// ErrorOnNoActiveJobs returns an error if there are no active jobs.
// Note: The executor intentionally sends both Preempted and Failed events for preempted jobs,
// so we allow Failed to follow Preempted without erroring.
func ErrorOnNoActiveJobs(parent context.Context, C chan *api.EventMessage, jobIds map[string]bool) error {
numActive := 0
numRemaining := len(jobIds)
exitedByJobId := make(map[string]bool)
preemptedJobIds := make(map[string]bool) // Track jobs that received Preempted event
for {
select {
case <-parent.Done():
Expand All @@ -257,6 +262,18 @@ func ErrorOnNoActiveJobs(parent context.Context, C chan *api.EventMessage, jobId
}
numActive--
} else if e := msg.GetFailed(); e != nil {
// Allow Failed after Preempted (executor sends both events for preempted jobs)
if _, wasPreempted := preemptedJobIds[e.JobId]; !wasPreempted {
if _, ok := exitedByJobId[e.JobId]; ok {
return errors.Errorf("received multiple terminal events for job %s", e.JobId)
}
}
exitedByJobId[e.JobId] = true
if _, ok := jobIds[e.JobId]; ok {
numRemaining--
}
numActive--
} else if e := msg.GetCancelled(); e != nil {
if _, ok := exitedByJobId[e.JobId]; ok {
return errors.Errorf("received multiple terminal events for job %s", e.JobId)
}
Expand All @@ -265,11 +282,12 @@ func ErrorOnNoActiveJobs(parent context.Context, C chan *api.EventMessage, jobId
numRemaining--
}
numActive--
} else if e := msg.GetCancelled(); e != nil {
} else if e := msg.GetPreempted(); e != nil {
if _, ok := exitedByJobId[e.JobId]; ok {
return errors.Errorf("received multiple terminal events for job %s", e.JobId)
}
exitedByJobId[e.JobId] = true
preemptedJobIds[e.JobId] = true // Mark as preempted to allow subsequent Failed
if _, ok := jobIds[e.JobId]; ok {
numRemaining--
}
Expand Down
102 changes: 98 additions & 4 deletions internal/testsuite/testrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,31 @@ func (srv *TestRunner) Run(ctx context.Context) (err error) {
// }
// }

// Build list of event channels based on test configuration.
eventChannels := []chan *api.EventMessage{assertCh, ingressCh, noActiveCh, benchmarkCh, srv.eventLogger.In}

// Add preempt channel if preemption is configured.
var preemptCh chan *api.EventMessage
if srv.testSpec.Action == api.TestSpec_ACTION_PREEMPT {
preemptCh = make(chan *api.EventMessage)
eventChannels = append(eventChannels, preemptCh)
}

// Duplicate events across all downstream services.
splitter := eventsplitter.New(
watcher.C,
[]chan *api.EventMessage{assertCh, ingressCh, noActiveCh, benchmarkCh, srv.eventLogger.In}...,
eventChannels...,
)
g.Go(func() error { return splitter.Run(ctx) })

// If configured, preempt jobs once they are running.
// Used to test job preemption.
if srv.testSpec.Action == api.TestSpec_ACTION_PREEMPT {
g.Go(func() error {
return preemptJobsWhenRunning(ctx, preemptCh, srv.testSpec, srv.apiConnectionDetails, jobIds)
})
}

// Cancel the errgroup if there are no active jobs.
g.Go(func() error { return eventwatcher.ErrorOnNoActiveJobs(ctx, noActiveCh, maps.Clone(jobIdMap)) })

Expand Down Expand Up @@ -185,12 +203,16 @@ func (srv *TestRunner) Run(ctx context.Context) (err error) {

// tryCancelJobs cancels submitted jobs if cancellation is configured.
func tryCancelJobs(ctx context.Context, testSpec *api.TestSpec, conn *client.ApiConnectionDetails, jobIds []string) error {
if testSpec.Action != api.TestSpec_ACTION_CANCEL {
return nil
}

req := &api.JobCancelRequest{
Queue: testSpec.GetQueue(),
JobSetId: testSpec.GetJobSetId(),
}
switch {
case testSpec.Cancel == api.TestSpec_BY_ID:
case testSpec.Selection == api.TestSpec_SELECTION_BY_ID:
return client.WithSubmitClient(conn, func(sc api.SubmitClient) error {
time.Sleep(3 * time.Second)
for _, jobId := range jobIds {
Expand All @@ -202,7 +224,7 @@ func tryCancelJobs(ctx context.Context, testSpec *api.TestSpec, conn *client.Api
}
return nil
})
case testSpec.Cancel == api.TestSpec_BY_SET:
case testSpec.Selection == api.TestSpec_SELECTION_BY_SET:
return client.WithSubmitClient(conn, func(sc api.SubmitClient) error {
time.Sleep(3 * time.Second)
_, err := sc.CancelJobs(ctx, req)
Expand All @@ -211,7 +233,7 @@ func tryCancelJobs(ctx context.Context, testSpec *api.TestSpec, conn *client.Api
}
return nil
})
case testSpec.Cancel == api.TestSpec_BY_IDS:
case testSpec.Selection == api.TestSpec_SELECTION_BY_IDS:
return client.WithSubmitClient(conn, func(sc api.SubmitClient) error {
time.Sleep(3 * time.Second)
req.JobIds = jobIds
Expand All @@ -224,3 +246,75 @@ func tryCancelJobs(ctx context.Context, testSpec *api.TestSpec, conn *client.Api
}
return nil
}

// preemptJobsWhenRunning waits for jobs to be running, then preempts them.
func preemptJobsWhenRunning(ctx context.Context, eventCh chan *api.EventMessage, testSpec *api.TestSpec, conn *client.ApiConnectionDetails, jobIds []string) error {
runningJobs := make(map[string]bool)

// Wait for all jobs to be running
for {
select {
case <-ctx.Done():
return nil
case msg := <-eventCh:
if e := msg.GetRunning(); e != nil {
runningJobs[e.JobId] = true

// Once all jobs are running, preempt them
if len(runningJobs) == len(jobIds) {
time.Sleep(1 * time.Second) // Brief delay to ensure job is fully running
_ = tryPreemptJobs(ctx, testSpec, conn, jobIds)
// Continue consuming events but don't preempt again
for {
select {
case <-ctx.Done():
return nil
case <-eventCh:
// Keep consuming to avoid blocking the splitter
}
}
}
}
}
}
}

// tryPreemptJobs preempts submitted jobs if preemption is configured.
func tryPreemptJobs(ctx context.Context, testSpec *api.TestSpec, conn *client.ApiConnectionDetails, jobIds []string) error {
req := &api.JobPreemptRequest{
Queue: testSpec.GetQueue(),
JobSetId: testSpec.GetJobSetId(),
Reason: testSpec.GetPreemptReason(),
}
switch {
case testSpec.Selection == api.TestSpec_SELECTION_BY_ID:
return client.WithSubmitClient(conn, func(sc api.SubmitClient) error {
for _, jobId := range jobIds {
req.JobIds = []string{jobId}
_, err := sc.PreemptJobs(ctx, req)
if err != nil {
return errors.WithStack(err)
}
}
return nil
})
case testSpec.Selection == api.TestSpec_SELECTION_BY_SET:
return client.WithSubmitClient(conn, func(sc api.SubmitClient) error {
_, err := sc.PreemptJobs(ctx, req)
if err != nil {
return errors.WithStack(err)
}
return nil
})
case testSpec.Selection == api.TestSpec_SELECTION_BY_IDS:
return client.WithSubmitClient(conn, func(sc api.SubmitClient) error {
req.JobIds = jobIds
_, err := sc.PreemptJobs(ctx, req)
if err != nil {
return errors.WithStack(err)
}
return nil
})
}
return nil
}
Loading