Skip to content

Commit de4cb72

Browse files
matzewpierDipi
andauthored
[release-v1.16] Fix KEDA scaling with SASL auth for brokers (knative-extensions#4253) (#1802)
* Fix KEDA scaling with SASL auth for brokers (knative-extensions#4253) * Fix KEDA scaling with SASL auth for brokers TriggerAuthentication is misconfigured when Broker is configured to connect to Kafka with SASL since it was using only the legacy secret format. Signed-off-by: Pierangelo Di Pilato <[email protected]> * Adding nil check secret's data Signed-off-by: Matthias Wessendorf <[email protected]> --------- Signed-off-by: Pierangelo Di Pilato <[email protected]> Signed-off-by: Matthias Wessendorf <[email protected]> Co-authored-by: Matthias Wessendorf <[email protected]> * fix import on backport... Signed-off-by: Matthias Wessendorf <[email protected]> --------- Signed-off-by: Pierangelo Di Pilato <[email protected]> Signed-off-by: Matthias Wessendorf <[email protected]> Co-authored-by: Pierangelo Di Pilato <[email protected]>
1 parent 6c1e272 commit de4cb72

File tree

3 files changed

+173
-39
lines changed

3 files changed

+173
-39
lines changed

control-plane/pkg/autoscaler/keda/keda.go

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ import (
2121
"fmt"
2222
"strconv"
2323

24+
"github.com/IBM/sarama"
2425
corev1 "k8s.io/api/core/v1"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26-
"k8s.io/utils/pointer"
2727
"knative.dev/pkg/kmeta"
2828
"knative.dev/pkg/logging"
2929

30-
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
30+
bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
31+
3132
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
33+
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internalskafkaeventing"
3234
"knative.dev/eventing-kafka-broker/third_party/pkg/client/clientset/versioned"
3335

3436
"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
@@ -42,6 +44,9 @@ import (
4244
const (
4345
// AutoscalerClass is the KEDA autoscaler class.
4446
AutoscalerClass = "keda.autoscaling.knative.dev"
47+
48+
KedaResourceLabel = internalskafkaeventing.GroupName + "/resource"
49+
KedaResourceLabelValue = "true"
4550
)
4651

4752
func GenerateScaleTarget(cg *kafkainternals.ConsumerGroup) *kedav1alpha1.ScaleTarget {
@@ -94,8 +99,10 @@ func GenerateScaleTriggers(cg *kafkainternals.ConsumerGroup, triggerAuthenticati
9499
}
95100

96101
func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData map[string][]byte) (*kedav1alpha1.TriggerAuthentication, *corev1.Secret, error) {
97-
98-
secretTargetRefs := make([]kedav1alpha1.AuthSecretTargetRef, 0, 8)
102+
// Make sure secretData is never nil
103+
if secretData == nil {
104+
secretData = make(map[string][]byte)
105+
}
99106

100107
secret := corev1.Secret{
101108
ObjectMeta: metav1.ObjectMeta{
@@ -104,28 +111,38 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
104111
OwnerReferences: []metav1.OwnerReference{
105112
*kmeta.NewControllerRef(cg),
106113
},
114+
Labels: map[string]string{
115+
KedaResourceLabel: KedaResourceLabelValue,
116+
},
107117
},
108118
Data: secretData,
109119
StringData: make(map[string]string),
110120
}
111121

112-
saslType := retrieveSaslTypeIfPresent(cg, secret)
122+
opt, err := security.NewSaramaSecurityOptionFromSecret(&secret)
123+
if err != nil {
124+
return nil, nil, fmt.Errorf("failed to get security option from secret: %w", err)
125+
}
126+
127+
cfg := &sarama.Config{}
128+
if err := opt(cfg); err != nil {
129+
return nil, nil, fmt.Errorf("failed to get SASL config from secret: %w", err)
130+
}
113131

114-
if saslType != nil {
115-
switch *saslType {
116-
case "SCRAM-SHA-256":
132+
if cfg.Net.SASL.Enable {
133+
switch cfg.Net.SASL.Mechanism {
134+
case sarama.SASLTypePlaintext:
135+
secret.StringData["sasl"] = "plaintext"
136+
case sarama.SASLTypeSCRAMSHA256:
117137
secret.StringData["sasl"] = "scram_sha256"
118-
case "SCRAM-SHA-512":
138+
case sarama.SASLTypeSCRAMSHA512:
119139
secret.StringData["sasl"] = "scram_sha512"
120-
case "PLAIN":
121-
secret.StringData["sasl"] = "plaintext"
122140
default:
123-
return nil, nil, fmt.Errorf("SASL type value %q is not supported", *saslType)
141+
return nil, nil, fmt.Errorf("SASL type value %q is not supported", cfg.Net.SASL.Mechanism)
124142
}
125-
} else {
126-
secret.StringData["sasl"] = "plaintext" //default
127143
}
128144

145+
secretTargetRefs := make([]kedav1alpha1.AuthSecretTargetRef, 0, 8)
129146
triggerAuth := &kedav1alpha1.TriggerAuthentication{
130147
ObjectMeta: metav1.ObjectMeta{
131148
Name: cg.Name,
@@ -134,7 +151,7 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
134151
*kmeta.NewControllerRef(cg),
135152
},
136153
Labels: map[string]string{
137-
//TODO: may need to add labels like eventing-autoscaler-keda/pkg/reconciler/broker/resources/triggerauthentication.go#L39-L40
154+
KedaResourceLabel: KedaResourceLabelValue,
138155
},
139156
},
140157
Spec: kedav1alpha1.TriggerAuthenticationSpec{
@@ -169,7 +186,7 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
169186

170187
if cg.Spec.Template.Spec.Auth.SecretSpec != nil && cg.Spec.Template.Spec.Auth.SecretSpec.Ref.Name != "" {
171188

172-
if saslType != nil { //SASL enabled
189+
if cfg.Net.SASL.Enable {
173190
sasl := kedav1alpha1.AuthSecretTargetRef{Parameter: "sasl", Name: secret.Name, Key: "sasl"}
174191
secretTargetRefs = append(secretTargetRefs, sasl)
175192

@@ -206,23 +223,7 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
206223
return triggerAuth, &secret, nil
207224
}
208225

209-
func retrieveSaslTypeIfPresent(cg *kafkainternals.ConsumerGroup, secret corev1.Secret) *string {
210-
if cg.Spec.Template.Spec.Auth.NetSpec != nil && cg.Spec.Template.Spec.Auth.NetSpec.SASL.Enable && cg.Spec.Template.Spec.Auth.NetSpec.SASL.Type.SecretKeyRef != nil {
211-
secretKeyRefKey := cg.Spec.Template.Spec.Auth.NetSpec.SASL.Type.SecretKeyRef.Key
212-
if saslTypeValue, ok := secret.Data[secretKeyRefKey]; ok {
213-
return pointer.String(string(saslTypeValue))
214-
}
215-
}
216-
217-
if cg.Spec.Template.Spec.Auth.SecretSpec != nil && cg.Spec.Template.Spec.Auth.SecretSpec.Ref != nil {
218-
if saslTypeValue, ok := secret.Data[security.SaslTypeLegacy]; ok {
219-
return pointer.String(string(saslTypeValue))
220-
}
221-
}
222-
return nil
223-
}
224-
225-
func addAuthSecretTargetRef(parameter string, secretKeyRef v1beta1.SecretValueFromSource, secretTargetRefs []kedav1alpha1.AuthSecretTargetRef) []kedav1alpha1.AuthSecretTargetRef {
226+
func addAuthSecretTargetRef(parameter string, secretKeyRef bindings.SecretValueFromSource, secretTargetRefs []kedav1alpha1.AuthSecretTargetRef) []kedav1alpha1.AuthSecretTargetRef {
226227
if secretKeyRef.SecretKeyRef == nil || secretKeyRef.SecretKeyRef.Name == "" || secretKeyRef.SecretKeyRef.Key == "" {
227228
return secretTargetRefs
228229
}

control-plane/pkg/security/secrets_provider_legacy_channel_secret.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,18 @@ func ResolveAuthContextFromLegacySecret(s *corev1.Secret) (*NetSpecAuthContext,
3838
protocolStr, protocolContract := getProtocolFromLegacyChannelSecret(s)
3939

4040
virtualSecret := s.DeepCopy()
41+
4142
virtualSecret.Data[ProtocolKey] = []byte(protocolStr)
42-
if v, ok := virtualSecret.Data["sasltype"]; ok {
43+
if v, ok := virtualSecret.Data[SaslType]; ok {
4344
virtualSecret.Data[SaslMechanismKey] = v
4445
}
45-
if v, ok := virtualSecret.Data["saslType"]; ok {
46+
if v, ok := virtualSecret.Data[SaslTypeLegacy]; ok {
4647
virtualSecret.Data[SaslMechanismKey] = v
4748
}
48-
if v, ok := virtualSecret.Data["username"]; ok {
49+
if v, ok := virtualSecret.Data[SaslUsernameKey]; ok {
4950
virtualSecret.Data[SaslUserKey] = v
5051
}
51-
if v, ok := virtualSecret.Data["user"]; ok {
52+
if v, ok := virtualSecret.Data[SaslUserKey]; ok {
5253
virtualSecret.Data[SaslUserKey] = v
5354
}
5455

test/rekt/features/keda_scaling.go

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222

2323
"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler/keda"
2424
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
25+
brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
26+
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret"
2527

2628
"knative.dev/eventing/test/rekt/resources/trigger"
2729

@@ -55,7 +57,7 @@ import (
5557
)
5658

5759
func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
58-
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")
60+
f := feature.NewFeature()
5961

6062
// we need to ensure that autoscaling is enabled for the rest of the feature to work
6163
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
@@ -93,7 +95,7 @@ func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
9395
}
9496

9597
func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
96-
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")
98+
f := feature.NewFeature()
9799

98100
// we need to ensure that autoscaling is enabled for the rest of the feature to work
99101
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
@@ -142,6 +144,44 @@ func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
142144
return f
143145
}
144146

147+
func KafkaSourceSASLScalesToZeroWithKeda() *feature.Feature {
148+
f := feature.NewFeature()
149+
150+
// we need to ensure that autoscaling is enabled for the rest of the feature to work
151+
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
152+
153+
sourceCfg := kafkaSourceConfig{
154+
sourceName: feature.MakeRandomK8sName("kafka-source"),
155+
authMech: SASLMech,
156+
topic: feature.MakeRandomK8sName("kafka-source-keda-sasl"),
157+
}
158+
sinkCfg := kafkaSinkConfig{
159+
sinkName: feature.MakeRandomK8sName("kafka-sink"),
160+
}
161+
sinkName, receiver := KafkaSourceFeatureSetup(f, sourceCfg, sinkCfg)
162+
163+
sender := feature.MakeRandomK8sName("eventshub-sender")
164+
165+
event := cetest.FullEvent()
166+
event.SetID(uuid.New().String())
167+
168+
// check that the source initially has replicas = 0
169+
f.Setup("Source should start with replicas = 0", verifyConsumerGroupReplicas(getKafkaSourceCg(sourceCfg.sourceName), 0, true))
170+
171+
options := []eventshub.EventsHubOption{
172+
eventshub.StartSenderToResource(kafkasink.GVR(), sinkName),
173+
eventshub.InputEvent(event),
174+
}
175+
f.Requirement("install eventshub sender", eventshub.Install(sender, options...))
176+
177+
f.Requirement("eventshub receiver gets event", assert.OnStore(receiver).MatchEvent(test.HasId(event.ID())).Exact(1))
178+
179+
// after the event is sent, the source should scale down to zero replicas
180+
f.Alpha("KafkaSource").Must("Scale down to zero", verifyConsumerGroupReplicas(getKafkaSourceCg(sourceCfg.sourceName), 0, false))
181+
182+
return f
183+
}
184+
145185
func TriggerScalesToZeroWithKeda() *feature.Feature {
146186
f := feature.NewFeature()
147187

@@ -171,6 +211,98 @@ func TriggerScalesToZeroWithKeda() *feature.Feature {
171211
return f
172212
}
173213

214+
func TriggerSASLScalesToZeroWithKeda() *feature.Feature {
215+
f := feature.NewFeature()
216+
217+
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
218+
219+
event := cetest.FullEvent()
220+
221+
brokerName := feature.MakeRandomK8sName("broker")
222+
triggerName := feature.MakeRandomK8sName("trigger")
223+
sourceName := feature.MakeRandomK8sName("source")
224+
sinkName := feature.MakeRandomK8sName("sink")
225+
brokerConfigName := feature.MakeRandomK8sName("brokercfg")
226+
authSecretName := feature.MakeRandomK8sName("kafkaauth")
227+
228+
// check that the trigger initially has replicas = 0
229+
f.Setup("Trigger should start with replicas = 0", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, true))
230+
231+
f.Setup("Create auth secret", func(ctx context.Context, t feature.T) {
232+
kafkaauthsecret.Install(authSecretName, kafkaauthsecret.WithSslSaslScram512Data(ctx))(ctx, t)
233+
})
234+
235+
f.Setup("Create broker config", brokerconfigmap.Install(brokerConfigName,
236+
brokerconfigmap.WithNumPartitions(3),
237+
brokerconfigmap.WithReplicationFactor(3),
238+
brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersSslSaslScram),
239+
brokerconfigmap.WithAuthSecret(authSecretName)))
240+
241+
f.Setup("Install broker", broker.Install(brokerName, append(
242+
broker.WithEnvConfig(),
243+
broker.WithConfig(brokerConfigName))...,
244+
))
245+
246+
f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))
247+
f.Setup("install broker", broker.Install(brokerName))
248+
f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sinkName), "")))
249+
250+
f.Requirement("install source", eventshub.Install(sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEvent(event)))
251+
252+
f.Requirement("sink receives event", assert.OnStore(sinkName).MatchEvent(test.HasId(event.ID())).Exact(1))
253+
254+
//after the event is sent, the trigger should scale down to zero replicas
255+
f.Alpha("Trigger").Must("Scale down to zero", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, false))
256+
257+
return f
258+
}
259+
260+
func TriggerSSLScalesToZeroWithKeda() *feature.Feature {
261+
f := feature.NewFeature()
262+
263+
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
264+
265+
event := cetest.FullEvent()
266+
267+
brokerName := feature.MakeRandomK8sName("broker")
268+
triggerName := feature.MakeRandomK8sName("trigger")
269+
sourceName := feature.MakeRandomK8sName("source")
270+
sinkName := feature.MakeRandomK8sName("sink")
271+
brokerConfigName := feature.MakeRandomK8sName("brokercfg")
272+
authSecretName := feature.MakeRandomK8sName("kafkaauth")
273+
274+
// check that the trigger initially has replicas = 0
275+
f.Setup("Trigger should start with replicas = 0", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, true))
276+
277+
f.Setup("Create auth secret", func(ctx context.Context, t feature.T) {
278+
kafkaauthsecret.Install(authSecretName, kafkaauthsecret.WithSslData(ctx))(ctx, t)
279+
})
280+
281+
f.Setup("Create broker config", brokerconfigmap.Install(brokerConfigName,
282+
brokerconfigmap.WithNumPartitions(3),
283+
brokerconfigmap.WithReplicationFactor(3),
284+
brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersSsl),
285+
brokerconfigmap.WithAuthSecret(authSecretName)))
286+
287+
f.Setup("Install broker", broker.Install(brokerName, append(
288+
broker.WithEnvConfig(),
289+
broker.WithConfig(brokerConfigName))...,
290+
))
291+
292+
f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))
293+
f.Setup("install broker", broker.Install(brokerName))
294+
f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sinkName), "")))
295+
296+
f.Requirement("install source", eventshub.Install(sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEvent(event)))
297+
298+
f.Requirement("sink receives event", assert.OnStore(sinkName).MatchEvent(test.HasId(event.ID())).Exact(1))
299+
300+
//after the event is sent, the trigger should scale down to zero replicas
301+
f.Alpha("Trigger").Must("Scale down to zero", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, false))
302+
303+
return f
304+
}
305+
174306
func ChannelScalesToZeroWithKeda() *feature.Feature {
175307
f := feature.NewFeature()
176308

0 commit comments

Comments
 (0)