Skip to content

Commit 4b16fab

Browse files
committed
Move datasource/extractor validations to datalayer runtime.
- Extractors declare their expected input type (unchanged) - DataSource declare their output type and expected Extractor type (new) - Compatibility validation utility functions and tests in plugins/datalayer/source (new, moved) - Datalayer WithConfig() validates compatbility before calling AddExtractor (new) - DataSource can optionaly implement validating interface method to perform specific compatibility checks (new) - Current DataSource checks on AddExtractor and their tests are not changed to minimize the code changes. Signed-off-by: Etai Lev Ran <elevran@gmail.com>
1 parent 0248172 commit 4b16fab

File tree

7 files changed

+264
-5
lines changed

7 files changed

+264
-5
lines changed

pkg/epp/datalayer/config.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package datalayer
1818

1919
import (
2020
"fmt"
21+
"reflect"
2122

2223
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/source"
2325
)
2426

2527
// Config defines the configuration of EPP data layer, as the set of DataSources
@@ -54,6 +56,24 @@ func WithConfig(cfg *Config, disallowedExtractorType string) error {
5456
return fmt.Errorf("disallowed Extractor %s is configured for source %s",
5557
extractor.TypedName().String(), srcCfg.Plugin.TypedName().String())
5658
}
59+
// Validate extractor input type is compatible with datasource output type
60+
if err := source.ValidateInputTypeCompatible(srcCfg.Plugin.OutputType(), extractor.ExpectedInputType()); err != nil {
61+
return fmt.Errorf("extractor %s input type incompatible with datasource %s: %w",
62+
extractor.TypedName(), srcCfg.Plugin.TypedName(), err)
63+
}
64+
// Validate extractor type is compatible with datasource expected extractor type
65+
extractorType := reflect.TypeOf(extractor)
66+
if err := source.ValidateExtractorCompatible(extractorType, srcCfg.Plugin.ExtractorType()); err != nil {
67+
return fmt.Errorf("extractor %s type incompatible with datasource %s: %w",
68+
extractor.TypedName(), srcCfg.Plugin.TypedName(), err)
69+
}
70+
// Allow datasource to perform additional custom validation
71+
if validator, ok := srcCfg.Plugin.(fwkdl.ValidatingDataSource); ok {
72+
if err := validator.ValidateExtractor(extractor); err != nil {
73+
return fmt.Errorf("extractor %s failed custom validation for datasource %s: %w",
74+
extractor.TypedName(), srcCfg.Plugin.TypedName(), err)
75+
}
76+
}
5777
if err := srcCfg.Plugin.AddExtractor(extractor); err != nil {
5878
return fmt.Errorf("failed to add Extractor %s to DataSource %s: %w", extractor.TypedName(),
5979
srcCfg.Plugin.TypedName(), err)

pkg/epp/datalayer/fake.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package datalayer
1818

1919
import (
2020
"context"
21+
"reflect"
2122
"sync/atomic"
2223

2324
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -46,6 +47,15 @@ func (fds *FakeDataSource) TypedName() plugin.TypedName {
4647
Name: fakeSource,
4748
}
4849
}
50+
51+
func (fds *FakeDataSource) OutputType() reflect.Type {
52+
return reflect.TypeOf(fwkdl.Metrics{})
53+
}
54+
55+
func (fds *FakeDataSource) ExtractorType() reflect.Type {
56+
return reflect.TypeOf((*fwkdl.Extractor)(nil)).Elem()
57+
}
58+
4959
func (fds *FakeDataSource) Extractors() []string { return []string{} }
5060
func (fds *FakeDataSource) AddExtractor(_ fwkdl.Extractor) error { return nil }
5161

@@ -69,6 +79,14 @@ func (m *FakeNotificationSource) TypedName() plugin.TypedName {
6979
return m.typedName
7080
}
7181

82+
func (m *FakeNotificationSource) OutputType() reflect.Type {
83+
return reflect.TypeOf(fwkdl.NotificationEvent{})
84+
}
85+
86+
func (m *FakeNotificationSource) ExtractorType() reflect.Type {
87+
return reflect.TypeOf((*fwkdl.NotificationExtractor)(nil)).Elem()
88+
}
89+
7290
func (m *FakeNotificationSource) GVK() schema.GroupVersionKind {
7391
return m.gvk
7492
}

pkg/epp/framework/interface/datalayer/plugin.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ type DataSource interface {
2828
plugin.Plugin
2929
// Extractors returns a list of registered Extractor names.
3030
Extractors() []string
31+
// OutputType returns the type of data this DataSource produces.
32+
// Used for validating extractor compatibility.
33+
OutputType() reflect.Type
34+
// ExtractorType returns the type of Extractor this DataSource expects.
35+
// For poll-based sources, this is the base Extractor interface.
36+
// For notification sources, this is the NotificationExtractor interface.
37+
ExtractorType() reflect.Type
3138
// AddExtractor adds an extractor to the data source. Multiple
3239
// Extractors can be registered.
3340
// The extractor will be called whenever the DataSource might
@@ -50,3 +57,11 @@ type Extractor interface {
5057
// attribute, stored on the given endpoint.
5158
Extract(ctx context.Context, data any, ep Endpoint) error
5259
}
60+
61+
// ValidatingDataSource is an optional interface that DataSources can implement
62+
// to perform additional custom validation when adding extractors.
63+
type ValidatingDataSource interface {
64+
// ValidateExtractor allows the DataSource to perform additional validation
65+
// beyond the standard type compatibility checks. Return an error if validation fails.
66+
ValidateExtractor(extractor Extractor) error
67+
}

pkg/epp/framework/plugins/datalayer/source/http/datasource.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ import (
2626
"reflect"
2727
"sync"
2828

29-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
3029
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
3130
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/source"
3232
)
3333

3434
// HTTPDataSource is a data source that receives its data using HTTP client.
@@ -77,6 +77,16 @@ func (dataSrc *HTTPDataSource) TypedName() fwkplugin.TypedName {
7777
return dataSrc.typedName
7878
}
7979

80+
// OutputType returns the type of data this DataSource produces.
81+
func (dataSrc *HTTPDataSource) OutputType() reflect.Type {
82+
return dataSrc.outputType
83+
}
84+
85+
// ExtractorType returns the type of Extractor this DataSource expects.
86+
func (dataSrc *HTTPDataSource) ExtractorType() reflect.Type {
87+
return source.ExtractorType
88+
}
89+
8090
// Extractors returns a list of registered Extractor names.
8191
func (dataSrc *HTTPDataSource) Extractors() []string {
8292
extractors := []string{}
@@ -92,7 +102,7 @@ func (dataSrc *HTTPDataSource) Extractors() []string {
92102
// AddExtractor adds an extractor to the data source, validating it can process
93103
// the data source output type.
94104
func (dataSrc *HTTPDataSource) AddExtractor(extractor fwkdl.Extractor) error {
95-
if err := datalayer.ValidateExtractorType(dataSrc.outputType, extractor.ExpectedInputType()); err != nil {
105+
if err := source.ValidateInputTypeCompatible(dataSrc.OutputType(), extractor.ExpectedInputType()); err != nil {
96106
return err
97107
}
98108
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.TypedName().Name, extractor); loaded {

pkg/epp/framework/plugins/datalayer/source/notifications/k8s_datasource.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"reflect"
2324
"sync"
2425

2526
"k8s.io/apimachinery/pkg/runtime/schema"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728

2829
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
2930
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/source"
3032
)
3133

3234
var (
@@ -61,6 +63,16 @@ func (s *K8sNotificationSource) GVK() schema.GroupVersionKind {
6163
return s.gvk
6264
}
6365

66+
// OutputType returns the type of data this DataSource produces (NotificationEvent).
67+
func (s *K8sNotificationSource) OutputType() reflect.Type {
68+
return source.NotificationEventType
69+
}
70+
71+
// ExtractorType returns the type of Extractor this DataSource expects (NotificationExtractor).
72+
func (s *K8sNotificationSource) ExtractorType() reflect.Type {
73+
return source.NotificationExtractorType
74+
}
75+
6476
// Extractors returns names of registered extractors.
6577
func (s *K8sNotificationSource) Extractors() []string {
6678
var names []string
@@ -79,10 +91,12 @@ func (s *K8sNotificationSource) AddExtractor(ext fwkdl.Extractor) error {
7991
if ext == nil {
8092
return errors.New("cannot add nil extractor")
8193
}
82-
notifyExt, ok := ext.(fwkdl.NotificationExtractor)
83-
if !ok {
84-
return fmt.Errorf("extractor %s does not implement NotificationExtractor", ext.TypedName())
94+
extractorType := reflect.TypeOf(ext)
95+
expectedType := reflect.TypeOf((*fwkdl.NotificationExtractor)(nil)).Elem()
96+
if err := source.ValidateExtractorCompatible(extractorType, expectedType); err != nil {
97+
return err
8598
}
99+
notifyExt := ext.(fwkdl.NotificationExtractor)
86100
if _, loaded := s.extractors.LoadOrStore(notifyExt.TypedName().Name, notifyExt); loaded {
87101
return fmt.Errorf("duplicate extractor %s on notification source %s",
88102
notifyExt.TypedName(), s.TypedName())
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package source
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
"reflect"
23+
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
25+
)
26+
27+
var (
28+
ExtractorType = reflect.TypeOf((*datalayer.Extractor)(nil)).Elem()
29+
NotificationExtractorType = reflect.TypeOf((*datalayer.NotificationExtractor)(nil)).Elem()
30+
NotificationEventType = reflect.TypeOf(datalayer.NotificationEvent{})
31+
)
32+
33+
// ValidateInputTypeCompatible checks if the extractor's expected input type is
34+
// compatible with the DataSource's output type.
35+
func ValidateInputTypeCompatible(dataSourceOutput, extractorInput reflect.Type) error {
36+
if dataSourceOutput == nil || extractorInput == nil {
37+
return errors.New("data source output type or extractor input type can't be nil")
38+
}
39+
if dataSourceOutput == extractorInput ||
40+
(extractorInput.Kind() == reflect.Interface && extractorInput.NumMethod() == 0) ||
41+
(extractorInput.Kind() == reflect.Interface && dataSourceOutput.Implements(extractorInput)) {
42+
return nil
43+
}
44+
return fmt.Errorf("extractor input type %v is not compatible with data source output type %v",
45+
extractorInput, dataSourceOutput)
46+
}
47+
48+
// ValidateExtractorCompatible checks if the extractor type is compatible with the
49+
// expected extractor interface type. For notification DataSources, this validates
50+
// that the extractor implements NotificationExtractor.
51+
func ValidateExtractorCompatible(extractorType reflect.Type, expectedInterfaceType reflect.Type) error {
52+
if extractorType == nil || expectedInterfaceType == nil {
53+
return errors.New("extractor type or expected interface type can't be nil")
54+
}
55+
if expectedInterfaceType.Kind() != reflect.Interface {
56+
return fmt.Errorf("expected type must be an interface, got %v", expectedInterfaceType.Kind())
57+
}
58+
if !extractorType.Implements(expectedInterfaceType) {
59+
return fmt.Errorf("extractor type %v does not implement interface %v",
60+
extractorType, expectedInterfaceType)
61+
}
62+
return nil
63+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package source
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestValidateInputTypeCompatible(t *testing.T) {
27+
type rawStruct struct{}
28+
type iface interface{ Foo() }
29+
30+
tests := []struct {
31+
name string
32+
output reflect.Type
33+
input reflect.Type
34+
valid bool
35+
}{
36+
{"exact match", typeOf(rawStruct{}), typeOf(rawStruct{}), true},
37+
{"input is interface{}", typeOf(rawStruct{}), typeOf((*any)(nil)), true},
38+
{"nil types are not allowed", typeOf(rawStruct{}), typeOf(nil), false},
39+
{"output does not implement input", typeOf(rawStruct{}), typeOf((*iface)(nil)), false},
40+
}
41+
42+
for _, tt := range tests {
43+
err := ValidateInputTypeCompatible(tt.output, tt.input)
44+
if tt.valid {
45+
assert.NoError(t, err, "%s: expected valid extractor type", tt.name)
46+
} else {
47+
assert.Error(t, err, "%s: expected invalid extractor type", tt.name)
48+
}
49+
}
50+
}
51+
52+
func TestValidateExtractorCompatible(t *testing.T) {
53+
type notExtractor struct{}
54+
55+
tests := []struct {
56+
name string
57+
extType reflect.Type
58+
expectedType reflect.Type
59+
valid bool
60+
errContains string
61+
}{
62+
{
63+
name: "nil extractor type",
64+
extType: nil,
65+
expectedType: ExtractorType,
66+
valid: false,
67+
errContains: "can't be nil",
68+
},
69+
{
70+
name: "nil expected type",
71+
extType: reflect.TypeOf(&notExtractor{}),
72+
expectedType: nil,
73+
valid: false,
74+
errContains: "can't be nil",
75+
},
76+
{
77+
name: "expected type not interface",
78+
extType: reflect.TypeOf(&notExtractor{}),
79+
expectedType: reflect.TypeOf("string"),
80+
valid: false,
81+
errContains: "must be an interface",
82+
},
83+
{
84+
name: "does not implement interface",
85+
extType: reflect.TypeOf(&notExtractor{}),
86+
expectedType: ExtractorType,
87+
valid: false,
88+
errContains: "does not implement interface",
89+
},
90+
}
91+
92+
for _, tt := range tests {
93+
err := ValidateExtractorCompatible(tt.extType, tt.expectedType)
94+
if tt.valid {
95+
assert.NoError(t, err, "%s: expected valid", tt.name)
96+
} else {
97+
assert.Error(t, err, "%s: expected error", tt.name)
98+
if tt.errContains != "" {
99+
assert.Contains(t, err.Error(), tt.errContains, "%s: error should contain", tt.name)
100+
}
101+
}
102+
}
103+
}
104+
105+
func TestTypeConstants(t *testing.T) {
106+
assert.True(t, ExtractorType.Kind() == reflect.Interface, "ExtractorType should be an interface")
107+
assert.True(t, NotificationExtractorType.Kind() == reflect.Interface, "NotificationExtractorType should be an interface")
108+
}
109+
110+
func typeOf(v any) reflect.Type {
111+
t := reflect.TypeOf(v)
112+
if t == nil {
113+
return nil
114+
}
115+
if t.Kind() == reflect.Ptr {
116+
return t.Elem()
117+
}
118+
return t
119+
}

0 commit comments

Comments
 (0)