-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathkafka-source.kamelet.yaml
117 lines (117 loc) · 6.53 KB
/
kafka-source.kamelet.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: kafka-source
annotations:
camel.apache.org/catalog.version: "1.10.10"
camel.apache.org/kamelet.icon: ""
camel.apache.org/provider: "Red Hat"
camel.apache.org/kamelet.group: "Kafka"
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Kafka Source"
description: |-
Receive data from Kafka topics.
required:
- topic
- bootstrapServers
- user
- password
type: object
properties:
topic:
title: Topic Names
description: Comma separated list of Kafka topic names
type: string
bootstrapServers:
title: Brokers
description: Comma separated list of Kafka Broker URLs
type: string
securityProtocol:
title: Security Protocol
description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported
type: string
default: SASL_SSL
saslMechanism:
title: SASL Mechanism
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
type: string
default: PLAIN
user:
title: Username
description: Username to authenticate to Kafka
type: string
password:
title: Password
description: Password to authenticate to kafka
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
- urn:camel:group:credentials
autoCommitEnable:
title: Auto Commit Enable
description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer
type: boolean
default: true
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
allowManualCommit:
title: Allow Manual Commit
description: Whether to allow doing manual commits
type: boolean
default: false
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
pollOnError:
title: Poll On Error Behavior
description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP
type: string
default: "ERROR_HANDLER"
autoOffsetReset:
title: Auto Offset Reset
description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none
type: string
default: "latest"
consumerGroup:
title: Consumer Group
description: A string that uniquely identifies the group of consumers to which this source belongs
type: string
example: "my-group-id"
deserializeHeaders:
title: Automatically Deserialize Headers
description: When enabled the Kamelet source will deserialize all message headers to String representation.
type: boolean
default: false
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
dependencies:
- "mvn:org.apache.camel.kamelets:camel-kamelets-utils:1.10.10"
- "camel:core"
- "camel:kafka"
- "camel:kamelet"
template:
from:
uri: "kafka:{{topic}}"
parameters:
brokers: "{{?bootstrapServers}}"
securityProtocol: "{{securityProtocol}}"
saslMechanism: "{{saslMechanism}}"
saslJaasConfig: "org.apache.kafka.common.security.plain.PlainLoginModule required username='{{user}}' password='{{password}}';"
autoCommitEnable: "{{autoCommitEnable}}"
allowManualCommit: "{{allowManualCommit}}"
pollOnError: "{{pollOnError}}"
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- set-property:
name: deserializeHeaders
constant: "{{deserializeHeaders}}"
- choice:
when:
- simple: "${exchangeProperty.deserializeHeaders} == 'true'"
steps:
- bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
- to: "kamelet:sink"