Skip to content

Commit dfab8a6

Browse files
committed
Handle unauth scheduler ctx
1 parent 5da73c3 commit dfab8a6

10 files changed

Lines changed: 125 additions & 42 deletions

File tree

deps/go_deps.MODULE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ use_repo(
255255
"com_github_prometheus_client_model",
256256
"com_github_prometheus_common",
257257
"com_github_rantav_go_grpc_channelz",
258-
"com_github_robfig_cron_v3",
258+
"com_github_robfig_cron_v3",
259259
"com_github_roaringbitmap_roaring",
260260
"com_github_rs_zerolog",
261261
"com_github_shirou_gopsutil_v3",

enterprise/server/githubapp/githubapp.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,22 @@ func (a *GitHubApp) GetInstallationTokenForStatusReportingOnly(ctx context.Conte
619619
return tok, nil
620620
}
621621

622+
func (a *GitHubApp) GetDefaultBranch(ctx context.Context, repoURL string, accessToken string) (string, error) {
623+
parsedURL, err := gitutil.ParseGitHubRepoURL(repoURL)
624+
if err != nil {
625+
return "", status.InvalidArgumentErrorf("invalid repo URL %q: %s", repoURL, err)
626+
}
627+
client, err := a.newAuthenticatedClient(ctx, accessToken)
628+
if err != nil {
629+
return "", status.WrapError(err, "create GitHub client")
630+
}
631+
repo, _, err := client.Repositories.Get(ctx, parsedURL.Owner, parsedURL.Repo)
632+
if err != nil {
633+
return "", err
634+
}
635+
return repo.GetDefaultBranch(), nil
636+
}
637+
622638
func (a *GitHubApp) GetRepositoryInstallationToken(ctx context.Context, groupID, repoURL string) (string, error) {
623639
if err := authutil.AuthorizeGroupAccess(ctx, a.env, groupID); err != nil {
624640
return "", err

enterprise/server/webhooks/webhook_data/webhook_data.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,18 @@ import (
99
var (
1010
// EventName holds canonical webhook event name constants.
1111
EventName struct {
12-
Push string
13-
PullRequest string
14-
ManualDispatch string
12+
Push string
13+
PullRequest string
14+
ManualDispatch string
15+
ScheduledDispatch string
1516
}
1617
)
1718

1819
func init() {
1920
EventName.Push = "push"
2021
EventName.PullRequest = "pull_request"
2122
EventName.ManualDispatch = "manual_dispatch"
23+
EventName.ScheduledDispatch = "scheduled"
2224
}
2325

2426
func DebugString(wd *interfaces.WebhookData) string {

enterprise/server/workflow/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ func GetDefault(targetRepoDefaultBranch string) *BuildBuddyConfig {
435435
// MatchesAnyTrigger returns whether the action is triggered by the event
436436
// published to the given branch or tag.
437437
func MatchesAnyTrigger(action *Action, event, branch, tag string) bool {
438-
// If user has manually requested action dispatch, always run it
439-
if event == webhook_data.EventName.ManualDispatch {
438+
// If user has manually or scheduled action dispatch, always run it
439+
if event == webhook_data.EventName.ManualDispatch || event == webhook_data.EventName.ScheduledDispatch {
440440
return true
441441
}
442442

enterprise/server/workflow/service/service.go

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,9 @@ func (ws *workflowService) RunScheduledWorkflows(ctx context.Context) error {
18591859
}
18601860
if err := ws.dispatchScheduledWorkflow(ctx, scheduled); err != nil {
18611861
log.CtxErrorf(ctx, "Failed to dispatch scheduled workflow %s: %s", scheduled.ScheduleID, err)
1862+
if err := ws.unclaimScheduledWorkflow(ctx, scheduled.ScheduleID); err != nil {
1863+
log.CtxWarningf(ctx, "Failed to unclaim scheduled workflow %s: %s", scheduled.ScheduleID, err)
1864+
}
18621865
continue
18631866
}
18641867
}
@@ -1877,7 +1880,7 @@ func (ws *workflowService) claimScheduledWorkflow(ctx context.Context) (*tables.
18771880
WHERE next_run_usec <= ?
18781881
AND (lease_expires_usec = 0 OR lease_expires_usec <= ?)
18791882
ORDER BY next_run_usec ASC
1880-
LIMIT 1`+dbh.SelectForUpdateModifier(), nowUsec, nowUsec).Take(scheduled)
1883+
LIMIT 1 `+dbh.SelectForUpdateModifier(), nowUsec, nowUsec).Take(scheduled)
18811884
if err != nil {
18821885
if db.IsRecordNotFound(err) {
18831886
scheduled = nil
@@ -1925,30 +1928,68 @@ func (ws *workflowService) unclaimScheduledWorkflow(ctx context.Context, schedul
19251928
return nil
19261929
}
19271930

1931+
func (ws *workflowService) getWorkflowForScheduledDispatch(ctx context.Context, groupID, repoURL string) (*tables.Workflow, error) {
1932+
gitRepository := &tables.GitRepository{}
1933+
err := ws.env.GetDBHandle().NewQuery(ctx, "workflow_service_get_for_scheduled_dispatch").Raw(`
1934+
SELECT * FROM "GitRepositories"
1935+
WHERE group_id = ?
1936+
AND repo_url = ?
1937+
`, groupID, repoURL).Take(gitRepository)
1938+
if err != nil {
1939+
return nil, status.WrapErrorf(err, "fetch repo %q", repoURL)
1940+
}
1941+
parsedURL, err := gitutil.ParseGitHubRepoURL(repoURL)
1942+
if err != nil {
1943+
return nil, status.WrapErrorf(err, "invalid repo URL %q", repoURL)
1944+
}
1945+
app, err := ws.env.GetGitHubAppService().GetGitHubAppForOwner(ctx, parsedURL.Owner)
1946+
if err != nil {
1947+
return nil, status.WrapErrorf(err, "get GitHub app for owner %q", parsedURL.Owner)
1948+
}
1949+
// The cron scheduler does not use an authenticated context, so we use this method.
1950+
accessToken, err := app.GetInstallationTokenForStatusReportingOnly(ctx, parsedURL.Owner)
1951+
if err != nil {
1952+
return nil, status.WrapErrorf(err, "get installation token for owner %q", parsedURL.Owner)
1953+
}
1954+
return ws.gitRepositoryWorkflow(gitRepository, accessToken.GetToken()).Workflow, nil
1955+
}
1956+
19281957
func (ws *workflowService) dispatchScheduledWorkflow(ctx context.Context, scheduled *tables.ScheduledRun) error {
1929-
wfID := ws.GetLegacyWorkflowIDForGitRepository(scheduled.GroupID, scheduled.RepoURL)
1930-
rsp, err := ws.ExecuteWorkflow(ctx, &wfpb.ExecuteWorkflowRequest{
1931-
WorkflowId: wfID,
1932-
PushedRepoUrl: scheduled.RepoURL,
1933-
PushedBranch: scheduled.Branch,
1934-
Async: true,
1935-
ActionNames: []string{scheduled.ActionName},
1936-
})
1958+
wf, err := ws.getWorkflowForScheduledDispatch(ctx, scheduled.GroupID, scheduled.RepoURL)
19371959
if err != nil {
1938-
if err := ws.unclaimScheduledWorkflow(ctx, scheduled.ScheduleID); err != nil {
1939-
log.CtxWarningf(ctx, "Failed to unclaim scheduled workflow %s: %s", scheduled.ScheduleID, err)
1940-
}
19411960
return err
19421961
}
1943-
for _, actionStatus := range rsp.GetActionStatuses() {
1944-
if actionErr := gstatus.FromProto(actionStatus.GetStatus()).Err(); actionErr != nil {
1945-
if err := ws.unclaimScheduledWorkflow(ctx, scheduled.ScheduleID); err != nil {
1946-
log.CtxWarningf(ctx, "Failed to unclaim scheduled workflow %s: %s", scheduled.ScheduleID, err)
1947-
}
1948-
return status.WrapErrorf(actionErr, "failed to start scheduled workflow action %q", scheduled.ActionName)
1949-
}
1962+
defaultBranch, err := ws.getRepoDefaultBranch(ctx, scheduled.RepoURL, wf.AccessToken)
1963+
if err != nil {
1964+
return err
1965+
}
1966+
wd := &interfaces.WebhookData{
1967+
EventName: webhook_data.EventName.ScheduledDispatch,
1968+
PushedRepoURL: scheduled.RepoURL,
1969+
PushedBranch: defaultBranch,
1970+
TargetRepoURL: scheduled.RepoURL,
1971+
TargetBranch: defaultBranch,
1972+
}
1973+
apiKey, err := ws.apiKeyForWorkflow(ctx, wf)
1974+
if err != nil {
1975+
return err
1976+
}
1977+
actions, err := ws.getActions(ctx, wf, wd, []string{scheduled.ActionName})
1978+
if err != nil {
1979+
return err
19501980
}
1951-
nextRunUsec, err := ws.calculateNextRunTimeUsec(scheduled.CronExpr, scheduled.NextRunUsec)
1981+
if len(actions) != 1 {
1982+
return status.InvalidArgumentErrorf("multiple actions named %q found", scheduled.ActionName)
1983+
}
1984+
action := actions[0]
1985+
invocationUUID, err := guuid.NewRandom()
1986+
if err != nil {
1987+
return err
1988+
}
1989+
if _, err := ws.executeWorkflowAction(ctx, apiKey, wf, wd, true /*isTrusted*/, action, invocationUUID.String(), nil /*extraCIRunnerArgs*/, nil /*env*/, true /*shouldRetry*/); err != nil {
1990+
return status.WrapErrorf(err, "failed to start scheduled workflow action %q", scheduled.ActionName)
1991+
}
1992+
nextRunUsec, err := ws.calculateNextRunTimeUsec(scheduled.CronExpr)
19521993
if err != nil {
19531994
alert.CtxUnexpectedEvent(ctx, "Failed to calculate next run time for scheduled workflow %s: %s", scheduled.ScheduleID, err)
19541995
return err
@@ -1960,12 +2001,27 @@ func (ws *workflowService) dispatchScheduledWorkflow(ctx context.Context, schedu
19602001
return nil
19612002
}
19622003

1963-
func (ws *workflowService) calculateNextRunTimeUsec(cronExpr string, lastRunTimeUsec int64) (int64, error) {
2004+
func (ws *workflowService) getRepoDefaultBranch(ctx context.Context, repoURL string, accessToken string) (string, error) {
2005+
parsedURL, err := gitutil.ParseGitHubRepoURL(repoURL)
2006+
if err != nil {
2007+
return "", status.InvalidArgumentErrorf("invalid repo URL %q: %s", repoURL, err)
2008+
}
2009+
app, err := ws.env.GetGitHubAppService().GetGitHubAppForOwner(ctx, parsedURL.Owner)
2010+
if err != nil {
2011+
return "", status.WrapErrorf(err, "get GitHub app for owner %q", parsedURL.Owner)
2012+
}
2013+
return app.GetDefaultBranch(ctx, repoURL, accessToken)
2014+
}
2015+
2016+
// calculateNextRunTimeUsec uses the given cron expression to return the next
2017+
// scheduled time, using the current time as the minimum.
2018+
func (ws *workflowService) calculateNextRunTimeUsec(cronExpr string) (int64, error) {
19642019
sched, err := cronParser.Parse(cronExpr)
19652020
if err != nil {
19662021
return 0, err
19672022
}
1968-
return sched.Next(time.UnixMicro(lastRunTimeUsec)).UnixMicro(), nil
2023+
now := ws.env.GetClock().Now()
2024+
return sched.Next(now).UnixMicro(), nil
19692025
}
19702026

19712027
func (ws *workflowService) advanceWorkflowSchedule(ctx context.Context, scheduleID string, nextRunUsec int64) error {

enterprise/server/workflow/service/service_test.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,12 +1090,12 @@ func TestAPIDispatch_ActionFiltering(t *testing.T) {
10901090
func TestScheduledWorkflow(t *testing.T) {
10911091
ctx := context.Background()
10921092
te := newTestEnv(t)
1093-
ctx, _, gid := authenticate(t, ctx, te)
10941093
execClient := te.GetRemoteExecutionClient().(*fakeExecutionClient)
10951094
te.SetRemoteExecutionClient(execClient)
10961095
provider := setupFakeGitProvider(t, te)
10971096
repoURL := makeTempRepo(t)
10981097
_ = runBBServer(ctx, t, te)
1098+
authCtx, _, gid := authenticate(t, ctx, te)
10991099
createWorkflow(t, te, repoURL, gid, false)
11001100
provider.FileContents = map[string]string{
11011101
config.FilePath: `
@@ -1119,7 +1119,6 @@ actions:
11191119
GroupID: gid,
11201120
RepoURL: repoURL,
11211121
ActionName: "Should Run",
1122-
Branch: "main",
11231122
// Run every hour, on the hour.
11241123
CronExpr: "0 * * * *",
11251124
NextRunUsec: now.UnixMicro(),
@@ -1130,7 +1129,6 @@ actions:
11301129
GroupID: gid,
11311130
RepoURL: repoURL,
11321131
ActionName: "Should Run 2",
1133-
Branch: "main",
11341132
// Run every hour, 5 minutes before the hour.
11351133
CronExpr: "55 * * * *",
11361134
NextRunUsec: fiveMinutesAgo.UnixMicro(),
@@ -1141,7 +1139,6 @@ actions:
11411139
GroupID: gid,
11421140
RepoURL: repoURL,
11431141
ActionName: "Not Time Yet",
1144-
Branch: "main",
11451142
// Run every day at 7PM UTC.
11461143
CronExpr: "0 19 * * *",
11471144
NextRunUsec: sevenPM.UnixMicro(),
@@ -1152,7 +1149,6 @@ actions:
11521149
GroupID: gid,
11531150
RepoURL: repoURL,
11541151
ActionName: "Already Leased",
1155-
Branch: "main",
11561152
// Run every hour, on the hour.
11571153
CronExpr: "0 * * * *",
11581154
NextRunUsec: now.UnixMicro(),
@@ -1164,22 +1160,23 @@ actions:
11641160
GroupID: gid,
11651161
RepoURL: repoURL,
11661162
ActionName: "Expired Lease",
1167-
Branch: "main",
11681163
// Run every hour, on the hour.
11691164
CronExpr: "0 * * * *",
11701165
NextRunUsec: now.UnixMicro(),
11711166
LeaseExpiresUsec: now.Add(-1 * time.Minute).UnixMicro(),
11721167
})
11731168

1174-
err := te.GetWorkflowService().RunScheduledWorkflows(ctx)
1169+
// Intentionally use a non-authenticated context, like the cron scheduler would.
1170+
err := te.GetWorkflowService().RunScheduledWorkflows(t.Context())
11751171
require.NoError(t, err)
11761172

11771173
expectedActionNames := []string{"Should Run", "Should Run 2", "Expired Lease"}
11781174
executedActionNames := make([]string, 0, len(expectedActionNames))
11791175
for range expectedActionNames {
11801176
select {
11811177
case execReq := <-execClient.executeRequests:
1182-
actionName := getExecutedActionName(t, ctx, te, execReq.Payload)
1178+
// When fetching the executions, make sure to use the authenticated context.
1179+
actionName := getExecutedActionName(t, authCtx, te, execReq.Payload)
11831180
executedActionNames = append(executedActionNames, actionName)
11841181
case <-time.After(5 * time.Second):
11851182
t.Fatal("timed out waiting for scheduled workflow execution")
@@ -1214,7 +1211,7 @@ actions:
12141211
func TestScheduledWorkflow_ConcurrentServers(t *testing.T) {
12151212
ctx := context.Background()
12161213
te := newTestEnv(t)
1217-
ctx, _, gid := authenticate(t, ctx, te)
1214+
authCtx, _, gid := authenticate(t, ctx, te)
12181215
execClient := te.GetRemoteExecutionClient().(*fakeExecutionClient)
12191216
te.SetRemoteExecutionClient(execClient)
12201217
provider := setupFakeGitProvider(t, te)
@@ -1239,7 +1236,6 @@ actions:
12391236
GroupID: gid,
12401237
RepoURL: repoURL,
12411238
ActionName: "Test",
1242-
Branch: "main",
12431239
// Run every hour, on the hour.
12441240
CronExpr: "0 * * * *",
12451241
NextRunUsec: now.UnixMicro(),
@@ -1250,7 +1246,6 @@ actions:
12501246
GroupID: gid,
12511247
RepoURL: repoURL,
12521248
ActionName: "Test 2",
1253-
Branch: "main",
12541249
// Run every hour, on the hour.
12551250
CronExpr: "0 * * * *",
12561251
NextRunUsec: now.UnixMicro(),
@@ -1263,7 +1258,8 @@ actions:
12631258
wg.Add(1)
12641259
go func() {
12651260
defer wg.Done()
1266-
errCh <- te.GetWorkflowService().RunScheduledWorkflows(ctx)
1261+
// Intentionally use a non-authenticated context, like the cron scheduler would.
1262+
errCh <- te.GetWorkflowService().RunScheduledWorkflows(t.Context())
12671263
}()
12681264
}
12691265
wg.Wait()
@@ -1277,7 +1273,8 @@ actions:
12771273
for range 2 {
12781274
select {
12791275
case execReq := <-execClient.executeRequests:
1280-
actionName := getExecutedActionName(t, ctx, te, execReq.Payload)
1276+
// When fetching the executions, make sure to use the authenticated context.
1277+
actionName := getExecutedActionName(t, authCtx, te, execReq.Payload)
12811278
executedActionNames = append(executedActionNames, actionName)
12821279
case <-time.After(5 * time.Second):
12831280
t.Fatal("timed out waiting for scheduled workflow execution")

server/interfaces/interfaces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,9 @@ type GitHubApp interface {
724724
// so should be used for status reporting only.
725725
GetInstallationTokenForStatusReportingOnly(ctx context.Context, owner string) (*github.InstallationToken, error)
726726

727+
// GetDefaultBranch returns the default branch for the given repo URL.
728+
GetDefaultBranch(ctx context.Context, repoURL string, accessToken string) (string, error)
729+
727730
// GetRepositoryInstallationToken returns an installation token for the given repo.
728731
// The repo must've been imported to BuildBuddy (i.e. a GitRepository row was created).
729732
GetRepositoryInstallationToken(ctx context.Context, groupID, repoURL string) (string, error)

server/tables/tables.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1469,5 +1469,4 @@ func RegisterTables() {
14691469
registerTable("UU", &UserUserList{})
14701470
registerTable("UM", &UserListGroup{})
14711471
registerTable("WF", &Workflow{})
1472-
registerTable("SR", &ScheduledRun{})
14731472
}

server/testutil/testgit/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ go_library(
1212
"//server/testutil/testfs",
1313
"//server/testutil/testshell",
1414
"//server/util/status",
15+
"@com_github_google_go_github_v59//github",
1516
],
1617
)

server/testutil/testgit/testgit.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/buildbuddy-io/buildbuddy/server/testutil/testfs"
1515
"github.com/buildbuddy-io/buildbuddy/server/testutil/testshell"
1616
"github.com/buildbuddy-io/buildbuddy/server/util/status"
17+
"github.com/google/go-github/v59/github"
1718
)
1819

1920
const (
@@ -185,6 +186,14 @@ func (a *FakeGitHubApp) GetRepositoryInstallationToken(ctx context.Context, grou
185186
return a.Token, nil
186187
}
187188

189+
func (a *FakeGitHubApp) GetInstallationTokenForStatusReportingOnly(ctx context.Context, owner string) (*github.InstallationToken, error) {
190+
return &github.InstallationToken{Token: &a.Token}, nil
191+
}
192+
193+
func (a *FakeGitHubApp) GetDefaultBranch(ctx context.Context, repoURL string, token string) (string, error) {
194+
return "main", nil
195+
}
196+
188197
func (a *FakeGitHubApp) AppID() int64 {
189198
return a.MockAppID
190199
}

0 commit comments

Comments
 (0)