Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (b *Broker) Open(conf *Config) error {
conf.Net.SASL.Version = SASLHandshakeV1
}

useSaslV0 := conf.Net.SASL.Version == SASLHandshakeV0 || conf.Net.SASL.Mechanism == SASLTypeGSSAPI
useSaslV0 := conf.Net.SASL.Version == SASLHandshakeV0
if conf.Net.SASL.Enable && useSaslV0 {
b.connErr = b.authenticateViaSASLv0()

Expand Down Expand Up @@ -1379,6 +1379,12 @@ func (b *Broker) authenticateViaSASLv1() error {
}

switch b.conf.Net.SASL.Mechanism {
case SASLTypeGSSAPI:
b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
}
return b.kerberosAuthenticator.AuthorizeV2(b, authSendReceiver)
case SASLTypeOAuth:
provider := b.conf.Net.SASL.TokenProvider
return b.sendAndReceiveSASLOAuth(authSendReceiver, provider)
Expand Down
5 changes: 3 additions & 2 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
},
{
name: "Kerberos client creation fails",
error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
error: errors.New("configuration file could not be opened: testdata/krb5.conf open testdata/krb5.conf: no such file or directory"),
},
{
name: "Bad server response, unmarshall key error",
Expand Down Expand Up @@ -701,10 +701,11 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
broker.requestsInFlight = metrics.NilCounter{}

conf := NewTestConfig()
conf.Net.SASL.Version = SASLHandshakeV0
conf.Net.SASL.Mechanism = SASLTypeGSSAPI
conf.Net.SASL.Enable = true
conf.Net.SASL.GSSAPI.ServiceName = "kafka"
conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
conf.Net.SASL.GSSAPI.KerberosConfigPath = "testdata/krb5.conf"
conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
conf.Net.SASL.GSSAPI.Username = "kafka"
conf.Net.SASL.GSSAPI.Password = "kafka"
Expand Down
4 changes: 3 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,9 @@ func (c *Config) Validate() error {
if c.Net.SASL.Mechanism == "" {
c.Net.SASL.Mechanism = SASLTypePlaintext
}

if c.Net.SASL.Version == SASLHandshakeV0 && c.ApiVersionsRequest {
return ConfigurationError("ApiVersionsRequest must be disabled when SASL v0 is enabled")
}
switch c.Net.SASL.Mechanism {
case SASLTypePlaintext:
if c.Net.SASL.User == "" {
Expand Down
158 changes: 108 additions & 50 deletions gssapi_kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math"
"net"
"strings"
"time"

Expand Down Expand Up @@ -108,16 +109,14 @@ func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
return a
}

/*
*
* Construct Kerberos AP_REQ package, conforming to RFC-4120
* https://tools.ietf.org/html/rfc4120#page-84
*
*/
// Construct Kerberos AP_REQ package, conforming to RFC-4120
// https://tools.ietf.org/html/rfc4120#page-84
func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
domain string, cname types.PrincipalName,
domain string,
cname types.PrincipalName,
ticket messages.Ticket,
sessionKey types.EncryptionKey) ([]byte, error) {
sessionKey types.EncryptionKey,
) ([]byte, error) {
auth, err := types.NewAuthenticator(domain, cname)
if err != nil {
return nil, err
Expand All @@ -144,16 +143,12 @@ func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
return aprBytes, nil
}

/*
*
* Append the GSS-API header to the payload, conforming to RFC-2743
* Section 3.1, Mechanism-Independent Token Format
*
* https://tools.ietf.org/html/rfc2743#page-81
*
* GSSAPIHeader + <specific mechanism payload>
*
*/
// Append the GSS-API header to the payload, conforming to RFC-2743
// Section 3.1, Mechanism-Independent Token Format
//
// https://tools.ietf.org/html/rfc2743#page-81
//
// GSSAPIHeader + <specific mechanism payload>
func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
if err != nil {
Expand All @@ -166,12 +161,15 @@ func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, e
return GSSPackage, nil
}

func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
func (krbAuth *GSSAPIKerberosAuth) initSecContext(
client KerberosClient,
bytes []byte,
) ([]byte, error) {
switch krbAuth.step {
case GSS_API_INITIAL:
aprBytes, err := krbAuth.createKrb5Token(
kerberosClient.Domain(),
kerberosClient.CName(),
client.Domain(),
client.CName(),
krbAuth.ticket,
krbAuth.encKey)
if err != nil {
Expand Down Expand Up @@ -200,63 +198,123 @@ func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient K
return nil, nil
}

/* This does the handshake for authorization */
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
if err != nil {
Logger.Printf("Kerberos client error: %s", err)
return err
}

err = kerberosClient.Login()
if err != nil {
Logger.Printf("Kerberos client error: %s", err)
return err
}
// Construct SPN using serviceName and host
// default SPN format: <SERVICE>/<FQDN>

host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
func (krbAuth *GSSAPIKerberosAuth) spn(broker *Broker) string {
host, _, _ := net.SplitHostPort(broker.addr)
var spn string
if krbAuth.Config.BuildSpn != nil {
spn = krbAuth.Config.BuildSpn(broker.conf.Net.SASL.GSSAPI.ServiceName, host)
} else {
spn = fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
}
return spn
}

ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
// Login will use the given KerberosClient to login and get a ticket for the given spn.
func (krbAuth *GSSAPIKerberosAuth) Login(
client KerberosClient,
spn string,
) (*messages.Ticket, error) {
if err := client.Login(); err != nil {
Logger.Printf("Kerberos client login error: %s", err)
return nil, err
}

ticket, encKey, err := client.GetServiceTicket(spn)
if err != nil {
Logger.Printf("Error getting Kerberos service ticket : %s", err)
return err
Logger.Printf("Kerberos service ticket error for %s: %s", spn, err)
return nil, err
}
krbAuth.ticket = ticket
krbAuth.encKey = encKey
krbAuth.step = GSS_API_INITIAL
var receivedBytes []byte = nil
defer kerberosClient.Destroy()

return &ticket, nil
}

// Authorize performs the kerberos auth handshake for authorization
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
client, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
if err != nil {
Logger.Printf("Kerberos client initialization error: %s", err)
return err
}
defer client.Destroy()

ticket, err := krbAuth.Login(client, krbAuth.spn(broker))
if err != nil {
return err
}

principal := strings.Join(ticket.SName.NameString, "/") + "@" + ticket.Realm
var receivedBytes []byte

for {
packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
packBytes, err := krbAuth.initSecContext(client, receivedBytes)
if err != nil {
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
Logger.Printf("Kerberos init error as %s: %s", principal, err)
return err
}

requestTime := time.Now()
bytesWritten, err := krbAuth.writePackage(broker, packBytes)
if err != nil {
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
Logger.Printf("Kerberos write error as %s: %s", principal, err)
return err
}
broker.updateOutgoingCommunicationMetrics(bytesWritten)
if krbAuth.step == GSS_API_VERIFY {
bytesRead := 0

switch krbAuth.step {
case GSS_API_VERIFY:
var bytesRead int
receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
requestLatency := time.Since(requestTime)
broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
if err != nil {
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
Logger.Printf("Kerberos read error as %s: %s", principal, err)
return err
}
} else if krbAuth.step == GSS_API_FINISH {
case GSS_API_FINISH:
return nil
}
}
}

// AuthorizeV2 performs the SASL v2 GSSAPI authentication with the Kafka broker.
func (krbAuth *GSSAPIKerberosAuth) AuthorizeV2(
broker *Broker,
authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error),
) error {
client, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
if err != nil {
Logger.Printf("Kerberos client initialization error: %s", err)
return err
}
defer client.Destroy()

ticket, err := krbAuth.Login(client, krbAuth.spn(broker))
if err != nil {
return err
}

principal := strings.Join(ticket.SName.NameString, "/") + "@" + ticket.Realm
var receivedBytes []byte

for {
token, err := krbAuth.initSecContext(client, receivedBytes)
if err != nil {
Logger.Printf("SASL Kerberos init error as %s: %s", principal, err)
return err
}

authResponse, err := authSendReceiver(token)
if err != nil {
Logger.Printf("SASL Kerberos authenticate error as %s: %s", principal, err)
return err
}

receivedBytes = authResponse.SaslAuthBytes

if krbAuth.step == GSS_API_FINISH {
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions kerberos_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
)

func TestFaildToCreateKerberosConfig(t *testing.T) {
expectedErr := errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory")
expectedErr := errors.New("configuration file could not be opened: testdata/krb5.conf open testdata/krb5.conf: no such file or directory")
clientConfig := NewTestConfig()
clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI
clientConfig.Net.SASL.Enable = true
Expand All @@ -69,7 +69,7 @@ func TestFaildToCreateKerberosConfig(t *testing.T) {
clientConfig.Net.SASL.GSSAPI.Username = "client"
clientConfig.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
clientConfig.Net.SASL.GSSAPI.Password = "qwerty"
clientConfig.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
clientConfig.Net.SASL.GSSAPI.KerberosConfigPath = "testdata/krb5.conf"
_, err := NewKerberosClient(&clientConfig.Net.SASL.GSSAPI)
// Expect to create client with password
if err.Error() != expectedErr.Error() {
Expand Down
Loading