Skip to content

Commit ebe1f6c

Browse files
committed
filebeat: add testcase for SASL PLAIN and SCRAM-SHA-256 in kafka input
1 parent 5d9efc3 commit ebe1f6c

File tree

2 files changed

+106
-74
lines changed

2 files changed

+106
-74
lines changed

filebeat/input/kafka/kafka_integration_test.go

Lines changed: 96 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -337,89 +337,113 @@ func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) {
337337

338338
func TestSASLAuthentication(t *testing.T) {
339339
testutils.SkipIfFIPSOnly(t, "SASL disabled when in fips140=only mode.")
340-
testTopic := createTestTopicName()
341-
groupID := "filebeat"
342-
343-
// Send test messages to the topic for the input to read.
344-
messages := []testMessage{
345-
{message: "testing"},
346-
{message: "sasl and stuff"},
347-
}
348-
for _, m := range messages {
349-
writeToKafkaTopic(t, testTopic, m.message, m.headers)
350-
}
351340

352-
// Setup the input config
353-
config := conf.MustNewConfigFrom(mapstr.M{
354-
"hosts": []string{getTestSASLKafkaHost()},
355-
"protocol": "https",
356-
"sasl.mechanism": "SCRAM-SHA-512",
357-
// Disable hostname verification since we are likely writing to localhost.
358-
"ssl.verification_mode": "certificate",
359-
"ssl.certificate_authorities": []string{
360-
"../../../testing/environments/docker/kafka/certs/ca-cert",
341+
testCases := []struct {
342+
name string
343+
mechanism string
344+
message string
345+
}{
346+
{
347+
name: "SCRAM-SHA-256",
348+
mechanism: sarama.SASLTypeSCRAMSHA256,
361349
},
362-
"username": "beats",
363-
"password": "KafkaTest",
364-
365-
"topics": []string{testTopic},
366-
"group_id": groupID,
367-
"wait_close": 0,
368-
})
350+
{
351+
name: "SCRAM-SHA-512",
352+
mechanism: sarama.SASLTypeSCRAMSHA512,
353+
},
354+
{
355+
name: "PLAIN",
356+
mechanism: sarama.SASLTypePlaintext,
357+
},
358+
}
369359

370-
client := beattest.NewChanClient(100)
371-
defer client.Close()
372-
events := client.Channel
373-
input, cancel := run(t, config, client)
360+
for _, tc := range testCases {
361+
t.Run(tc.name, func(t *testing.T) {
362+
testTopic := createTestTopicName()
363+
groupID := "filebeat"
374364

375-
timeout := time.After(30 * time.Second)
376-
for range messages {
377-
select {
378-
case event := <-events:
379-
v, err := event.Fields.GetValue("message")
380-
if err != nil {
381-
t.Fatal(err)
365+
// Send test messages to the topic for the input to read.
366+
messages := []testMessage{
367+
{message: "testing"},
368+
{message: "testing" + tc.mechanism},
382369
}
383-
text, ok := v.(string)
384-
if !ok {
385-
t.Fatal("could not get message text from event")
370+
for _, m := range messages {
371+
writeToKafkaTopic(t, testTopic, m.message, m.headers)
386372
}
387-
msg := findMessage(t, text, messages)
388-
assert.Equal(t, text, msg.message)
389373

390-
checkMatchingHeaders(t, event, msg.headers)
391-
392-
// emulating the pipeline (kafkaInput.Run)
393-
meta, ok := event.Private.(eventMeta)
394-
if !ok {
395-
t.Fatal("could not get eventMeta and ack the message")
374+
// Setup the input config
375+
config := conf.MustNewConfigFrom(mapstr.M{
376+
"hosts": []string{getTestSASLKafkaHost()},
377+
"protocol": "https",
378+
"sasl.mechanism": tc.mechanism,
379+
// Disable hostname verification since we are likely writing to localhost.
380+
"ssl.verification_mode": "certificate",
381+
"ssl.certificate_authorities": []string{
382+
"../../../testing/environments/docker/kafka/certs/ca-cert",
383+
},
384+
"username": "beats",
385+
"password": "KafkaTest",
386+
387+
"topics": []string{testTopic},
388+
"group_id": groupID,
389+
"wait_close": 0,
390+
})
391+
392+
client := beattest.NewChanClient(100)
393+
defer client.Close()
394+
events := client.Channel
395+
input, cancel := run(t, config, client)
396+
397+
timeout := time.After(30 * time.Second)
398+
for range messages {
399+
select {
400+
case event := <-events:
401+
v, err := event.Fields.GetValue("message")
402+
if err != nil {
403+
t.Fatal(err)
404+
}
405+
text, ok := v.(string)
406+
if !ok {
407+
t.Fatal("could not get message text from event")
408+
}
409+
msg := findMessage(t, text, messages)
410+
assert.Equal(t, text, msg.message)
411+
412+
checkMatchingHeaders(t, event, msg.headers)
413+
414+
// emulating the pipeline (kafkaInput.Run)
415+
meta, ok := event.Private.(eventMeta)
416+
if !ok {
417+
t.Fatal("could not get eventMeta and ack the message")
418+
}
419+
meta.ackHandler()
420+
case <-timeout:
421+
t.Fatal("timeout waiting for incoming events")
422+
}
396423
}
397-
meta.ackHandler()
398-
case <-timeout:
399-
t.Fatal("timeout waiting for incoming events")
400-
}
401-
}
402424

403-
// sarama commits every second, we need to make sure
404-
// all message acks are committed before the rest of the checks
405-
<-time.After(2 * time.Second)
406-
407-
// Close the done channel and make sure the beat shuts down in a reasonable
408-
// amount of time.
409-
cancel()
410-
didClose := make(chan struct{})
411-
go func() {
412-
input.Wait()
413-
close(didClose)
414-
}()
425+
// sarama commits every second, we need to make sure
426+
// all message acks are committed before the rest of the checks
427+
<-time.After(2 * time.Second)
428+
429+
// Close the done channel and make sure the beat shuts down in a reasonable
430+
// amount of time.
431+
cancel()
432+
didClose := make(chan struct{})
433+
go func() {
434+
input.Wait()
435+
close(didClose)
436+
}()
437+
438+
select {
439+
case <-time.After(30 * time.Second):
440+
t.Fatal("timeout waiting for beat to shut down")
441+
case <-didClose:
442+
}
415443

416-
select {
417-
case <-time.After(30 * time.Second):
418-
t.Fatal("timeout waiting for beat to shut down")
419-
case <-didClose:
444+
assertOffset(t, groupID, testTopic, int64(len(messages)))
445+
})
420446
}
421-
422-
assertOffset(t, groupID, testTopic, int64(len(messages)))
423447
}
424448

425449
func TestTest(t *testing.T) {

testing/environments/docker/kafka/run.sh

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \
2727
--override listeners=INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094,SASL_SSL://0.0.0.0:9093 \
2828
--override advertised.listeners=INSIDE://${KAFKA_ADVERTISED_HOST}:9092,OUTSIDE://localhost:9094,SASL_SSL://localhost:9093 \
2929
--override inter.broker.listener.name=INSIDE \
30-
--override sasl.enabled.mechanisms=SCRAM-SHA-512 \
30+
--override sasl.enabled.mechanisms=SCRAM-SHA-512,SCRAM-SHA-256,PLAIN \
3131
--override listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required;" \
32+
--override listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required;" \
33+
--override listener.name.sasl_ssl.plain.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required user_beats=\"KafkaTest\";" \
3234
--override logs.dir=${KAFKA_LOGS_DIR} \
3335
--override log4j.logger.kafka=DEBUG,kafkaAppender \
3436
--override log.flush.interval.ms=200 \
@@ -42,12 +44,18 @@ wait_for_port 9092
4244

4345
echo "Kafka load status code $?"
4446

45-
# create a user beats with password KafkaTest, for use in client SASL authentication
47+
# create users beats with password KafkaTest, for use in client SASL authentication
4648
/kafka/bin/kafka-configs.sh \
4749
--bootstrap-server localhost:9092 \
4850
--alter --add-config 'SCRAM-SHA-512=[password=KafkaTest]' \
4951
--entity-type users \
5052
--entity-name beats
5153

54+
/kafka/bin/kafka-configs.sh \
55+
--bootstrap-server localhost:9092 \
56+
--alter --add-config 'SCRAM-SHA-256=[password=KafkaTest]' \
57+
--entity-type users \
58+
--entity-name beats
59+
5260
# Make sure the container keeps running
5361
tail -f /dev/null

0 commit comments

Comments
 (0)