1
1
package kafka
2
2
3
3
import (
4
+ "errors"
4
5
"fmt"
5
6
"log"
6
7
"strconv"
@@ -144,8 +145,10 @@ func TestAcc_TopicAlterReplicationFactor(t *testing.T) {
144
145
145
146
r .Test (t , r.TestCase {
146
147
ProviderFactories : overrideProviderFactory (),
147
- PreCheck : func () { testAccPreCheck (t ) },
148
- CheckDestroy : testAccCheckTopicDestroy ,
148
+ PreCheck : func () {
149
+ testAccPreCheck (t )
150
+ },
151
+ CheckDestroy : testAccCheckTopicDestroy ,
149
152
Steps : []r.TestStep {
150
153
{
151
154
Config : cfg (t , bs , fmt .Sprintf (testResourceTopic_updateRF , topicName , 1 , 7 )),
@@ -251,6 +254,8 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes
251
254
}
252
255
kafkaConfig .Producer .Return .Errors = true
253
256
kafkaConfig .Producer .Return .Successes = true
257
+ kafkaConfig .Metadata .Full = true
258
+
254
259
kafkaConfig .Producer .RequiredAcks = sarama .WaitForAll
255
260
kafkaConfig .Producer .Timeout = 90 * time .Second
256
261
kafkaConfig .Producer .Retry .Max = 5
@@ -269,12 +274,14 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes
269
274
270
275
// rety 5 times
271
276
retries := 5
277
+ produceErrs := make ([]error , 0 , retries )
272
278
for i := 0 ; i < retries ; i ++ {
273
279
if errs := producer .SendMessages (messages ); errs != nil {
280
+ produceErrs = append (produceErrs , errs )
274
281
for _ , err := range errs .(sarama.ProducerErrors ) {
275
282
log .Println ("[ERROR] Write to kafka failed: " , err )
276
283
if i == retries - 1 {
277
- return err
284
+ return errors . Join ( produceErrs ... )
278
285
}
279
286
}
280
287
} else {
0 commit comments