Skip to content

feat(kafka sink/source): support SASL OAUTHBEARER with method=default and external token endpoint URL #25328

@dvilaverde

Description

@dvilaverde

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

sasl.oauthbearer.method=oidc requires the access_token returned by the token endpoint to be a JWT containing a standard exp claim. librdkafka parses the JWT payload to extract exp (for scheduling proactive token
refresh) and fails with:

Failed to acquire SASL OAUTHBEARER token: Expected JSON JWT response with "exp" field

This makes method=oidc unusable with any OAuth2 provider that:

  • Returns opaque (non-JWT) access tokens, or
  • Returns JWTs that use non-standard expiry fields (e.g., created_at + expires_in in milliseconds rather than a standard exp epoch-seconds claim)

These are valid, real-world OAuth2 providers. The expires_in field in the token endpoint HTTP response is the RFC 6749-specified mechanism for communicating token lifetime — the JWT exp claim is not guaranteed by
the OAuth2 spec.

Current Workaround

None. method=oidc is the only supported path for external token endpoints in Vector today, and it cannot be made to work with providers that don't include exp in their JWT payloads.

Implementation Notes

The change is confined to Vector's Kafka sink context (src/sinks/kafka/). A custom ClientContext implementation that overrides oauthbearer_token_refresh would:

  • Detect when sasl.oauthbearer.method=default + sasl.oauthbearer.token.endpoint.url are both set
  • Fire an async HTTP POST to the endpoint on each refresh callback
  • Parse the standard OAuth2 response (access_token, expires_in, optional token_type)
  • Call set_oauthbearer_token with the result

The change is additive and does not affect existing method=oidc behavior. Estimated diff size: ~100 lines.

Use Cases

  1. Enterprise identity providers with non-standard JWT payloads. Several large-scale internal OAuth2/OIDC systems (some Azure AD configurations, legacy on-premise IdPs) issue access tokens whose JWT
    payloads use proprietary expiry fields (created_at + expires_in in milliseconds, valid_until, etc.) instead of the standard exp epoch-seconds claim. These providers are widely used in enterprise Kafka deployments;
    method=oidc is incompatible with all of them.
  2. OAuth2 providers that return opaque (non-JWT) access tokens. RFC 6749 does not require access tokens to be JWTs — they can be any opaque string. Many OAuth2 authorization servers (including some versions of
    Keycloak in opaque-token mode, Hydra, and custom internal auth services) issue opaque tokens. method=oidc fails immediately on these since the token cannot be base64-decoded as a JWT payload.
  3. Token proxy/sidecar patterns. A common Kubernetes deployment pattern is a sidecar that handles credential fetching and caching, exposing a simple POST /oauth2/token endpoint on localhost that returns
    {"access_token": "...", "expires_in": N}. This sidecar may return the raw token from an upstream provider without wrapping it in a new JWT. method=default with a callback is the natural fit for this architecture —
    librdkafka calls the sidecar, gets the token + lifetime, and handles refresh scheduling itself.
  4. Short-lived or dynamically-scoped tokens where JWT re-signing is not feasible. Some auth systems issue tokens where the JWT payload cannot be modified without invalidating the broker-side signature check, yet the
    exp field is absent or in a non-standard format. Operators currently have no path to make Vector work with these providers.

Attempted Solutions

Attempt 1: method=oidc with a token proxy sidecar

The natural starting point. Configured a local HTTP sidecar that fetches and caches tokens from the upstream OAuth2 provider, exposing them on localhost:9898/oauth2/token. Pointed sasl.oauthbearer.token.endpoint.url
at the sidecar.

librdkafka_options:
"security.protocol": "SASL_SSL"
"sasl.mechanism": "OAUTHBEARER"
"sasl.oauthbearer.method": "oidc"
"sasl.oauthbearer.client.id": "my-client-id"
"sasl.oauthbearer.client.secret": "my-client-secret"
"sasl.oauthbearer.token.endpoint.url": "http://localhost:9898/oauth2/token"

Result: librdkafka successfully calls the endpoint, retrieves the token, then immediately fails:
Failed to acquire SASL OAUTHBEARER token: Expected JSON JWT response with "exp" field
The upstream provider returns a JWT whose payload uses created_at (epoch ms) + expires_in (ms, as a string) rather than the standard exp claim. librdkafka's OIDC handler has no fallback — it parses only the JWT
payload, not the HTTP response JSON.


Attempt 2: Add exp as a top-level field in the token endpoint response

Hypothesis: librdkafka might fall back to reading exp from the HTTP response JSON if it cannot find it in the JWT payload.

Modified the proxy to compute and return an absolute exp Unix timestamp alongside the standard fields:
{
"access_token": "",
"token_type": "bearer",
"expires_in": 86399,
"exp": 1714506000
}

Result: No change. Confirmed by reading the librdkafka source (rdkafka_sasl_oauthbearer_oidc.c, rd_kafka_oidc_token_try_validate()) — exp is read exclusively from the decoded JWT payload. There is no fallback to the
response JSON.

Proposal

When sasl.oauthbearer.method=default is configured alongside sasl.oauthbearer.token.endpoint.url, Vector should implement librdkafka's oauthbearer_token_refresh_cb callback. The callback should:

  1. POST to the configured sasl.oauthbearer.token.endpoint.url
  2. Parse access_token and expires_in from the standard OAuth2 JSON response
  3. Call rd_kafka_oauthbearer_set_token() with the token and lifetime derived from expires_in — no JWT parsing required

This is the pattern librdkafka's own documentation recommends for method=default: the application implements the refresh callback and sets the token lifetime from the HTTP response rather than from JWT
introspection.

Proposed Configuration

  sinks:
    kafka_out:
      type: kafka
      bootstrap_servers: "broker:9097"
      topic: "my-topic"
      encoding:
        codec: json
      librdkafka_options:
        "security.protocol": "SASL_SSL"
        "sasl.mechanism": "OAUTHBEARER"
        "sasl.oauthbearer.method": "default"
        "sasl.oauthbearer.token.endpoint.url": "http://localhost:9898/oauth2/token"

References

Version

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    sink: kafkaAnything `kafka` sink relatedsource: kafkaAnything `kafka` source related

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions