Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions enterprise/server/scheduling/task_router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//server/environment",
"//server/interfaces",
"//server/real_environment",
"//server/util/bazel_request",
"//server/util/hash",
"//server/util/log",
"//server/util/platform",
Expand All @@ -33,10 +34,12 @@ go_test(
"//server/interfaces",
"//server/testutil/testauth",
"//server/testutil/testenv",
"//server/util/bazel_request",
"//server/util/testing/flags",
"@com_github_open_feature_go_sdk//openfeature",
"@com_github_open_feature_go_sdk//openfeature/memprovider",
"@com_github_open_feature_go_sdk//openfeature/testing",
"@com_github_open_feature_go_sdk_contrib_providers_flagd//pkg",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//slices",
],
Expand Down
76 changes: 52 additions & 24 deletions enterprise/server/scheduling/task_router/task_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/real_environment"
"github.com/buildbuddy-io/buildbuddy/server/util/bazel_request"
"github.com/buildbuddy-io/buildbuddy/server/util/hash"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/platform"
Expand Down Expand Up @@ -46,6 +47,8 @@ const (

// Preferred node limit for tasks using [persistentWorkerRouter].
persistentWorkerRouterPreferredNodeLimit = 128

affinityRouterUseTargetLabelExperiment = "remote_execution.affinity_router_use_target_label"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
affinityRouterUseTargetLabelExperiment = "remote_execution.affinity_router_use_target_label"
affinityRouterUseTargetPackageExperiment = "remote_execution.affinity_router_use_target_package"

)

type taskRouter struct {
Expand Down Expand Up @@ -315,22 +318,39 @@ func (tr *taskRouter) MarkFailed(ctx context.Context, action *repb.Action, cmd *

// Contains the parameters required to make a routing decision.
type routingParams struct {
cmd *repb.Command
platform *repb.Platform
remoteInstanceName string
groupID string
cmd *repb.Command
platform *repb.Platform
remoteInstanceName string
groupID string
targetPackageLabel string
useTargetPackageForAffinityRouting bool
}

func getTargetPackageLabel(targetLabel string) string {
if packageLabel, _, ok := strings.Cut(targetLabel, ":"); ok {
return packageLabel
}
return targetLabel
}

func getRoutingParams(ctx context.Context, env environment.Env, action *repb.Action, cmd *repb.Command, remoteInstanceName string) routingParams {
groupID := interfaces.AuthAnonymousUser
if u, err := env.GetAuthenticator().AuthenticatedUser(ctx); err == nil {
groupID = u.GetGroupID()
}
useTargetPackageForAffinityRouting := false
if fp := env.GetExperimentFlagProvider(); fp != nil {
useTargetPackageForAffinityRouting = fp.Boolean(ctx, affinityRouterUseTargetLabelExperiment, false)
}
rmd := bazel_request.GetRequestMetadata(ctx)
return routingParams{
cmd: cmd,
platform: platform.GetProto(action, cmd),
remoteInstanceName: remoteInstanceName,
groupID: groupID}
cmd: cmd,
platform: platform.GetProto(action, cmd),
remoteInstanceName: remoteInstanceName,
groupID: groupID,
targetPackageLabel: getTargetPackageLabel(rmd.GetTargetId()),
useTargetPackageForAffinityRouting: useTargetPackageForAffinityRouting,
}
}

// Selects and returns a Router to use, or nil if none applies.
Expand Down Expand Up @@ -461,26 +481,34 @@ func (s *ciRunnerRouter) RoutingInfo(params routingParams) (int, []string, error
// - remoteInstanceName
// - groupID
// - platform properties
// - and the name of the first action output
// - and an affinity hint derived from either the first action output
// (default) or the request metadata target package (for experiment-enabled
// orgs)
//
// Because only a single action can generate a given output in Bazel, this key
// uniquely identifies an action and is stable even if the action's inputs
// change. The intent of using this routing key is to route successive actions
// whose inputs have changed to nodes which previously executed that action to
// increase the local-cache hitrate, as it's likely that for large actions most
// of the input tree is unchanged.
// The first-output hint is stable even if an action's inputs change, which can
// route successive executions of the same Bazel action back to a warmer
// executor. The target-package experiment deliberately broadens that affinity so
// related actions for the same package can share warmed executors and reduce
// routing spray.
type affinityRouter struct {
rdb redis.UniversalClient
}

func (*affinityRouter) Applies(_ context.Context, params routingParams) bool {
return getFirstOutput(params.cmd) != ""
return getAffinityRoutingHint(params) != ""
}

func (*affinityRouter) preferredNodeLimit(_ routingParams) int {
return defaultPreferredNodeLimit
}

func getAffinityRoutingHint(params routingParams) string {
if params.useTargetPackageForAffinityRouting && params.targetPackageLabel != "" {
return params.targetPackageLabel
}
return getFirstOutput(params.cmd)
}

func (*affinityRouter) routingKey(params routingParams) (string, error) {
parts := []string{"task_route", params.groupID}

Expand All @@ -494,15 +522,15 @@ func (*affinityRouter) routingKey(params routingParams) (string, error) {
}
parts = append(parts, hash.Bytes(b))

// Add the first output as the final part of the routing key. This should
// uniquely identify a bazel action and is an attempt to route actions to
// executor nodes that are warmed up (with inputs and OCI images) for this
// action.
firstOutput := getFirstOutput(params.cmd)
if firstOutput == "" {
return "", status.InternalError("routing key requested for action with no outputs")
// Add the selected affinity hint as the final part of the routing key. By
// default this is the first declared output, but a per-org experiment can
// switch this to the target package to reduce routing spray across related
// actions belonging to the same package.
hint := getAffinityRoutingHint(params)
if hint == "" {
return "", status.InternalError("routing key requested for action with no affinity hint")
}
parts = append(parts, hash.String(firstOutput))
parts = append(parts, hash.String(hint))

return strings.Join(parts, "/"), nil
}
Expand Down
127 changes: 127 additions & 0 deletions enterprise/server/scheduling/task_router/task_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"os"
"testing"

"github.com/buildbuddy-io/buildbuddy/enterprise/server/experiments"
Expand All @@ -14,13 +15,15 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/testutil/testauth"
"github.com/buildbuddy-io/buildbuddy/server/testutil/testenv"
"github.com/buildbuddy-io/buildbuddy/server/util/bazel_request"
"github.com/buildbuddy-io/buildbuddy/server/util/testing/flags"
"github.com/open-feature/go-sdk/openfeature"
"github.com/open-feature/go-sdk/openfeature/memprovider"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
openfeatureTesting "github.com/open-feature/go-sdk/openfeature/testing"
)

Expand Down Expand Up @@ -214,6 +217,103 @@ func TestTaskRouter_RankNodes_AffinityRouting(t *testing.T) {
requireNotAlwaysRanked(0, executorHostID2, t, router, ctx, thirdCmd, instanceName)
}

func TestTaskRouter_RankNodes_AffinityRouting_UsesTargetPackageForSelectedGroups(t *testing.T) {
env := newTestEnv(t)

const testFlags = `{
"$schema": "https://flagd.dev/schema/v0/flags.json",
"flags": {
"remote_execution.affinity_router_use_target_label": {
"state": "ENABLED",
"variants": {
"enabled": true,
"disabled": false
},
"defaultVariant": "disabled",
"targeting": {
"if": [
{ "==": [{ "var": "group_id" }, "GR1"] },
"enabled",
"disabled"
]
}
}
}
}`
offlineFlagPath := writeFlagConfig(t, testFlags)
provider, err := flagd.NewProvider(flagd.WithInProcessResolver(), flagd.WithOfflineFilePath(offlineFlagPath))
require.NoError(t, err)
require.NoError(t, openfeature.SetProviderAndWait(provider))
fp, err := experiments.NewFlagProvider("test")
require.NoError(t, err)
env.SetExperimentFlagProvider(fp)

router := newTaskRouter(t, env)
nodes := sequentiallyNumberedNodes(100)
instanceName := "test-instance"
firstCmd := &repb.Command{
Arguments: []string{"go", "tool", "link"},
OutputPaths: []string{"/bazel-out/k8-fastbuild/bin/foo/libfoo.a"},
}
secondCmd := &repb.Command{
Arguments: []string{"test-setup.sh"},
OutputPaths: []string{"/bazel-out/k8-fastbuild/testlogs/foo/foo_test/test.outputs"},
}

selectedOrgCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar:foo_lib", "GoLink")
selectedOrgOtherTargetCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar:foo_test", "TestRunner")
selectedOrgPackageCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar", "TestRunner")
selectedOrgOtherPackageCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/baz:foo_test", "TestRunner")
router.MarkSucceeded(selectedOrgCtx, nil, firstCmd, instanceName, executorHostID1)

ranked := router.RankNodes(selectedOrgOtherTargetCtx, nil, secondCmd, instanceName, nodes)
require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId())
ranked = router.RankNodes(selectedOrgPackageCtx, nil, secondCmd, instanceName, nodes)
require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId())
requireNotAlwaysRanked(0, executorHostID1, t, router, selectedOrgOtherPackageCtx, secondCmd, instanceName)

controlOrgCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US2"), "//foo/bar:foo_lib", "GoLink")
controlOrgOtherTargetCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US2"), "//foo/bar:foo_test", "TestRunner")
router.MarkSucceeded(controlOrgCtx, nil, firstCmd, instanceName, executorHostID1)

requireNotAlwaysRanked(0, executorHostID1, t, router, controlOrgOtherTargetCtx, secondCmd, instanceName)
}

func TestTaskRouter_RankNodes_AffinityRouting_TargetLabelExperimentFallsBackToFirstOutput(t *testing.T) {
env := newTestEnv(t)

testProvider := openfeatureTesting.NewTestProvider()
testProvider.UsingFlags(t, map[string]memprovider.InMemoryFlag{
"remote_execution.affinity_router_use_target_label": {
State: memprovider.Enabled,
DefaultVariant: "enabled",
Variants: map[string]any{
"enabled": true,
"disabled": false,
},
},
})
require.NoError(t, openfeature.SetProviderAndWait(testProvider))
defer testProvider.Cleanup()
fp, err := experiments.NewFlagProvider("test")
require.NoError(t, err)
env.SetExperimentFlagProvider(fp)

router := newTaskRouter(t, env)
ctx := withAuthUser(t, context.Background(), env, "US1")
instanceName := "test-instance"
firstCmd := &repb.Command{
OutputPaths: []string{"/bazel-out/k8-fastbuild/bin/foo/libfoo.a"},
}
secondCmd := &repb.Command{
OutputPaths: []string{"/bazel-out/k8-fastbuild/testlogs/foo/foo_test/test.outputs"},
}

router.MarkSucceeded(ctx, nil, firstCmd, instanceName, executorHostID1)

requireNotAlwaysRanked(0, executorHostID1, t, router, ctx, secondCmd, instanceName)
}

func TestTaskRouter_RankNodes_WeightedByCPU(t *testing.T) {
env := newTestEnv(t)
router := newTaskRouter(t, env)
Expand Down Expand Up @@ -642,6 +742,33 @@ func withAuthUser(t *testing.T, ctx context.Context, env environment.Env, userID
return ctx
}

func withRequestMetadata(ctx context.Context, targetLabel, mnemonic string) context.Context {
return bazel_request.OverrideRequestMetadata(ctx, &repb.RequestMetadata{
TargetId: targetLabel,
ActionMnemonic: mnemonic,
})
}

func writeFlagConfig(t testing.TB, data string) string {
t.Helper()
f, err := os.CreateTemp(os.Getenv("TEST_TMPDIR"), "buildbuddy-task-router-*.flagd.json")
if err != nil {
t.Fatal(err)
}
path := f.Name()
if _, err := f.Write([]byte(data)); err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = f.Close()
_ = os.RemoveAll(path)
})
return path
}

func sequentiallyNumberedNodes(n int) []interfaces.ExecutionNode {
nodes := make([]interfaces.ExecutionNode, 0, n)
for i := 0; i < n; i++ {
Expand Down
Loading