-
Notifications
You must be signed in to change notification settings - Fork 14.3k
/
Copy pathkafka.py
1901 lines (1651 loc) · 99.2 KB
/
kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import math
import os.path
import re
import signal
import time
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from .config import KafkaConfig
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import config_property, quorum
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH
from kafkatest.version import KafkaVersion
from kafkatest.version import get_version
from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config
class KafkaListener:
def __init__(self, name, port_number, security_protocol, open=False, sasl_mechanism = None):
self.name = name
self.port_number = port_number
self.security_protocol = security_protocol
self.open = open
self.sasl_mechanism = sasl_mechanism
def listener(self):
return "%s://:%s" % (self.name, str(self.port_number))
def advertised_listener(self, node):
return "%s://%s:%s" % (self.name, node.account.hostname, str(self.port_number))
def listener_security_protocol(self):
return "%s:%s" % (self.name, self.security_protocol)
class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
"""
Ducktape system test service for Brokers and KRaft Controllers
Metadata Quorums
----------------
Kafka can use either ZooKeeper or a KRaft Controller quorum for its
metadata. See the kafkatest.services.kafka.quorum.ServiceQuorumInfo
class for details.
Attributes
----------
quorum_info : kafkatest.services.kafka.quorum.ServiceQuorumInfo
Information about the service and it's metadata quorum
num_nodes_broker_role : int
The number of nodes in the service that include 'broker'
in process.roles (0 when using Zookeeper)
num_nodes_controller_role : int
The number of nodes in the service that include 'controller'
in process.roles (0 when using Zookeeper)
controller_quorum : KafkaService
None when using ZooKeeper, otherwise the Kafka service for the
combined case or the isolated controller quorum service
instance for the isolated case
isolated_controller_quorum : KafkaService
None for the combined case or when using ZooKeeper, otherwise
the isolated controller quorum service instance
Kafka Security Protocols
------------------------
The security protocol advertised to clients and the inter-broker
security protocol can be set in the constructor and can be changed
afterwards as well. Set these attributes to make changes; they
take effect when starting each node:
security_protocol : str
default PLAINTEXT
client_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
interbroker_security_protocol : str
default PLAINTEXT
interbroker_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
ZooKeeper
---------
Create an instance of ZookeeperService when metadata_quorum is ZK
(ZK is the default if metadata_quorum is not a test parameter).
KRaft Quorums
------------
Set metadata_quorum accordingly (to COMBINED_KRAFT or ISOLATED_KRAFT).
Do not instantiate a ZookeeperService instance.
Starting Kafka will cause any isolated controller quorum to
automatically start first. Explicitly stopping Kafka does not stop
any isolated controller quorum, but Ducktape will stop both when
tearing down the test (it will stop Kafka first).
KRaft Security Protocols
--------------------------------
The broker-to-controller and inter-controller security protocols
will both initially be set to the inter-broker security protocol.
The broker-to-controller and inter-controller security protocols
must be identical for the combined case (an exception will be
thrown when trying to start the service if they are not identical).
The broker-to-controller and inter-controller security protocols
can differ in the isolated case.
Set these attributes for the combined case. Changes take effect
when starting each node:
controller_security_protocol : str
default PLAINTEXT
controller_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
intercontroller_security_protocol : str
default PLAINTEXT
intercontroller_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
Set the same attributes for the isolated case (changes take effect
when starting each quorum node), but you must first obtain the
service instance for the isolated quorum via one of the
'controller_quorum' or 'isolated_controller_quorum' attributes as
defined above.
"""
PERSISTENT_ROOT = "/mnt/kafka"
STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
# Logs such as controller.log, server.log, etc all go here
OPERATIONAL_LOG_DIR = os.path.join(PERSISTENT_ROOT, "kafka-operational-logs")
OPERATIONAL_LOG_INFO_DIR = os.path.join(OPERATIONAL_LOG_DIR, "info")
OPERATIONAL_LOG_DEBUG_DIR = os.path.join(OPERATIONAL_LOG_DIR, "debug")
# Kafka log segments etc go here
DATA_LOG_DIR_PREFIX = os.path.join(PERSISTENT_ROOT, "kafka-data-logs")
DATA_LOG_DIR_1 = "%s-1" % (DATA_LOG_DIR_PREFIX)
DATA_LOG_DIR_2 = "%s-2" % (DATA_LOG_DIR_PREFIX)
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
METADATA_LOG_DIR = os.path.join (PERSISTENT_ROOT, "kafka-metadata-logs")
METADATA_SNAPSHOT_SEARCH_STR = "%s/__cluster_metadata-0/*.checkpoint" % METADATA_LOG_DIR
METADATA_FIRST_LOG = "%s/__cluster_metadata-0/00000000000000000000.log" % METADATA_LOG_DIR
# Kafka Authorizer
KRAFT_ACL_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
INTERBROKER_LISTENER_NAME = 'INTERNAL'
JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf"
KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
SECURITY_PROTOCOLS = [SecurityConfig.PLAINTEXT, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]
logs = {
"kafka_server_start_stdout_stderr": {
"path": STDOUT_STDERR_CAPTURE,
"collect_default": True},
"kafka_operational_logs_info": {
"path": OPERATIONAL_LOG_INFO_DIR,
"collect_default": True},
"kafka_operational_logs_debug": {
"path": OPERATIONAL_LOG_DEBUG_DIR,
"collect_default": False},
"kafka_data_1": {
"path": DATA_LOG_DIR_1,
"collect_default": False},
"kafka_data_2": {
"path": DATA_LOG_DIR_2,
"collect_default": False},
"kafka_cluster_metadata": {
"path": METADATA_LOG_DIR,
"collect_default": False},
"kafka_heap_dump_file": {
"path": HEAP_DUMP_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=18000, zk_session_timeout=18000, server_prop_overrides=None, zk_chroot=None,
zk_client_secure=False,
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None,
extra_kafka_opts="", tls_version=None,
isolated_kafka=None,
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
quorum_info_provider=None,
use_new_coordinator=None,
consumer_group_migration_policy=None,
dynamicRaftQuorum=False,
use_transactions_v2=False,
use_share_groups=None
):
"""
:param context: test context
:param int num_nodes: the number of nodes in the service. There are 4 possibilities:
1) Zookeeper quorum:
The number of brokers is defined by this parameter.
The broker.id values will be 1..num_nodes.
2) Combined KRaft quorum:
The number of nodes having a broker role is defined by this parameter.
The node.id values will be 1..num_nodes
The number of nodes having a controller role will by default be 1, 3, or 5 depending on num_nodes
(1 if num_nodes < 3, otherwise 3 if num_nodes < 5, otherwise 5). This calculation
can be overridden via controller_num_nodes_override, which must be between 1 and num_nodes,
inclusive, when non-zero. Here are some possibilities:
num_nodes = 1:
broker having node.id=1: broker.roles=broker+controller
num_nodes = 2:
broker having node.id=1: broker.roles=broker+controller
broker having node.id=2: broker.roles=broker
num_nodes = 3:
broker having node.id=1: broker.roles=broker+controller
broker having node.id=2: broker.roles=broker+controller
broker having node.id=3: broker.roles=broker+controller
num_nodes = 3, controller_num_nodes_override = 1
broker having node.id=1: broker.roles=broker+controller
broker having node.id=2: broker.roles=broker
broker having node.id=3: broker.roles=broker
3) Isolated KRaft quorum when instantiating the broker service:
The number of nodes, all of which will have broker.roles=broker, is defined by this parameter.
The node.id values will be 1..num_nodes
4) Isolated KRaft quorum when instantiating the controller service:
The number of nodes, all of which will have broker.roles=controller, is defined by this parameter.
The node.id values will be 3001..(3000 + num_nodes)
The value passed in is determined by the broker service when that is instantiated, and it uses the
same algorithm as described above: 1, 3, or 5 unless controller_num_nodes_override is provided.
:param ZookeeperService zk:
:param dict topics: which topics to create automatically
:param str security_protocol: security protocol for clients to use
:param str tls_version: version of the TLS protocol.
:param str interbroker_security_protocol: security protocol to use for broker-to-broker (and KRaft controller-to-controller) communication
:param str client_sasl_mechanism: sasl mechanism for clients to use
:param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker (and to-controller) communication
:param str authorizer_class_name: which authorizer class to use
:param str version: which kafka version to use. Defaults to "dev" branch
:param jmx_object_names:
:param jmx_attributes:
:param int zk_connect_timeout:
:param int zk_session_timeout:
:param list[list] server_prop_overrides: overrides for kafka.properties file, if the second value is None or "", it will be filtered
e.g: [["config1", "true"], ["config2", "1000"]]
:param str zk_chroot:
:param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True
:param ListenerSecurityConfig listener_security_config: listener config to use
:param dict per_node_server_prop_overrides: overrides for kafka.properties file keyed by 1-based node number
e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: [["config1", "false"], ["config2", "0"]]}
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
:param KafkaService isolated_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
:param int controller_num_nodes_override: the number of controller nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
:param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used.
:param consumer_group_migration_policy: The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa.
:param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag
:param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890
:param use_share_groups: When true, enables the use of share groups introduced in KIP-932
"""
self.zk = zk
self.isolated_kafka = isolated_kafka
self.allow_zk_with_kraft = allow_zk_with_kraft
if quorum_info_provider is None:
self.quorum_info = quorum.ServiceQuorumInfo.from_test_context(self, context)
else:
self.quorum_info = quorum_info_provider(self)
self.controller_quorum = None # will define below if necessary
self.isolated_controller_quorum = None # will define below if necessary
self.dynamicRaftQuorum = False
# Set use_new_coordinator based on context and arguments.
# If not specified, the default config is used.
if use_new_coordinator is None:
arg_name = 'use_new_coordinator'
if context.injected_args is not None:
use_new_coordinator = context.injected_args.get(arg_name)
if use_new_coordinator is None:
use_new_coordinator = context.globals.get(arg_name)
# Set use_share_groups based on context and arguments.
# If not specified, the default config is used.
if use_share_groups is None:
arg_name = 'use_share_groups'
if context.injected_args is not None:
use_share_groups = context.injected_args.get(arg_name)
if use_share_groups is None:
use_share_groups = context.globals.get(arg_name)
# Assign the determined value.
self.use_new_coordinator = use_new_coordinator
self.use_transactions_v2 = use_transactions_v2
self.use_share_groups = use_share_groups
# Set consumer_group_migration_policy based on context and arguments.
if consumer_group_migration_policy is None:
arg_name = 'consumer_group_migration_policy'
if context.injected_args is not None:
consumer_group_migration_policy = context.injected_args.get(arg_name)
if consumer_group_migration_policy is None:
consumer_group_migration_policy = context.globals.get(arg_name)
self.consumer_group_migration_policy = consumer_group_migration_policy
if num_nodes < 1:
raise Exception("Must set a positive number of nodes: %i" % num_nodes)
self.num_nodes_broker_role = 0
self.num_nodes_controller_role = 0
if self.quorum_info.using_kraft:
self.dynamicRaftQuorum = dynamicRaftQuorum
# Used to ensure not more than one controller bootstraps with the standalone flag
self.standalone_controller_bootstrapped = False
if self.quorum_info.has_brokers:
num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers:
self.num_nodes_controller_role = self.num_kraft_controllers(num_nodes_broker_role, controller_num_nodes_override)
if self.isolated_kafka:
raise Exception("Must not specify isolated Kafka service with combined Controller quorum")
else:
self.num_nodes_controller_role = num_nodes
if not self.isolated_kafka:
raise Exception("Must specify isolated Kafka service when instantiating isolated Controller service (should not happen)")
# Initially use the inter-broker security protocol for both
# broker-to-controller and inter-controller communication. Both can be explicitly changed later if desired.
# Note, however, that the two must the same if the controller quorum is combined with the
# brokers. Different security protocols for the two are only supported with a isolated controller quorum.
self.controller_security_protocol = interbroker_security_protocol
self.controller_sasl_mechanism = interbroker_sasl_mechanism
self.intercontroller_security_protocol = interbroker_security_protocol
self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism
# Ducktape tears down services in the reverse order in which they are created,
# so create a service for the isolated controller quorum (if we need one) first, before
# invoking Service.__init__(), so that Ducktape will tear down the quorum last; otherwise
# Ducktape will tear down the controller quorum first, which could lead to problems in
# Kafka and delays in tearing it down (and who knows what else -- it's simply better
# to correctly tear down Kafka first, before tearing down the isolated controller).
if self.quorum_info.has_controllers:
self.controller_quorum = self
else:
num_isolated_controller_nodes = self.num_kraft_controllers(num_nodes, controller_num_nodes_override)
self.isolated_controller_quorum = KafkaService(
context, num_isolated_controller_nodes, self.zk, security_protocol=self.controller_security_protocol,
interbroker_security_protocol=self.intercontroller_security_protocol,
client_sasl_mechanism=self.controller_sasl_mechanism, interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism,
authorizer_class_name=authorizer_class_name, version=version, jmx_object_names=jmx_object_names,
jmx_attributes=jmx_attributes,
listener_security_config=listener_security_config,
extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
isolated_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft,
server_prop_overrides=server_prop_overrides, dynamicRaftQuorum=self.dynamicRaftQuorum
)
self.controller_quorum = self.isolated_controller_quorum
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=KafkaService.PERSISTENT_ROOT)
self.security_protocol = security_protocol
self.tls_version = tls_version
self.client_sasl_mechanism = client_sasl_mechanism
self.topics = topics
self.minikdc = None
self.concurrent_start = True # start concurrently by default
self.authorizer_class_name = authorizer_class_name
self.zk_set_acl = False
if server_prop_overrides is None:
self.server_prop_overrides = []
else:
self.server_prop_overrides = server_prop_overrides
if per_node_server_prop_overrides is None:
self.per_node_server_prop_overrides = {}
else:
self.per_node_server_prop_overrides = per_node_server_prop_overrides
self.log_level = "DEBUG"
self.zk_chroot = zk_chroot
self.zk_client_secure = zk_client_secure
self.listener_security_config = listener_security_config
self.extra_kafka_opts = extra_kafka_opts
#
# In a heavily loaded and not very fast machine, it is
# sometimes necessary to give more time for the zk client
# to have its session established, especially if the client
# is authenticating and waiting for the SaslAuthenticated
# in addition to the SyncConnected event.
#
# The default value for zookeeper.connect.timeout.ms is
# 2 seconds and here we increase it to 5 seconds, but
# it can be overridden by setting the corresponding parameter
# for this constructor.
self.zk_connect_timeout = zk_connect_timeout
# Also allow the session timeout to be provided explicitly,
# primarily so that test cases can depend on it when waiting
# e.g. brokers to deregister after a hard kill.
self.zk_session_timeout = zk_session_timeout
broker_only_port_mappings = {
KafkaService.INTERBROKER_LISTENER_NAME:
KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, config_property.FIRST_BROKER_PORT + 7, None, False)
}
controller_only_port_mappings = {}
for idx, sec_protocol in enumerate(KafkaService.SECURITY_PROTOCOLS):
name_for_controller = self.controller_listener_name(sec_protocol)
broker_only_port_mappings[sec_protocol] = KafkaListener(sec_protocol, config_property.FIRST_BROKER_PORT + idx, sec_protocol, False)
controller_only_port_mappings[name_for_controller] = KafkaListener(name_for_controller, config_property.FIRST_CONTROLLER_PORT + idx, sec_protocol, False)
if self.quorum_info.using_zk or self.quorum_info.has_brokers and not self.quorum_info.has_controllers: # ZK or KRaft broker-only
self.port_mappings = broker_only_port_mappings
elif self.quorum_info.has_brokers_and_controllers: # KRaft broker+controller
self.port_mappings = broker_only_port_mappings.copy()
self.port_mappings.update(controller_only_port_mappings)
else: # KRaft controller-only
self.port_mappings = controller_only_port_mappings
self.interbroker_listener = None
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self._security_config = None
for node in self.nodes:
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
node.version = version
zk_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.BROKER_ID: self.idx(node),
config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: zk_connect_timeout,
config_property.ZOOKEEPER_SESSION_TIMEOUT_MS: zk_session_timeout
}
kraft_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.NODE_ID: self.idx(node)
}
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
kraft_broker_plus_zk_configs.update(zk_broker_configs)
kraft_broker_plus_zk_configs.pop(config_property.BROKER_ID)
if node_quorum_info.service_quorum_info.using_zk:
node.config = KafkaConfig(**zk_broker_configs)
elif not node_quorum_info.has_broker_role: # KRaft controller-only role
controller_only_configs = {
config_property.NODE_ID: self.node_id_as_isolated_controller(node),
}
kraft_controller_plus_zk_configs = controller_only_configs.copy()
kraft_controller_plus_zk_configs.update(zk_broker_configs)
kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
if self.zk:
node.config = KafkaConfig(**kraft_controller_plus_zk_configs)
else:
node.config = KafkaConfig(**controller_only_configs)
else: # KRaft broker-only role or combined broker+controller roles
if self.zk:
node.config = KafkaConfig(**kraft_broker_plus_zk_configs)
else:
node.config = KafkaConfig(**kraft_broker_configs)
self.combined_nodes_started = 0
self.nodes_to_start = self.nodes
def node_id_as_isolated_controller(self, node):
"""
Generates the node id for a controller-only node, starting from config_property.FIRST_CONTROLLER_ID so as not
to overlap with broker id numbering.
This method does not do any validation to check this node is actually part of an isolated controller quorum.
"""
return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1
def num_kraft_controllers(self, num_nodes_broker_role, controller_num_nodes_override):
if controller_num_nodes_override < 0:
raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override)
if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.combined_kraft:
raise Exception("controller_num_nodes_override must not exceed the service's node count in the combined case: %i > %i" %
(controller_num_nodes_override, num_nodes_broker_role))
if controller_num_nodes_override:
return controller_num_nodes_override
if num_nodes_broker_role < 3:
return 1
if num_nodes_broker_role < 5:
return 3
return 5
def set_version(self, version):
for node in self.nodes:
node.version = version
def controller_listener_name(self, security_protocol_name):
return "CONTROLLER_%s" % security_protocol_name
@property
def interbroker_security_protocol(self):
# TODO: disentangle interbroker and intercontroller protocol information
return self.interbroker_listener.security_protocol if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_security_protocol
# this is required for backwards compatibility - there are a lot of tests that set this property explicitly
# meaning 'use one of the existing listeners that match given security protocol, do not use custom listener'
@interbroker_security_protocol.setter
def interbroker_security_protocol(self, security_protocol):
self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
self.listener_security_config.use_separate_interbroker_listener = use_separate_listener
if self.listener_security_config.use_separate_interbroker_listener:
# do not close existing port here since it is not used exclusively for interbroker communication
self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
self.interbroker_listener.security_protocol = security_protocol
else:
# close dedicated interbroker port, so it's not dangling in 'listeners' and 'advertised.listeners'
self.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
self.interbroker_listener = self.port_mappings[security_protocol]
@property
def security_config(self):
if not self._security_config:
# we will later change the security protocols to PLAINTEXT if this is an isolated KRaft controller case since
# those security protocols are irrelevant there and we don't want to falsely indicate the use of SASL or TLS
security_protocol_to_use=self.security_protocol
interbroker_security_protocol_to_use=self.interbroker_security_protocol
# determine uses/serves controller sasl mechanisms
serves_controller_sasl_mechanism=None
serves_intercontroller_sasl_mechanism=None
uses_controller_sasl_mechanism=None
if self.quorum_info.has_brokers:
if self.controller_quorum.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
uses_controller_sasl_mechanism = self.controller_quorum.controller_sasl_mechanism
if self.quorum_info.has_controllers:
if self.intercontroller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
serves_intercontroller_sasl_mechanism = self.intercontroller_sasl_mechanism
uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in combined case
if self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
serves_controller_sasl_mechanism = self.controller_sasl_mechanism
# determine if KRaft uses TLS
kraft_tls = False
if self.quorum_info.has_brokers and not self.quorum_info.has_controllers:
# KRaft broker only
kraft_tls = self.controller_quorum.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
if self.quorum_info.has_controllers:
# isolated or combined KRaft controller
kraft_tls = self.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS \
or self.intercontroller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
# clear irrelevant security protocols of SASL/TLS implications for the isolated controller quorum case
if self.quorum_info.has_controllers and not self.quorum_info.has_brokers:
security_protocol_to_use=SecurityConfig.PLAINTEXT
interbroker_security_protocol_to_use=SecurityConfig.PLAINTEXT
self._security_config = SecurityConfig(self.context, security_protocol_to_use, interbroker_security_protocol_to_use,
zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
client_sasl_mechanism=self.client_sasl_mechanism,
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
listener_security_config=self.listener_security_config,
tls_version=self.tls_version,
serves_controller_sasl_mechanism=serves_controller_sasl_mechanism,
serves_intercontroller_sasl_mechanism=serves_intercontroller_sasl_mechanism,
uses_controller_sasl_mechanism=uses_controller_sasl_mechanism,
kraft_tls=kraft_tls)
# Ensure we have the correct client security protocol and SASL mechanism because they may have been mutated
self._security_config.properties['security.protocol'] = self.security_protocol
self._security_config.properties['sasl.mechanism'] = self.client_sasl_mechanism
# Ensure we have the right inter-broker security protocol because it may have been mutated
# since we cached our security config (ignore if this is an isolated KRaft controller quorum case; the
# inter-broker security protocol is not used there).
if (self.quorum_info.using_zk or self.quorum_info.has_brokers):
# in case inter-broker SASL mechanism has changed without changing the inter-broker security protocol
self._security_config.properties['sasl.mechanism.inter.broker.protocol'] = self.interbroker_sasl_mechanism
if self._security_config.interbroker_security_protocol != self.interbroker_security_protocol:
self._security_config.interbroker_security_protocol = self.interbroker_security_protocol
self._security_config.calc_has_sasl()
self._security_config.calc_has_ssl()
for port in self.port_mappings.values():
if port.open:
self._security_config.enable_security_protocol(port.security_protocol, port.sasl_mechanism)
if self.quorum_info.using_zk:
if self.zk.zk_sasl:
self._security_config.enable_sasl()
self._security_config.zk_sasl = self.zk.zk_sasl
if self.zk_client_secure:
self._security_config.enable_ssl()
self._security_config.zk_tls = self.zk_client_secure
return self._security_config
def open_port(self, listener_name):
self.port_mappings[listener_name].open = True
def close_port(self, listener_name):
self.port_mappings[listener_name].open = False
def start_minikdc_if_necessary(self, add_principals=""):
has_sasl = self.security_config.has_sasl
if has_sasl:
if self.minikdc is None:
other_service = self.isolated_kafka if self.isolated_kafka else self.controller_quorum if self.quorum_info.using_kraft else None
if not other_service or not other_service.minikdc:
nodes_for_kdc = self.nodes.copy()
if other_service and other_service != self:
nodes_for_kdc += other_service.nodes
self.minikdc = MiniKdc(self.context, nodes_for_kdc, extra_principals = add_principals)
self.minikdc.start()
def alive(self, node):
return len(self.pids(node)) > 0
def start(self, add_principals="", nodes_to_skip=[], isolated_controllers_to_skip=[], timeout_sec=60, **kwargs):
"""
Start the Kafka broker and wait until it registers its ID in ZooKeeper
Startup will be skipped for any nodes in nodes_to_skip. These nodes can be started later via add_broker
"""
if self.quorum_info.using_zk and self.zk_client_secure and not self.zk.zk_client_secure_port:
raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
if not all([node in self.nodes for node in nodes_to_skip]):
raise Exception("nodes_to_skip should be a subset of this service's nodes")
if self.quorum_info.has_brokers_and_controllers and (
self.controller_security_protocol != self.intercontroller_security_protocol or
self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS and self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
# This is not supported because both the broker and the controller take the first entry from
# controller.listener.names and the value from sasl.mechanism.controller.protocol;
# they share a single config, so they must both see/use identical values.
raise Exception("Combined KRaft Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" %
(self.controller_security_protocol, self.controller_sasl_mechanism,
self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism))
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
self.open_port(self.security_protocol)
self.interbroker_listener.open = True
# we have to wait to decide whether to open the controller port(s)
# because it could be dependent on the particular node in the
# combined case where the number of controllers could be less
# than the number of nodes in the service
self.start_minikdc_if_necessary(add_principals)
# save the nodes we want to start in a member variable so we know which nodes to start and which to skip
# in start_node
self.nodes_to_start = [node for node in self.nodes if node not in nodes_to_skip]
if self.quorum_info.using_zk:
self._ensure_zk_chroot()
if self.isolated_controller_quorum:
self.isolated_controller_quorum.start(nodes_to_skip=isolated_controllers_to_skip)
Service.start(self, **kwargs)
if self.concurrent_start:
# We didn't wait while starting each individual node, so wait for them all now
for node in self.nodes_to_start:
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
monitor.offset = 0
self.wait_for_start(node, monitor, timeout_sec)
if self.quorum_info.using_zk:
self.logger.info("Waiting for brokers to register at ZK")
expected_broker_ids = set(self.nodes_to_start)
wait_until(lambda: {node for node in self.nodes_to_start if self.is_registered(node)} == expected_broker_ids,
timeout_sec=30, backoff_sec=1, err_msg="Kafka servers didn't register at ZK within 30 seconds")
# Create topics if necessary
if self.topics is not None:
for topic, topic_cfg in self.topics.items():
if topic_cfg is None:
topic_cfg = {}
topic_cfg["topic"] = topic
self.create_topic(topic_cfg)
self.concurrent_start = False # in case it was True and this method was invoked directly instead of via start_concurrently()
def start_concurrently(self, add_principals="", timeout_sec=60):
self.concurrent_start = True # ensure it is True in case it has been explicitly disabled elsewhere
self.start(add_principals = add_principals, timeout_sec=timeout_sec)
self.concurrent_start = False
def add_broker(self, node):
"""
Starts an individual node. add_broker should only be used for nodes skipped during initial kafka service startup
"""
if node in self.nodes_to_start:
raise Exception("Add broker should only be used for nodes that haven't already been started")
self.logger.debug(self.who_am_i() + ": killing processes and attempting to clean up before starting")
# Added precaution - kill running processes, clean persistent files
# try/except for each step, since each of these steps may fail if there are no processes
# to kill or no files to remove
try:
self.stop_node(node)
except Exception:
pass
try:
self.clean_node(node)
except Exception:
pass
if node not in self.nodes_to_start:
self.nodes_to_start += [node]
self.logger.debug("%s: starting node" % self.who_am_i(node))
# ensure we wait for the broker to start by setting concurrent start to False for the invocation of start_node()
orig_concurrent_start = self.concurrent_start
self.concurrent_start = False
self.start_node(node)
self.concurrent_start = orig_concurrent_start
def _ensure_zk_chroot(self):
self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
if self.zk_chroot:
if not self.zk_chroot.startswith('/'):
raise Exception("Zookeeper chroot must start with '/' but found " + self.zk_chroot)
parts = self.zk_chroot.split('/')[1:]
for i in range(len(parts)):
self.zk.create('/' + '/'.join(parts[:i+1]))
def set_protocol_and_port(self, node):
listeners = []
advertised_listeners = []
protocol_map = []
controller_listener_names = self.controller_listener_name_list(node)
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
if (self.dynamicRaftQuorum and quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role) or \
port.name not in controller_listener_names:
advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \
else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
else None
if controller_sec_protocol:
protocol_map.append("%s:%s" % (self.controller_listener_name(controller_sec_protocol), controller_sec_protocol))
self.listeners = ','.join(listeners)
self.advertised_listeners = ','.join(advertised_listeners)
self.listener_security_protocol_map = ','.join(protocol_map)
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
def prop_file(self, node):
self.set_protocol_and_port(node)
#load template configs as dictionary
config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config, num_nodes=self.num_nodes,
listener_security_config=self.listener_security_config,
use_share_groups=self.use_share_groups)
configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
if not l.startswith("#") and "=" in l )
#load specific test override configs
override_configs = KafkaConfig(**node.config)
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
if self.quorum_info.using_zk or self.zk:
override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
if self.zk_client_secure:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'true'
override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] = 'org.apache.zookeeper.ClientCnxnSocketNetty'
else:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
if self.use_new_coordinator is not None:
override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = str(self.use_new_coordinator)
if self.consumer_group_migration_policy is not None:
override_configs[config_property.CONSUMER_GROUP_MIGRATION_POLICY] = str(self.consumer_group_migration_policy)
for prop in self.server_prop_overrides:
override_configs[prop[0]] = prop[1]
for prop in self.per_node_server_prop_overrides.get(self.idx(node), []):
override_configs[prop[0]] = prop[1]
if self.use_share_groups is not None and self.use_share_groups is True:
override_configs[config_property.SHARE_GROUP_ENABLE] = str(self.use_share_groups)
override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] = str(self.use_share_groups)
override_configs[config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS] = 'classic,consumer,share'
#update template configs with test override configs
configs.update(override_configs)
filtered_configs = {k: v for k, v in configs.items() if v not in [None, ""]}
prop_file = self.render_configs(filtered_configs)
return prop_file
def render_configs(self, configs):
"""Render self as a series of lines key=val\n, and do so in a consistent order. """
keys = [k for k in configs.keys()]
keys.sort()
s = ""
for k in keys:
s += "%s=%s\n" % (k, str(configs[k]))
return s
def start_cmd(self, node):
"""
To bring up kafka using native image, pass following in ducktape options
--globals '{"kafka_mode": "native"}'
"""
kafka_mode = self.context.globals.get("kafka_mode", "")
cmd = f"export KAFKA_MODE={kafka_mode}; "
cmd += "export JMX_PORT=%d; " % self.jmx_port
cmd += "export KAFKA_LOG4J_OPTS=\"%s%s\"; " % (get_log4j_config_param(node), os.path.join(self.PERSISTENT_ROOT, get_log4j_config(node)))
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["kafka_heap_dump_file"]["path"]
security_kafka_opts = self.security_config.kafka_opts.strip('\"')
cmd += fix_opts_for_new_jvm(node)
cmd += "export KAFKA_OPTS=\"%s %s %s\"; " % (heap_kafka_opts, security_kafka_opts, self.extra_kafka_opts)
cmd += "%s %s 1>> %s 2>> %s &" % \
(self.path.script("kafka-server-start.sh", node),
KafkaService.CONFIG_FILE,
KafkaService.STDOUT_STDERR_CAPTURE,
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
def controller_listener_name_list(self, node):
if self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
# Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
# 1) the node is a controller node
# 2) the inter-controller listener name differs from the broker-to-controller listener name
return [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)] \
if (quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role and
self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name]
def start_node(self, node, timeout_sec=60, **kwargs):
if node not in self.nodes_to_start:
return
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
if self.quorum_info.has_controllers:
for controller_listener in self.controller_listener_name_list(node):
if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener)
else: # combined case where node doesn't have a controller
self.close_port(controller_listener)
self.security_config.setup_node(node)
if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO: SCRAM currently unsupported for controller quorum
self.maybe_setup_broker_scram_credentials(node)
if self.quorum_info.using_kraft:
# define controller.quorum.bootstrap.servers or controller.quorum.voters text
security_protocol_to_use = self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
if self.dynamicRaftQuorum:
self.controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
else:
self.controller_quorum_voters = ','.join(["{}@{}:{}".format(self.controller_quorum.idx(node) +
first_node_id - 1,
node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match the isolated quorum if one exists
if self.isolated_controller_quorum:
self.controller_sasl_mechanism = self.isolated_controller_quorum.controller_sasl_mechanism
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
self.logger.info(prop_file)
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
node.account.create_file(os.path.join(self.PERSISTENT_ROOT, get_log4j_config(node)),
self.render(get_log4j_config(node), log_dir=KafkaService.OPERATIONAL_LOG_DIR))
if self.quorum_info.using_kraft:
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.dynamicRaftQuorum:
if self.node_quorum_info.has_controller_role:
if self.standalone_controller_bootstrapped:
cmd += " --no-initial-controllers"
else:
cmd += " --standalone"
self.standalone_controller_bootstrapped = True
if self.use_transactions_v2:
cmd += " --feature transaction.version=2"
else:
if get_version(node).supports_feature_command():
cmd += " --feature transaction.version=0"
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\
("concurrently" if self.concurrent_start else "serially", str(node.account), cmd))
if self.node_quorum_info.has_controller_role and self.node_quorum_info.has_broker_role:
self.combined_nodes_started += 1
if self.concurrent_start:
node.account.ssh(cmd) # and then don't wait for the startup message
else:
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
node.account.ssh(cmd)
self.wait_for_start(node, monitor, timeout_sec)
def wait_for_start(self, node, monitor, timeout_sec=60):
# Kafka 1.0.0 and higher don't have a space between "Kafka" and "Server"
monitor.wait_until("Kafka\s*Server.*started", timeout_sec=timeout_sec, backoff_sec=.25,
err_msg="Kafka server didn't finish startup in %d seconds" % timeout_sec)
if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO: SCRAM currently unsupported for controller quorum
# Credentials for inter-broker communication are created before starting Kafka.
# Client credentials are created after starting Kafka so that both loading of
# existing credentials from ZK and dynamic update of credentials in Kafka are tested.
# We use the admin client and connect as the broker user when creating the client (non-broker) credentials
# if Kafka supports KIP-554, otherwise we use ZooKeeper.
self.maybe_setup_client_scram_credentials(node)
self.start_jmx_tool(self.idx(node), node)
if not self.pids(node):
raise Exception("No process ids recorded on node %s" % node.account.hostname)
def upgrade_metadata_version(self, new_version):
self.run_metadata_features_command("upgrade", new_version)
def downgrade_metadata_version(self, new_version):
self.run_metadata_features_command("downgrade", new_version)
def run_metadata_features_command(self, op, new_version):
cmd = self.path.script("kafka-features.sh ")
cmd += "--bootstrap-server %s " % self.bootstrap_servers()
cmd += "%s --metadata %s" % (op, new_version)
self.logger.info("Running %s command...\n%s" % (op, cmd))
self.nodes[0].account.ssh(cmd)
def run_features_command(self, op, feature, new_version):
cmd = self.path.script("kafka-features.sh ")
cmd += "--bootstrap-server %s " % self.bootstrap_servers()
cmd += "%s --feature %s=%s" % (op, feature, new_version)
self.logger.info("Running %s command...\n%s" % (op, cmd))
self.nodes[0].account.ssh(cmd)
def pids(self, node):
"""Return process ids associated with running processes on the given node."""
try:
return node.account.java_pids(self.java_class_name())
except (RemoteCommandError, ValueError) as e:
return []
def signal_node(self, node, sig=signal.SIGTERM):
pids = self.pids(node)
for pid in pids:
node.account.signal(pid, sig)
def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
leader = self.leader(topic, partition)
self.signal_node(leader, sig)
def controllers_required_for_quorum(self):
"""
Assume N = the total number of controller nodes in the cluster, and positive
For N=1, we need 1 controller to be running to have a quorum
For N=2, we need 2 controllers
For N=3, we need 2 controllers
For N=4, we need 3 controllers
For N=5, we need 3 controllers
:return: the number of controller nodes that must be started for there to be a quorum
"""
return math.ceil((1 + self.num_nodes_controller_role) / 2)
def stop_node(self, node, clean_shutdown=True, timeout_sec=60):
pids = self.pids(node)
cluster_has_combined_controllers = self.quorum_info.has_brokers and self.quorum_info.has_controllers
force_sigkill_due_to_too_few_combined_controllers =\
clean_shutdown and cluster_has_combined_controllers\
and self.combined_nodes_started < self.controllers_required_for_quorum()
if force_sigkill_due_to_too_few_combined_controllers:
self.logger.info("Forcing node to stop via SIGKILL due to too few combined KRaft controllers: %i/%i" %\