Skip to content

Attempting to connect to the MSK IAM [Consumer] #573

Open
@oaattia

Description

@oaattia

Description

It appears to me that the addition of OAUTHBEARER support, specifically the new methods RdKafka\Conf::setOauthbearerTokenRefreshCb(), RdKafka::oauthbearerSetToken(), and RdKafka::oauthbearerSetTokenFailure(), in the high-level consumer is not functioning as expected

The following code:

<?php

$this->kafkaConfig->setOauthbearerTokenRefreshCb(function ($kafka, $oauthbearer_config = null) {
            try {
                $provider = \Aws\Credentials\CredentialProvider::defaultProvider(
                    [
                        'region'  => env('AWS_DEFAULT_REGION'),
                        'version' => 'latest'
                    ]
                );
                
                $credentials = $provider()->wait();
                
                // using https://github.com/PyaeSoneAungRgn/aws-msk-iam-sasl-signer-php
                $signer = new AwsMskIamSaslSigner(
                    env('AWS_DEFAULT_REGION'),
                    $credentials->getAccessKeyId(),
                    $credentials->getSecretKey(),
                    $credentials->getSecurityToken()
                );
                
                
                $token = $signer->generateToken();

                $kafka->oauthbearerSetToken(
                    $token['token'],
                    $token['expiryTime'],
                    'AWS MSK IAM',
                    []
                );
            } catch (\Exception $e) {
                var_dump('OAuth token generation failed:', $e->getMessage());
                throw new \RuntimeException('Failed to generate OAuth token: ' . $e->getMessage());
            }
        });

Resulted in this output:

[2025-01-08 17:13:31]ERROR: Call to undefined method RdKafka\KafkaConsumer::oauthbearerSetToken() {"exception":"[object] (Error(code: 0): Call to undefined method RdKafka\KafkaConsumer::oauthbearerSetToken() at src/Consumer/AwsTopicConsumer.php:123)

But I expected to start consuming, but i's looks to me the that the high level consumer doesn't contain oauthbearerSetToken

php-rdkafka Version

6.0.5

librdkafka Version

No response

PHP Version

PHP 8.0

Operating System

Ubuntu 20

Kafka Version

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions