17
17
package kafka .api
18
18
19
19
import kafka .utils .TestUtils
20
- import org .apache .kafka .clients .admin .NewPartitionReassignment
21
- import org .apache .kafka .clients .producer .{ProducerConfig , ProducerRecord }
20
+ import org .apache .kafka .clients .admin .{ Admin , NewPartitionReassignment , TopicDescription }
21
+ import org .apache .kafka .clients .producer .{ProducerConfig , ProducerRecord , RecordMetadata }
22
22
import org .apache .kafka .common .TopicPartition
23
23
import org .apache .kafka .server .config .{ReplicationConfigs , ServerLogConfigs }
24
- import org .junit .jupiter .api .Assertions .assertEquals
24
+ import org .junit .jupiter .api .Assertions .{ assertEquals , assertNotEquals }
25
25
import org .junit .jupiter .params .ParameterizedTest
26
26
import org .junit .jupiter .params .provider .ValueSource
27
27
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
33
33
34
34
class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
35
35
val producerCount : Int = 1
36
- val brokerCount : Int = 2
36
+ val brokerCount : Int = 3
37
37
38
38
serverConfig.put(ServerLogConfigs .NUM_PARTITIONS_CONFIG , 2 .toString)
39
39
serverConfig.put(ReplicationConfigs .DEFAULT_REPLICATION_FACTOR_CONFIG , 2 .toString)
@@ -84,4 +84,93 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
84
84
assertEquals(topic, producer.send(new ProducerRecord (topic, null , " value" .getBytes(StandardCharsets .UTF_8 ))).get.topic())
85
85
}
86
86
87
+ /**
88
+ * Tests that Producer produce to new topic id after recreation.
89
+ *
90
+ * Producer will attempt to send messages to the partition specified in each record, and should
91
+ * succeed as long as the metadata has been updated with new topic id.
92
+ */
93
+ @ ParameterizedTest
94
+ @ ValueSource (strings = Array (" zk" , " kraft" ))
95
+ def testSendWithRecreatedTopic (quorum : String ): Unit = {
96
+ val numRecords = 10
97
+ val topic = " topic"
98
+ createTopic(topic)
99
+ val admin = createAdminClient()
100
+ val topicId = getTopicMetadata(admin, topic).topicId()
101
+ val producer = createProducer()
102
+
103
+ (1 to numRecords).foreach { i =>
104
+ val resp = producer.send(new ProducerRecord (topic, null , (" value" + i).getBytes(StandardCharsets .UTF_8 ))).get
105
+ assertEquals(topic, resp.topic())
106
+ }
107
+ // Start topic deletion
108
+ deleteTopic(topic, listenerName)
109
+
110
+ // Verify that the topic is deleted when no metadata request comes in
111
+ TestUtils .verifyTopicDeletion(zkClientOrNull, topic, 2 , brokers)
112
+ createTopic(topic)
113
+ assertNotEquals(topicId, getTopicMetadata(admin, topic).topicId())
114
+
115
+ // Producer should be able to send messages even after topic gets deleted
116
+ val recordMetadata : RecordMetadata = producer.send(new ProducerRecord (topic, null , " value" .getBytes(StandardCharsets .UTF_8 ))).get
117
+ assertEquals(topic, recordMetadata.topic())
118
+ }
119
+
120
+ /**
121
+ * Tests that Producer produce to topic during reassignment where topic metadata change on broker side.
122
+ *
123
+ * Producer will attempt to send messages to the partition specified in each record, and should
124
+ * succeed as long as the metadata on the leader has been updated with new topic id.
125
+ */
126
+ @ ParameterizedTest
127
+ @ ValueSource (strings = Array (" zk" , " kraft" ))
128
+ def testSendWithTopicReassignmentIsMidWay (quorum : String ): Unit = {
129
+ val numRecords = 10
130
+ val topic = " topic"
131
+ val partition0 : TopicPartition = new TopicPartition (topic, 0 )
132
+ val partition1 = new TopicPartition (topic, 1 )
133
+ val admin : Admin = createAdminClient()
134
+
135
+ // Create topic with leader as 0 for the 2 partitions.
136
+ createTopicWithAssignment(topic, Map (0 -> Seq (0 , 1 ), 1 -> Seq (0 , 1 )))
137
+ TestUtils .assertLeader(admin, partition1, 0 )
138
+
139
+ val topicDetails = getTopicMetadata(admin, topic)
140
+ assertEquals(0 , topicDetails.partitions().get(0 ).leader().id())
141
+ val producer = createProducer()
142
+
143
+ (1 to numRecords).foreach { i =>
144
+ val resp = producer.send(new ProducerRecord (topic, null , (" value" + i).getBytes(StandardCharsets .UTF_8 ))).get
145
+ assertEquals(topic, resp.topic())
146
+ }
147
+
148
+ val reassignment = Map (
149
+ partition0 -> Optional .of(new NewPartitionReassignment (util.Arrays .asList(2 , 1 ))),
150
+ partition1 -> Optional .of(new NewPartitionReassignment (util.Arrays .asList(2 , 1 )))
151
+ )
152
+
153
+ // Change assignment of one of the replicas from 0 to 2
154
+ admin.alterPartitionReassignments(reassignment.asJava).all().get()
155
+
156
+ TestUtils .waitUntilTrue(
157
+ () => partitionLeader(admin, partition0) == 2 && partitionLeader(admin, partition1) == 2 ,
158
+ s " Expected preferred leader to become 2, but is ${partitionLeader(admin, partition0)} and ${partitionLeader(admin, partition1)}" ,
159
+ 10000 )
160
+ TestUtils .assertLeader(admin, partition1, 2 )
161
+ assertEquals(topicDetails.topicId(), getTopicMetadata(admin, topic).topicId())
162
+
163
+ // Producer should be able to send messages even after topic gets reassigned
164
+ assertEquals(topic, producer.send(new ProducerRecord (topic, null , " value" .getBytes(StandardCharsets .UTF_8 ))).get.topic())
165
+ }
166
+
167
+ def getTopicMetadata (admin : Admin , topic : String ): TopicDescription = {
168
+ admin.describeTopics(util.Collections .singletonList(topic)).allTopicNames().get().get(topic)
169
+ }
170
+
171
+ def partitionLeader (admin : Admin , topicPartition : TopicPartition ): Int = {
172
+ val partitionMetadata = getTopicMetadata(admin, topicPartition.topic).partitions.get(topicPartition.partition)
173
+ val preferredLeaderMetadata = partitionMetadata.leader()
174
+ preferredLeaderMetadata.id
175
+ }
87
176
}
0 commit comments