Skip to content

Commit 5db451a

Browse files
fix: tester should fail when messages cannot be written (#227)
* fix: tester fails when messages cannot be written * fix writeRate * lower batchTimeout
1 parent 783435a commit 5db451a

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

cmd/topicctl/subcmd/tester.go

+20-13
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,23 @@ func runTestWriter(ctx context.Context) error {
158158
return errors.New("Stopping because of user response")
159159
}
160160

161+
batchSize := 5
162+
161163
writer := kafka.NewWriter(
162164
kafka.WriterConfig{
163-
Brokers: []string{connector.Config.BrokerAddr},
164-
Dialer: connector.Dialer,
165-
Topic: testerConfig.topic,
166-
Balancer: &kafka.LeastBytes{},
167-
Async: true,
168-
QueueCapacity: 5,
169-
BatchSize: 5,
165+
Brokers: []string{connector.Config.BrokerAddr},
166+
Dialer: connector.Dialer,
167+
Topic: testerConfig.topic,
168+
Balancer: &kafka.LeastBytes{},
169+
Async: false,
170+
BatchSize: batchSize,
171+
BatchTimeout: 1 * time.Nanosecond,
170172
},
171173
)
172174
defer writer.Close()
173175

174176
index := 0
175-
tickDuration := time.Duration(1000.0/float64(testerConfig.writeRate)) * time.Millisecond
177+
tickDuration := time.Duration(1000.0/float64(testerConfig.writeRate/batchSize)) * time.Millisecond
176178
sendTicker := time.NewTicker(tickDuration)
177179
logTicker := time.NewTicker(5 * time.Second)
178180

@@ -183,17 +185,22 @@ func runTestWriter(ctx context.Context) error {
183185
case <-ctx.Done():
184186
return nil
185187
case <-sendTicker.C:
186-
err := writer.WriteMessages(
187-
ctx,
188-
kafka.Message{
188+
msgs := []kafka.Message{}
189+
190+
for i := 0; i < 5; i++ {
191+
msgs = append(msgs, kafka.Message{
189192
Key: []byte(fmt.Sprintf("msg_%d", index)),
190193
Value: []byte(fmt.Sprintf("Contents of test message %d", index)),
191-
},
194+
})
195+
index++
196+
}
197+
err := writer.WriteMessages(
198+
ctx,
199+
msgs...,
192200
)
193201
if err != nil {
194202
return err
195203
}
196-
index++
197204
case <-logTicker.C:
198205
log.Infof("%d messages sent", index)
199206
}

0 commit comments

Comments
 (0)