@@ -117,7 +117,8 @@ func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
117117func (krbAuth * GSSAPIKerberosAuth ) createKrb5Token (
118118 domain string , cname types.PrincipalName ,
119119 ticket messages.Ticket ,
120- sessionKey types.EncryptionKey ) ([]byte , error ) {
120+ sessionKey types.EncryptionKey ,
121+ ) ([]byte , error ) {
121122 auth , err := types .NewAuthenticator (domain , cname )
122123 if err != nil {
123124 return nil , err
@@ -200,63 +201,122 @@ func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient K
200201 return nil , nil
201202}
202203
203- /* This does the handshake for authorization */
204- func (krbAuth * GSSAPIKerberosAuth ) Authorize (broker * Broker ) error {
205- kerberosClient , err := krbAuth .NewKerberosClientFunc (krbAuth .Config )
206- if err != nil {
207- Logger .Printf ("Kerberos client error: %s" , err )
208- return err
209- }
210-
211- err = kerberosClient .Login ()
212- if err != nil {
213- Logger .Printf ("Kerberos client error: %s" , err )
214- return err
215- }
216- // Construct SPN using serviceName and host
217- // default SPN format: <SERVICE>/<FQDN>
218-
219- host := strings .SplitN (broker .addr , ":" , 2 )[0 ] // Strip port part
204+ func (krbAuth * GSSAPIKerberosAuth ) spn (broker * Broker ) string {
205+ host := strings .SplitN (broker .addr , ":" , 2 )[0 ]
220206 var spn string
221207 if krbAuth .Config .BuildSpn != nil {
222208 spn = krbAuth .Config .BuildSpn (broker .conf .Net .SASL .GSSAPI .ServiceName , host )
223209 } else {
224210 spn = fmt .Sprintf ("%s/%s" , broker .conf .Net .SASL .GSSAPI .ServiceName , host )
225211 }
212+ return spn
213+ }
214+
215+ // Login will use the given KerberosClient to login and get a ticket for the given spn.
216+ func (krbAuth * GSSAPIKerberosAuth ) Login (kerberosClient KerberosClient , spn string ) (* messages.Ticket , error ) {
217+ if err := kerberosClient .Login (); err != nil {
218+ Logger .Printf ("Kerberos client login error: %s" , err )
219+ return nil , err
220+ }
226221
227222 ticket , encKey , err := kerberosClient .GetServiceTicket (spn )
228223 if err != nil {
229- Logger .Printf ("Error getting Kerberos service ticket : %s" , err )
230- return err
224+ Logger .Printf ("Kerberos service ticket error for %s : %s" , spn , err )
225+ return nil , err
231226 }
232227 krbAuth .ticket = ticket
233228 krbAuth .encKey = encKey
234229 krbAuth .step = GSS_API_INITIAL
235- var receivedBytes []byte = nil
230+
231+ return & ticket , nil
232+ }
233+
234+ // Authorize performs the kerberos auth handshake for authorization
235+ func (krbAuth * GSSAPIKerberosAuth ) Authorize (broker * Broker ) error {
236+ kerberosClient , err := krbAuth .NewKerberosClientFunc (krbAuth .Config )
237+ if err != nil {
238+ Logger .Printf ("Kerberos client initialization error: %s" , err )
239+ return err
240+ }
236241 defer kerberosClient .Destroy ()
242+
243+ ticket , err := krbAuth .Login (kerberosClient , krbAuth .spn (broker ))
244+ if err != nil {
245+ return err
246+ }
247+
248+ principal := strings .Join (ticket .SName .NameString , "/" ) + "@" + ticket .Realm
249+ var receivedBytes []byte
250+
237251 for {
238252 packBytes , err := krbAuth .initSecContext (receivedBytes , kerberosClient )
239253 if err != nil {
240- Logger .Printf ("Error while performing GSSAPI Kerberos Authentication : %s\n " , err )
254+ Logger .Printf ("Kerberos init error as %s : %s" , principal , err )
241255 return err
242256 }
257+
243258 requestTime := time .Now ()
244259 bytesWritten , err := krbAuth .writePackage (broker , packBytes )
245260 if err != nil {
246- Logger .Printf ("Error while performing GSSAPI Kerberos Authentication : %s\n " , err )
261+ Logger .Printf ("Kerberos write error as %s : %s" , principal , err )
247262 return err
248263 }
249264 broker .updateOutgoingCommunicationMetrics (bytesWritten )
250- if krbAuth .step == GSS_API_VERIFY {
251- bytesRead := 0
265+
266+ switch krbAuth .step {
267+ case GSS_API_VERIFY :
268+ var bytesRead int
252269 receivedBytes , bytesRead , err = krbAuth .readPackage (broker )
253270 requestLatency := time .Since (requestTime )
254271 broker .updateIncomingCommunicationMetrics (bytesRead , requestLatency )
255272 if err != nil {
256- Logger .Printf ("Error while performing GSSAPI Kerberos Authentication : %s\n " , err )
273+ Logger .Printf ("Kerberos read error as %s : %s" , principal , err )
257274 return err
258275 }
259- } else if krbAuth .step == GSS_API_FINISH {
276+ case GSS_API_FINISH :
277+ return nil
278+ }
279+ }
280+ }
281+
282+ // AuthorizeV2 performs the SASL v2 GSSAPI authentication with the Kafka broker.
283+ func (krbAuth * GSSAPIKerberosAuth ) AuthorizeV2 (broker * Broker , authSendReceiver func (authBytes []byte ) (* SaslAuthenticateResponse , error )) error {
284+ kerberosClient , err := krbAuth .NewKerberosClientFunc (krbAuth .Config )
285+ if err != nil {
286+ Logger .Printf ("Kerberos client initialization error: %s" , err )
287+ return err
288+ }
289+ defer kerberosClient .Destroy ()
290+
291+ ticket , err := krbAuth .Login (kerberosClient , krbAuth .spn (broker ))
292+ if err != nil {
293+ return err
294+ }
295+
296+ principal := strings .Join (ticket .SName .NameString , "/" ) + "@" + ticket .Realm
297+ var receivedBytes []byte
298+
299+ for {
300+ token , err := krbAuth .initSecContext (receivedBytes , kerberosClient )
301+ if err != nil {
302+ Logger .Printf ("SASL Kerberos init error as %s: %s" , principal , err )
303+ return err
304+ }
305+
306+ authResponse , err := authSendReceiver (token )
307+ if err != nil {
308+ Logger .Printf ("SASL Kerberos authenticate error as %s: %s" , principal , err )
309+ return err
310+ }
311+
312+ if authResponse .Err != ErrNoError {
313+ Logger .Printf ("SASL Kerberos authentication failed as %s: %s" , principal , authResponse .Err )
314+ return authResponse .Err
315+ }
316+
317+ receivedBytes = authResponse .SaslAuthBytes
318+
319+ if krbAuth .step == GSS_API_FINISH {
260320 return nil
261321 }
262322 }
0 commit comments