Skip to content

Commit bdd5a25

Browse files
OneCricketeervivetiwa
andauthored
[GH-52] Use Jackson for JSON handling (#54)
* [GH-52] Use Jackson for all JSON handling * Add schema tests * checkstyle + tests * Update streaming-connect-sink/src/test/java/com/adobe/platform/streaming/sink/utils/SinkUtilsTest.java * Delete aep-connector-with-schemas-enabled.json * refactor: create JacksonFactory * fix: ContentHandler compilation * ISSUE-52 Changes : 1. Add developer guide to ingest avro data 2. In case of null record value, let json converter handle the conversion to avoid NullPointerException and other schema conversions. * ISSUE-52 Add TokenResponse Generic for java 8 compilation * ISSUE-52 Java 8 compilation fix --------- Co-authored-by: vivetiwa <[email protected]>
1 parent 72ee40a commit bdd5a25

File tree

24 files changed

+426
-240
lines changed

24 files changed

+426
-240
lines changed

DEVELOPER_GUIDE.md

Lines changed: 132 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ your local docker kafka topic.
1313

1414

1515
## Build Docker locally and Run
16-
```bash
16+
```shell
1717
./gradlew clean build
1818
docker build -t streaming-connect .
19-
docker-compose up -d
19+
docker compose up -d
2020
```
2121

2222
## Configuration Options
@@ -31,8 +31,6 @@ AEP Sink connector configurations can be supplied in the call register the conne
3131
|-----------------------------------|-------------------------------------------------|---------------------------------------------------------|----------|-------------------------|
3232
| topics | comma separated list of topics | | yes | |
3333
| connector.class | classname of impl | com.adobe.platform.streaming.sink.impl.AEPSinkConnector | yes | |
34-
| key.converter.schemas.enable | enables conversion of schemas | false | no | |
35-
| value.converter.schemas.enable | enables conversion of schemas | false | no | |
3634
| aep.endpoint | aep streaming endpoint url | | yes | |
3735
| aep.connection.proxy.host | address of the proxy host to connect through | | no | |
3836
| aep.connection.proxy.port | port of the proxy host to connect through | 443 | no | |
@@ -51,17 +49,25 @@ AEP Sink connector configurations can be supplied in the call register the conne
5149
## Step-by-Step Workflow
5250

5351
### Build
54-
```./gradlew clean build```
52+
```shell
53+
./gradlew clean build
54+
```
5555

5656
### Build docker
5757

58-
```docker build -t streaming-connect .```
58+
```shell
59+
docker build -t streaming-connect .
60+
```
5961

6062
### Running Docker
61-
```docker-compose up -d```
63+
```shell
64+
docker compose up -d
65+
```
6266

6367
### Tail Docker logs
64-
```docker logs experience-platform-streaming-connect_kafka-connect_1 -f```
68+
```shell
69+
docker logs experience-platform-streaming-connect_kafka-connect_1 -f
70+
```
6571

6672
### Manage running connectors
6773

@@ -70,7 +76,9 @@ connect instances.
7076

7177
#### List of running connectors
7278

73-
```curl -X GET http://localhost:8083/connectors```
79+
```shell
80+
curl -X GET http://localhost:8083/connectors
81+
```
7482

7583
### Create a Streaming Connection
7684

@@ -84,7 +92,7 @@ Once you have an IMS access token and API key, it needs to be provided as part o
8492
Note that the sandbox-name is optional, if not provided will default to the Production sandbox.
8593

8694

87-
```
95+
```shell
8896
curl -X POST https://platform.adobe.io/data/foundation/flowservice/connections \
8997
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
9098
-H 'Content-Type: application/json' \
@@ -122,15 +130,15 @@ the one below. The `id` field in the response is the Connection Id.
122130
123131
With the connection created, you can now retrieve your data collection URL from the connection information.
124132
125-
```
133+
```shell
126134
curl -X GET https://platform.adobe.io/data/foundation/flowservice/connections/{CONNECTION_ID} \
127135
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
128136
-H 'x-gw-ims-org-id: {IMS_ORG}' \
129137
-H 'x-api-key: {API_KEY}' \
130138
-H 'x-sandbox-name: {SANDBOX_NAME}'
131139
```
132140
133-
```
141+
```json
134142
{
135143
"items": [
136144
{
@@ -173,20 +181,22 @@ sinked to.
173181
Once the Connect server is running on port 8083, you can use REST APIs to launch multiple instances of connectors.
174182
175183
#### Basic
176-
```
184+
```shell
177185
curl -s -X POST \
178186
-H "Content-Type: application/json" \
179187
--data '{
180188
"name": "aep-sink-connector",
181189
"config": {
190+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
182191
"topics": "connect-test",
183192
"tasks.max": 1,
184-
"aep.flush.interval.seconds": 1,
185-
"aep.flush.bytes.kb": 4,
186-
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
193+
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
187194
"key.converter.schemas.enable": "false",
195+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
188196
"value.converter.schemas.enable": "false",
189197
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}"
198+
"aep.flush.interval.seconds": 1,
199+
"aep.flush.bytes.kb": 4,
190200
}
191201
}' http://localhost:8083/connectors
192202
```
@@ -196,92 +206,97 @@ curl -s -X POST \
196206
Use the command below to set up a Sink connector to a Authenticated Streaming Connection:
197207
198208
1. Using access_token
199-
```
200-
curl -s -X POST \
201-
-H "Content-Type: application/json" \
202-
--data '{
203-
"name": "aep-auth-sink-connector",
204-
"config": {
205-
"topics": "connect-test",
206-
"tasks.max": 1,
207-
"aep.flush.interval.seconds": 1,
208-
"aep.flush.bytes.kb": 4,
209-
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
210-
"key.converter.schemas.enable": "false",
211-
"value.converter.schemas.enable": "false",
212-
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
213-
"aep.connection.auth.enabled": true,
214-
"aep.connection.auth.token.type": "access_token",
215-
"aep.connection.auth.client.id": "<client_id>",
216-
"aep.connection.auth.client.code": "<client_code>",
217-
"aep.connection.auth.client.secret": "<client_secret>"
218-
}
219-
}' http://localhost:8083/connectors
220-
```
209+
```shell
210+
curl -s -X POST \
211+
-H "Content-Type: application/json" \
212+
--data '{
213+
"name": "aep-auth-sink-connector",
214+
"config": {
215+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
216+
"topics": "connect-test",
217+
"tasks.max": 1,
218+
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
219+
"key.converter.schemas.enable": "false",
220+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
221+
"value.converter.schemas.enable": "false",
222+
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
223+
"aep.flush.interval.seconds": 1,
224+
"aep.flush.bytes.kb": 4,
225+
"aep.connection.auth.enabled": true,
226+
"aep.connection.auth.token.type": "access_token",
227+
"aep.connection.auth.client.id": "<client_id>",
228+
"aep.connection.auth.client.code": "<client_code>",
229+
"aep.connection.auth.client.secret": "<client_secret>"
230+
}
231+
}' http://localhost:8083/connectors
232+
```
221233
222234
2. Using jwt_token
223-
- Convert private.key from adobe console to PKCS8 private using command
224-
```
225-
openssl pkcs8 -topk8 -inform PEM -outform DER -in private.key -out private-pkcs8.key -nocrypt
226-
```
227-
228-
- Create http connector
229-
```
230-
curl -s -X POST \
231-
-H "Content-Type: application/json" \
232-
--data '{
233-
"name": "aep-auth-sink-connector",
234-
"config": {
235-
"topics": "connect-test",
236-
"tasks.max": 1,
237-
"aep.flush.interval.seconds": 1,
238-
"aep.flush.bytes.kb": 4,
239-
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
240-
"key.converter.schemas.enable": "false",
241-
"value.converter.schemas.enable": "false",
242-
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
243-
"aep.connection.auth.enabled": true,
244-
"aep.connection.auth.token.type": "jwt_token",
245-
"aep.connection.auth.client.id": "<client_id>",
246-
"aep.connection.auth.imsOrg": "<organization-id>",
247-
"aep.connection.auth.accountKey": "<technical-account-id>",
248-
"aep.connection.auth.filePath": "<path-to-private-pkcs8.key>",
249-
"aep.connection.auth.endpoint": "<ims-url>",
250-
"aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
251-
"aep.connection.auth.client.secret": "<client_secret>"
252-
}
253-
}' http://localhost:8083/connectors
254-
```
255-
Note - `aep.connection.endpoint.headers` format should be comma separated.
256-
Example: To send below 2 http header -
257-
1. key: x-adobe-flow-id, value: 341fd4f0-cdec-4912-1ab6-fb54aeb41286
258-
2. key: x-adobe-dataset-id, value: 3096fbfd5978431948af3ba3
259-
260-
Use `aep.connection.endpoint.headers` value -
261-
```
262-
{'x-adobe-flow-id':'341fd4f0-cdec-4912-1ab6-fb54aeb41286','x-adobe-dataset-id': '3096fbfd5978431948af3ba3'}
263-
```
264-
265-
235+
- Convert private.key from adobe console to PKCS8 private using command
236+
```shell
237+
openssl pkcs8 -topk8 -inform PEM -outform DER -in private.key -out private-pkcs8.key -nocrypt
238+
```
239+
240+
- Create http connector
241+
```shell
242+
curl -s -X POST \
243+
-H "Content-Type: application/json" \
244+
--data '{
245+
"name": "aep-auth-sink-connector",
246+
"config": {
247+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
248+
"topics": "connect-test",
249+
"tasks.max": 1,
250+
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
251+
"key.converter.schemas.enable": "false",
252+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
253+
"value.converter.schemas.enable": "false",
254+
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
255+
"aep.flush.interval.seconds": 1,
256+
"aep.flush.bytes.kb": 4,
257+
"aep.connection.auth.enabled": true,
258+
"aep.connection.auth.token.type": "jwt_token",
259+
"aep.connection.auth.client.id": "<client_id>",
260+
"aep.connection.auth.imsOrg": "<organization-id>",
261+
"aep.connection.auth.accountKey": "<technical-account-id>",
262+
"aep.connection.auth.filePath": "<path-to-private-pkcs8.key>",
263+
"aep.connection.auth.endpoint": "<ims-url>",
264+
"aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
265+
"aep.connection.auth.client.secret": "<client_secret>"
266+
}
267+
}' http://localhost:8083/connectors
268+
```
269+
270+
Note - `aep.connection.endpoint.headers` format should be JSON-encoded.
271+
Example: To send below 2 HTTP headers -
272+
1. key: `x-adobe-flow-id`, value: `341fd4f0-cdec-4912-1ab6-fb54aeb41286`
273+
2. key: `x-adobe-dataset-id`, value: `3096fbfd5978431948af3ba3`
274+
275+
Use config -
276+
```json
277+
"aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
278+
```
266279
267280
#### Batching
268281
269282
Use the command below to set up an Sink connector to batch up requests and reduce the number of network calls
270283
271-
```
284+
```shell
272285
curl -s -X POST \
273286
-H "Content-Type: application/json" \
274287
--data '{
275288
"name": "aep-batch-sink-connector",
276289
"config": {
290+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
277291
"topics": "connect-test",
278292
"tasks.max": 1,
279-
"aep.flush.interval.seconds": 1,
280-
"aep.flush.bytes.kb": 20,
281-
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
293+
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
282294
"key.converter.schemas.enable": "false",
295+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
283296
"value.converter.schemas.enable": "false",
284297
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
298+
"aep.flush.interval.seconds": 1,
299+
"aep.flush.bytes.kb": 20,
285300
}
286301
}' http://localhost:8083/connectors
287302
```
@@ -291,16 +306,44 @@ To send error records to dead letter topic please use standard kafka connector e
291306
292307
Kafka connect dead letter configurations : `https://docs.confluent.io/platform/current/connect/concepts.html#dead-letter-queue`
293308
309+
#### Avro data ingestion
310+
Ingest avro data using confluent schema registry and avro converter.
311+
Steps to ingest avro data :
312+
1. Install confluent avro converter by following document - https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter/
313+
2. Upload schema registry rest endpoint to add or update avro schema. Please follow documentation to add schema - https://docs.confluent.io/platform/7.5/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration:~:text=the%20command%20below.-,curl%20%2DX%20POST%20%2DH,-%22Content%2DType%3A%20application
314+
3. Create connector with following configuration :
315+
```shell
316+
curl -s -X POST \
317+
-H 'Content-Type: application/json' \
318+
--data '{
319+
"name": "aep-sink-connector",
320+
"config": {
321+
"topics": "connect-test",
322+
"tasks.max": 1,
323+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
324+
"key.converter": "io.confluent.connect.avro.AvroConverter",
325+
"key.converter.schema.registry.url": "{SCHEMA_REGISTRY_URL}",
326+
"value.converter": "io.confluent.connect.avro.AvroConverter",
327+
"value.converter.schema.registry.url": "{SCHEMA_REGISTRY_URL}",
328+
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
329+
"aep.flush.interval.seconds": 1,
330+
"aep.flush.bytes.kb": 20
331+
}
332+
}' http://localhost:8083/connectors
333+
```
334+
Replace `{SCHEMA_REGISTRY_URL}` with schema registry rest endpoint.
335+
4. Once connector is up, post avro data to topic `connect-test` to send data to AEP.
336+
294337
#### Poxy host configuration
295338
There are 2 ways to route request to aep endpoint through proxy server :
296339
1. **Using Environment Variable** : Export poxyHost and proxyPort on each kafka node, then restart kafka connect node.
297340
298341
For HTTPS use following :
299-
```
342+
```shell
300343
export KAFKA_OPTS="-Dhttps.proxyHost=127.0.0.1 -Dhttps.proxyPort=8085 -Dhttps.proxyUser=proxyUsername -Dhttps.proxyPassword=proxyPassword"
301344
```
302345
For HTTP use following:
303-
```
346+
```shell
304347
export KAFKA_OPTS="-Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=8085 -Dhttp.proxyUser=proxyUsername -Dhttp.proxyPassword=proxyPassword"
305348
```
306349
2. **Using Connector Properties** : While creating connector set following properties, default values mentioned in [connect configurations](#configuration-options).
@@ -323,7 +366,7 @@ In order to test the flow, you can use the following curl command to post a mess
323366
Kafka rest proxy. Please ensure that the curl command uses your inlet endpoint, and the schema of the XDM message
324367
corresponding to your setup.
325368
326-
```bash
369+
```shell
327370
curl -X POST \
328371
http://localhost:8082/topics/connect-test \
329372
-H 'Content-Type: application/vnd.kafka.json.v2+json' \
@@ -380,4 +423,4 @@ by the AEP Streaming Sink Connector and sent the AEP Streaming inlet.
380423
[blogpost]: https://medium.com/adobetech/using-postman-for-jwt-authentication-on-adobe-i-o-7573428ffe7f
381424
[connect-apis]: https://docs.confluent.io/current/connect/references/restapi.html
382425
[io-console]: https://console.adobe.io/
383-
[tutorial]: https://www.adobe.io/apis/experienceplatform/home/tutorials/alltutorials.html#!api-specification/markdown/narrative/tutorials/authenticate_to_acp_tutorial/authenticate_to_acp_tutorial.md
426+
[tutorial]: https://github.com/AdobeDocs/experience-platform-docs/blob/master/developer/markdown/narrative/tutorials/authenticate_to_acp_tutorial/authenticate_to_acp_tutorial.md

build.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
buildscript {
1414
repositories {
15-
jcenter()
1615
mavenCentral()
1716
}
1817

@@ -34,7 +33,6 @@ plugins {
3433
description 'Adobe Experience Platform Streaming Connect'
3534

3635
repositories {
37-
jcenter()
3836
mavenCentral()
3937
}
4038

@@ -91,7 +89,6 @@ subprojects {
9189
}
9290

9391
repositories {
94-
jcenter()
9592
mavenCentral()
9693

9794
publishing {

0 commit comments

Comments
 (0)