Skip to content

Commit aff60be

Browse files
committed
feat: Add support for dynamic matrix strategies with job outputs
Signed-off-by: Pascal Zimmermann <pascal.zimmermann@theiotstudio.com>
1 parent 94437ea commit aff60be

File tree

11 files changed

+941
-5
lines changed

11 files changed

+941
-5
lines changed

models/actions/run_job.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ type ActionRunJob struct {
5151
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
5252
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
5353

54+
RawStrategy string // raw strategy from job YAML's "strategy" section (stored before matrix expansion for deferred evaluation)
55+
56+
// IsMatrixEvaluated is only valid/needed when this job's RawStrategy is not empty and contains a matrix that depends on job outputs.
57+
// If the matrix can't be evaluated yet (e.g. job hasn't completed), this field will be false.
58+
// If the matrix has been successfully evaluated with job outputs, this field will be true.
59+
IsMatrixEvaluated bool
60+
5461
Started timeutil.TimeStamp
5562
Stopped timeutil.TimeStamp
5663
Created timeutil.TimeStamp `xorm:"created"`
@@ -267,3 +274,11 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob)
267274

268275
return CancelJobs(ctx, jobsToCancel)
269276
}
277+
278+
// InsertActionRunJobs inserts multiple ActionRunJob records into the database
279+
func InsertActionRunJobs(ctx context.Context, jobs []*ActionRunJob) error {
280+
if len(jobs) == 0 {
281+
return nil
282+
}
283+
return db.Insert(ctx, jobs)
284+
}

models/migrations/migrations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ func prepareMigrationTasks() []*migration {
400400
newMigration(323, "Add support for actions concurrency", v1_26.AddActionsConcurrency),
401401
newMigration(324, "Fix closed milestone completeness for milestones with no issues", v1_26.FixClosedMilestoneCompleteness),
402402
newMigration(325, "Fix missed repo_id when migrate attachments", v1_26.FixMissedRepoIDWhenMigrateAttachments),
403+
newMigration(326, "Add support for matrix actions evaluation", v1_26.AddMatrixEvaluationColumnsToActionRunJob),
403404
}
404405
return preparedMigrations
405406
}

models/migrations/v1_26/v326.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2026 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package v1_26
5+
6+
import (
7+
"xorm.io/xorm"
8+
)
9+
10+
func AddMatrixEvaluationColumnsToActionRunJob(x *xorm.Engine) error {
11+
return x.Sync(new(ActionRunJobWithMatrixSupport))
12+
}
13+
14+
// ActionRunJobWithMatrixSupport is a temporary struct for migration purposes
15+
// It only defines the new columns we need to add
16+
type ActionRunJobWithMatrixSupport struct {
17+
RawStrategy string `xorm:"TEXT"` // raw strategy from job YAML's "strategy" section
18+
IsMatrixEvaluated bool // whether the matrix has been evaluated with job outputs
19+
}
20+
21+
// TableName returns the table name for xorm to sync
22+
func (ActionRunJobWithMatrixSupport) TableName() string {
23+
return "action_run_job"
24+
}

routers/web/web.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"code.gitea.io/gitea/routers/web/user"
4242
user_setting "code.gitea.io/gitea/routers/web/user/setting"
4343
"code.gitea.io/gitea/routers/web/user/setting/security"
44+
actions_service "code.gitea.io/gitea/services/actions"
4445
auth_service "code.gitea.io/gitea/services/auth"
4546
"code.gitea.io/gitea/services/context"
4647
"code.gitea.io/gitea/services/forms"
@@ -250,6 +251,8 @@ func Routes() *web.Router {
250251

251252
if setting.Metrics.Enabled {
252253
prometheus.MustRegister(metrics.NewCollector())
254+
// Register matrix re-evaluation metrics
255+
prometheus.MustRegister(actions_service.NewMatrixMetricsCollector())
253256
routes.Get("/metrics", append(mid, Metrics)...)
254257
}
255258

services/actions/job_emitter.go

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"time"
1011

1112
actions_model "code.gitea.io/gitea/models/actions"
1213
"code.gitea.io/gitea/models/db"
@@ -202,6 +203,9 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
202203
if err != nil {
203204
return nil, nil, err
204205
}
206+
207+
log.Debug("Checking %d jobs for run %d (status: %s)", len(jobs), run.ID, run.Status)
208+
205209
vars, err := actions_model.GetVariablesOfRun(ctx, run)
206210
if err != nil {
207211
return nil, nil, err
@@ -213,14 +217,18 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
213217
}
214218

215219
updates := newJobStatusResolver(jobs, vars).Resolve(ctx)
220+
log.Debug("Job status resolver returned %d job status updates for run %d", len(updates), run.ID)
221+
216222
for _, job := range jobs {
217223
if status, ok := updates[job.ID]; ok {
224+
oldStatus := job.Status
218225
job.Status = status
219226
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
220227
return err
221228
} else if n != 1 {
222229
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
223230
}
231+
log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status)
224232
updatedJobs = append(updatedJobs, job)
225233
}
226234
}
@@ -229,6 +237,20 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
229237
return nil, nil, err
230238
}
231239

240+
// Reload jobs from the database to pick up any newly created matrix jobs
241+
oldJobCount := len(jobs)
242+
jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
243+
if err != nil {
244+
return nil, nil, err
245+
}
246+
247+
if len(jobs) > oldJobCount {
248+
log.Info("Matrix re-evaluation created %d new jobs for run %d (was %d, now %d)",
249+
len(jobs)-oldJobCount, run.ID, oldJobCount, len(jobs))
250+
}
251+
252+
log.Debug("Job check completed for run %d: %d jobs updated, %d total jobs", run.ID, len(updatedJobs), len(jobs))
253+
232254
return jobs, updatedJobs, nil
233255
}
234256

@@ -313,47 +335,102 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model
313335

314336
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
315337
ret := map[int64]actions_model.Status{}
338+
resolveMetrics := struct {
339+
totalBlocked int
340+
matrixReevaluated int
341+
concurrencyUpdated int
342+
jobsStarted int
343+
jobsSkipped int
344+
}{}
345+
316346
for id, status := range r.statuses {
317347
actionRunJob := r.jobMap[id]
318348
if status != actions_model.StatusBlocked {
319349
continue
320350
}
351+
352+
resolveMetrics.totalBlocked++
353+
log.Debug("Resolving blocked job %d (JobID: %s, RunID: %d)", id, actionRunJob.JobID, actionRunJob.RunID)
354+
321355
allDone, allSucceed := r.resolveCheckNeeds(id)
322356
if !allDone {
357+
log.Debug("Job %d: not all dependencies completed yet", id)
323358
continue
324359
}
325360

361+
log.Debug("Job %d: all dependencies completed (allSucceed: %v), checking matrix re-evaluation", id, allSucceed)
362+
363+
// Try to re-evaluate the matrix with job outputs if it depends on them
364+
startTime := time.Now()
365+
newMatrixJobs, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars)
366+
duration := time.Since(startTime).Milliseconds()
367+
368+
if err != nil {
369+
log.Error("Matrix re-evaluation error for job %d (JobID: %s): %v (duration: %dms)", id, actionRunJob.JobID, err, duration)
370+
continue
371+
}
372+
373+
// If new matrix jobs were created, add them to the resolver and continue
374+
if len(newMatrixJobs) > 0 {
375+
resolveMetrics.matrixReevaluated++
376+
log.Info("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)",
377+
id, actionRunJob.JobID, len(newMatrixJobs), duration)
378+
// The new jobs will be picked up in the next resolution iteration
379+
continue
380+
}
381+
382+
log.Debug("Job %d: no matrix re-evaluation needed or result is empty", id)
383+
326384
// update concurrency and check whether the job can run now
327-
err := updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars)
385+
err = updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars)
328386
if err != nil {
329387
// The err can be caused by different cases: database error, or syntax error, or the needed jobs haven't completed
330388
// At the moment there is no way to distinguish them.
331389
// Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)"
332390
// TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users
333-
log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err)
391+
log.Debug("Concurrency evaluation failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err)
334392
continue
335393
}
336394

395+
resolveMetrics.concurrencyUpdated++
396+
337397
shouldStartJob := true
338398
if !allSucceed {
339399
// Not all dependent jobs completed successfully:
340400
// * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition.
341401
// * otherwise, the job should be skipped.
342402
shouldStartJob = r.resolveJobHasIfCondition(actionRunJob)
403+
log.Debug("Job %d: not all dependencies succeeded. Has if-condition: %v, should start: %v", id, shouldStartJob, shouldStartJob)
343404
}
344405

345406
newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped)
346407
if newStatus == actions_model.StatusWaiting {
347408
newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob)
348409
if err != nil {
349-
log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err)
410+
log.Error("Concurrency check failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err)
350411
}
351412
}
352413

353414
if newStatus != actions_model.StatusBlocked {
354415
ret[id] = newStatus
416+
switch newStatus {
417+
case actions_model.StatusWaiting:
418+
resolveMetrics.jobsStarted++
419+
log.Info("Job %d (JobID: %s) transitioned to StatusWaiting", id, actionRunJob.JobID)
420+
case actions_model.StatusSkipped:
421+
resolveMetrics.jobsSkipped++
422+
log.Info("Job %d (JobID: %s) transitioned to StatusSkipped", id, actionRunJob.JobID)
423+
}
355424
}
356425
}
426+
427+
// Log resolution metrics summary
428+
if resolveMetrics.totalBlocked > 0 {
429+
log.Debug("Job resolution summary: total_blocked=%d, matrix_reevaluated=%d, concurrency_updated=%d, jobs_started=%d, jobs_skipped=%d",
430+
resolveMetrics.totalBlocked, resolveMetrics.matrixReevaluated, resolveMetrics.concurrencyUpdated,
431+
resolveMetrics.jobsStarted, resolveMetrics.jobsSkipped)
432+
}
433+
357434
return ret
358435
}
359436

0 commit comments

Comments
 (0)