diff --git a/enterprise/server/scheduling/task_router/BUILD b/enterprise/server/scheduling/task_router/BUILD index e3bd1bea134..dda1b4ed482 100644 --- a/enterprise/server/scheduling/task_router/BUILD +++ b/enterprise/server/scheduling/task_router/BUILD @@ -11,6 +11,7 @@ go_library( "//server/environment", "//server/interfaces", "//server/real_environment", + "//server/util/bazel_request", "//server/util/hash", "//server/util/log", "//server/util/platform", @@ -33,10 +34,12 @@ go_test( "//server/interfaces", "//server/testutil/testauth", "//server/testutil/testenv", + "//server/util/bazel_request", "//server/util/testing/flags", "@com_github_open_feature_go_sdk//openfeature", "@com_github_open_feature_go_sdk//openfeature/memprovider", "@com_github_open_feature_go_sdk//openfeature/testing", + "@com_github_open_feature_go_sdk_contrib_providers_flagd//pkg", "@com_github_stretchr_testify//require", "@org_golang_x_exp//slices", ], diff --git a/enterprise/server/scheduling/task_router/task_router.go b/enterprise/server/scheduling/task_router/task_router.go index ee3d3de9055..958d1c4789b 100644 --- a/enterprise/server/scheduling/task_router/task_router.go +++ b/enterprise/server/scheduling/task_router/task_router.go @@ -12,6 +12,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/environment" "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/real_environment" + "github.com/buildbuddy-io/buildbuddy/server/util/bazel_request" "github.com/buildbuddy-io/buildbuddy/server/util/hash" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/platform" @@ -46,6 +47,8 @@ const ( // Preferred node limit for tasks using [persistentWorkerRouter]. persistentWorkerRouterPreferredNodeLimit = 128 + + affinityRouterUseTargetLabelExperiment = "remote_execution.affinity_router_use_target_label" ) type taskRouter struct { @@ -315,10 +318,19 @@ func (tr *taskRouter) MarkFailed(ctx context.Context, action *repb.Action, cmd * // Contains the parameters required to make a routing decision. type routingParams struct { - cmd *repb.Command - platform *repb.Platform - remoteInstanceName string - groupID string + cmd *repb.Command + platform *repb.Platform + remoteInstanceName string + groupID string + targetPackageLabel string + useTargetPackageForAffinityRouting bool +} + +func getTargetPackageLabel(targetLabel string) string { + if packageLabel, _, ok := strings.Cut(targetLabel, ":"); ok { + return packageLabel + } + return targetLabel } func getRoutingParams(ctx context.Context, env environment.Env, action *repb.Action, cmd *repb.Command, remoteInstanceName string) routingParams { @@ -326,11 +338,19 @@ func getRoutingParams(ctx context.Context, env environment.Env, action *repb.Act if u, err := env.GetAuthenticator().AuthenticatedUser(ctx); err == nil { groupID = u.GetGroupID() } + useTargetPackageForAffinityRouting := false + if fp := env.GetExperimentFlagProvider(); fp != nil { + useTargetPackageForAffinityRouting = fp.Boolean(ctx, affinityRouterUseTargetLabelExperiment, false) + } + rmd := bazel_request.GetRequestMetadata(ctx) return routingParams{ - cmd: cmd, - platform: platform.GetProto(action, cmd), - remoteInstanceName: remoteInstanceName, - groupID: groupID} + cmd: cmd, + platform: platform.GetProto(action, cmd), + remoteInstanceName: remoteInstanceName, + groupID: groupID, + targetPackageLabel: getTargetPackageLabel(rmd.GetTargetId()), + useTargetPackageForAffinityRouting: useTargetPackageForAffinityRouting, + } } // Selects and returns a Router to use, or nil if none applies. @@ -461,26 +481,34 @@ func (s *ciRunnerRouter) RoutingInfo(params routingParams) (int, []string, error // - remoteInstanceName // - groupID // - platform properties -// - and the name of the first action output +// - and an affinity hint derived from either the first action output +// (default) or the request metadata target package (for experiment-enabled +// orgs) // -// Because only a single action can generate a given output in Bazel, this key -// uniquely identifies an action and is stable even if the action's inputs -// change. The intent of using this routing key is to route successive actions -// whose inputs have changed to nodes which previously executed that action to -// increase the local-cache hitrate, as it's likely that for large actions most -// of the input tree is unchanged. +// The first-output hint is stable even if an action's inputs change, which can +// route successive executions of the same Bazel action back to a warmer +// executor. The target-package experiment deliberately broadens that affinity so +// related actions for the same package can share warmed executors and reduce +// routing spray. type affinityRouter struct { rdb redis.UniversalClient } func (*affinityRouter) Applies(_ context.Context, params routingParams) bool { - return getFirstOutput(params.cmd) != "" + return getAffinityRoutingHint(params) != "" } func (*affinityRouter) preferredNodeLimit(_ routingParams) int { return defaultPreferredNodeLimit } +func getAffinityRoutingHint(params routingParams) string { + if params.useTargetPackageForAffinityRouting && params.targetPackageLabel != "" { + return params.targetPackageLabel + } + return getFirstOutput(params.cmd) +} + func (*affinityRouter) routingKey(params routingParams) (string, error) { parts := []string{"task_route", params.groupID} @@ -494,15 +522,15 @@ func (*affinityRouter) routingKey(params routingParams) (string, error) { } parts = append(parts, hash.Bytes(b)) - // Add the first output as the final part of the routing key. This should - // uniquely identify a bazel action and is an attempt to route actions to - // executor nodes that are warmed up (with inputs and OCI images) for this - // action. - firstOutput := getFirstOutput(params.cmd) - if firstOutput == "" { - return "", status.InternalError("routing key requested for action with no outputs") + // Add the selected affinity hint as the final part of the routing key. By + // default this is the first declared output, but a per-org experiment can + // switch this to the target package to reduce routing spray across related + // actions belonging to the same package. + hint := getAffinityRoutingHint(params) + if hint == "" { + return "", status.InternalError("routing key requested for action with no affinity hint") } - parts = append(parts, hash.String(firstOutput)) + parts = append(parts, hash.String(hint)) return strings.Join(parts, "/"), nil } diff --git a/enterprise/server/scheduling/task_router/task_router_test.go b/enterprise/server/scheduling/task_router/task_router_test.go index 99928765c19..91b2f694d77 100644 --- a/enterprise/server/scheduling/task_router/task_router_test.go +++ b/enterprise/server/scheduling/task_router/task_router_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "testing" "github.com/buildbuddy-io/buildbuddy/enterprise/server/experiments" @@ -14,6 +15,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/testutil/testauth" "github.com/buildbuddy-io/buildbuddy/server/testutil/testenv" + "github.com/buildbuddy-io/buildbuddy/server/util/bazel_request" "github.com/buildbuddy-io/buildbuddy/server/util/testing/flags" "github.com/open-feature/go-sdk/openfeature" "github.com/open-feature/go-sdk/openfeature/memprovider" @@ -21,6 +23,7 @@ import ( "golang.org/x/exp/slices" repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution" + flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" openfeatureTesting "github.com/open-feature/go-sdk/openfeature/testing" ) @@ -214,6 +217,103 @@ func TestTaskRouter_RankNodes_AffinityRouting(t *testing.T) { requireNotAlwaysRanked(0, executorHostID2, t, router, ctx, thirdCmd, instanceName) } +func TestTaskRouter_RankNodes_AffinityRouting_UsesTargetPackageForSelectedGroups(t *testing.T) { + env := newTestEnv(t) + + const testFlags = `{ + "$schema": "https://flagd.dev/schema/v0/flags.json", + "flags": { + "remote_execution.affinity_router_use_target_label": { + "state": "ENABLED", + "variants": { + "enabled": true, + "disabled": false + }, + "defaultVariant": "disabled", + "targeting": { + "if": [ + { "==": [{ "var": "group_id" }, "GR1"] }, + "enabled", + "disabled" + ] + } + } + } + }` + offlineFlagPath := writeFlagConfig(t, testFlags) + provider, err := flagd.NewProvider(flagd.WithInProcessResolver(), flagd.WithOfflineFilePath(offlineFlagPath)) + require.NoError(t, err) + require.NoError(t, openfeature.SetProviderAndWait(provider)) + fp, err := experiments.NewFlagProvider("test") + require.NoError(t, err) + env.SetExperimentFlagProvider(fp) + + router := newTaskRouter(t, env) + nodes := sequentiallyNumberedNodes(100) + instanceName := "test-instance" + firstCmd := &repb.Command{ + Arguments: []string{"go", "tool", "link"}, + OutputPaths: []string{"/bazel-out/k8-fastbuild/bin/foo/libfoo.a"}, + } + secondCmd := &repb.Command{ + Arguments: []string{"test-setup.sh"}, + OutputPaths: []string{"/bazel-out/k8-fastbuild/testlogs/foo/foo_test/test.outputs"}, + } + + selectedOrgCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar:foo_lib", "GoLink") + selectedOrgOtherTargetCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar:foo_test", "TestRunner") + selectedOrgPackageCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar", "TestRunner") + selectedOrgOtherPackageCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/baz:foo_test", "TestRunner") + router.MarkSucceeded(selectedOrgCtx, nil, firstCmd, instanceName, executorHostID1) + + ranked := router.RankNodes(selectedOrgOtherTargetCtx, nil, secondCmd, instanceName, nodes) + require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId()) + ranked = router.RankNodes(selectedOrgPackageCtx, nil, secondCmd, instanceName, nodes) + require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId()) + requireNotAlwaysRanked(0, executorHostID1, t, router, selectedOrgOtherPackageCtx, secondCmd, instanceName) + + controlOrgCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US2"), "//foo/bar:foo_lib", "GoLink") + controlOrgOtherTargetCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US2"), "//foo/bar:foo_test", "TestRunner") + router.MarkSucceeded(controlOrgCtx, nil, firstCmd, instanceName, executorHostID1) + + requireNotAlwaysRanked(0, executorHostID1, t, router, controlOrgOtherTargetCtx, secondCmd, instanceName) +} + +func TestTaskRouter_RankNodes_AffinityRouting_TargetLabelExperimentFallsBackToFirstOutput(t *testing.T) { + env := newTestEnv(t) + + testProvider := openfeatureTesting.NewTestProvider() + testProvider.UsingFlags(t, map[string]memprovider.InMemoryFlag{ + "remote_execution.affinity_router_use_target_label": { + State: memprovider.Enabled, + DefaultVariant: "enabled", + Variants: map[string]any{ + "enabled": true, + "disabled": false, + }, + }, + }) + require.NoError(t, openfeature.SetProviderAndWait(testProvider)) + defer testProvider.Cleanup() + fp, err := experiments.NewFlagProvider("test") + require.NoError(t, err) + env.SetExperimentFlagProvider(fp) + + router := newTaskRouter(t, env) + ctx := withAuthUser(t, context.Background(), env, "US1") + instanceName := "test-instance" + firstCmd := &repb.Command{ + OutputPaths: []string{"/bazel-out/k8-fastbuild/bin/foo/libfoo.a"}, + } + secondCmd := &repb.Command{ + OutputPaths: []string{"/bazel-out/k8-fastbuild/testlogs/foo/foo_test/test.outputs"}, + } + + router.MarkSucceeded(ctx, nil, firstCmd, instanceName, executorHostID1) + + requireNotAlwaysRanked(0, executorHostID1, t, router, ctx, secondCmd, instanceName) +} + func TestTaskRouter_RankNodes_WeightedByCPU(t *testing.T) { env := newTestEnv(t) router := newTaskRouter(t, env) @@ -642,6 +742,33 @@ func withAuthUser(t *testing.T, ctx context.Context, env environment.Env, userID return ctx } +func withRequestMetadata(ctx context.Context, targetLabel, mnemonic string) context.Context { + return bazel_request.OverrideRequestMetadata(ctx, &repb.RequestMetadata{ + TargetId: targetLabel, + ActionMnemonic: mnemonic, + }) +} + +func writeFlagConfig(t testing.TB, data string) string { + t.Helper() + f, err := os.CreateTemp(os.Getenv("TEST_TMPDIR"), "buildbuddy-task-router-*.flagd.json") + if err != nil { + t.Fatal(err) + } + path := f.Name() + if _, err := f.Write([]byte(data)); err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + _ = f.Close() + _ = os.RemoveAll(path) + }) + return path +} + func sequentiallyNumberedNodes(n int) []interfaces.ExecutionNode { nodes := make([]interfaces.ExecutionNode, 0, n) for i := 0; i < n; i++ {