Skip to content

Commit 1be8fd7

Browse files
authored
Merge pull request #17 from nirrozenbaum/migrate/cmd-config-charts-body-based-routing-internal-pkg-bbr-pkg-common-test-integration-test-integration-util.go-test-testdata-test-utils-version
migrate: sigs.k8s.io/gateway-api-inference-extension @ dc908434
2 parents f3acc49 + 5395929 commit 1be8fd7

85 files changed

Lines changed: 11572 additions & 1 deletion

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/main.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"os"
21+
22+
ctrl "sigs.k8s.io/controller-runtime"
23+
24+
"github.com/llm-d/llm-d-inference-payload-processor/cmd/runner"
25+
)
26+
27+
func main() {
28+
if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
29+
os.Exit(1)
30+
}
31+
32+
}

cmd/runner/health.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package runner
18+
19+
import (
20+
"context"
21+
22+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
23+
"google.golang.org/grpc/codes"
24+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
25+
"google.golang.org/grpc/status"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
27+
28+
logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
29+
)
30+
31+
type healthServer struct{}
32+
33+
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
34+
// TODO: we're accepting ANY service name for now as a temporary hack in alignment with
35+
// upstream issues. See https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/788
36+
// if in.Service != extProcPb.ExternalProcessor_ServiceDesc.ServiceName {
37+
// s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
38+
// return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
39+
// }
40+
41+
log.FromContext(ctx).V(logutil.DEBUG).Info("gRPC health check serving", "service", in.Service)
42+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
43+
}
44+
45+
func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) {
46+
// currently only the ext_proc service is provided
47+
serviceHealthResponse, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: extProcPb.ExternalProcessor_ServiceDesc.ServiceName})
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
return &healthPb.HealthListResponse{
53+
Statuses: map[string]*healthPb.HealthCheckResponse{
54+
extProcPb.ExternalProcessor_ServiceDesc.ServiceName: serviceHealthResponse,
55+
},
56+
}, nil
57+
}
58+
59+
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
60+
return status.Error(codes.Unimplemented, "Watch is not implemented")
61+
}

cmd/runner/runner.go

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package runner
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net/http"
23+
"os"
24+
25+
"github.com/prometheus/client_golang/prometheus"
26+
"github.com/spf13/pflag"
27+
"google.golang.org/grpc"
28+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
29+
"k8s.io/client-go/rest"
30+
ctrl "sigs.k8s.io/controller-runtime"
31+
"sigs.k8s.io/controller-runtime/pkg/builder"
32+
"sigs.k8s.io/controller-runtime/pkg/cache"
33+
"sigs.k8s.io/controller-runtime/pkg/manager"
34+
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
35+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
36+
37+
"github.com/llm-d/llm-d-inference-payload-processor/internal/runnable"
38+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/bbr/framework"
39+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/bbr/metrics"
40+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/bbr/plugins/basemodelextractor"
41+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/bbr/plugins/bodyfieldtoheader"
42+
runserver "github.com/llm-d/llm-d-inference-payload-processor/pkg/bbr/server"
43+
logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
44+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/profiling"
45+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/tracing"
46+
"github.com/llm-d/llm-d-inference-payload-processor/version"
47+
)
48+
49+
const modelField = "model"
50+
51+
var setupLog = ctrl.Log.WithName("setup")
52+
53+
func NewRunner() *Runner {
54+
return &Runner{
55+
bbrExecutableName: "BBR",
56+
requestPlugins: []framework.RequestProcessor{},
57+
responsePlugins: []framework.ResponseProcessor{},
58+
customCollectors: []prometheus.Collector{},
59+
}
60+
}
61+
62+
// Runner is used to run bbr with its plugins
63+
type Runner struct {
64+
bbrExecutableName string
65+
// The slice of BBR plugin instances executed by the request handler,
66+
// in the same order the plugin flags are provided.
67+
requestPlugins []framework.RequestProcessor
68+
// The slice of BBR plugin instances executed by the response handler,
69+
// in the same order the plugin flags are provided.
70+
responsePlugins []framework.ResponseProcessor
71+
72+
customCollectors []prometheus.Collector
73+
}
74+
75+
// WithExecutableName sets the name of the executable containing the runner.
76+
// The name is used in the version log upon startup and is otherwise opaque.
77+
func (r *Runner) WithExecutableName(exeName string) *Runner {
78+
r.bbrExecutableName = exeName
79+
return r
80+
}
81+
82+
func (r *Runner) WithCustomCollectors(collectors ...prometheus.Collector) *Runner {
83+
r.customCollectors = collectors
84+
return r
85+
}
86+
87+
func (r *Runner) Run(ctx context.Context) error {
88+
// Setup a basic logger in case command-line argument parsing fails.
89+
logutil.InitSetupLogging()
90+
91+
setupLog.Info(r.bbrExecutableName+" build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef)
92+
93+
opts := runserver.NewOptions()
94+
opts.AddFlags(pflag.CommandLine)
95+
pflag.Parse()
96+
97+
if err := opts.Complete(); err != nil {
98+
return err
99+
}
100+
if err := opts.Validate(); err != nil {
101+
setupLog.Error(err, "Failed to validate flags")
102+
return err
103+
}
104+
105+
// Print all flag values.
106+
flags := make(map[string]any)
107+
pflag.VisitAll(func(f *pflag.Flag) {
108+
flags[f.Name] = f.Value
109+
})
110+
111+
if opts.Tracing {
112+
err := tracing.InitTracing(ctx, setupLog, "gateway-api-inference-extension/bbr")
113+
if err != nil {
114+
setupLog.Error(err, "failed to initialize tracing")
115+
return err
116+
}
117+
}
118+
119+
setupLog.Info("Flags processed", "flags", flags)
120+
121+
logutil.InitLogging(&opts.ZapOptions)
122+
123+
// Init runtime.
124+
cfg, err := ctrl.GetConfig()
125+
if err != nil {
126+
setupLog.Error(err, "Failed to get rest config")
127+
return err
128+
}
129+
130+
// --- Setup Metrics Server ---
131+
metrics.Register(r.customCollectors...)
132+
metrics.RecordBBRInfo(version.CommitSHA, version.BuildRef)
133+
// Register metrics handler.
134+
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
135+
// More info:
136+
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server
137+
// - https://book.kubebuilder.io/reference/metrics.html
138+
metricsServerOptions := metricsserver.Options{
139+
BindAddress: fmt.Sprintf(":%d", opts.MetricsPort),
140+
FilterProvider: func() func(c *rest.Config, httpClient *http.Client) (metricsserver.Filter, error) {
141+
if opts.MetricsEndpointAuth {
142+
return filters.WithAuthenticationAndAuthorization
143+
}
144+
145+
return nil
146+
}(),
147+
}
148+
cacheOptions := cache.Options{}
149+
// Apply namespace filtering only if env var is set.
150+
namespace := os.Getenv("NAMESPACE")
151+
if namespace != "" {
152+
cacheOptions.DefaultNamespaces = map[string]cache.Config{
153+
namespace: {},
154+
}
155+
}
156+
157+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Cache: cacheOptions, Metrics: metricsServerOptions})
158+
if err != nil {
159+
setupLog.Error(err, "Failed to create manager", "config", cfg)
160+
return err
161+
}
162+
163+
if opts.EnablePprof {
164+
setupLog.Info("Setting pprof handlers")
165+
if err = profiling.SetupPprofHandlers(mgr); err != nil {
166+
setupLog.Error(err, "Failed to setup pprof handlers")
167+
return err
168+
}
169+
}
170+
171+
bbrHandle := framework.NewBbrHandle(ctx, mgr)
172+
173+
// Register factories for all known in-tree BBR plugins
174+
r.registerInTreePlugins()
175+
176+
// Construct BBR plugin instances for the in-tree plugins that are (1) registered and (2) requested via the --plugin flags
177+
if len(opts.PluginSpecs) == 0 {
178+
setupLog.Info("No BBR plugins are specified. Running BBR with the default behavior.")
179+
180+
modelToHeaderPlugin, err := bodyfieldtoheader.NewBodyFieldToHeaderPlugin(modelField, bodyfieldtoheader.ModelHeader)
181+
if err != nil {
182+
setupLog.Error(err, "Failed to create plugin", "pluginType", bodyfieldtoheader.BodyFieldToHeaderPluginType)
183+
return err
184+
}
185+
r.requestPlugins = append(r.requestPlugins, modelToHeaderPlugin)
186+
187+
// Create BaseModelToHeaderPlugin instance for extracting the "model" field into X-Gateway-Base-Model-Name
188+
baseModelToHeaderPlugin, err := basemodelextractor.NewBaseModelToHeaderPlugin(func() *builder.Builder {
189+
return ctrl.NewControllerManagedBy(mgr)
190+
}, mgr.GetClient())
191+
if err != nil {
192+
setupLog.Error(err, "Failed to create plugin", "pluginType", basemodelextractor.BaseModelToHeaderPluginType)
193+
return err
194+
}
195+
196+
r.requestPlugins = append(r.requestPlugins, baseModelToHeaderPlugin)
197+
} else {
198+
setupLog.Info("BBR plugins are specified. Running BBR with the specified plugins.")
199+
200+
for _, s := range opts.PluginSpecs {
201+
factory, ok := framework.Registry[s.Type]
202+
if !ok {
203+
setupLog.Error(err, fmt.Sprintf("unknown plugin type %q (no factory registered)\n", s.Type))
204+
return err
205+
}
206+
instance, err := factory(s.Name, s.JSON, bbrHandle)
207+
if err != nil {
208+
setupLog.Error(err, fmt.Sprintf("invalid %s#%s: %v\n", s.Type, s.Name, err))
209+
return err
210+
}
211+
if requestProcessor, ok := instance.(framework.RequestProcessor); ok {
212+
r.requestPlugins = append(r.requestPlugins, requestProcessor)
213+
}
214+
if responseProcessor, ok := instance.(framework.ResponseProcessor); ok {
215+
r.responsePlugins = append(r.responsePlugins, responseProcessor)
216+
}
217+
}
218+
}
219+
220+
// Setup ExtProc Server Runner.
221+
serverRunner := &runserver.ExtProcServerRunner{
222+
GrpcPort: opts.GRPCPort,
223+
SecureServing: opts.SecureServing,
224+
Streaming: opts.Streaming,
225+
RequestPlugins: r.requestPlugins,
226+
ResponsePlugins: r.responsePlugins,
227+
}
228+
229+
// Register health server.
230+
if err := registerHealthServer(mgr, opts.GRPCHealthPort); err != nil {
231+
return err
232+
}
233+
234+
// Register ext-proc server.
235+
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
236+
setupLog.Error(err, "Failed to register ext-proc gRPC server")
237+
return err
238+
}
239+
240+
// Start the manager. This blocks until a signal is received.
241+
setupLog.Info("Manager starting")
242+
if err := mgr.Start(ctx); err != nil {
243+
setupLog.Error(err, "Error starting manager")
244+
return err
245+
}
246+
setupLog.Info("Manager terminated")
247+
return nil
248+
}
249+
250+
// registerInTreePlugins registers the factory functions of all known BBR plugins
251+
func (r *Runner) registerInTreePlugins() {
252+
framework.Register(bodyfieldtoheader.BodyFieldToHeaderPluginType, bodyfieldtoheader.BodyFieldToHeaderPluginFactory)
253+
framework.Register(basemodelextractor.BaseModelToHeaderPluginType, basemodelextractor.BaseModelToHeaderPluginFactory)
254+
}
255+
256+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
257+
func registerHealthServer(mgr manager.Manager, port int) error {
258+
srv := grpc.NewServer()
259+
healthPb.RegisterHealthServer(srv, &healthServer{})
260+
if err := mgr.Add(
261+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
262+
setupLog.Error(err, "Failed to register health server")
263+
return err
264+
}
265+
return nil
266+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Patterns to ignore when building packages.
2+
# This supports shell glob matching, relative path matching, and
3+
# negation (prefixed with !). Only one pattern per line.
4+
.DS_Store
5+
# Common VCS dirs
6+
.git/
7+
.gitignore
8+
.bzr/
9+
.bzrignore
10+
.hg/
11+
.hgignore
12+
.svn/
13+
# Common backup files
14+
*.swp
15+
*.bak
16+
*.tmp
17+
*.orig
18+
*~
19+
# Various IDEs
20+
.project
21+
.idea/
22+
*.tmproj
23+
.vscode/
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
apiVersion: v2
2+
name: body-based-routing
3+
description: A Helm chart for the body-based routing extension
4+
5+
type: application
6+
7+
version: 0.1.0
8+
9+
appVersion: "0.2.0"

0 commit comments

Comments
 (0)