Skip to content

Commit a050275

Browse files
authored
Added supoort for oauth2 access token for service to service (#63)
1 parent 5a2dbf1 commit a050275

File tree

12 files changed

+363
-5
lines changed

12 files changed

+363
-5
lines changed

DEVELOPER_GUIDE.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ Use the command below to set up a Sink connector to a Authenticated Streaming Co
231231
}' http://localhost:8083/connectors
232232
```
233233
234-
2. Using jwt_token
234+
2. Using jwt_token **[DEPRECATED]**
235235
- Convert private.key from adobe console to PKCS8 private using command
236236
```shell
237237
openssl pkcs8 -topk8 -inform PEM -outform DER -in private.key -out private-pkcs8.key -nocrypt
@@ -272,6 +272,46 @@ Use the command below to set up a Sink connector to a Authenticated Streaming Co
272272
1. key: `x-adobe-flow-id`, value: `341fd4f0-cdec-4912-1ab6-fb54aeb41286`
273273
2. key: `x-adobe-dataset-id`, value: `3096fbfd5978431948af3ba3`
274274
275+
Use config -
276+
```json
277+
"aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
278+
```
279+
#### note : jwt_token authentication is deprecated
280+
281+
282+
3. Using oauth2_access_token
283+
- Create http connector
284+
```shell
285+
curl -s -X POST \
286+
-H "Content-Type: application/json" \
287+
--data '{
288+
"name": "aep-auth-sink-connector",
289+
"config": {
290+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
291+
"topics": "connect-test",
292+
"tasks.max": 1,
293+
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
294+
"key.converter.schemas.enable": "false",
295+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
296+
"value.converter.schemas.enable": "false",
297+
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
298+
"aep.flush.interval.seconds": 1,
299+
"aep.flush.bytes.kb": 4,
300+
"aep.connection.auth.enabled": true,
301+
"aep.connection.auth.token.type": "oauth2_access_token",
302+
"aep.connection.auth.client.id": "<client_id>",
303+
"aep.connection.auth.client.secret": "<client_secret>"
304+
"aep.connection.auth.endpoint": "<ims-url>",
305+
"aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
306+
}
307+
}' http://localhost:8083/connectors
308+
```
309+
310+
Note - `aep.connection.endpoint.headers` format should be JSON-encoded.
311+
Example: To send below 2 HTTP headers -
312+
1. key: `x-adobe-flow-id`, value: `341fd4f0-cdec-4912-1ab6-fb54aeb41286`
313+
2. key: `x-adobe-dataset-id`, value: `3096fbfd5978431948af3ba3`
314+
275315
Use config -
276316
```json
277317
"aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ task copyDependencies(type: Copy) {
247247
"delivery_guarantee": [ "at_least_once"],
248248
"kafka_connect_api": true,
249249
"single_message_transforms": true,
250-
"supported_encodings": [ "json" ]
250+
"supported_encodings": [ "any" ]
251251
},
252252
"license": [
253253
{

streaming-connect-common/src/main/java/com/adobe/platform/streaming/auth/TokenType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
public enum TokenType {
2323

2424
ACCESS_TOKEN("access_token"),
25-
JWT_TOKEN("jwt_token");
25+
JWT_TOKEN("jwt_token"),
26+
OAUTH2_ACCESS_TOKEN("oauth2_access_token");
2627

2728
private static final Map<String, TokenType> TOKEN_TYPES = ImmutableMap.<String, TokenType>builder()
2829
.put(ACCESS_TOKEN.name, ACCESS_TOKEN)
2930
.put(JWT_TOKEN.name, JWT_TOKEN)
31+
.put(OAUTH2_ACCESS_TOKEN.name, OAUTH2_ACCESS_TOKEN)
3032
.build();
3133

3234
private String name;

streaming-connect-common/src/main/java/com/adobe/platform/streaming/auth/impl/AuthProviderFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public static AuthProvider getAuthProvider(TokenType tokenType, Map<String, Stri
4040
case JWT_TOKEN:
4141
return getJWTAuthProvider(authProperties, authProxyConfiguration);
4242

43+
case OAUTH2_ACCESS_TOKEN:
44+
return getOAuth2IMSProvider(authProperties, authProxyConfiguration);
45+
4346
default:
4447
throw new AuthException("Invalid token type to get auth provider");
4548
}
@@ -61,6 +64,20 @@ private static AuthProvider getIMSAuthProvider(Map<String, String> authPropertie
6164
new IMSTokenProvider(endpoint, clientId, clientCode, clientSecret, authProxyConfiguration);
6265
}
6366

67+
private static AuthProvider getOAuth2IMSProvider(final Map<String, String> authProperties,
68+
final AuthProxyConfiguration authProxyConfiguration) {
69+
final String clientId = authProperties.get(AuthUtils.AUTH_CLIENT_ID);
70+
final String clientSecret = authProperties.get(AuthUtils.AUTH_CLIENT_SECRET);
71+
72+
Preconditions.checkNotNull(clientId, "Invalid client Id");
73+
Preconditions.checkNotNull(clientSecret, "Invalid client secret");
74+
75+
final String endpoint = authProperties.get(AuthUtils.AUTH_ENDPOINT);
76+
return StringUtils.isEmpty(endpoint) ?
77+
new OAuth2IMSTokenProvider(clientId, clientSecret, authProxyConfiguration) :
78+
new OAuth2IMSTokenProvider(endpoint, clientId, clientSecret, authProxyConfiguration);
79+
}
80+
6481
private static AuthProvider getJWTAuthProvider(Map<String, String> authProperties,
6582
AuthProxyConfiguration authProxyConfiguration) {
6683
final String clientId = authProperties.get(AuthUtils.AUTH_CLIENT_ID);

streaming-connect-common/src/main/java/com/adobe/platform/streaming/auth/impl/JWTTokenProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
/**
4646
* @author Adobe Inc.
4747
*/
48+
@Deprecated
4849
public class JWTTokenProvider extends AbstractAuthProvider {
4950

5051
private static final Logger LOG = LoggerFactory.getLogger(JWTTokenProvider.class);
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2024 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming.auth.impl;
14+
15+
import com.adobe.platform.streaming.auth.AbstractAuthProvider;
16+
import com.adobe.platform.streaming.auth.AuthException;
17+
import com.adobe.platform.streaming.auth.AuthUtils;
18+
import com.adobe.platform.streaming.auth.TokenResponse;
19+
import com.adobe.platform.streaming.http.HttpException;
20+
import com.adobe.platform.streaming.http.HttpProducer;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
25+
/**
26+
* @author Adobe Inc.
27+
*/
28+
public class OAuth2IMSTokenProvider extends AbstractAuthProvider {
29+
30+
private static final Logger LOG = LoggerFactory.getLogger(OAuth2IMSTokenProvider.class);
31+
private static final String IMS_ENDPOINT_PATH = "/ims/token/v3";
32+
private static final String GRANT_TYPE = "client_credentials";
33+
private static final String SCOPE = "openid,AdobeID,read_organizations,additional_info.projectedProductContext," +
34+
"session";
35+
36+
private String endpoint = System.getenv(AuthUtils.AUTH_ENDPOINT);
37+
38+
private final String clientId;
39+
private final String clientSecret;
40+
private HttpProducer httpProducer;
41+
42+
OAuth2IMSTokenProvider(final String clientId, final String clientSecret,
43+
final AuthProxyConfiguration authProxyConfiguration) {
44+
this.clientId = clientId;
45+
this.clientSecret = clientSecret;
46+
this.httpProducer = HttpProducer.newBuilder(endpoint)
47+
.withProxyHost(authProxyConfiguration.getProxyHost())
48+
.withProxyPort(authProxyConfiguration.getProxyPort())
49+
.withProxyUser(authProxyConfiguration.getProxyUsername())
50+
.withProxyPassword(authProxyConfiguration.getProxyPassword())
51+
.build();
52+
}
53+
54+
OAuth2IMSTokenProvider(final String endpoint, final String clientId, final String clientSecret,
55+
final AuthProxyConfiguration authProxyConfiguration) {
56+
this(clientId, clientSecret, authProxyConfiguration);
57+
this.endpoint = endpoint;
58+
this.httpProducer = HttpProducer.newBuilder(endpoint)
59+
.withProxyHost(authProxyConfiguration.getProxyHost())
60+
.withProxyPort(authProxyConfiguration.getProxyPort())
61+
.withProxyUser(authProxyConfiguration.getProxyUsername())
62+
.withProxyPassword(authProxyConfiguration.getProxyPassword())
63+
.build();
64+
}
65+
66+
@Override
67+
protected TokenResponse getTokenResponse() throws AuthException {
68+
LOG.debug("refreshing expired oauth2 accessToken: {}", clientId);
69+
StringBuilder params = new StringBuilder()
70+
.append("grant_type=").append(GRANT_TYPE)
71+
.append("&client_id=").append(clientId)
72+
.append("&client_secret=").append(clientSecret)
73+
.append("&scope=").append(SCOPE);
74+
75+
try {
76+
final TokenResponse tokenResponse = httpProducer.post(IMS_ENDPOINT_PATH, params.toString().getBytes(),
77+
getContentHandler());
78+
// As the expiresIn time we get from the API is in seconds we need to convert this into milliseconds
79+
final long expiresInMilliSecond = tokenResponse.getExpiresIn() * 1000;
80+
final TokenResponse updatedTokenResponse = new TokenResponse(tokenResponse.getTokenType(), expiresInMilliSecond,
81+
tokenResponse.getRefreshToken(), tokenResponse.getAccessToken());
82+
return updatedTokenResponse;
83+
} catch (HttpException httpException) {
84+
throw new AuthException("Exception while fetching oauth2 access token", httpException);
85+
}
86+
}
87+
88+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2024 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming.auth.impl;
14+
15+
import com.adobe.platform.streaming.auth.AuthException;
16+
import org.junit.jupiter.api.Test;
17+
18+
import static org.junit.jupiter.api.Assertions.assertThrows;
19+
20+
/**
21+
* @author Adobe Inc.
22+
*/
23+
class OAuth2IMSTokenProviderTest {
24+
25+
private static final String TEST_ENDPOINT = "https://ims-na1.adobelogin.com";
26+
private static final String TEST_CLIENT_ID = "testClientId";
27+
private static final String TEST_CLIENT_SECRET = "testClientSecret";
28+
29+
@Test
30+
void testGetTokenInvalidClientId() {
31+
OAuth2IMSTokenProvider imsTokenProvider = new OAuth2IMSTokenProvider(TEST_ENDPOINT, null,
32+
TEST_CLIENT_SECRET, AuthProxyConfiguration.builder().build());
33+
assertThrows(AuthException.class, imsTokenProvider::getToken);
34+
}
35+
36+
@Test
37+
void testGetTokenInvalidSecret() {
38+
OAuth2IMSTokenProvider imsTokenProvider = new OAuth2IMSTokenProvider(TEST_ENDPOINT, TEST_CLIENT_ID, null,
39+
AuthProxyConfiguration.builder().build());
40+
assertThrows(AuthException.class, imsTokenProvider::getToken);
41+
}
42+
43+
@Test
44+
void testGetToken() {
45+
OAuth2IMSTokenProvider imsTokenProvider = new OAuth2IMSTokenProvider(
46+
TEST_ENDPOINT,
47+
TEST_CLIENT_ID,
48+
TEST_CLIENT_SECRET,
49+
AuthProxyConfiguration.builder().build()
50+
);
51+
assertThrows(AuthException.class, imsTokenProvider::getTokenResponse);
52+
}
53+
54+
}

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/AbstractAEPPublisher.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,25 @@ private AuthProvider getAuthProvider(Map<String, String> props) {
112112
.equals(AEP_CONNECTION_AUTH_ENABLED_VALUE);
113113
LOG.info("Auth Enabled for DCS Published: {}", isAuthEnabled);
114114
if (isAuthEnabled) {
115-
TokenType tokenType = TokenType.getTokenType(props.getOrDefault(
115+
LOG.info("Auth Token Type = {%s}", AEP_CONNECTION_AUTH_TOKEN_TYPE);
116+
final TokenType tokenType = TokenType.getTokenType(props.getOrDefault(
116117
AEP_CONNECTION_AUTH_TOKEN_TYPE,
117118
TokenType.JWT_TOKEN.getName())
118119
);
119120

120-
return tokenType == TokenType.ACCESS_TOKEN ? getIMSTokenProvider(props) : getJWTTokenProvider(props);
121+
switch (tokenType) {
122+
case ACCESS_TOKEN:
123+
return getIMSTokenProvider(props);
124+
125+
case JWT_TOKEN:
126+
return getJWTTokenProvider(props);
127+
128+
case OAUTH2_ACCESS_TOKEN:
129+
return getOAuth2IMSTokenProvider(props);
130+
131+
default:
132+
throw new AuthException("Invalid token type to get auth provider");
133+
}
121134
}
122135
} catch (AuthException authException) {
123136
throw new IllegalArgumentException("Exception while instantiating the auth provider", authException);
@@ -140,6 +153,22 @@ private AuthProvider getIMSTokenProvider(Map<String, String> props) throws AuthE
140153
.build());
141154
}
142155

156+
private AuthProvider getOAuth2IMSTokenProvider(final Map<String, String> props) throws AuthException {
157+
LOG.info("Get auth token for type oauth2_access_token");
158+
159+
return AuthProviderFactory.getAuthProvider(TokenType.OAUTH2_ACCESS_TOKEN, ImmutableMap.<String, String>builder()
160+
.put(AuthUtils.AUTH_CLIENT_ID, props.get(AEP_CONNECTION_AUTH_CLIENT_ID))
161+
.put(AuthUtils.AUTH_CLIENT_SECRET, props.get(AEP_CONNECTION_AUTH_CLIENT_SECRET))
162+
.put(AuthUtils.AUTH_ENDPOINT, props.get(AEP_CONNECTION_AUTH_ENDPOINT))
163+
.build(), AuthProxyConfiguration.builder()
164+
.proxyHost(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_HOST, null))
165+
.proxyPort(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_PORT, 443))
166+
.proxyUsername(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_USER, null))
167+
.proxyPassword(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_PASSWORD, null))
168+
.build());
169+
}
170+
171+
@Deprecated
143172
private AuthProvider getJWTTokenProvider(Map<String, String> props) throws AuthException {
144173
return AuthProviderFactory.getAuthProvider(TokenType.JWT_TOKEN, ImmutableMap.<String, String>builder()
145174
.put(AuthUtils.AUTH_CLIENT_ID, props.get(AEP_CONNECTION_AUTH_CLIENT_ID))

0 commit comments

Comments
 (0)