diff --git a/cmd/topicctl/subcmd/tester.go b/cmd/topicctl/subcmd/tester.go index 03bd18e..e6b9788 100644 --- a/cmd/topicctl/subcmd/tester.go +++ b/cmd/topicctl/subcmd/tester.go @@ -158,21 +158,23 @@ func runTestWriter(ctx context.Context) error { return errors.New("Stopping because of user response") } + batchSize := 5 + writer := kafka.NewWriter( kafka.WriterConfig{ - Brokers: []string{connector.Config.BrokerAddr}, - Dialer: connector.Dialer, - Topic: testerConfig.topic, - Balancer: &kafka.LeastBytes{}, - Async: true, - QueueCapacity: 5, - BatchSize: 5, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, + Topic: testerConfig.topic, + Balancer: &kafka.LeastBytes{}, + Async: false, + BatchSize: batchSize, + BatchTimeout: 1 * time.Nanosecond, }, ) defer writer.Close() index := 0 - tickDuration := time.Duration(1000.0/float64(testerConfig.writeRate)) * time.Millisecond + tickDuration := time.Duration(1000.0/float64(testerConfig.writeRate/batchSize)) * time.Millisecond sendTicker := time.NewTicker(tickDuration) logTicker := time.NewTicker(5 * time.Second) @@ -183,17 +185,22 @@ func runTestWriter(ctx context.Context) error { case <-ctx.Done(): return nil case <-sendTicker.C: - err := writer.WriteMessages( - ctx, - kafka.Message{ + msgs := []kafka.Message{} + + for i := 0; i < 5; i++ { + msgs = append(msgs, kafka.Message{ Key: []byte(fmt.Sprintf("msg_%d", index)), Value: []byte(fmt.Sprintf("Contents of test message %d", index)), - }, + }) + index++ + } + err := writer.WriteMessages( + ctx, + msgs..., ) if err != nil { return err } - index++ case <-logTicker.C: log.Infof("%d messages sent", index) }