Skip to content

Commit b01b4e6

Browse files
[release-1.19] Fix MT Channel based broker when OIDC is enabled (#8777)
fix: extract channel service suffix into constant When OIDC is enabled and https is disabled (i.e. http requests are used) the event receiver is using the host name to determine the name of the channel and fails to do so. An example host name is `broker-kne-trigger-kn-channel.namespace-1.svc.cluster.local`. The channel name here is `broker-kne-trigger` without the suffix `-kn-channel` which was hardcoded in the logic which was creating the channel owned k8 service. The constant `-kn-channel` is now extracted into a common constant in the `channel` package. A conditional check in `ParseChannelFromHost` now checks for the suffix in the host name and removes it if needed. An additional test case was added and existing tests were updated. Issue #8705. Signed-off-by: Stanislav Jakuschevskij <[email protected]> Co-authored-by: Stanislav Jakuschevskij <[email protected]>
1 parent eac3a0f commit b01b4e6

File tree

9 files changed

+62
-39
lines changed

9 files changed

+62
-39
lines changed

pkg/channel/event_receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
306306
return
307307
}
308308

309-
/// Here we do the OIDC audience verification
309+
// Here we do the OIDC audience verification
310310
features := feature.FromContext(ctx)
311311
if features.IsOIDCAuthentication() {
312312
r.logger.Debug("OIDC authentication is enabled")

pkg/channel/event_receiver_test.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
8888
},
8989
"path based channel reference": {
9090
path: "/new-namespace/new-channel",
91-
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
91+
host: host(),
9292
receiverFunc: func(ctx context.Context, r ChannelReference, m event.Event, additionalHeaders nethttp.Header) error {
9393
if r.Namespace != "new-namespace" || r.Name != "new-channel" {
9494
return fmt.Errorf("bad channel reference %v", r)
@@ -107,9 +107,9 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
107107
"x-requEst-id": {"1234"},
108108
"knatIve-will-pass-through": {"true", "always"},
109109
},
110-
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
110+
host: host(),
111111
receiverFunc: func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
112-
if r.Namespace != "test-namespace" || r.Name != "test-name" {
112+
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
113113
return fmt.Errorf("test receiver func -- bad reference: %v", r)
114114
}
115115

@@ -138,7 +138,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
138138
},
139139
"OPTIONS okay": {
140140
method: nethttp.MethodOptions,
141-
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
141+
host: host(),
142142
expected: nethttp.StatusOK,
143143
responseValidator: func(res httptest.ResponseRecorder) error {
144144
expectedHeaders := nethttp.Header{
@@ -163,7 +163,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
163163
tc.path = "/"
164164
}
165165
if tc.host == "" {
166-
tc.host = "test-channel.test-namespace.svc." + network.GetClusterDomainName()
166+
tc.host = host()
167167
}
168168

169169
f := tc.receiverFunc
@@ -238,7 +238,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
238238
done := make(chan struct{}, 1)
239239

240240
receiverFunc := func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
241-
if r.Namespace != "test-namespace" || r.Name != "test-name" {
241+
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
242242
return fmt.Errorf("test receiver func -- bad reference: %v", r)
243243
}
244244

@@ -253,8 +253,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
253253

254254
// Default the common things.
255255
method := nethttp.MethodPost
256-
host := "test-name.test-namespace.svc." + network.GetClusterDomainName()
257-
258256
logger, _ := zap.NewDevelopment()
259257

260258
r, err := NewEventReceiver(receiverFunc, logger)
@@ -276,7 +274,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
276274
),
277275
))
278276
require.NoError(t, err)
279-
p.RequestTemplate.Host = host
277+
p.RequestTemplate.Host = host()
280278

281279
c, err := cloudevents.NewClient(p)
282280
require.NoError(t, err)
@@ -291,8 +289,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
291289
}
292290

293291
func TestEventReceiver_WrongRequest(t *testing.T) {
294-
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"
295-
296292
f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
297293
return errors.New("test induced receiver function error")
298294
}
@@ -301,7 +297,7 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
301297
t.Fatalf("Error creating new event receiver. Error:%s", err)
302298
}
303299

304-
req := httptest.NewRequest(nethttp.MethodPost, host, bytes.NewReader([]byte("{}")))
300+
req := httptest.NewRequest(nethttp.MethodPost, "http://"+host()+"/", bytes.NewReader([]byte("{}")))
305301
req.Header.Set("content-type", "application/json")
306302

307303
res := httptest.ResponseRecorder{}
@@ -313,8 +309,6 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
313309
}
314310

315311
func TestEventReceiver_UnknownHost(t *testing.T) {
316-
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"
317-
318312
f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
319313
return errors.New("test induced receiver function error")
320314
}
@@ -335,7 +329,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
335329
}
336330

337331
req := httptest.NewRequest("POST", "http://localhost:8080/", nil)
338-
req.Host = host
332+
req.Host = "http://" + host() + "/"
339333

340334
err = http.WriteRequest(context.TODO(), binding.ToMessage(&event), req)
341335
if err != nil {
@@ -349,3 +343,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
349343
t.Fatal("Unexpected status code. Expected 404. Actual", res.Code)
350344
}
351345
}
346+
347+
func host() string {
348+
return fmt.Sprintf("test-channel%s.test-namespace.svc.%s", K8ServiceNameSuffix, network.GetClusterDomainName())
349+
}

pkg/channel/references.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ import (
2121
"strings"
2222
)
2323

24+
const (
25+
// K8ServiceNameSuffix is added to the k8 service name which is owned by the channel
26+
K8ServiceNameSuffix = "-kn-channel"
27+
)
28+
2429
// ChannelReference references a Channel within the cluster by name and
2530
// namespace.
2631
type ChannelReference struct {
@@ -38,6 +43,11 @@ func ParseChannelFromHost(host string) (ChannelReference, error) {
3843
if len(chunks) < 2 {
3944
return ChannelReference{}, BadRequestError(fmt.Sprintf("bad host format %q", host))
4045
}
46+
47+
if channelName, found := strings.CutSuffix(chunks[0], K8ServiceNameSuffix); found {
48+
chunks[0] = channelName
49+
}
50+
4151
return ChannelReference{
4252
Name: chunks[0],
4353
Namespace: chunks[1],

pkg/channel/references_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,24 @@ func TestParseChannelFromHost(t *testing.T) {
4646
wantErr bool
4747
expectedChannelRef ChannelReference
4848
}{
49+
"host based with channel k8 service suffix": {
50+
host: fmt.Sprintf("%s%s.%s.svc.cluster.local", referencesTestChannelName, K8ServiceNameSuffix, referencesTestNamespace),
51+
wantErr: false,
52+
expectedChannelRef: ChannelReference{
53+
Namespace: referencesTestNamespace,
54+
Name: referencesTestChannelName,
55+
},
56+
},
4957
"host based": {
50-
host: "test-channel.test-namespace.svc.cluster.local",
58+
host: fmt.Sprintf("%s.%s.svc.cluster.local", referencesTestChannelName, referencesTestNamespace),
5159
wantErr: false,
5260
expectedChannelRef: ChannelReference{
53-
Namespace: "test-namespace",
54-
Name: "test-channel",
61+
Namespace: referencesTestNamespace,
62+
Name: referencesTestChannelName,
5563
},
5664
},
5765
"bad host format should return error": {
58-
host: "test-channel",
66+
host: referencesTestChannelName,
5967
wantErr: true,
6068
},
6169
}
@@ -91,11 +99,11 @@ func TestParseChannelFromPath(t *testing.T) {
9199
expectedChannelRef ChannelReference
92100
}{
93101
"path based": {
94-
path: "/new-namespace/new-channel/",
102+
path: fmt.Sprintf("/%s/%s/", referencesTestNamespace, referencesTestChannelName),
95103
wantErr: false,
96104
expectedChannelRef: ChannelReference{
97-
Namespace: "new-namespace",
98-
Name: "new-channel",
105+
Namespace: referencesTestNamespace,
106+
Name: referencesTestChannelName,
99107
},
100108
},
101109

pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"knative.dev/pkg/resolver"
3434

3535
"knative.dev/eventing/pkg/apis/feature"
36+
"knative.dev/eventing/pkg/channel"
3637
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
3738
"knative.dev/eventing/pkg/eventingtls"
3839
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
@@ -70,6 +71,7 @@ const (
7071
dlsName = "test-dls"
7172
testNS = "test-namespace"
7273
imcName = "test-imc"
74+
imcSvcName = imcName + channel.K8ServiceNameSuffix
7375
imageName = "test-image"
7476
maxIdleConns = 2000
7577
maxIdleConnsPerHost = 200
@@ -83,7 +85,7 @@ const (
8385
var (
8486
channelServiceAddress = duckv1.Addressable{
8587
Name: pointer.String("http"),
86-
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
88+
URL: apis.HTTP(fmt.Sprintf("%s.test-namespace.svc.cluster.local", imcSvcName)),
8789
}
8890

8991
channelAudience = fmt.Sprintf("messaging.knative.dev/inmemorychannel/%s/%s", testNS, imcName)
@@ -426,11 +428,11 @@ func TestAllCases(t *testing.T) {
426428
WithInMemoryChannelDeploymentReady(),
427429
WithInMemoryChannelServiceReady(),
428430
WithInMemoryChannelEndpointsReady(),
429-
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", `Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
431+
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", fmt.Sprintf(`Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
430432
),
431433
}},
432434
WantEvents: []string{
433-
Eventf(corev1.EventTypeWarning, "InternalError", `inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
435+
Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf(`inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
434436
},
435437
}, {
436438
Name: "Works, channel exists with subscribers",
@@ -986,7 +988,7 @@ func makeChannelService(imc *v1.InMemoryChannel) *corev1.Service {
986988
},
987989
ObjectMeta: metav1.ObjectMeta{
988990
Namespace: testNS,
989-
Name: fmt.Sprintf("%s-kn-channel", imcName),
991+
Name: imcSvcName,
990992
Labels: map[string]string{
991993
resources.MessagingRoleLabel: resources.MessagingRole,
992994
},
@@ -1016,7 +1018,7 @@ func makeChannelServiceNotOwnedByUs(imc *v1.InMemoryChannel) *corev1.Service {
10161018
},
10171019
ObjectMeta: metav1.ObjectMeta{
10181020
Namespace: testNS,
1019-
Name: fmt.Sprintf("%s-kn-channel", imcName),
1021+
Name: imcSvcName,
10201022
Labels: map[string]string{
10211023
resources.MessagingRoleLabel: resources.MessagingRole,
10221024
},

pkg/reconciler/inmemorychannel/controller/resources/service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
corev1 "k8s.io/api/core/v1"
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2222
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
23+
"knative.dev/eventing/pkg/channel"
2324
"knative.dev/pkg/kmeta"
2425
"knative.dev/pkg/network"
2526
)
@@ -35,7 +36,7 @@ const (
3536
type K8sServiceOption func(*corev1.Service) error
3637

3738
func CreateChannelServiceName(name string) string {
38-
return kmeta.ChildName(name, "-kn-channel")
39+
return kmeta.ChildName(name, channel.K8ServiceNameSuffix)
3940
}
4041

4142
// ExternalService is a functional option for CreateK8sService to create a K8s service of type ExternalName

pkg/reconciler/inmemorychannel/controller/resources/service_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@ package resources
1818

1919
import (
2020
"errors"
21-
"fmt"
2221
"testing"
2322

2423
"github.com/google/go-cmp/cmp"
2524
corev1 "k8s.io/api/core/v1"
2625
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2726
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
27+
"knative.dev/eventing/pkg/channel"
2828
"knative.dev/pkg/kmeta"
2929
"knative.dev/pkg/network"
3030
)
3131

3232
const (
3333
serviceName = "my-test-service"
3434
imcName = "my-test-imc"
35+
imcSvcName = imcName + channel.K8ServiceNameSuffix
3536
testNS = "my-test-ns"
3637
dispatcherNS = "dispatcher-namespace"
3738
dispatcherName = "dispatcher-name"
@@ -44,7 +45,7 @@ func TestCreateExternalServiceAddress(t *testing.T) {
4445
}
4546

4647
func TestCreateChannelServiceAddress(t *testing.T) {
47-
if want, got := "my-test-imc-kn-channel", CreateChannelServiceName(imcName); want != got {
48+
if want, got := imcSvcName, CreateChannelServiceName(imcName); want != got {
4849
t.Errorf("Want: %q got %q", want, got)
4950
}
5051
}
@@ -62,7 +63,7 @@ func TestNewK8sService(t *testing.T) {
6263
Kind: "Service",
6364
},
6465
ObjectMeta: metav1.ObjectMeta{
65-
Name: fmt.Sprintf("%s-kn-channel", imcName),
66+
Name: CreateChannelServiceName(imcName),
6667
Namespace: testNS,
6768
Labels: map[string]string{
6869
MessagingRoleLabel: MessagingRole,
@@ -105,7 +106,7 @@ func TestNewK8sServiceWithExternal(t *testing.T) {
105106
Kind: "Service",
106107
},
107108
ObjectMeta: metav1.ObjectMeta{
108-
Name: fmt.Sprintf("%s-kn-channel", imcName),
109+
Name: CreateChannelServiceName(imcName),
109110
Namespace: testNS,
110111
Labels: map[string]string{
111112
MessagingRoleLabel: MessagingRole,

pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package dispatcher
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"net/http"
2223
"reflect"
2324
"testing"
@@ -48,6 +49,7 @@ import (
4849
"knative.dev/eventing/pkg/apis/feature"
4950
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
5051
"knative.dev/eventing/pkg/auth"
52+
"knative.dev/eventing/pkg/channel"
5153
"knative.dev/eventing/pkg/channel/fanout"
5254
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
5355
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
@@ -68,7 +70,7 @@ const (
6870

6971
var (
7072
channelServiceAddress = duckv1.Addressable{
71-
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
73+
URL: apis.HTTP(fmt.Sprintf("%s%s.%s.svc.cluster.local", imcName, channel.K8ServiceNameSuffix, testNS)),
7274
}
7375

7476
linear = eventingduckv1.BackoffPolicyLinear

test/conformance/helpers/channel_tracing_test_helper.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/openzipkin/zipkin-go/model"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030

31+
ch "knative.dev/eventing/pkg/channel"
3132
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
3233
testlib "knative.dev/eventing/test/lib"
3334
"knative.dev/eventing/test/lib/recordevents"
@@ -143,7 +144,7 @@ func setupChannelTracingWithReply(
143144
Span: tracinghelper.MatchHTTPSpanNoReply(
144145
model.Server,
145146
tracinghelper.WithHTTPHostAndPath(
146-
fmt.Sprintf("%s-kn-channel.%s.svc", channelName, client.Namespace),
147+
fmt.Sprintf("%s%s.%s.svc", channelName, ch.K8ServiceNameSuffix, client.Namespace),
147148
"/",
148149
),
149150
),
@@ -177,14 +178,14 @@ func setupChannelTracingWithReply(
177178
},
178179
{
179180
// 6. Channel Dispatcher span
180-
Span: channelSpan(eventID, fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), ""),
181+
Span: channelSpan(eventID, fmt.Sprintf("%s%s.%s.svc", replyChannelName, ch.K8ServiceNameSuffix, client.Namespace), ""),
181182
Children: []tracinghelper.TestSpanTree{
182183
{
183184
// 7. Channel sends reply from Mutator Pod to the reply Channel.
184185
Span: tracinghelper.MatchHTTPSpanNoReply(
185186
model.Client,
186187
tracinghelper.WithHTTPURL(
187-
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
188+
fmt.Sprintf("%s%s.%s.svc", replyChannelName, ch.K8ServiceNameSuffix, client.Namespace),
188189
"",
189190
),
190191
),
@@ -194,7 +195,7 @@ func setupChannelTracingWithReply(
194195
Span: tracinghelper.MatchHTTPSpanNoReply(
195196
model.Server,
196197
tracinghelper.WithHTTPHostAndPath(
197-
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
198+
fmt.Sprintf("%s%s.%s.svc", replyChannelName, ch.K8ServiceNameSuffix, client.Namespace),
198199
"/",
199200
),
200201
),
@@ -245,7 +246,7 @@ func setupChannelTracingWithReply(
245246
Span: tracinghelper.MatchHTTPSpanNoReply(
246247
model.Client,
247248
tracinghelper.WithHTTPURL(
248-
fmt.Sprintf("%s-kn-channel.%s.svc", channelName, client.Namespace),
249+
fmt.Sprintf("%s%s.%s.svc", channelName, ch.K8ServiceNameSuffix, client.Namespace),
249250
"",
250251
),
251252
tracinghelper.WithLocalEndpointServiceName("sender"),

0 commit comments

Comments
 (0)