@@ -44,22 +44,42 @@ func produceWithJava(t *testing.T, topic string, codec CompressionCodec, message
4444 stdin , err := cmd .StdinPipe ()
4545 require .NoError (t , err )
4646
47+ stderr , err := cmd .StderrPipe ()
48+ require .NoError (t , err )
49+
50+ var stderrOutput strings.Builder
51+ var wg sync.WaitGroup
52+ wg .Add (1 )
53+ go func () {
54+ defer wg .Done ()
55+ s := bufio .NewScanner (stderr )
56+ for s .Scan () {
57+ stderrOutput .WriteString (s .Text () + "\n " )
58+ }
59+ }()
60+
4761 require .NoError (t , cmd .Start ())
4862
4963 for _ , msg := range messages {
5064 _ , err := fmt .Fprintln (stdin , msg )
5165 if err != nil {
5266 stdin .Close ()
5367 waitErr := cmd .Wait ()
68+ wg .Wait ()
5469 if waitErr != nil {
55- err = fmt .Errorf ("failed to write message: %w; Java producer failed: %w" , err , waitErr )
70+ err = fmt .Errorf ("failed to write message: %w; Java producer failed: %w; stderr: %s " , err , waitErr , stderrOutput . String () )
5671 }
5772 }
5873 require .NoError (t , err )
5974 }
6075 stdin .Close ()
6176
62- require .NoError (t , cmd .Wait (), "Java producer failed" )
77+ err = cmd .Wait ()
78+ wg .Wait ()
79+ if err != nil {
80+ t .Logf ("Java producer stderr: %s" , stderrOutput .String ())
81+ require .NoError (t , err , "Java producer failed" )
82+ }
6383}
6484
6585func consumeWithSarama (t * testing.T , topic string , startOffset int64 , count int ) []string {
@@ -253,7 +273,7 @@ func kafkaVersionAtLeast(requiredVersion string) bool {
253273
254274func javaProducerArgs (topic string , codec CompressionCodec ) []string {
255275 args := make ([]string , 0 , 8 )
256- if kafkaVersionAtLeast ("2.0 .0" ) {
276+ if kafkaVersionAtLeast ("2.5 .0" ) {
257277 args = append (args , "--bootstrap-server" , brokerAddr )
258278 } else {
259279 args = append (args , "--broker-list" , brokerAddr )
0 commit comments