Skip to content

Commit 270ecaf

Browse files
author
amuraru
committed
OAUTH support for AdobeIMS
1 parent 5a18f45 commit 270ecaf

10 files changed

+1168
-29
lines changed

docs/oauth-configuration.md

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# OAuth Configuration Guide
2+
3+
kminion supports OAUTHBEARER SASL mechanism with two flavors:
4+
5+
1. **Generic OAuth** - Standard OAuth 2.0 client credentials flow
6+
2. **Adobe IMS OAuth** - Adobe Identity Management System integration
7+
8+
## Generic OAuth Configuration
9+
10+
For standard OAuth 2.0 providers, configure the following:
11+
12+
```yaml
13+
kafka:
14+
sasl:
15+
enabled: true
16+
mechanism: "OAUTHBEARER"
17+
oauth:
18+
# Leave type empty for generic OAuth
19+
type: ""
20+
tokenEndpoint: "https://your-oauth-provider.com/oauth/token"
21+
clientId: "your-client-id"
22+
clientSecret: "your-client-secret"
23+
scope: "kafka"
24+
```
25+
26+
### Environment Variables
27+
28+
```bash
29+
KAFKA_SASL_ENABLED=true
30+
KAFKA_SASL_MECHANISM=OAUTHBEARER
31+
KAFKA_SASL_OAUTH_TYPE=""
32+
KAFKA_SASL_OAUTH_TOKENENDPOINT=https://your-oauth-provider.com/oauth/token
33+
KAFKA_SASL_OAUTH_CLIENTID=your-client-id
34+
KAFKA_SASL_OAUTH_CLIENTSECRET=your-client-secret
35+
KAFKA_SASL_OAUTH_SCOPE=kafka
36+
```
37+
38+
## Adobe IMS OAuth Configuration
39+
40+
For Adobe Identity Management System, configure the following:
41+
42+
```yaml
43+
kafka:
44+
sasl:
45+
enabled: true
46+
mechanism: "OAUTHBEARER"
47+
oauth:
48+
type: "AdobeIMS"
49+
tokenEndpoint: "https://ims-na1.adobelogin.com"
50+
clientId: "your-ims-client-id"
51+
clientSecret: "your-ims-client-secret"
52+
additional:
53+
clientCode: "your-ims-code"
54+
```
55+
56+
### Environment Variables
57+
58+
```bash
59+
KAFKA_SASL_ENABLED=true
60+
KAFKA_SASL_MECHANISM=OAUTHBEARER
61+
KAFKA_SASL_OAUTH_TYPE=AdobeIMS
62+
KAFKA_SASL_OAUTH_TOKENENDPOINT=https://ims-na1.adobelogin.com
63+
KAFKA_SASL_OAUTH_CLIENTID=your-ims-client-id
64+
KAFKA_SASL_OAUTH_CLIENTSECRET=your-ims-client-secret
65+
KAFKA_SASL_OAUTH_ADDITIONAL_CLIENTCODE=your-ims-code
66+
```
67+
68+
## How It Works
69+
70+
### Generic OAuth Flow
71+
72+
1. kminion uses the client credentials grant type
73+
2. Sends a POST request to the token endpoint with client ID and secret
74+
3. Receives an access token
75+
4. Uses the token for Kafka authentication
76+
77+
### Adobe IMS Flow
78+
79+
1. kminion uses the Adobe IMS Go SDK (`github.com/adobe/ims-go`)
80+
2. Creates an IMS client with the configured endpoint
81+
3. Requests a token using the IMS-specific authentication flow
82+
4. Stores both the access token and refresh token
83+
5. Uses the access token for Kafka authentication
84+
6. **Automatically refreshes the token** when it expires in less than 15 minutes
85+
7. Updates the refresh token if a new one is provided during refresh
86+
87+
## Switching Between Providers
88+
89+
The `type` field determines which OAuth provider to use:
90+
91+
- **Empty or omitted**: Uses generic OAuth with `tokenEndpoint`, `clientId`, `clientSecret`, and `scope`
92+
- **"AdobeIMS"**: Uses Adobe IMS with `tokenEndpoint`, `clientId`, `clientSecret`, and `additional.clientCode`
93+
94+
### Field Reuse
95+
96+
Both generic OAuth and Adobe IMS share common fields:
97+
- `tokenEndpoint`: OAuth token endpoint URL (for Adobe IMS, this is the IMS endpoint)
98+
- `clientId`: OAuth client ID
99+
- `clientSecret`: OAuth client secret
100+
101+
Provider-specific fields go in the `additional` block:
102+
- For Adobe IMS: `clientCode` (authorization code)
103+
104+
## Validation
105+
106+
- **Generic OAuth**: Validates that `tokenEndpoint`, `clientId`, and `clientSecret` are provided
107+
- **Adobe IMS**: Validation happens during client creation when connecting to Kafka
108+
109+
## Token Refresh Behavior
110+
111+
### Generic OAuth
112+
113+
Generic OAuth fetches a new token on every authentication request using the client credentials flow. This is simple but may result in more frequent token requests to the OAuth provider.
114+
115+
### Adobe IMS
116+
117+
Adobe IMS implements intelligent token refresh with resilient error handling:
118+
119+
- **Initial Token**: Obtained during startup using the authorization code with retry logic
120+
- **Refresh Token**: Stored securely and used for token renewal
121+
- **Automatic Refresh**: Tokens are automatically refreshed when they expire in less than 15 minutes
122+
- **Thread-Safe**: Token refresh is protected by mutex to prevent race conditions in concurrent environments
123+
- **Long-Running Support**: Designed for long-running Kafka clients that need to maintain connections for extended periods
124+
- **Retry Logic**: Automatic retry with exponential backoff for transient network errors
125+
126+
The 15-minute refresh threshold ensures that:
127+
- Tokens are refreshed proactively before expiration
128+
- There's sufficient time to complete the refresh operation
129+
- Kafka connections remain authenticated without interruption
130+
131+
### Error Resilience
132+
133+
Adobe IMS token operations (both initial fetch and refresh) include automatic retry logic for transient errors:
134+
135+
**Retryable Errors:**
136+
- Network timeouts (connection timeout, i/o timeout)
137+
- DNS resolution failures (temporary errors, timeouts)
138+
- Connection refused errors (service not ready)
139+
- Connection reset errors (connection dropped)
140+
- Network dial errors (can't establish connection)
141+
- Closed network connection errors
142+
- IMS HTTP errors (408, 429, 500, 502, 503, 504)
143+
144+
**Retry Strategy:**
145+
- Maximum 3 retry attempts
146+
- Exponential backoff: 1s, 2s, 4s, 8s, 16s (capped at 30s)
147+
- Non-retryable errors (e.g., invalid credentials) fail immediately
148+
- Context cancellation is respected during retry backoff
149+
150+
This ensures that temporary network issues or IMS service hiccups don't cause permanent authentication failures.
151+
152+
## Dependencies
153+
154+
- Generic OAuth: Built-in HTTP client
155+
- Adobe IMS: `github.com/adobe/ims-go` v0.19.1+
156+

docs/reference-config.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,20 @@ kafka:
6161
enableFast: true
6262
# OAUTHBEARER config properties
6363
oauth:
64+
# Type of OAuth provider. Valid values: AdobeIMS (leave empty for generic OAuth)
65+
type: ""
66+
# OAuth token endpoint (used by both generic OAuth and provider-specific types like AdobeIMS)
6467
tokenEndpoint: ""
68+
# Client ID (used by both generic OAuth and provider-specific types)
6569
clientId: ""
70+
# Client secret (used by both generic OAuth and provider-specific types)
6671
clientSecret: ""
72+
# Scope for generic OAuth (not used by AdobeIMS)
6773
scope: ""
74+
# Additional provider-specific configuration
75+
# For AdobeIMS: clientCode is required
76+
additional:
77+
clientCode: ""
6878

6979
minion:
7080
consumerGroups:
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Example configurations for OAuth authentication with Kafka
2+
3+
# Example 1: Generic OAuth (e.g., Keycloak, Okta, Auth0)
4+
---
5+
kafka:
6+
brokers:
7+
- kafka-broker-1:9092
8+
- kafka-broker-2:9092
9+
clientId: "kminion"
10+
11+
sasl:
12+
enabled: true
13+
mechanism: "OAUTHBEARER"
14+
oauth:
15+
# Leave type empty for generic OAuth
16+
type: ""
17+
tokenEndpoint: "https://your-oauth-provider.com/oauth/token"
18+
clientId: "your-kafka-client-id"
19+
clientSecret: "your-kafka-client-secret"
20+
scope: "kafka"
21+
22+
# Example 2: Adobe IMS OAuth
23+
---
24+
kafka:
25+
brokers:
26+
- kafka-broker-1:9092
27+
- kafka-broker-2:9092
28+
clientId: "kminion"
29+
30+
sasl:
31+
enabled: true
32+
mechanism: "OAUTHBEARER"
33+
oauth:
34+
type: "AdobeIMS"
35+
tokenEndpoint: "https://ims-na1.adobelogin.com"
36+
clientId: "your-ims-client-id"
37+
clientSecret: "your-ims-client-secret"
38+
additional:
39+
clientCode: "your-ims-authorization-code"
40+
41+
# Example 3: Generic OAuth with TLS
42+
---
43+
kafka:
44+
brokers:
45+
- kafka-broker-1:9093
46+
- kafka-broker-2:9093
47+
clientId: "kminion"
48+
49+
tls:
50+
enabled: true
51+
caFilepath: "/path/to/ca-cert.pem"
52+
certFilepath: "/path/to/client-cert.pem"
53+
keyFilepath: "/path/to/client-key.pem"
54+
55+
sasl:
56+
enabled: true
57+
mechanism: "OAUTHBEARER"
58+
oauth:
59+
type: ""
60+
tokenEndpoint: "https://your-oauth-provider.com/oauth/token"
61+
clientId: "your-kafka-client-id"
62+
clientSecret: "your-kafka-client-secret"
63+
scope: "kafka"
64+
65+
# Example 4: Adobe IMS OAuth with TLS
66+
---
67+
kafka:
68+
brokers:
69+
- kafka-broker-1:9093
70+
- kafka-broker-2:9093
71+
clientId: "kminion"
72+
73+
tls:
74+
enabled: true
75+
caFilepath: "/path/to/ca-cert.pem"
76+
77+
sasl:
78+
enabled: true
79+
mechanism: "OAUTHBEARER"
80+
oauth:
81+
type: "AdobeIMS"
82+
tokenEndpoint: "https://ims-na1.adobelogin.com"
83+
clientId: "your-ims-client-id"
84+
clientSecret: "your-ims-client-secret"
85+
additional:
86+
clientCode: "your-ims-authorization-code"
87+

go.mod

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/cloudhut/kminion/v2
33
go 1.25
44

55
require (
6+
github.com/adobe/ims-go v0.19.1
67
github.com/google/uuid v1.6.0
78
github.com/jcmturner/gokrb5/v8 v8.4.4
89
github.com/jellydator/ttlcache/v2 v2.11.1
@@ -14,7 +15,7 @@ require (
1415
github.com/stretchr/testify v1.11.1
1516
github.com/twmb/franz-go v1.19.5
1617
github.com/twmb/franz-go/pkg/kadm v1.16.1
17-
github.com/twmb/franz-go/pkg/kmsg v1.11.2
18+
github.com/twmb/franz-go/pkg/kmsg v1.12.0
1819
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
1920
go.uber.org/atomic v1.11.0
2021
go.uber.org/automaxprocs v1.6.0
@@ -27,6 +28,7 @@ require (
2728
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2829
github.com/davecgh/go-spew v1.1.1 // indirect
2930
github.com/fsnotify/fsnotify v1.8.0 // indirect
31+
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
3032
github.com/hashicorp/go-uuid v1.0.3 // indirect
3133
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
3234
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
@@ -44,9 +46,9 @@ require (
4446
github.com/prometheus/procfs v0.16.1 // indirect
4547
go.uber.org/multierr v1.11.0 // indirect
4648
go.yaml.in/yaml/v2 v2.4.2 // indirect
47-
golang.org/x/crypto v0.42.0 // indirect
48-
golang.org/x/net v0.44.0 // indirect
49-
golang.org/x/sys v0.36.0 // indirect
49+
golang.org/x/crypto v0.43.0 // indirect
50+
golang.org/x/net v0.46.0 // indirect
51+
golang.org/x/sys v0.37.0 // indirect
5052
google.golang.org/protobuf v1.36.8 // indirect
5153
gopkg.in/yaml.v3 v3.0.1 // indirect
5254
)

go.sum

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
22
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
33
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
4+
github.com/adobe/ims-go v0.19.1 h1:K2/cgCkM/MZO7Nvv+yYjrQKpT6Tuyw1sMUxsUxU7BB4=
5+
github.com/adobe/ims-go v0.19.1/go.mod h1:cOGPuM+RrWjl9DAo3Z/fU/OOHt+PGpLHwYgidu0dpGQ=
46
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
57
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
68
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -63,6 +65,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6
6365
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
6466
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
6567
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
68+
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
69+
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
6670
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
6771
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
6872
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -298,8 +302,8 @@ github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7Eqflv
298302
github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw=
299303
github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc=
300304
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
301-
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
302-
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
305+
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
306+
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
303307
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 h1:alKdbddkPw3rDh+AwmUEwh6HNYgTvDSFIe/GWYRR9RM=
304308
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0/go.mod h1:k8BoBjyUbFj34f0rRbn+Ky12sZFAPbmShrg0karAIMo=
305309
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -334,8 +338,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
334338
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
335339
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
336340
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
337-
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
338-
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
341+
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
342+
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
339343
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
340344
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
341345
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -372,8 +376,8 @@ golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY
372376
golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
373377
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
374378
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
375-
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
376-
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
379+
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
380+
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
377381
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
378382
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
379383
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -425,8 +429,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
425429
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
426430
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
427431
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
428-
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
429-
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
432+
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
433+
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
430434
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
431435
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
432436
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

0 commit comments

Comments
 (0)