Skip to content

Commit 65e2411

Browse files
Feature: Ephemeral action runners (#33570)
* This includes a runner mock test for hardend PickTask behavior like described in my proposal * Runner register ephemeral flag introduced in https://gitea.com/gitea/act_runner/pulls/649 Closes #32461
1 parent 55cc649 commit 65e2411

File tree

10 files changed

+238
-19
lines changed

10 files changed

+238
-19
lines changed

models/actions/runner.go

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type ActionRunner struct {
5757

5858
// Store labels defined in state file (default: .runner file) of `act_runner`
5959
AgentLabels []string `xorm:"TEXT"`
60+
// Store if this is a runner that only ever get one single job assigned
61+
Ephemeral bool `xorm:"ephemeral NOT NULL DEFAULT false"`
6062

6163
Created timeutil.TimeStamp `xorm:"created"`
6264
Updated timeutil.TimeStamp `xorm:"updated"`

models/migrations/migrations.go

+1
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ func prepareMigrationTasks() []*migration {
375375
newMigration(312, "Add DeleteBranchAfterMerge to AutoMerge", v1_24.AddDeleteBranchAfterMergeForAutoMerge),
376376
newMigration(313, "Move PinOrder from issue table to a new table issue_pin", v1_24.MovePinOrderToTableIssuePin),
377377
newMigration(314, "Update OwnerID as zero for repository level action tables", v1_24.UpdateOwnerIDOfRepoLevelActionsTables),
378+
newMigration(315, "Add Ephemeral to ActionRunner", v1_24.AddEphemeralToActionRunner),
378379
}
379380
return preparedMigrations
380381
}

models/migrations/v1_24/v315.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2025 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package v1_24 //nolint
5+
6+
import (
7+
"xorm.io/xorm"
8+
)
9+
10+
func AddEphemeralToActionRunner(x *xorm.Engine) error {
11+
type ActionRunner struct {
12+
Ephemeral bool `xorm:"ephemeral NOT NULL DEFAULT false"`
13+
}
14+
15+
return x.Sync(new(ActionRunner))
16+
}

routers/api/actions/runner/runner.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func (s *Service) Register(
7878
RepoID: runnerToken.RepoID,
7979
Version: req.Msg.Version,
8080
AgentLabels: labels,
81+
Ephemeral: req.Msg.Ephemeral,
8182
}
8283
if err := runner.GenerateToken(); err != nil {
8384
return nil, errors.New("can't generate token")
@@ -96,12 +97,13 @@ func (s *Service) Register(
9697

9798
res := connect.NewResponse(&runnerv1.RegisterResponse{
9899
Runner: &runnerv1.Runner{
99-
Id: runner.ID,
100-
Uuid: runner.UUID,
101-
Token: runner.Token,
102-
Name: runner.Name,
103-
Version: runner.Version,
104-
Labels: runner.AgentLabels,
100+
Id: runner.ID,
101+
Uuid: runner.UUID,
102+
Token: runner.Token,
103+
Name: runner.Name,
104+
Version: runner.Version,
105+
Labels: runner.AgentLabels,
106+
Ephemeral: runner.Ephemeral,
105107
},
106108
})
107109

services/actions/cleanup.go

+26-1
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@ import (
99
"time"
1010

1111
actions_model "code.gitea.io/gitea/models/actions"
12+
"code.gitea.io/gitea/models/db"
1213
actions_module "code.gitea.io/gitea/modules/actions"
1314
"code.gitea.io/gitea/modules/log"
1415
"code.gitea.io/gitea/modules/setting"
1516
"code.gitea.io/gitea/modules/storage"
1617
"code.gitea.io/gitea/modules/timeutil"
18+
19+
"xorm.io/builder"
1720
)
1821

19-
// Cleanup removes expired actions logs, data and artifacts
22+
// Cleanup removes expired actions logs, data, artifacts and used ephemeral runners
2023
func Cleanup(ctx context.Context) error {
2124
// clean up expired artifacts
2225
if err := CleanupArtifacts(ctx); err != nil {
@@ -28,6 +31,11 @@ func Cleanup(ctx context.Context) error {
2831
return fmt.Errorf("cleanup logs: %w", err)
2932
}
3033

34+
// clean up old ephemeral runners
35+
if err := CleanupEphemeralRunners(ctx); err != nil {
36+
return fmt.Errorf("cleanup old ephemeral runners: %w", err)
37+
}
38+
3139
return nil
3240
}
3341

@@ -123,3 +131,20 @@ func CleanupLogs(ctx context.Context) error {
123131
log.Info("Removed %d logs", count)
124132
return nil
125133
}
134+
135+
// CleanupEphemeralRunners removes used ephemeral runners which are no longer able to process jobs
136+
func CleanupEphemeralRunners(ctx context.Context) error {
137+
subQuery := builder.Select("`action_runner`.id").
138+
From(builder.Select("*").From("`action_runner`"), "`action_runner`"). // mysql needs this redundant subquery
139+
Join("INNER", "`action_task`", "`action_task`.`runner_id` = `action_runner`.`id`").
140+
Where(builder.Eq{"`action_runner`.`ephemeral`": true}).
141+
And(builder.NotIn("`action_task`.`status`", actions_model.StatusWaiting, actions_model.StatusRunning, actions_model.StatusBlocked))
142+
b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`")
143+
res, err := db.GetEngine(ctx).Exec(b)
144+
if err != nil {
145+
return fmt.Errorf("find runners: %w", err)
146+
}
147+
affected, _ := res.RowsAffected()
148+
log.Info("Removed %d runners", affected)
149+
return nil
150+
}

services/actions/task.go

+20
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,26 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
2323
actionTask *actions_model.ActionTask
2424
)
2525

26+
if runner.Ephemeral {
27+
var task actions_model.ActionTask
28+
has, err := db.GetEngine(ctx).Where("runner_id = ?", runner.ID).Get(&task)
29+
// Let the runner retry the request, do not allow to proceed
30+
if err != nil {
31+
return nil, false, err
32+
}
33+
if has {
34+
if task.Status == actions_model.StatusWaiting || task.Status == actions_model.StatusRunning || task.Status == actions_model.StatusBlocked {
35+
return nil, false, nil
36+
}
37+
// task has been finished, remove it
38+
_, err = db.DeleteByID[actions_model.ActionRunner](ctx, runner.ID)
39+
if err != nil {
40+
return nil, false, err
41+
}
42+
return nil, false, fmt.Errorf("runner has been removed")
43+
}
44+
}
45+
2646
if err := db.WithTx(ctx, func(ctx context.Context) error {
2747
t, ok, err := actions_model.CreateTaskForRunner(ctx, runner)
2848
if err != nil {

tests/integration/actions_job_test.go

+155-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"code.gitea.io/gitea/modules/json"
2222
"code.gitea.io/gitea/modules/setting"
2323
api "code.gitea.io/gitea/modules/structs"
24+
actions_service "code.gitea.io/gitea/services/actions"
2425

2526
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
27+
"connectrpc.com/connect"
2628
"github.com/stretchr/testify/assert"
2729
)
2830

@@ -132,7 +134,7 @@ jobs:
132134

133135
apiRepo := createActionsTestRepo(t, token, "actions-jobs-with-needs", false)
134136
runner := newMockRunner()
135-
runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"})
137+
runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
136138

137139
for _, tc := range testCases {
138140
t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) {
@@ -318,7 +320,7 @@ jobs:
318320

319321
apiRepo := createActionsTestRepo(t, token, "actions-jobs-outputs-with-matrix", false)
320322
runner := newMockRunner()
321-
runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"})
323+
runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
322324

323325
for _, tc := range testCases {
324326
t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) {
@@ -363,7 +365,7 @@ func TestActionsGiteaContext(t *testing.T) {
363365
user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository)
364366

365367
runner := newMockRunner()
366-
runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"})
368+
runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
367369

368370
// init the workflow
369371
wfTreePath := ".gitea/workflows/pull.yml"
@@ -437,6 +439,156 @@ jobs:
437439
})
438440
}
439441

442+
// Ephemeral
443+
func TestActionsGiteaContextEphemeral(t *testing.T) {
444+
onGiteaRun(t, func(t *testing.T, u *url.URL) {
445+
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
446+
user2Session := loginUser(t, user2.Name)
447+
user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
448+
449+
apiBaseRepo := createActionsTestRepo(t, user2Token, "actions-gitea-context", false)
450+
baseRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiBaseRepo.ID})
451+
user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository)
452+
453+
runner := newMockRunner()
454+
runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}, true)
455+
456+
// verify CleanupEphemeralRunners does not remove this runner
457+
err := actions_service.CleanupEphemeralRunners(t.Context())
458+
assert.NoError(t, err)
459+
460+
// init the workflow
461+
wfTreePath := ".gitea/workflows/pull.yml"
462+
wfFileContent := `name: Pull Request
463+
on: pull_request
464+
jobs:
465+
wf1-job:
466+
runs-on: ubuntu-latest
467+
steps:
468+
- run: echo 'test the pull'
469+
wf2-job:
470+
runs-on: ubuntu-latest
471+
steps:
472+
- run: echo 'test the pull'
473+
`
474+
opts := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, fmt.Sprintf("create %s", wfTreePath), wfFileContent)
475+
createWorkflowFile(t, user2Token, baseRepo.OwnerName, baseRepo.Name, wfTreePath, opts)
476+
// user2 creates a pull request
477+
doAPICreateFile(user2APICtx, "user2-patch.txt", &api.CreateFileOptions{
478+
FileOptions: api.FileOptions{
479+
NewBranchName: "user2/patch-1",
480+
Message: "create user2-patch.txt",
481+
Author: api.Identity{
482+
Name: user2.Name,
483+
Email: user2.Email,
484+
},
485+
Committer: api.Identity{
486+
Name: user2.Name,
487+
Email: user2.Email,
488+
},
489+
Dates: api.CommitDateOptions{
490+
Author: time.Now(),
491+
Committer: time.Now(),
492+
},
493+
},
494+
ContentBase64: base64.StdEncoding.EncodeToString([]byte("user2-fix")),
495+
})(t)
496+
apiPull, err := doAPICreatePullRequest(user2APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, "user2/patch-1")(t)
497+
assert.NoError(t, err)
498+
task := runner.fetchTask(t)
499+
gtCtx := task.Context.GetFields()
500+
actionTask := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: task.Id})
501+
actionRunJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: actionTask.JobID})
502+
actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: actionRunJob.RunID})
503+
assert.NoError(t, actionRun.LoadAttributes(t.Context()))
504+
505+
assert.Equal(t, user2.Name, gtCtx["actor"].GetStringValue())
506+
assert.Equal(t, setting.AppURL+"api/v1", gtCtx["api_url"].GetStringValue())
507+
assert.Equal(t, apiPull.Base.Ref, gtCtx["base_ref"].GetStringValue())
508+
runEvent := map[string]any{}
509+
assert.NoError(t, json.Unmarshal([]byte(actionRun.EventPayload), &runEvent))
510+
assert.True(t, reflect.DeepEqual(gtCtx["event"].GetStructValue().AsMap(), runEvent))
511+
assert.Equal(t, actionRun.TriggerEvent, gtCtx["event_name"].GetStringValue())
512+
assert.Equal(t, apiPull.Head.Ref, gtCtx["head_ref"].GetStringValue())
513+
assert.Equal(t, actionRunJob.JobID, gtCtx["job"].GetStringValue())
514+
assert.Equal(t, actionRun.Ref, gtCtx["ref"].GetStringValue())
515+
assert.Equal(t, (git.RefName(actionRun.Ref)).ShortName(), gtCtx["ref_name"].GetStringValue())
516+
assert.False(t, gtCtx["ref_protected"].GetBoolValue())
517+
assert.Equal(t, string((git.RefName(actionRun.Ref)).RefType()), gtCtx["ref_type"].GetStringValue())
518+
assert.Equal(t, actionRun.Repo.OwnerName+"/"+actionRun.Repo.Name, gtCtx["repository"].GetStringValue())
519+
assert.Equal(t, actionRun.Repo.OwnerName, gtCtx["repository_owner"].GetStringValue())
520+
assert.Equal(t, actionRun.Repo.HTMLURL(), gtCtx["repositoryUrl"].GetStringValue())
521+
assert.Equal(t, fmt.Sprint(actionRunJob.RunID), gtCtx["run_id"].GetStringValue())
522+
assert.Equal(t, fmt.Sprint(actionRun.Index), gtCtx["run_number"].GetStringValue())
523+
assert.Equal(t, fmt.Sprint(actionRunJob.Attempt), gtCtx["run_attempt"].GetStringValue())
524+
assert.Equal(t, "Actions", gtCtx["secret_source"].GetStringValue())
525+
assert.Equal(t, setting.AppURL, gtCtx["server_url"].GetStringValue())
526+
assert.Equal(t, actionRun.CommitSHA, gtCtx["sha"].GetStringValue())
527+
assert.Equal(t, actionRun.WorkflowID, gtCtx["workflow"].GetStringValue())
528+
assert.Equal(t, setting.Actions.DefaultActionsURL.URL(), gtCtx["gitea_default_actions_url"].GetStringValue())
529+
token := gtCtx["token"].GetStringValue()
530+
assert.Equal(t, actionTask.TokenLastEight, token[len(token)-8:])
531+
532+
// verify CleanupEphemeralRunners does not remove this runner
533+
err = actions_service.CleanupEphemeralRunners(t.Context())
534+
assert.NoError(t, err)
535+
536+
resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
537+
TasksVersion: 0,
538+
}))
539+
assert.NoError(t, err)
540+
assert.Nil(t, resp.Msg.Task)
541+
542+
// verify CleanupEphemeralRunners does not remove this runner
543+
err = actions_service.CleanupEphemeralRunners(t.Context())
544+
assert.NoError(t, err)
545+
546+
runner.client.runnerServiceClient.UpdateTask(t.Context(), connect.NewRequest(&runnerv1.UpdateTaskRequest{
547+
State: &runnerv1.TaskState{
548+
Id: actionTask.ID,
549+
Result: runnerv1.Result_RESULT_SUCCESS,
550+
},
551+
}))
552+
resp, err = runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
553+
TasksVersion: 0,
554+
}))
555+
assert.Error(t, err)
556+
assert.Nil(t, resp)
557+
558+
resp, err = runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
559+
TasksVersion: 0,
560+
}))
561+
assert.Error(t, err)
562+
assert.Nil(t, resp)
563+
564+
// create an runner that picks a job and get force cancelled
565+
runnerToBeRemoved := newMockRunner()
566+
runnerToBeRemoved.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner-to-be-removed", []string{"ubuntu-latest"}, true)
567+
568+
taskToStopAPIObj := runnerToBeRemoved.fetchTask(t)
569+
570+
taskToStop := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskToStopAPIObj.Id})
571+
572+
// verify CleanupEphemeralRunners does not remove the custom crafted runner
573+
err = actions_service.CleanupEphemeralRunners(t.Context())
574+
assert.NoError(t, err)
575+
576+
runnerToRemove := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{ID: taskToStop.RunnerID})
577+
578+
err = actions_model.StopTask(t.Context(), taskToStop.ID, actions_model.StatusFailure)
579+
assert.NoError(t, err)
580+
581+
// verify CleanupEphemeralRunners does remove the custom crafted runner
582+
err = actions_service.CleanupEphemeralRunners(t.Context())
583+
assert.NoError(t, err)
584+
585+
unittest.AssertNotExistsBean(t, &actions_model.ActionRunner{ID: runnerToRemove.ID})
586+
587+
// this cleanup is required to allow further tests to pass
588+
doAPIDeleteRepository(user2APICtx)(t)
589+
})
590+
}
591+
440592
func createActionsTestRepo(t *testing.T, authToken, repoName string, isPrivate bool) *api.Repository {
441593
req := NewRequestWithJSON(t, "POST", "/api/v1/user/repos", &api.CreateRepoOption{
442594
Name: repoName,

tests/integration/actions_log_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ jobs:
105105
apiRepo := createActionsTestRepo(t, token, "actions-download-task-logs", false)
106106
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID})
107107
runner := newMockRunner()
108-
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"})
108+
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
109109

110110
for _, tc := range testCases {
111111
t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) {

tests/integration/actions_runner_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,20 @@ func (r *mockRunner) doPing(t *testing.T) {
6767
assert.Equal(t, "Hello, mock-runner!", resp.Msg.Data)
6868
}
6969

70-
func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []string) {
70+
func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []string, ephemeral bool) {
7171
r.doPing(t)
7272
resp, err := r.client.runnerServiceClient.Register(t.Context(), connect.NewRequest(&runnerv1.RegisterRequest{
73-
Name: name,
74-
Token: token,
75-
Version: "mock-runner-version",
76-
Labels: labels,
73+
Name: name,
74+
Token: token,
75+
Version: "mock-runner-version",
76+
Labels: labels,
77+
Ephemeral: ephemeral,
7778
}))
7879
assert.NoError(t, err)
7980
r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token)
8081
}
8182

82-
func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) {
83+
func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string, ephemeral bool) {
8384
session := loginUser(t, ownerName)
8485
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository)
8586
req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/repos/%s/%s/actions/runners/registration-token", ownerName, repoName)).AddTokenAuth(token)
@@ -88,7 +89,7 @@ func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, run
8889
Token string `json:"token"`
8990
}
9091
DecodeJSON(t, resp, &registrationToken)
91-
r.doRegister(t, runnerName, registrationToken.Token, labels)
92+
r.doRegister(t, runnerName, registrationToken.Token, labels, ephemeral)
9293
}
9394

9495
func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task {

tests/integration/repo_webhook_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ func Test_WebhookWorkflowJob(t *testing.T) {
639639
assert.NoError(t, err)
640640

641641
runner := newMockRunner()
642-
runner.registerAsRepoRunner(t, "user2", "repo1", "mock-runner", []string{"ubuntu-latest"})
642+
runner.registerAsRepoRunner(t, "user2", "repo1", "mock-runner", []string{"ubuntu-latest"}, false)
643643

644644
// 2. trigger the webhooks
645645

0 commit comments

Comments
 (0)