17
17
package kafka .api
18
18
19
19
import kafka .utils .TestUtils
20
- import org .apache .kafka .clients .admin .NewPartitionReassignment
21
- import org .apache .kafka .clients .producer .{ProducerConfig , ProducerRecord }
20
+ import org .apache .kafka .clients .admin .{ Admin , NewPartitionReassignment , TopicDescription }
21
+ import org .apache .kafka .clients .producer .{ProducerConfig , ProducerRecord , RecordMetadata }
22
22
import org .apache .kafka .common .TopicPartition
23
23
import org .apache .kafka .server .config .{ReplicationConfigs , ServerLogConfigs }
24
- import org .junit .jupiter .api .Assertions .assertEquals
24
+ import org .junit .jupiter .api .Assertions .{ assertEquals , assertNotEquals }
25
25
import org .junit .jupiter .params .ParameterizedTest
26
26
import org .junit .jupiter .params .provider .ValueSource
27
27
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
33
33
34
34
class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
35
35
val producerCount : Int = 1
36
- val brokerCount : Int = 2
36
+ val brokerCount : Int = 3
37
37
38
38
serverConfig.put(ServerLogConfigs .NUM_PARTITIONS_CONFIG , 2 .toString)
39
39
serverConfig.put(ReplicationConfigs .DEFAULT_REPLICATION_FACTOR_CONFIG , 2 .toString)
@@ -84,4 +84,94 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
84
84
assertEquals(topic, producer.send(new ProducerRecord (topic, null , " value" .getBytes(StandardCharsets .UTF_8 ))).get.topic())
85
85
}
86
86
87
+ /**
88
+ * Tests that Producer produce to new topic id after recreation.
89
+ *
90
+ * Producer will attempt to send messages to the partition specified in each record, and should
91
+ * succeed as long as the metadata has been updated with new topic id.
92
+ */
93
+ @ ParameterizedTest
94
+ @ ValueSource (strings = Array (" zk" , " kraft" ))
95
+ def testSendWithRecreatedTopic (quorum : String ): Unit = {
96
+ val numRecords = 10
97
+ val topic = " topic"
98
+ createTopic(topic)
99
+ val admin = createAdminClient()
100
+ val topicId = getTopicMetadata(admin, topic).topicId()
101
+ val producer = createProducer()
102
+
103
+ (1 to numRecords).foreach { i =>
104
+ val resp = producer.send(new ProducerRecord (topic, null , (" value" + i).getBytes(StandardCharsets .UTF_8 ))).get
105
+ assertEquals(topic, resp.topic())
106
+ }
107
+ // Start topic deletion
108
+ deleteTopic(topic, listenerName)
109
+
110
+ // Verify that the topic is deleted when no metadata request comes in
111
+ TestUtils .verifyTopicDeletion(zkClientOrNull, topic, 2 , brokers)
112
+ createTopic(topic)
113
+ assertNotEquals(topicId, getTopicMetadata(admin, topic).topicId())
114
+
115
+ // Producer should be able to send messages even after topic gets recreated
116
+ val recordMetadata : RecordMetadata = producer.send(new ProducerRecord (topic, null , " value" .getBytes(StandardCharsets .UTF_8 ))).get
117
+ assertEquals(topic, recordMetadata.topic())
118
+ assertEquals(0 , recordMetadata.offset())
119
+ }
120
+
121
+ /**
122
+ * Tests that Producer produce to topic during reassignment where topic metadata change on broker side.
123
+ *
124
+ * Producer will attempt to send messages to the partition specified in each record, and should
125
+ * succeed as long as the metadata on the leader has been updated with new topic id.
126
+ */
127
+ @ ParameterizedTest
128
+ @ ValueSource (strings = Array (" zk" , " kraft" ))
129
+ def testSendWithTopicReassignmentIsMidWay (quorum : String ): Unit = {
130
+ val numRecords = 10
131
+ val topic = " topic"
132
+ val partition0 : TopicPartition = new TopicPartition (topic, 0 )
133
+ val partition1 = new TopicPartition (topic, 1 )
134
+ val admin : Admin = createAdminClient()
135
+
136
+ // Create topic with leader as 0 for the 2 partitions.
137
+ createTopicWithAssignment(topic, Map (0 -> Seq (0 , 1 ), 1 -> Seq (0 , 1 )))
138
+ TestUtils .assertLeader(admin, partition1, 0 )
139
+
140
+ val topicDetails = getTopicMetadata(admin, topic)
141
+ assertEquals(0 , topicDetails.partitions().get(0 ).leader().id())
142
+ val producer = createProducer()
143
+
144
+ (1 to numRecords).foreach { i =>
145
+ val resp = producer.send(new ProducerRecord (topic, null , (" value" + i).getBytes(StandardCharsets .UTF_8 ))).get
146
+ assertEquals(topic, resp.topic())
147
+ }
148
+
149
+ val reassignment = Map (
150
+ partition0 -> Optional .of(new NewPartitionReassignment (util.Arrays .asList(2 , 1 ))),
151
+ partition1 -> Optional .of(new NewPartitionReassignment (util.Arrays .asList(2 , 1 )))
152
+ )
153
+
154
+ // Change assignment of one of the replicas from 0 to 2
155
+ admin.alterPartitionReassignments(reassignment.asJava).all().get()
156
+
157
+ TestUtils .waitUntilTrue(
158
+ () => partitionLeader(admin, partition0) == 2 && partitionLeader(admin, partition1) == 2 ,
159
+ s " Expected preferred leader to become 2, but is ${partitionLeader(admin, partition0)} and ${partitionLeader(admin, partition1)}" ,
160
+ 10000 )
161
+ TestUtils .assertLeader(admin, partition1, 2 )
162
+ assertEquals(topicDetails.topicId(), getTopicMetadata(admin, topic).topicId())
163
+
164
+ // Producer should be able to send messages even after topic gets reassigned
165
+ assertEquals(topic, producer.send(new ProducerRecord (topic, null , " value" .getBytes(StandardCharsets .UTF_8 ))).get.topic())
166
+ }
167
+
168
+ def getTopicMetadata (admin : Admin , topic : String ): TopicDescription = {
169
+ admin.describeTopics(util.Collections .singletonList(topic)).allTopicNames().get().get(topic)
170
+ }
171
+
172
+ def partitionLeader (admin : Admin , topicPartition : TopicPartition ): Int = {
173
+ val partitionMetadata = getTopicMetadata(admin, topicPartition.topic).partitions.get(topicPartition.partition)
174
+ val preferredLeaderMetadata = partitionMetadata.leader()
175
+ preferredLeaderMetadata.id
176
+ }
87
177
}
0 commit comments