-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathkafka-connector-sink.yaml
More file actions
73 lines (63 loc) · 3.14 KB
/
kafka-connector-sink.yaml
File metadata and controls
73 lines (63 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
kind: Secret
apiVersion: v1
metadata:
name: googlebigquery-credential
namespace: <namespace>
stringData:
clientEmail: <clientEmail>
projectId: <projectId>
datasetId: <datasetId>
privateKey: <privateKey>
type: Opaque
---
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
labels:
eventstreams.ibm.com/cluster: <name of Kafka Connect cluster>
name: <name of connector>
namespace: <namespace>
spec:
autoRestart:
# Enables automatic restarting of Kafka connectors in case of errors or failures.
# For more details, see: https://strimzi.io/blog/2023/01/25/auto-restarting-connectors/
enabled: true
maxRestarts: <number of times the connector attempts to restart in case of an error or failure>
# Connector class name
class: com.ibm.eventstreams.connect.connectivitypack.sink.ConnectivityPackSinkConnector
# `tasksMax` should be equal to the number of object-eventType combinations
# In this example it is 1 (Employee - UPSERT)
tasksMax: 1
config:
# Which system to connect to, for example, googlebigquery
connectivitypack.sink: googlebigquery
# Credentials to access the sink system (This example shows auth-type - 'BASIC')
connectivitypack.sink.authType: BASIC
connectivitypack.sink.credentials.clientEmail: ${file:/mnt/googlebigquery-credential:clientEmail}
connectivitypack.sink.credentials.projectId: ${file:/mnt/googlebigquery-credential:projectId}
connectivitypack.sink.credentials.datasetId: ${file:/mnt/googlebigquery-credential:datasetId}
connectivitypack.sink.credentials.privateKey: ${file:/mnt/googlebigquery-credential:privateKey}
# Object and associated action to be performed
# For example: The config has the sink object the connector will perform action on. Each object corresponds to a sink system object,
# such as Google BigQuery 'Table'.
connectivitypack.sink.object: 'Employee'
# Required for actions UPSERT and UPDATE only
connectivitypack.sink.object.key: employee_id
# Specifies the action to be performed on 'Employee' table.
connectivitypack.sink.action: UPSERT
# The topic from which the connector reads messages from.
topics: 'employee-upsert'
# Single Message transformations (SMTs)
transforms: 'RenameFields,FilterFields,CastFields'
transforms.FilterFields.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameFields.type: org.apache.kafka.connect.transforms.ReplaceField$Value
# Mapping of <filed name in Kafka topic record : column name in Google BigQuery Table>
transforms.RenameFields.renames: 'first_name__c:first_name,last_name__c:last_name,date_of_birth__c:date_of_birth,gender__c:gender,email__c:email,Salary__c:salary'
# List of fields to be excluded from upserting to the table
transforms.FilterFields.exclude: 'Id,OwnerId,IsDeleted,CreatedDate,Name,CreatedById,LastModifiedDate,LastModifiedById,SystemModstamp'
transforms.CastFields.type: org.apache.kafka.connect.transforms.Cast$Value
# Casting required for numeric data types.
transforms.CastFields.spec: 'salary:string'
# For handling bad records
errors.tolerance: all
errors.log.enable: true