Skip to content

Commit e3d0935

Browse files
committed
Handle unauth scheduler ctx
1 parent 5da73c3 commit e3d0935

10 files changed

Lines changed: 160 additions & 42 deletions

File tree

deps/go_deps.MODULE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ use_repo(
255255
"com_github_prometheus_client_model",
256256
"com_github_prometheus_common",
257257
"com_github_rantav_go_grpc_channelz",
258-
"com_github_robfig_cron_v3",
258+
"com_github_robfig_cron_v3",
259259
"com_github_roaringbitmap_roaring",
260260
"com_github_rs_zerolog",
261261
"com_github_shirou_gopsutil_v3",

enterprise/server/githubapp/githubapp.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,22 @@ func (a *GitHubApp) GetInstallationTokenForStatusReportingOnly(ctx context.Conte
619619
return tok, nil
620620
}
621621

622+
func (a *GitHubApp) GetDefaultBranch(ctx context.Context, repoURL string, accessToken string) (string, error) {
623+
parsedURL, err := gitutil.ParseGitHubRepoURL(repoURL)
624+
if err != nil {
625+
return "", status.InvalidArgumentErrorf("invalid repo URL %q: %s", repoURL, err)
626+
}
627+
client, err := a.newAuthenticatedClient(ctx, accessToken)
628+
if err != nil {
629+
return "", status.WrapError(err, "create GitHub client")
630+
}
631+
repo, _, err := client.Repositories.Get(ctx, parsedURL.Owner, parsedURL.Repo)
632+
if err != nil {
633+
return "", err
634+
}
635+
return repo.GetDefaultBranch(), nil
636+
}
637+
622638
func (a *GitHubApp) GetRepositoryInstallationToken(ctx context.Context, groupID, repoURL string) (string, error) {
623639
if err := authutil.AuthorizeGroupAccess(ctx, a.env, groupID); err != nil {
624640
return "", err

enterprise/server/webhooks/webhook_data/webhook_data.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,18 @@ import (
99
var (
1010
// EventName holds canonical webhook event name constants.
1111
EventName struct {
12-
Push string
13-
PullRequest string
14-
ManualDispatch string
12+
Push string
13+
PullRequest string
14+
ManualDispatch string
15+
ScheduledDispatch string
1516
}
1617
)
1718

1819
func init() {
1920
EventName.Push = "push"
2021
EventName.PullRequest = "pull_request"
2122
EventName.ManualDispatch = "manual_dispatch"
23+
EventName.ScheduledDispatch = "scheduled"
2224
}
2325

2426
func DebugString(wd *interfaces.WebhookData) string {

enterprise/server/workflow/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ func GetDefault(targetRepoDefaultBranch string) *BuildBuddyConfig {
435435
// MatchesAnyTrigger returns whether the action is triggered by the event
436436
// published to the given branch or tag.
437437
func MatchesAnyTrigger(action *Action, event, branch, tag string) bool {
438-
// If user has manually requested action dispatch, always run it
439-
if event == webhook_data.EventName.ManualDispatch {
438+
// If user has manually or scheduled action dispatch, always run it
439+
if event == webhook_data.EventName.ManualDispatch || event == webhook_data.EventName.ScheduledDispatch {
440440
return true
441441
}
442442

enterprise/server/workflow/service/service.go

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1854,11 +1854,15 @@ func (ws *workflowService) RunScheduledWorkflows(ctx context.Context) error {
18541854
log.CtxWarningf(ctx, "Failed to claim scheduled workflow: %s", err)
18551855
continue
18561856
}
1857+
// If there are no eligible scheduled workflows, stop polling until the next time the scheduler runs.
18571858
if scheduled == nil {
18581859
break
18591860
}
18601861
if err := ws.dispatchScheduledWorkflow(ctx, scheduled); err != nil {
18611862
log.CtxErrorf(ctx, "Failed to dispatch scheduled workflow %s: %s", scheduled.ScheduleID, err)
1863+
if err := ws.unclaimScheduledWorkflow(ctx, scheduled.ScheduleID); err != nil {
1864+
log.CtxWarningf(ctx, "Failed to unclaim scheduled workflow %s: %s", scheduled.ScheduleID, err)
1865+
}
18621866
continue
18631867
}
18641868
}
@@ -1877,7 +1881,7 @@ func (ws *workflowService) claimScheduledWorkflow(ctx context.Context) (*tables.
18771881
WHERE next_run_usec <= ?
18781882
AND (lease_expires_usec = 0 OR lease_expires_usec <= ?)
18791883
ORDER BY next_run_usec ASC
1880-
LIMIT 1`+dbh.SelectForUpdateModifier(), nowUsec, nowUsec).Take(scheduled)
1884+
LIMIT 1 `+dbh.SelectForUpdateModifier(), nowUsec, nowUsec).Take(scheduled)
18811885
if err != nil {
18821886
if db.IsRecordNotFound(err) {
18831887
scheduled = nil
@@ -1925,30 +1929,68 @@ func (ws *workflowService) unclaimScheduledWorkflow(ctx context.Context, schedul
19251929
return nil
19261930
}
19271931

1932+
func (ws *workflowService) getWorkflowForScheduledDispatch(ctx context.Context, groupID, repoURL string) (*tables.Workflow, error) {
1933+
gitRepository := &tables.GitRepository{}
1934+
err := ws.env.GetDBHandle().NewQuery(ctx, "workflow_service_get_for_scheduled_dispatch").Raw(`
1935+
SELECT * FROM "GitRepositories"
1936+
WHERE group_id = ?
1937+
AND repo_url = ?
1938+
`, groupID, repoURL).Take(gitRepository)
1939+
if err != nil {
1940+
return nil, status.WrapErrorf(err, "fetch repo %q", repoURL)
1941+
}
1942+
parsedURL, err := gitutil.ParseGitHubRepoURL(repoURL)
1943+
if err != nil {
1944+
return nil, status.WrapErrorf(err, "invalid repo URL %q", repoURL)
1945+
}
1946+
app, err := ws.env.GetGitHubAppService().GetGitHubAppForOwner(ctx, parsedURL.Owner)
1947+
if err != nil {
1948+
return nil, status.WrapErrorf(err, "get GitHub app for owner %q", parsedURL.Owner)
1949+
}
1950+
// The cron scheduler does not use an authenticated context, so we use this method.
1951+
accessToken, err := app.GetInstallationTokenForStatusReportingOnly(ctx, parsedURL.Owner)
1952+
if err != nil {
1953+
return nil, status.WrapErrorf(err, "get installation token for owner %q", parsedURL.Owner)
1954+
}
1955+
return ws.gitRepositoryWorkflow(gitRepository, accessToken.GetToken()).Workflow, nil
1956+
}
1957+
19281958
func (ws *workflowService) dispatchScheduledWorkflow(ctx context.Context, scheduled *tables.ScheduledRun) error {
1929-
wfID := ws.GetLegacyWorkflowIDForGitRepository(scheduled.GroupID, scheduled.RepoURL)
1930-
rsp, err := ws.ExecuteWorkflow(ctx, &wfpb.ExecuteWorkflowRequest{
1931-
WorkflowId: wfID,
1932-
PushedRepoUrl: scheduled.RepoURL,
1933-
PushedBranch: scheduled.Branch,
1934-
Async: true,
1935-
ActionNames: []string{scheduled.ActionName},
1936-
})
1959+
wf, err := ws.getWorkflowForScheduledDispatch(ctx, scheduled.GroupID, scheduled.RepoURL)
19371960
if err != nil {
1938-
if err := ws.unclaimScheduledWorkflow(ctx, scheduled.ScheduleID); err != nil {
1939-
log.CtxWarningf(ctx, "Failed to unclaim scheduled workflow %s: %s", scheduled.ScheduleID, err)
1940-
}
19411961
return err
19421962
}
1943-
for _, actionStatus := range rsp.GetActionStatuses() {
1944-
if actionErr := gstatus.FromProto(actionStatus.GetStatus()).Err(); actionErr != nil {
1945-
if err := ws.unclaimScheduledWorkflow(ctx, scheduled.ScheduleID); err != nil {
1946-
log.CtxWarningf(ctx, "Failed to unclaim scheduled workflow %s: %s", scheduled.ScheduleID, err)
1947-
}
1948-
return status.WrapErrorf(actionErr, "failed to start scheduled workflow action %q", scheduled.ActionName)
1949-
}
1963+
defaultBranch, err := ws.getRepoDefaultBranch(ctx, scheduled.RepoURL, wf.AccessToken)
1964+
if err != nil {
1965+
return err
1966+
}
1967+
wd := &interfaces.WebhookData{
1968+
EventName: webhook_data.EventName.ScheduledDispatch,
1969+
PushedRepoURL: scheduled.RepoURL,
1970+
PushedBranch: defaultBranch,
1971+
TargetRepoURL: scheduled.RepoURL,
1972+
TargetBranch: defaultBranch,
1973+
}
1974+
apiKey, err := ws.apiKeyForWorkflow(ctx, wf)
1975+
if err != nil {
1976+
return err
1977+
}
1978+
actions, err := ws.getActions(ctx, wf, wd, []string{scheduled.ActionName})
1979+
if err != nil {
1980+
return err
19501981
}
1951-
nextRunUsec, err := ws.calculateNextRunTimeUsec(scheduled.CronExpr, scheduled.NextRunUsec)
1982+
if len(actions) != 1 {
1983+
return status.InvalidArgumentErrorf("multiple actions named %q found", scheduled.ActionName)
1984+
}
1985+
action := actions[0]
1986+
invocationUUID, err := guuid.NewRandom()
1987+
if err != nil {
1988+
return err
1989+
}
1990+
if _, err := ws.executeWorkflowAction(ctx, apiKey, wf, wd, true /*isTrusted*/, action, invocationUUID.String(), nil /*extraCIRunnerArgs*/, nil /*env*/, true /*shouldRetry*/); err != nil {
1991+
return status.WrapErrorf(err, "failed to start scheduled workflow action %q", scheduled.ActionName)
1992+
}
1993+
nextRunUsec, err := ws.calculateNextRunTimeUsec(scheduled.CronExpr)
19521994
if err != nil {
19531995
alert.CtxUnexpectedEvent(ctx, "Failed to calculate next run time for scheduled workflow %s: %s", scheduled.ScheduleID, err)
19541996
return err
@@ -1960,12 +2002,27 @@ func (ws *workflowService) dispatchScheduledWorkflow(ctx context.Context, schedu
19602002
return nil
19612003
}
19622004

1963-
func (ws *workflowService) calculateNextRunTimeUsec(cronExpr string, lastRunTimeUsec int64) (int64, error) {
2005+
func (ws *workflowService) getRepoDefaultBranch(ctx context.Context, repoURL string, accessToken string) (string, error) {
2006+
parsedURL, err := gitutil.ParseGitHubRepoURL(repoURL)
2007+
if err != nil {
2008+
return "", status.InvalidArgumentErrorf("invalid repo URL %q: %s", repoURL, err)
2009+
}
2010+
app, err := ws.env.GetGitHubAppService().GetGitHubAppForOwner(ctx, parsedURL.Owner)
2011+
if err != nil {
2012+
return "", status.WrapErrorf(err, "get GitHub app for owner %q", parsedURL.Owner)
2013+
}
2014+
return app.GetDefaultBranch(ctx, repoURL, accessToken)
2015+
}
2016+
2017+
// calculateNextRunTimeUsec uses the given cron expression to return the next
2018+
// scheduled time, using the current time as the minimum.
2019+
func (ws *workflowService) calculateNextRunTimeUsec(cronExpr string) (int64, error) {
19642020
sched, err := cronParser.Parse(cronExpr)
19652021
if err != nil {
19662022
return 0, err
19672023
}
1968-
return sched.Next(time.UnixMicro(lastRunTimeUsec)).UnixMicro(), nil
2024+
now := ws.env.GetClock().Now()
2025+
return sched.Next(now).UnixMicro(), nil
19692026
}
19702027

19712028
func (ws *workflowService) advanceWorkflowSchedule(ctx context.Context, scheduleID string, nextRunUsec int64) error {

enterprise/server/workflow/service/service_test.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,12 +1090,12 @@ func TestAPIDispatch_ActionFiltering(t *testing.T) {
10901090
func TestScheduledWorkflow(t *testing.T) {
10911091
ctx := context.Background()
10921092
te := newTestEnv(t)
1093-
ctx, _, gid := authenticate(t, ctx, te)
10941093
execClient := te.GetRemoteExecutionClient().(*fakeExecutionClient)
10951094
te.SetRemoteExecutionClient(execClient)
10961095
provider := setupFakeGitProvider(t, te)
10971096
repoURL := makeTempRepo(t)
10981097
_ = runBBServer(ctx, t, te)
1098+
authCtx, _, gid := authenticate(t, ctx, te)
10991099
createWorkflow(t, te, repoURL, gid, false)
11001100
provider.FileContents = map[string]string{
11011101
config.FilePath: `
@@ -1119,7 +1119,6 @@ actions:
11191119
GroupID: gid,
11201120
RepoURL: repoURL,
11211121
ActionName: "Should Run",
1122-
Branch: "main",
11231122
// Run every hour, on the hour.
11241123
CronExpr: "0 * * * *",
11251124
NextRunUsec: now.UnixMicro(),
@@ -1130,7 +1129,6 @@ actions:
11301129
GroupID: gid,
11311130
RepoURL: repoURL,
11321131
ActionName: "Should Run 2",
1133-
Branch: "main",
11341132
// Run every hour, 5 minutes before the hour.
11351133
CronExpr: "55 * * * *",
11361134
NextRunUsec: fiveMinutesAgo.UnixMicro(),
@@ -1141,7 +1139,6 @@ actions:
11411139
GroupID: gid,
11421140
RepoURL: repoURL,
11431141
ActionName: "Not Time Yet",
1144-
Branch: "main",
11451142
// Run every day at 7PM UTC.
11461143
CronExpr: "0 19 * * *",
11471144
NextRunUsec: sevenPM.UnixMicro(),
@@ -1152,7 +1149,6 @@ actions:
11521149
GroupID: gid,
11531150
RepoURL: repoURL,
11541151
ActionName: "Already Leased",
1155-
Branch: "main",
11561152
// Run every hour, on the hour.
11571153
CronExpr: "0 * * * *",
11581154
NextRunUsec: now.UnixMicro(),
@@ -1164,22 +1160,23 @@ actions:
11641160
GroupID: gid,
11651161
RepoURL: repoURL,
11661162
ActionName: "Expired Lease",
1167-
Branch: "main",
11681163
// Run every hour, on the hour.
11691164
CronExpr: "0 * * * *",
11701165
NextRunUsec: now.UnixMicro(),
11711166
LeaseExpiresUsec: now.Add(-1 * time.Minute).UnixMicro(),
11721167
})
11731168

1174-
err := te.GetWorkflowService().RunScheduledWorkflows(ctx)
1169+
// Intentionally use a non-authenticated context, like the cron scheduler would.
1170+
err := te.GetWorkflowService().RunScheduledWorkflows(t.Context())
11751171
require.NoError(t, err)
11761172

11771173
expectedActionNames := []string{"Should Run", "Should Run 2", "Expired Lease"}
11781174
executedActionNames := make([]string, 0, len(expectedActionNames))
11791175
for range expectedActionNames {
11801176
select {
11811177
case execReq := <-execClient.executeRequests:
1182-
actionName := getExecutedActionName(t, ctx, te, execReq.Payload)
1178+
// When fetching the executions, make sure to use the authenticated context.
1179+
actionName := getExecutedActionName(t, authCtx, te, execReq.Payload)
11831180
executedActionNames = append(executedActionNames, actionName)
11841181
case <-time.After(5 * time.Second):
11851182
t.Fatal("timed out waiting for scheduled workflow execution")
@@ -1211,10 +1208,44 @@ actions:
12111208
require.Equal(t, int64(0), notTimeYet.LeaseExpiresUsec)
12121209
}
12131210

1211+
func TestScheduledWorkflow_NoEligibleSchedules(t *testing.T) {
1212+
ctx := context.Background()
1213+
te := newTestEnv(t)
1214+
_, _, gid := authenticate(t, ctx, te)
1215+
repoURL := makeTempRepo(t)
1216+
_ = runBBServer(ctx, t, te)
1217+
createWorkflow(t, te, repoURL, gid, false)
1218+
1219+
now := time.Date(2026, 1, 2, 15, 0, 0, 0, time.UTC)
1220+
te.SetClock(clockwork.NewFakeClockAt(now))
1221+
1222+
// All next run times are in the future.
1223+
insertScheduledRun(t, te, &tables.ScheduledRun{
1224+
ScheduleID: "future-1",
1225+
GroupID: gid,
1226+
RepoURL: repoURL,
1227+
ActionName: "Some Action",
1228+
CronExpr: "0 19 * * *",
1229+
NextRunUsec: now.Add(1 * time.Hour).UnixMicro(),
1230+
})
1231+
1232+
// Intentionally use a non-authenticated context, like the cron scheduler would.
1233+
err := te.GetWorkflowService().RunScheduledWorkflows(t.Context())
1234+
require.NoError(t, err)
1235+
1236+
// No executions should have been started.
1237+
require.Zero(t, len(te.GetRemoteExecutionClient().(*fakeExecutionClient).executeRequests))
1238+
1239+
// Schedule rows should be untouched.
1240+
future1 := getScheduledRun(t, te, "future-1")
1241+
require.Equal(t, now.Add(1*time.Hour).UnixMicro(), future1.NextRunUsec)
1242+
require.Equal(t, int64(0), future1.LeaseExpiresUsec)
1243+
}
1244+
12141245
func TestScheduledWorkflow_ConcurrentServers(t *testing.T) {
12151246
ctx := context.Background()
12161247
te := newTestEnv(t)
1217-
ctx, _, gid := authenticate(t, ctx, te)
1248+
authCtx, _, gid := authenticate(t, ctx, te)
12181249
execClient := te.GetRemoteExecutionClient().(*fakeExecutionClient)
12191250
te.SetRemoteExecutionClient(execClient)
12201251
provider := setupFakeGitProvider(t, te)
@@ -1239,7 +1270,6 @@ actions:
12391270
GroupID: gid,
12401271
RepoURL: repoURL,
12411272
ActionName: "Test",
1242-
Branch: "main",
12431273
// Run every hour, on the hour.
12441274
CronExpr: "0 * * * *",
12451275
NextRunUsec: now.UnixMicro(),
@@ -1250,7 +1280,6 @@ actions:
12501280
GroupID: gid,
12511281
RepoURL: repoURL,
12521282
ActionName: "Test 2",
1253-
Branch: "main",
12541283
// Run every hour, on the hour.
12551284
CronExpr: "0 * * * *",
12561285
NextRunUsec: now.UnixMicro(),
@@ -1263,7 +1292,8 @@ actions:
12631292
wg.Add(1)
12641293
go func() {
12651294
defer wg.Done()
1266-
errCh <- te.GetWorkflowService().RunScheduledWorkflows(ctx)
1295+
// Intentionally use a non-authenticated context, like the cron scheduler would.
1296+
errCh <- te.GetWorkflowService().RunScheduledWorkflows(t.Context())
12671297
}()
12681298
}
12691299
wg.Wait()
@@ -1277,7 +1307,8 @@ actions:
12771307
for range 2 {
12781308
select {
12791309
case execReq := <-execClient.executeRequests:
1280-
actionName := getExecutedActionName(t, ctx, te, execReq.Payload)
1310+
// When fetching the executions, make sure to use the authenticated context.
1311+
actionName := getExecutedActionName(t, authCtx, te, execReq.Payload)
12811312
executedActionNames = append(executedActionNames, actionName)
12821313
case <-time.After(5 * time.Second):
12831314
t.Fatal("timed out waiting for scheduled workflow execution")

server/interfaces/interfaces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,9 @@ type GitHubApp interface {
724724
// so should be used for status reporting only.
725725
GetInstallationTokenForStatusReportingOnly(ctx context.Context, owner string) (*github.InstallationToken, error)
726726

727+
// GetDefaultBranch returns the default branch for the given repo URL.
728+
GetDefaultBranch(ctx context.Context, repoURL string, accessToken string) (string, error)
729+
727730
// GetRepositoryInstallationToken returns an installation token for the given repo.
728731
// The repo must've been imported to BuildBuddy (i.e. a GitRepository row was created).
729732
GetRepositoryInstallationToken(ctx context.Context, groupID, repoURL string) (string, error)

server/tables/tables.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1469,5 +1469,4 @@ func RegisterTables() {
14691469
registerTable("UU", &UserUserList{})
14701470
registerTable("UM", &UserListGroup{})
14711471
registerTable("WF", &Workflow{})
1472-
registerTable("SR", &ScheduledRun{})
14731472
}

server/testutil/testgit/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ go_library(
1212
"//server/testutil/testfs",
1313
"//server/testutil/testshell",
1414
"//server/util/status",
15+
"@com_github_google_go_github_v59//github",
1516
],
1617
)

0 commit comments

Comments
 (0)