Skip to content

Commit 2d191ba

Browse files
alunguamuraru
andcommitted
Add support for dedicated envoy deployment per brokerConfigGroup (#2)
* No LB on envoy * Add support for dedicated envoy deployment per brokerConfigGroup Use a different label for each envoy deployment Add support to bring your own Load Balancer LB will not be managed by Kafka-Operator. HostnameOverride should be mandatory with BringYourOwnLB Add external listeners per broker config group Create Envoy if all config groups define an EL Envoy ingress should be creted if there is a global External Listener define, or if each broker config group defines each own External Listener. The broker config group External Listener will always override the global External Listener. Reconcile External Listeners only if needed. Fix LoadBalancer overwrite for ExternalListener LB address / IP should be used if HostnameOverride is not used Cosmetic fixes Co-authored-by: Adi Muraru <[email protected]>
1 parent b718da3 commit 2d191ba

25 files changed

+1750
-189
lines changed

charts/kafka-operator/crds/operator-kafka-crd.yaml

Lines changed: 564 additions & 24 deletions
Large diffs are not rendered by default.

config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml

Lines changed: 428 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
apiVersion: kafka.banzaicloud.io/v1beta1
2+
kind: KafkaCluster
3+
metadata:
4+
labels:
5+
controller-tools.k8s.io: "1.0"
6+
name: kafka
7+
spec:
8+
headlessServiceEnabled: true
9+
zkAddresses:
10+
- "zookeeper-client.zookeeper:2181"
11+
service.beta.kubernetes.io/aws-load-balancer-type: nlb
12+
oneBrokerPerNode: false
13+
clusterImage: "banzaicloud/kafka:2.13-2.4.0"
14+
readOnlyConfig: |
15+
auto.create.topics.enable=false
16+
brokerConfigGroups:
17+
az1:
18+
storageConfigs:
19+
- mountPath: "/kafka-logs"
20+
pvcSpec:
21+
accessModes:
22+
- ReadWriteOnce
23+
resources:
24+
requests:
25+
storage: 10Gi
26+
listenersConfig:
27+
externalListeners:
28+
- type: "plaintext"
29+
name: "external"
30+
externalStartingPort: 19190
31+
containerPort: 9094
32+
az2:
33+
storageConfigs:
34+
- mountPath: "/kafka-logs"
35+
pvcSpec:
36+
accessModes:
37+
- ReadWriteOnce
38+
resources:
39+
requests:
40+
storage: 10Gi
41+
listenersConfig:
42+
externalListeners:
43+
- type: "plaintext"
44+
name: "external"
45+
externalStartingPort: 19290
46+
containerPort: 9094
47+
az3:
48+
storageConfigs:
49+
- mountPath: "/kafka-logs"
50+
pvcSpec:
51+
accessModes:
52+
- ReadWriteOnce
53+
resources:
54+
requests:
55+
storage: 10Gi
56+
listenersConfig:
57+
externalListeners:
58+
- type: "plaintext"
59+
name: "external"
60+
externalStartingPort: 19390
61+
containerPort: 9094
62+
brokers:
63+
- id: 1
64+
brokerConfigGroup: "az1"
65+
- id: 2
66+
brokerConfigGroup: "az2"
67+
- id: 3
68+
brokerConfigGroup: "az3"
69+
rollingUpgradeConfig:
70+
failureThreshold: 1
71+
listenersConfig:
72+
internalListeners:
73+
- type: "plaintext"
74+
name: "plaintext"
75+
containerPort: 29092
76+
usedForInnerBrokerCommunication: true
77+
- type: "plaintext"
78+
name: "controller"
79+
containerPort: 29093
80+
usedForInnerBrokerCommunication: false
81+
usedForControllerCommunication: true
82+
cruiseControlConfig:
83+
config: |
84+
# Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
85+
#
86+
# This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details.
87+
# Configuration for the metadata client.
88+
# =======================================
89+
# The maximum interval in milliseconds between two metadata refreshes.
90+
#metadata.max.age.ms=300000
91+
# Client id for the Cruise Control. It is used for the metadata client.
92+
#client.id=kafka-cruise-control
93+
# The size of TCP send buffer bytes for the metadata client.
94+
#send.buffer.bytes=131072
95+
# The size of TCP receive buffer size for the metadata client.
96+
#receive.buffer.bytes=131072
97+
# The time to wait before disconnect an idle TCP connection.
98+
#connections.max.idle.ms=540000
99+
# The time to wait before reconnect to a given host.
100+
#reconnect.backoff.ms=50
101+
# The time to wait for a response from a host after sending a request.
102+
#request.timeout.ms=30000
103+
# Configurations for the load monitor
104+
# =======================================
105+
# The number of metric fetcher thread to fetch metrics for the Kafka cluster
106+
num.metric.fetchers=1
107+
# The metric sampler class
108+
metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler
109+
# Configurations for CruiseControlMetricsReporterSampler
110+
metric.reporter.topic.pattern=__CruiseControlMetrics
111+
# The sample store class name
112+
sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore
113+
# The config for the Kafka sample store to save the partition metric samples
114+
partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples
115+
# The config for the Kafka sample store to save the model training samples
116+
broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples
117+
# The replication factor of Kafka metric sample store topic
118+
sample.store.topic.replication.factor=2
119+
# The config for the number of Kafka sample store consumer threads
120+
num.sample.loading.threads=8
121+
# The partition assignor class for the metric samplers
122+
metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor
123+
# The metric sampling interval in milliseconds
124+
metric.sampling.interval.ms=120000
125+
# The partition metrics window size in milliseconds
126+
partition.metrics.window.ms=300000
127+
# The number of partition metric windows to keep in memory
128+
num.partition.metrics.windows=1
129+
# The minimum partition metric samples required for a partition in each window
130+
min.samples.per.partition.metrics.window=1
131+
# The broker metrics window size in milliseconds
132+
broker.metrics.window.ms=300000
133+
# The number of broker metric windows to keep in memory
134+
num.broker.metrics.windows=20
135+
# The minimum broker metric samples required for a partition in each window
136+
min.samples.per.broker.metrics.window=1
137+
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities)
138+
capacity.config.file=config/capacity.json
139+
#capacity.config.file=config/capacityJBOD.json
140+
# Configurations for the analyzer
141+
# =======================================
142+
# The list of goals to optimize the Kafka cluster for with pre-computed proposals
143+
default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
144+
# The list of supported goals
145+
goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
146+
# The list of supported hard goals
147+
hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
148+
# The minimum percentage of well monitored partitions out of all the partitions
149+
min.monitored.partition.percentage=0.95
150+
# The balance threshold for CPU
151+
cpu.balance.threshold=1.1
152+
# The balance threshold for disk
153+
disk.balance.threshold=1.1
154+
# The balance threshold for network inbound utilization
155+
network.inbound.balance.threshold=1.1
156+
# The balance threshold for network outbound utilization
157+
network.outbound.balance.threshold=1.1
158+
# The balance threshold for the replica count
159+
replica.count.balance.threshold=1.1
160+
# The capacity threshold for CPU in percentage
161+
cpu.capacity.threshold=0.8
162+
# The capacity threshold for disk in percentage
163+
disk.capacity.threshold=0.8
164+
# The capacity threshold for network inbound utilization in percentage
165+
network.inbound.capacity.threshold=0.8
166+
# The capacity threshold for network outbound utilization in percentage
167+
network.outbound.capacity.threshold=0.8
168+
# The threshold to define the cluster to be in a low CPU utilization state
169+
cpu.low.utilization.threshold=0.0
170+
# The threshold to define the cluster to be in a low disk utilization state
171+
disk.low.utilization.threshold=0.0
172+
# The threshold to define the cluster to be in a low network inbound utilization state
173+
network.inbound.low.utilization.threshold=0.0
174+
# The threshold to define the cluster to be in a low disk utilization state
175+
network.outbound.low.utilization.threshold=0.0
176+
# The metric anomaly percentile upper threshold
177+
metric.anomaly.percentile.upper.threshold=90.0
178+
# The metric anomaly percentile lower threshold
179+
metric.anomaly.percentile.lower.threshold=10.0
180+
# How often should the cached proposal be expired and recalculated if necessary
181+
proposal.expiration.ms=60000
182+
# The maximum number of replicas that can reside on a broker at any given time.
183+
max.replicas.per.broker=10000
184+
# The number of threads to use for proposal candidate precomputing.
185+
num.proposal.precompute.threads=1
186+
# the topics that should be excluded from the partition movement.
187+
#topics.excluded.from.partition.movement
188+
# Configurations for the executor
189+
# =======================================
190+
# The max number of partitions to move in/out on a given broker at a given time.
191+
num.concurrent.partition.movements.per.broker=10
192+
# The interval between two execution progress checks.
193+
execution.progress.check.interval.ms=10000
194+
# Configurations for anomaly detector
195+
# =======================================
196+
# The goal violation notifier class
197+
anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier
198+
# The metric anomaly finder class
199+
metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder
200+
# The anomaly detection interval
201+
anomaly.detection.interval.ms=10000
202+
# The goal violation to detect.
203+
anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
204+
# The interested metrics for metric anomaly analyzer.
205+
metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN
206+
## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics.
207+
#metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH
208+
# The zk path to store failed broker information.
209+
failed.brokers.zk.path=/CruiseControlBrokerList
210+
# Topic config provider class
211+
topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider
212+
# The cluster configurations for the KafkaTopicConfigProvider
213+
cluster.configs.file=config/clusterConfigs.json
214+
# The maximum time in milliseconds to store the response and access details of a completed user task.
215+
completed.user.task.retention.time.ms=21600000
216+
# The maximum time in milliseconds to retain the demotion history of brokers.
217+
demotion.history.retention.time.ms=86400000
218+
# The maximum number of completed user tasks for which the response and access details will be cached.
219+
max.cached.completed.user.tasks=100
220+
# The maximum number of user tasks for concurrently running in async endpoints across all users.
221+
max.active.user.tasks=5
222+
# Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled
223+
self.healing.enabled=true
224+
# Enable self healing for broker failure detector
225+
#self.healing.broker.failure.enabled=true
226+
# Enable self healing for goal violation detector
227+
#self.healing.goal.violation.enabled=true
228+
# Enable self healing for metric anomaly detector
229+
#self.healing.metric.anomaly.enabled=true
230+
# configurations for the webserver
231+
# ================================
232+
# HTTP listen port
233+
webserver.http.port=9090
234+
# HTTP listen address
235+
webserver.http.address=0.0.0.0
236+
# Whether CORS support is enabled for API or not
237+
webserver.http.cors.enabled=false
238+
# Value for Access-Control-Allow-Origin
239+
webserver.http.cors.origin=http://localhost:8080/
240+
# Value for Access-Control-Request-Method
241+
webserver.http.cors.allowmethods=OPTIONS,GET,POST
242+
# Headers that should be exposed to the Browser (Webapp)
243+
# This is a special header that is used by the
244+
# User Tasks subsystem and should be explicitly
245+
# Enabled when CORS mode is used as part of the
246+
# Admin Interface
247+
webserver.http.cors.exposeheaders=User-Task-ID
248+
# REST API default prefix
249+
# (dont forget the ending *)
250+
webserver.api.urlprefix=/kafkacruisecontrol/*
251+
# Location where the Cruise Control frontend is deployed
252+
webserver.ui.diskpath=./cruise-control-ui/dist/
253+
# URL path prefix for UI
254+
# (dont forget the ending *)
255+
webserver.ui.urlprefix=/*
256+
# Time After which request is converted to Async
257+
webserver.request.maxBlockTimeMs=10000
258+
# Default Session Expiry Period
259+
webserver.session.maxExpiryTimeMs=60000
260+
# Session cookie path
261+
webserver.session.path=/
262+
# Server Access Logs
263+
webserver.accesslog.enabled=true
264+
# Location of HTTP Request Logs
265+
webserver.accesslog.path=access.log
266+
# HTTP Request Log retention days
267+
webserver.accesslog.retention.days=14
268+
clusterConfig: |
269+
{
270+
"min.insync.replicas": 3
271+
}

pkg/errorfactory/errorfactory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type CruiseControlTaskTimeout struct{ error }
7070
// CruiseControlTaskFailure states that CC task was not found (CC restart?) or failed
7171
type CruiseControlTaskFailure struct{ error }
7272

73+
// KafkaConfigError stats that the Kafka configuration is invalid
74+
type KafkaConfigError struct{ error }
75+
7376
// New creates a new error factory error
7477
func New(t interface{}, err error, msg string, wrapArgs ...interface{}) error {
7578
wrapped := errors.WrapIfWithDetails(err, msg, wrapArgs...)
@@ -110,6 +113,8 @@ func New(t interface{}, err error, msg string, wrapArgs ...interface{}) error {
110113
return CruiseControlTaskTimeout{wrapped}
111114
case CruiseControlTaskFailure:
112115
return CruiseControlTaskFailure{wrapped}
116+
case KafkaConfigError:
117+
return KafkaConfigError{wrapped}
113118
}
114119
return wrapped
115120
}

pkg/k8sutil/resource.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,42 @@ import (
3434
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
3535
)
3636

37+
func Delete(log logr.Logger, client runtimeClient.Client, target runtime.Object) error {
38+
targetType := reflect.TypeOf(target)
39+
current := target.DeepCopyObject()
40+
41+
key, err := runtimeClient.ObjectKeyFromObject(current)
42+
if err != nil {
43+
return errors.WithDetails(err, "kind", targetType)
44+
}
45+
log = log.WithValues("kind", targetType, "name", key.Name)
46+
47+
err = client.Get(context.TODO(), key, current)
48+
if err == nil {
49+
err = client.Delete(context.TODO(), current)
50+
if err != nil {
51+
return errorfactory.New(
52+
errorfactory.APIFailure{},
53+
err,
54+
"delete resource failed",
55+
"kind", targetType, "name", key.Name,
56+
)
57+
}
58+
} else if apierrors.IsNotFound(err) {
59+
log.V(1).Info("resource not found for delete")
60+
return nil
61+
} else {
62+
return errorfactory.New(
63+
errorfactory.APIFailure{},
64+
err,
65+
"getting resource failed",
66+
"kind", targetType, "name", key.Name,
67+
)
68+
}
69+
log.Info("resource deleted")
70+
return nil
71+
}
72+
3773
// Reconcile reconciles K8S resources
3874
func Reconcile(log logr.Logger, client runtimeClient.Client, desired runtime.Object, cr *v1beta1.KafkaCluster) error {
3975
desiredType := reflect.TypeOf(desired)

0 commit comments

Comments
 (0)