Skip to content

Commit c70ad34

Browse files
authored
Add agentGateway injection options (#12999)
Signed-off-by: David Jumani <[email protected]>
1 parent 8a06845 commit c70ad34

File tree

8 files changed

+118
-47
lines changed

8 files changed

+118
-47
lines changed

pkg/agentgateway/translator/gateway_collection.go

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -194,31 +194,62 @@ func (g ParentInfo) Equals(other ParentInfo) bool {
194194
slices.Equal(g.Hostnames, other.Hostnames)
195195
}
196196

197+
type GatewayTransformationFunction func(GatewayCollectionConfig) func(ctx krt.HandlerContext, obj *gwv1.Gateway) (*gwv1.GatewayStatus, []*GatewayListener)
198+
199+
type GatewayCollectionConfigOption func(o *GatewayCollectionConfig)
200+
201+
func WithGatewayTransformationFunc(f GatewayTransformationFunction) GatewayCollectionConfigOption {
202+
return func(o *GatewayCollectionConfig) {
203+
o.transformationFunc = f
204+
}
205+
}
206+
207+
type GatewayCollectionConfig struct {
208+
ControllerName string
209+
Gateways krt.Collection[*gwv1.Gateway]
210+
ListenerSets krt.Collection[ListenerSet]
211+
GatewayClasses krt.Collection[GatewayClass]
212+
Namespaces krt.Collection[*corev1.Namespace]
213+
Grants ReferenceGrants
214+
Secrets krt.Collection[*corev1.Secret]
215+
ConfigMaps krt.Collection[*corev1.ConfigMap]
216+
KrtOpts krtutil.KrtOptions
217+
218+
listenerIndex krt.Index[types.NamespacedName, ListenerSet]
219+
transformationFunc GatewayTransformationFunction
220+
}
221+
197222
// GatewayCollection returns a collection of the internal representations GatewayListeners for the given gateway.
198223
func GatewayCollection(
199-
controllerName string,
200-
gateways krt.Collection[*gwv1.Gateway],
201-
listenerSets krt.Collection[ListenerSet],
202-
gatewayClasses krt.Collection[GatewayClass],
203-
namespaces krt.Collection[*corev1.Namespace],
204-
grants ReferenceGrants,
205-
secrets krt.Collection[*corev1.Secret],
206-
configMaps krt.Collection[*corev1.ConfigMap],
207-
krtopts krtutil.KrtOptions,
224+
cfg GatewayCollectionConfig,
225+
opts ...GatewayCollectionConfigOption,
208226
) (
209227
krt.StatusCollection[*gwv1.Gateway, gwv1.GatewayStatus],
210228
krt.Collection[*GatewayListener],
211229
) {
212-
listenerIndex := krt.NewIndex(listenerSets, "gatewayParent", func(o ListenerSet) []types.NamespacedName {
230+
for _, fn := range opts {
231+
fn(&cfg)
232+
}
233+
cfg.listenerIndex = krt.NewIndex(cfg.ListenerSets, "gatewayParent", func(o ListenerSet) []types.NamespacedName {
213234
return []types.NamespacedName{o.GatewayParent}
214235
})
215-
statusCol, gw := krt.NewStatusManyCollection(gateways, func(ctx krt.HandlerContext, obj *gwv1.Gateway) (*gwv1.GatewayStatus, []*GatewayListener) {
216-
class := krt.FetchOne(ctx, gatewayClasses, krt.FilterKey(string(obj.Spec.GatewayClassName)))
236+
if cfg.transformationFunc == nil {
237+
cfg.transformationFunc = GatewaysTransformationFunc
238+
}
239+
240+
statusCol, gw := krt.NewStatusManyCollection(cfg.Gateways, cfg.transformationFunc(cfg), cfg.KrtOpts.ToOptions("KubernetesGateway")...)
241+
242+
return statusCol, gw
243+
}
244+
245+
func GatewaysTransformationFunc(cfg GatewayCollectionConfig) func(ctx krt.HandlerContext, obj *gwv1.Gateway) (*gwv1.GatewayStatus, []*GatewayListener) {
246+
return func(ctx krt.HandlerContext, obj *gwv1.Gateway) (*gwv1.GatewayStatus, []*GatewayListener) {
247+
class := krt.FetchOne(ctx, cfg.GatewayClasses, krt.FilterKey(string(obj.Spec.GatewayClassName)))
217248
if class == nil {
218249
logger.Debug("gateway class not found, skipping", "gw_name", obj.GetName(), "gatewayClassName", obj.Spec.GatewayClassName)
219250
return nil, nil
220251
}
221-
if string(class.Controller) != controllerName {
252+
if string(class.Controller) != cfg.ControllerName {
222253
logger.Debug("skipping gateway not managed by our controller", "gw_name", obj.GetName(), "gatewayClassName", obj.Spec.GatewayClassName, "controllerName", class.Controller)
223254
return nil, nil // ignore gateways not managed by our controller
224255
}
@@ -249,7 +280,7 @@ func GatewayCollection(
249280
// Attached Routes count starts at 0 and gets updated later in the status syncer
250281
// when the real count is available after route processing
251282

252-
hostnames, tlsInfo, updatedStatus, programmed := BuildListener(ctx, secrets, configMaps, grants, namespaces, obj, status.Listeners, kgw, l, i, nil)
283+
hostnames, tlsInfo, updatedStatus, programmed := BuildListener(ctx, cfg.Secrets, cfg.ConfigMaps, cfg.Grants, cfg.Namespaces, obj, status.Listeners, kgw, l, i, nil)
253284
status.Listeners = updatedStatus
254285

255286
lstatus := status.Listeners[i]
@@ -303,7 +334,7 @@ func GatewayCollection(
303334
})
304335
result = append(result, res)
305336
}
306-
listenersFromSets := krt.Fetch(ctx, listenerSets, krt.FilterIndex(listenerIndex, config.NamespacedName(obj)))
337+
listenersFromSets := krt.Fetch(ctx, cfg.ListenerSets, krt.FilterIndex(cfg.listenerIndex, config.NamespacedName(obj)))
307338
for _, ls := range listenersFromSets {
308339
result = append(result, &GatewayListener{
309340
Name: ls.Name,
@@ -320,9 +351,7 @@ func GatewayCollection(
320351
}
321352
gws := rm.BuildGWStatus(context.Background(), *obj, nil)
322353
return gws, result
323-
}, krtopts.ToOptions("KubernetesGateway")...)
324-
325-
return statusCol, gw
354+
}
326355
}
327356

328357
type ListenerSet struct {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package agentgatewaysyncer
2+
3+
import "github.com/kgateway-dev/kgateway/v2/pkg/agentgateway/translator"
4+
5+
type agentgatewaySyncerConfig struct {
6+
GatewayTransformationFunc translator.GatewayTransformationFunction
7+
}
8+
9+
type AgentgatewaySyncerOption func(*agentgatewaySyncerConfig)
10+
11+
func processAgentgatewaySyncerOptions(opts ...AgentgatewaySyncerOption) *agentgatewaySyncerConfig {
12+
cfg := &agentgatewaySyncerConfig{}
13+
for _, fn := range opts {
14+
fn(cfg)
15+
}
16+
return cfg
17+
}
18+
19+
func WithGatewayTransformationFunc(f translator.GatewayTransformationFunction) AgentgatewaySyncerOption {
20+
return func(o *agentgatewaySyncerConfig) {
21+
o.GatewayTransformationFunc = f
22+
}
23+
}

pkg/kgateway/agentgatewaysyncer/syncer.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type Syncer struct {
6868
Registrations []krtxds.Registration
6969

7070
Outputs OutputCollections
71+
72+
gatewayCollectionOptions []translator.GatewayCollectionConfigOption
7173
}
7274

7375
func NewAgwSyncer(
@@ -79,7 +81,9 @@ func NewAgwSyncer(
7981
additionalGatewayClasses map[string]*deployer.GatewayClassInfo,
8082
krtopts krtutil.KrtOptions,
8183
extraGVKs []schema.GroupVersionKind,
84+
opts ...AgentgatewaySyncerOption,
8285
) *Syncer {
86+
cfg := processAgentgatewaySyncerOptions(opts...)
8387
syncer := &Syncer{
8488
agwCollections: agwCollections,
8589
controllerName: controllerName,
@@ -89,6 +93,8 @@ func NewAgwSyncer(
8993
client: client,
9094
statusCollections: status.NewStatusCollections(extraGVKs),
9195
NackPublisher: nack.NewPublisher(client),
96+
gatewayCollectionOptions: []translator.GatewayCollectionConfigOption{
97+
translator.WithGatewayTransformationFunc(cfg.GatewayTransformationFunc)},
9298
}
9399
logger.Debug("init agentgateway Syncer", "controllername", controllerName)
94100

@@ -174,17 +180,17 @@ func (s *Syncer) buildGatewayCollection(
174180
krt.StatusCollection[*gwv1.Gateway, gwv1.GatewayStatus],
175181
krt.Collection[*translator.GatewayListener],
176182
) {
177-
return translator.GatewayCollection(
178-
s.controllerName,
179-
s.agwCollections.Gateways,
180-
listenerSets,
181-
gatewayClasses,
182-
s.agwCollections.Namespaces,
183-
refGrants,
184-
s.agwCollections.Secrets,
185-
s.agwCollections.ConfigMaps,
186-
krtopts,
187-
)
183+
return translator.GatewayCollection(translator.GatewayCollectionConfig{
184+
ControllerName: s.controllerName,
185+
Gateways: s.agwCollections.Gateways,
186+
ListenerSets: listenerSets,
187+
GatewayClasses: gatewayClasses,
188+
Namespaces: s.agwCollections.Namespaces,
189+
Grants: refGrants,
190+
Secrets: s.agwCollections.Secrets,
191+
ConfigMaps: s.agwCollections.ConfigMaps,
192+
KrtOpts: krtopts,
193+
}, s.gatewayCollectionOptions...)
188194
}
189195

190196
func (s *Syncer) buildListenerSetCollection(

pkg/kgateway/controller/start.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
4040
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/krtutil"
4141
kgtwschemes "github.com/kgateway-dev/kgateway/v2/pkg/schemes"
42-
"github.com/kgateway-dev/kgateway/v2/pkg/syncer"
4342
"github.com/kgateway-dev/kgateway/v2/pkg/utils/kubeutils"
4443
"github.com/kgateway-dev/kgateway/v2/pkg/utils/namespaces"
4544
"github.com/kgateway-dev/kgateway/v2/pkg/validator"
@@ -101,7 +100,10 @@ type StartConfig struct {
101100
GatewayControllerExtension sdk.GatewayControllerExtension
102101

103102
// StatusSyncerOptions is the list of options to be passed when creating the StatusSyncer
104-
StatusSyncerOptions []syncer.StatusSyncerOption
103+
StatusSyncerOptions []proxy_syncer.StatusSyncerOption
104+
105+
// AgentgatewaySyncerOptions is the list of options to be passed when creating the AgentGatewaySyncer
106+
AgentgatewaySyncerOptions []agentgatewaysyncer.AgentgatewaySyncerOption
105107
}
106108

107109
// Start runs the controllers responsible for processing the K8s Gateway API objects
@@ -227,6 +229,7 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
227229
cfg.AdditionalGatewayClasses,
228230
cfg.KrtOptions,
229231
gvks,
232+
cfg.AgentgatewaySyncerOptions...,
230233
)
231234

232235
if err := cfg.Manager.Add(agwSyncer); err != nil {
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
package syncer
1+
package proxy_syncer
22

33
import (
44
"context"
55

66
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
77
)
88

9-
type StatusSyncerConfig struct {
9+
type statusSyncerConfig struct {
1010
CustomStatusSync func(ctx context.Context, rm reports.ReportMap)
1111
}
1212

13-
type StatusSyncerOption func(*StatusSyncerConfig)
13+
type StatusSyncerOption func(*statusSyncerConfig)
1414

15-
func ProcessStatusSyncerOptions(opts ...StatusSyncerOption) *StatusSyncerConfig {
16-
cfg := &StatusSyncerConfig{}
15+
func processStatusSyncerOptions(opts ...StatusSyncerOption) *statusSyncerConfig {
16+
cfg := &statusSyncerConfig{}
1717
for _, fn := range opts {
1818
fn(cfg)
1919
}
2020
return cfg
2121
}
2222

2323
func WithCustomStatusSync(customSync func(ctx context.Context, rm reports.ReportMap)) StatusSyncerOption {
24-
return func(cfg *StatusSyncerConfig) {
24+
return func(cfg *statusSyncerConfig) {
2525
cfg.CustomStatusSync = customSync
2626
}
2727
}

pkg/kgateway/proxy_syncer/status_syncer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
plug "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk"
3131
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
3232
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
33-
"github.com/kgateway-dev/kgateway/v2/pkg/syncer"
3433
"github.com/kgateway-dev/kgateway/v2/pkg/utils/stopwatch"
3534
)
3635

@@ -61,9 +60,9 @@ func NewStatusSyncer(
6160
reportQueue utils.AsyncQueue[reports.ReportMap],
6261
backendPolicyReportQueue utils.AsyncQueue[reports.ReportMap],
6362
cacheSyncs []cache.InformerSynced,
64-
opts ...syncer.StatusSyncerOption,
63+
opts ...StatusSyncerOption,
6564
) *StatusSyncer {
66-
cfg := syncer.ProcessStatusSyncerOptions(opts...)
65+
cfg := processStatusSyncerOptions(opts...)
6766
return &StatusSyncer{
6867
mgr: mgr,
6968
plugins: plugins,

pkg/kgateway/setup/setup.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/admin"
3333
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/agentgatewaysyncer"
3434
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/controller"
35+
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/proxy_syncer"
3536
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/wellknown"
3637
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/xds"
3738
"github.com/kgateway-dev/kgateway/v2/pkg/krtcollections"
@@ -41,7 +42,6 @@ import (
4142
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
4243
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/krtutil"
4344
"github.com/kgateway-dev/kgateway/v2/pkg/schemes"
44-
"github.com/kgateway-dev/kgateway/v2/pkg/syncer"
4545
"github.com/kgateway-dev/kgateway/v2/pkg/utils/envutils"
4646
"github.com/kgateway-dev/kgateway/v2/pkg/utils/namespaces"
4747
"github.com/kgateway-dev/kgateway/v2/pkg/validator"
@@ -204,12 +204,18 @@ func WithCommonCollectionsOptions(commonCollectionsOptions []collections.Option)
204204
}
205205
}
206206

207-
func WithStatusSyncerOptions(statusSyncerOptions []syncer.StatusSyncerOption) func(*setup) {
207+
func WithStatusSyncerOptions(statusSyncerOptions []proxy_syncer.StatusSyncerOption) func(*setup) {
208208
return func(s *setup) {
209209
s.statusSyncerOptions = statusSyncerOptions
210210
}
211211
}
212212

213+
func WithAgentgatewaySyncerOptions(agentgatewaySyncerOptions []agentgatewaysyncer.AgentgatewaySyncerOption) func(*setup) {
214+
return func(s *setup) {
215+
s.agentgatewaySyncerOptions = agentgatewaySyncerOptions
216+
}
217+
}
218+
213219
type setup struct {
214220
apiClient apiclient.Client
215221
extraInformerCacheSyncHandlers []cache.InformerSynced
@@ -238,8 +244,9 @@ type setup struct {
238244
validator validator.Validator
239245
extraAgwPolicyStatusHandlers map[schema.GroupVersionKind]agwplugins.AgwPolicyStatusSyncHandler
240246

241-
commonCollectionsOptions []collections.Option
242-
statusSyncerOptions []syncer.StatusSyncerOption
247+
commonCollectionsOptions []collections.Option
248+
statusSyncerOptions []proxy_syncer.StatusSyncerOption
249+
agentgatewaySyncerOptions []agentgatewaysyncer.AgentgatewaySyncerOption
243250
}
244251

245252
var _ Server = &setup{}
@@ -508,6 +515,7 @@ func (s *setup) buildKgatewayWithConfig(
508515
ExtraAgwPolicyStatusHandlers: s.extraAgwPolicyStatusHandlers,
509516
GatewayControllerExtension: s.gatewayControllerExtension,
510517
StatusSyncerOptions: s.statusSyncerOptions,
518+
AgentgatewaySyncerOptions: s.agentgatewaySyncerOptions,
511519
})
512520
if err != nil {
513521
slog.Error("failed initializing controller: ", "error", err)

pkg/setup/setup.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ import (
1414
agwplugins "github.com/kgateway-dev/kgateway/v2/pkg/agentgateway/plugins"
1515
"github.com/kgateway-dev/kgateway/v2/pkg/apiclient"
1616
"github.com/kgateway-dev/kgateway/v2/pkg/deployer"
17+
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/agentgatewaysyncer"
18+
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/proxy_syncer"
1719
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/setup"
1820
sdk "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk"
1921
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
20-
"github.com/kgateway-dev/kgateway/v2/pkg/syncer"
2122
"github.com/kgateway-dev/kgateway/v2/pkg/validator"
2223
)
2324

@@ -50,8 +51,9 @@ type Options struct {
5051
// ExtraAgwPolicyStatusHandlers maps policy kinds to their status sync handlers for AgentGateway
5152
ExtraAgwPolicyStatusHandlers map[schema.GroupVersionKind]agwplugins.AgwPolicyStatusSyncHandler
5253

53-
CommonCollectionsOptions []collections.Option
54-
StatusSyncerOptions []syncer.StatusSyncerOption
54+
CommonCollectionsOptions []collections.Option
55+
StatusSyncerOptions []proxy_syncer.StatusSyncerOption
56+
AgentGatewaySyncerOptions []agentgatewaysyncer.AgentgatewaySyncerOption
5557
}
5658

5759
func New(opts Options) (setup.Server, error) {
@@ -78,5 +80,6 @@ func New(opts Options) (setup.Server, error) {
7880
setup.WithExtraAgwPolicyStatusHandlers(opts.ExtraAgwPolicyStatusHandlers),
7981
setup.WithCommonCollectionsOptions(opts.CommonCollectionsOptions),
8082
setup.WithStatusSyncerOptions(opts.StatusSyncerOptions),
83+
setup.WithAgentgatewaySyncerOptions(opts.AgentGatewaySyncerOptions),
8184
)
8285
}

0 commit comments

Comments
 (0)