From 61afdd35354e908139dc60ccfcd74211394ef5bb Mon Sep 17 00:00:00 2001 From: Son Luong Ngoc Date: Wed, 22 Apr 2026 21:36:45 +0700 Subject: [PATCH] task_router: route by target package Review feedback asked the affinity experiment to group by package because the full target label still splits sibling targets in the same package. Use the Bazel package label instead, so related actions such as //foo/bar:foo_lib and //foo/bar:foo_test can share warmed executors. Keep the existing first-output fallback when request metadata is missing. Cover selected-org, control-org, same-package, and different-package behavior in task_router tests. --- .../server/scheduling/task_router/BUILD | 3 + .../scheduling/task_router/task_router.go | 76 +++++++---- .../task_router/task_router_test.go | 127 ++++++++++++++++++ 3 files changed, 182 insertions(+), 24 deletions(-) diff --git a/enterprise/server/scheduling/task_router/BUILD b/enterprise/server/scheduling/task_router/BUILD index e3bd1bea134..dda1b4ed482 100644 --- a/enterprise/server/scheduling/task_router/BUILD +++ b/enterprise/server/scheduling/task_router/BUILD @@ -11,6 +11,7 @@ go_library( "//server/environment", "//server/interfaces", "//server/real_environment", + "//server/util/bazel_request", "//server/util/hash", "//server/util/log", "//server/util/platform", @@ -33,10 +34,12 @@ go_test( "//server/interfaces", "//server/testutil/testauth", "//server/testutil/testenv", + "//server/util/bazel_request", "//server/util/testing/flags", "@com_github_open_feature_go_sdk//openfeature", "@com_github_open_feature_go_sdk//openfeature/memprovider", "@com_github_open_feature_go_sdk//openfeature/testing", + "@com_github_open_feature_go_sdk_contrib_providers_flagd//pkg", "@com_github_stretchr_testify//require", "@org_golang_x_exp//slices", ], diff --git a/enterprise/server/scheduling/task_router/task_router.go b/enterprise/server/scheduling/task_router/task_router.go index ee3d3de9055..958d1c4789b 100644 --- a/enterprise/server/scheduling/task_router/task_router.go +++ b/enterprise/server/scheduling/task_router/task_router.go @@ -12,6 +12,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/environment" "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/real_environment" + "github.com/buildbuddy-io/buildbuddy/server/util/bazel_request" "github.com/buildbuddy-io/buildbuddy/server/util/hash" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/platform" @@ -46,6 +47,8 @@ const ( // Preferred node limit for tasks using [persistentWorkerRouter]. persistentWorkerRouterPreferredNodeLimit = 128 + + affinityRouterUseTargetLabelExperiment = "remote_execution.affinity_router_use_target_label" ) type taskRouter struct { @@ -315,10 +318,19 @@ func (tr *taskRouter) MarkFailed(ctx context.Context, action *repb.Action, cmd * // Contains the parameters required to make a routing decision. type routingParams struct { - cmd *repb.Command - platform *repb.Platform - remoteInstanceName string - groupID string + cmd *repb.Command + platform *repb.Platform + remoteInstanceName string + groupID string + targetPackageLabel string + useTargetPackageForAffinityRouting bool +} + +func getTargetPackageLabel(targetLabel string) string { + if packageLabel, _, ok := strings.Cut(targetLabel, ":"); ok { + return packageLabel + } + return targetLabel } func getRoutingParams(ctx context.Context, env environment.Env, action *repb.Action, cmd *repb.Command, remoteInstanceName string) routingParams { @@ -326,11 +338,19 @@ func getRoutingParams(ctx context.Context, env environment.Env, action *repb.Act if u, err := env.GetAuthenticator().AuthenticatedUser(ctx); err == nil { groupID = u.GetGroupID() } + useTargetPackageForAffinityRouting := false + if fp := env.GetExperimentFlagProvider(); fp != nil { + useTargetPackageForAffinityRouting = fp.Boolean(ctx, affinityRouterUseTargetLabelExperiment, false) + } + rmd := bazel_request.GetRequestMetadata(ctx) return routingParams{ - cmd: cmd, - platform: platform.GetProto(action, cmd), - remoteInstanceName: remoteInstanceName, - groupID: groupID} + cmd: cmd, + platform: platform.GetProto(action, cmd), + remoteInstanceName: remoteInstanceName, + groupID: groupID, + targetPackageLabel: getTargetPackageLabel(rmd.GetTargetId()), + useTargetPackageForAffinityRouting: useTargetPackageForAffinityRouting, + } } // Selects and returns a Router to use, or nil if none applies. @@ -461,26 +481,34 @@ func (s *ciRunnerRouter) RoutingInfo(params routingParams) (int, []string, error // - remoteInstanceName // - groupID // - platform properties -// - and the name of the first action output +// - and an affinity hint derived from either the first action output +// (default) or the request metadata target package (for experiment-enabled +// orgs) // -// Because only a single action can generate a given output in Bazel, this key -// uniquely identifies an action and is stable even if the action's inputs -// change. The intent of using this routing key is to route successive actions -// whose inputs have changed to nodes which previously executed that action to -// increase the local-cache hitrate, as it's likely that for large actions most -// of the input tree is unchanged. +// The first-output hint is stable even if an action's inputs change, which can +// route successive executions of the same Bazel action back to a warmer +// executor. The target-package experiment deliberately broadens that affinity so +// related actions for the same package can share warmed executors and reduce +// routing spray. type affinityRouter struct { rdb redis.UniversalClient } func (*affinityRouter) Applies(_ context.Context, params routingParams) bool { - return getFirstOutput(params.cmd) != "" + return getAffinityRoutingHint(params) != "" } func (*affinityRouter) preferredNodeLimit(_ routingParams) int { return defaultPreferredNodeLimit } +func getAffinityRoutingHint(params routingParams) string { + if params.useTargetPackageForAffinityRouting && params.targetPackageLabel != "" { + return params.targetPackageLabel + } + return getFirstOutput(params.cmd) +} + func (*affinityRouter) routingKey(params routingParams) (string, error) { parts := []string{"task_route", params.groupID} @@ -494,15 +522,15 @@ func (*affinityRouter) routingKey(params routingParams) (string, error) { } parts = append(parts, hash.Bytes(b)) - // Add the first output as the final part of the routing key. This should - // uniquely identify a bazel action and is an attempt to route actions to - // executor nodes that are warmed up (with inputs and OCI images) for this - // action. - firstOutput := getFirstOutput(params.cmd) - if firstOutput == "" { - return "", status.InternalError("routing key requested for action with no outputs") + // Add the selected affinity hint as the final part of the routing key. By + // default this is the first declared output, but a per-org experiment can + // switch this to the target package to reduce routing spray across related + // actions belonging to the same package. + hint := getAffinityRoutingHint(params) + if hint == "" { + return "", status.InternalError("routing key requested for action with no affinity hint") } - parts = append(parts, hash.String(firstOutput)) + parts = append(parts, hash.String(hint)) return strings.Join(parts, "/"), nil } diff --git a/enterprise/server/scheduling/task_router/task_router_test.go b/enterprise/server/scheduling/task_router/task_router_test.go index 99928765c19..91b2f694d77 100644 --- a/enterprise/server/scheduling/task_router/task_router_test.go +++ b/enterprise/server/scheduling/task_router/task_router_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "testing" "github.com/buildbuddy-io/buildbuddy/enterprise/server/experiments" @@ -14,6 +15,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/testutil/testauth" "github.com/buildbuddy-io/buildbuddy/server/testutil/testenv" + "github.com/buildbuddy-io/buildbuddy/server/util/bazel_request" "github.com/buildbuddy-io/buildbuddy/server/util/testing/flags" "github.com/open-feature/go-sdk/openfeature" "github.com/open-feature/go-sdk/openfeature/memprovider" @@ -21,6 +23,7 @@ import ( "golang.org/x/exp/slices" repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution" + flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" openfeatureTesting "github.com/open-feature/go-sdk/openfeature/testing" ) @@ -214,6 +217,103 @@ func TestTaskRouter_RankNodes_AffinityRouting(t *testing.T) { requireNotAlwaysRanked(0, executorHostID2, t, router, ctx, thirdCmd, instanceName) } +func TestTaskRouter_RankNodes_AffinityRouting_UsesTargetPackageForSelectedGroups(t *testing.T) { + env := newTestEnv(t) + + const testFlags = `{ + "$schema": "https://flagd.dev/schema/v0/flags.json", + "flags": { + "remote_execution.affinity_router_use_target_label": { + "state": "ENABLED", + "variants": { + "enabled": true, + "disabled": false + }, + "defaultVariant": "disabled", + "targeting": { + "if": [ + { "==": [{ "var": "group_id" }, "GR1"] }, + "enabled", + "disabled" + ] + } + } + } + }` + offlineFlagPath := writeFlagConfig(t, testFlags) + provider, err := flagd.NewProvider(flagd.WithInProcessResolver(), flagd.WithOfflineFilePath(offlineFlagPath)) + require.NoError(t, err) + require.NoError(t, openfeature.SetProviderAndWait(provider)) + fp, err := experiments.NewFlagProvider("test") + require.NoError(t, err) + env.SetExperimentFlagProvider(fp) + + router := newTaskRouter(t, env) + nodes := sequentiallyNumberedNodes(100) + instanceName := "test-instance" + firstCmd := &repb.Command{ + Arguments: []string{"go", "tool", "link"}, + OutputPaths: []string{"/bazel-out/k8-fastbuild/bin/foo/libfoo.a"}, + } + secondCmd := &repb.Command{ + Arguments: []string{"test-setup.sh"}, + OutputPaths: []string{"/bazel-out/k8-fastbuild/testlogs/foo/foo_test/test.outputs"}, + } + + selectedOrgCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar:foo_lib", "GoLink") + selectedOrgOtherTargetCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar:foo_test", "TestRunner") + selectedOrgPackageCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/bar", "TestRunner") + selectedOrgOtherPackageCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US1"), "//foo/baz:foo_test", "TestRunner") + router.MarkSucceeded(selectedOrgCtx, nil, firstCmd, instanceName, executorHostID1) + + ranked := router.RankNodes(selectedOrgOtherTargetCtx, nil, secondCmd, instanceName, nodes) + require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId()) + ranked = router.RankNodes(selectedOrgPackageCtx, nil, secondCmd, instanceName, nodes) + require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId()) + requireNotAlwaysRanked(0, executorHostID1, t, router, selectedOrgOtherPackageCtx, secondCmd, instanceName) + + controlOrgCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US2"), "//foo/bar:foo_lib", "GoLink") + controlOrgOtherTargetCtx := withRequestMetadata(withAuthUser(t, context.Background(), env, "US2"), "//foo/bar:foo_test", "TestRunner") + router.MarkSucceeded(controlOrgCtx, nil, firstCmd, instanceName, executorHostID1) + + requireNotAlwaysRanked(0, executorHostID1, t, router, controlOrgOtherTargetCtx, secondCmd, instanceName) +} + +func TestTaskRouter_RankNodes_AffinityRouting_TargetLabelExperimentFallsBackToFirstOutput(t *testing.T) { + env := newTestEnv(t) + + testProvider := openfeatureTesting.NewTestProvider() + testProvider.UsingFlags(t, map[string]memprovider.InMemoryFlag{ + "remote_execution.affinity_router_use_target_label": { + State: memprovider.Enabled, + DefaultVariant: "enabled", + Variants: map[string]any{ + "enabled": true, + "disabled": false, + }, + }, + }) + require.NoError(t, openfeature.SetProviderAndWait(testProvider)) + defer testProvider.Cleanup() + fp, err := experiments.NewFlagProvider("test") + require.NoError(t, err) + env.SetExperimentFlagProvider(fp) + + router := newTaskRouter(t, env) + ctx := withAuthUser(t, context.Background(), env, "US1") + instanceName := "test-instance" + firstCmd := &repb.Command{ + OutputPaths: []string{"/bazel-out/k8-fastbuild/bin/foo/libfoo.a"}, + } + secondCmd := &repb.Command{ + OutputPaths: []string{"/bazel-out/k8-fastbuild/testlogs/foo/foo_test/test.outputs"}, + } + + router.MarkSucceeded(ctx, nil, firstCmd, instanceName, executorHostID1) + + requireNotAlwaysRanked(0, executorHostID1, t, router, ctx, secondCmd, instanceName) +} + func TestTaskRouter_RankNodes_WeightedByCPU(t *testing.T) { env := newTestEnv(t) router := newTaskRouter(t, env) @@ -642,6 +742,33 @@ func withAuthUser(t *testing.T, ctx context.Context, env environment.Env, userID return ctx } +func withRequestMetadata(ctx context.Context, targetLabel, mnemonic string) context.Context { + return bazel_request.OverrideRequestMetadata(ctx, &repb.RequestMetadata{ + TargetId: targetLabel, + ActionMnemonic: mnemonic, + }) +} + +func writeFlagConfig(t testing.TB, data string) string { + t.Helper() + f, err := os.CreateTemp(os.Getenv("TEST_TMPDIR"), "buildbuddy-task-router-*.flagd.json") + if err != nil { + t.Fatal(err) + } + path := f.Name() + if _, err := f.Write([]byte(data)); err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + _ = f.Close() + _ = os.RemoveAll(path) + }) + return path +} + func sequentiallyNumberedNodes(n int) []interfaces.ExecutionNode { nodes := make([]interfaces.ExecutionNode, 0, n) for i := 0; i < n; i++ {