- 
                Notifications
    
You must be signed in to change notification settings  - Fork 59
 
Description
I am trying to use kafka comptabile redpanda for http sink connector with the configuraiton as
{
"name": "http-sink-test",
"config": {
"connector.class": "io.aiven.kafka.connect.http.HttpSinkConnector",
"topics.regex": "abc",
"http.authorization.type": "oauth2",
"http.url": "https://xxx/data",
"http.headers.content.type": "application/json",
"oauth2.access.token.url": "https://xxx/token",
"oauth2.client.id": "yyyy",
"oauth2.client.secret": "zzz",
"oauth2.client.authorization.mode": "header",
"oauth2.response.token.property": "access_token",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
} and get the following error in status of the connector.
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.42.3.119:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:522)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:499)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:522)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)\n\t... 14 more\nCaused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Message': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])"Message 100"; line: 1, column: 9]\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)\n\t... 18 more\nCaused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Message': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])"Message 100"; line: 1, column: 9]\n\tat com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)\n\tat com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703)\n\tat com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)\n\t... 19 more\n"
}
it is to be noted that curl command inside the http-sink-pod works correctly.
curl --location 'https://mvp-preview2-idp-ra.centralus.cloudapp.azure.com:443/idp/timeseries/v1/b8a18df1-8840-4156-9897-0801ed52c66d/sources/69060a7b-f7a7-4289-a64d-6658d913b86b/data' 
--header 'Content-Type: application/json' 
--header 'Authorization: Bearer tokenvalue' 
--data '{
"value": 4,
"quality": 192,
"status": "OK",
"timeStamp": "2023-08-10T15:52:52.116Z",
"annotations": {
"productColor": "red",
"productCode": "765665.1"
}
}
can you please assist here on what went wrong here. do i need to change anything on configuration