@@ -5,7 +5,7 @@ use pretty_assertions::assert_eq;
55use rstest:: rstest;
66use std:: time:: Duration ;
77use std:: time:: Instant ;
8- use test_cases:: produce_consume_partitions1 ;
8+ use test_cases:: produce_consume_partitions1_topic_already_exists ;
99use test_cases:: produce_consume_partitions3;
1010use test_cases:: { assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls} ;
1111use test_helpers:: connection:: kafka:: node:: run_node_smoke_test_scram;
@@ -31,9 +31,12 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
3131 let connection_builder = KafkaConnectionBuilder :: new ( driver, "127.0.0.1:9192" ) ;
3232 test_cases:: standard_test_suite ( & connection_builder) . await ;
3333
34+ let mut expected_events = vec ! [ ] ;
35+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
36+
3437 tokio:: time:: timeout (
3538 Duration :: from_secs ( 10 ) ,
36- shotover. shutdown_and_then_consume_events ( & [ ] ) ,
39+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
3740 )
3841 . await
3942 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -74,9 +77,12 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
7477 let connection_builder = KafkaConnectionBuilder :: new ( driver, "127.0.0.1:9192" ) ;
7578 test_cases:: standard_test_suite ( & connection_builder) . await ;
7679
80+ let mut expected_events = vec ! [ ] ;
81+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
82+
7783 tokio:: time:: timeout (
7884 Duration :: from_secs ( 10 ) ,
79- shotover. shutdown_and_then_consume_events ( & [ ] ) ,
85+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
8086 )
8187 . await
8288 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -169,7 +175,15 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) {
169175 let connection_builder = KafkaConnectionBuilder :: new ( driver, "127.0.0.1:9192" ) ;
170176 test_cases:: standard_test_suite ( & connection_builder) . await ;
171177
172- shotover. shutdown_and_then_consume_events ( & [ ] ) . await ;
178+ let mut expected_events = vec ! [ ] ;
179+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
180+
181+ tokio:: time:: timeout (
182+ Duration :: from_secs ( 10 ) ,
183+ shotover. shutdown_and_then_consume_events ( & expected_events) ,
184+ )
185+ . await
186+ . expect ( "Shotover did not shutdown within 10s" ) ;
173187}
174188
175189#[ cfg( feature = "alpha-transforms" ) ]
@@ -189,7 +203,15 @@ async fn passthrough_sasl_plain(#[case] driver: KafkaDriver) {
189203 KafkaConnectionBuilder :: new ( driver, "127.0.0.1:9192" ) . use_sasl_plain ( "user" , "password" ) ;
190204 test_cases:: standard_test_suite ( & connection_builder) . await ;
191205
192- shotover. shutdown_and_then_consume_events ( & [ ] ) . await ;
206+ let mut expected_events = vec ! [ ] ;
207+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
208+
209+ tokio:: time:: timeout (
210+ Duration :: from_secs ( 10 ) ,
211+ shotover. shutdown_and_then_consume_events ( & expected_events) ,
212+ )
213+ . await
214+ . expect ( "Shotover did not shutdown within 10s" ) ;
193215}
194216
195217#[ cfg( feature = "alpha-transforms" ) ]
@@ -210,7 +232,12 @@ async fn passthrough_sasl_plain_python() {
210232 )
211233 . await ;
212234
213- shotover. shutdown_and_then_consume_events ( & [ ] ) . await ;
235+ tokio:: time:: timeout (
236+ Duration :: from_secs ( 10 ) ,
237+ shotover. shutdown_and_then_consume_events ( & [ ] ) ,
238+ )
239+ . await
240+ . expect ( "Shotover did not shutdown within 10s" ) ;
214241}
215242
216243#[ rstest]
@@ -287,9 +314,12 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
287314
288315 assert_inaccessible_peers_metric_emitted_on_port ( 0 , 9001 ) . await ;
289316
317+ let mut expected_events = vec ! [ ] ;
318+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
319+
290320 tokio:: time:: timeout (
291321 Duration :: from_secs ( 10 ) ,
292- shotover. shutdown_and_then_consume_events ( & [ ] ) ,
322+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
293323 )
294324 . await
295325 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -311,16 +341,19 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
311341 . await ;
312342
313343 // Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them.
344+ let mut expected_events = vec ! [
345+ EventMatcher :: new( )
346+ . with_level( Level :: Error )
347+ . with_count( Count :: Any ) ,
348+ EventMatcher :: new( )
349+ . with_level( Level :: Warn )
350+ . with_count( Count :: Any ) ,
351+ ] ;
352+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
353+
314354 tokio:: time:: timeout (
315355 Duration :: from_secs ( 10 ) ,
316- shotover. shutdown_and_then_consume_events ( & [
317- EventMatcher :: new ( )
318- . with_level ( Level :: Error )
319- . with_count ( Count :: Any ) ,
320- EventMatcher :: new ( )
321- . with_level ( Level :: Warn )
322- . with_count ( Count :: Any ) ,
323- ] ) ,
356+ shotover. shutdown_and_then_consume_events ( & expected_events) ,
324357 )
325358 . await
326359 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -350,9 +383,12 @@ async fn cluster_1_rack_single_shotover_broker_idle_timeout(#[case] driver: Kafk
350383 // So instead we rely on a test case hits the timeout with plenty of buffer to avoid the race condition.
351384 test_cases:: test_broker_idle_timeout ( & connection_builder, shotover. pid ( ) ) . await ;
352385
386+ let mut expected_events = vec ! [ ] ;
387+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
388+
353389 tokio:: time:: timeout (
354390 Duration :: from_secs ( 10 ) ,
355- shotover. shutdown_and_then_consume_events ( & [ ] ) ,
391+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
356392 )
357393 . await
358394 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -384,10 +420,13 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
384420 test_cases:: cluster_test_suite ( & connection_builder) . await ;
385421 assert_inaccessible_peers_metric_emitted_on_port ( 0 , 9001 ) . await ;
386422
423+ let mut expected_events = multi_shotover_events ( ) ;
424+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
425+
387426 for shotover in shotovers {
388427 tokio:: time:: timeout (
389428 Duration :: from_secs ( 10 ) ,
390- shotover. shutdown_and_then_consume_events ( & multi_shotover_events ( ) ) ,
429+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
391430 )
392431 . await
393432 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -444,6 +483,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk
444483 . with_message ( r#"Shotover peer 127.0.0.1:9191 is down"# )
445484 . with_count ( Count :: GreaterThanOrEqual ( 1 ) ) ,
446485 ) ;
486+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
447487
448488 for shotover in shotovers {
449489 tokio:: time:: timeout (
@@ -508,6 +548,8 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf
508548 ) ;
509549 }
510550
551+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
552+
511553 for shotover in shotovers {
512554 tokio:: time:: timeout (
513555 Duration :: from_secs ( 10 ) ,
@@ -558,15 +600,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver:
558600 . with_count ( Count :: GreaterThanOrEqual ( 1 ) ) ,
559601 ) ;
560602
561- if driver. is_cpp ( ) {
562- expected_events. push (
563- EventMatcher :: new ( )
564- . with_level ( Level :: Warn )
565- . with_target ( "shotover::server" )
566- . with_message ( r#"failed to receive message on tcp stream: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }"# )
567- . with_count ( Count :: Any ) ,
568- ) ;
569- }
603+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
570604
571605 for shotover in shotovers {
572606 tokio:: time:: timeout (
@@ -613,10 +647,13 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
613647 test_cases:: describe_log_dirs ( & connection_builder) . await ;
614648 }
615649
650+ let mut expected_events = multi_shotover_events ( ) ;
651+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
652+
616653 for shotover in shotovers {
617654 tokio:: time:: timeout (
618655 Duration :: from_secs ( 10 ) ,
619- shotover. shutdown_and_then_consume_events ( & multi_shotover_events ( ) ) ,
656+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
620657 )
621658 . await
622659 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -661,10 +698,13 @@ async fn cluster_2_racks_multi_shotover_rebalance_protocol(#[case] driver: Kafka
661698 . await ;
662699 }
663700
701+ let mut expected_events = multi_shotover_events ( ) ;
702+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
703+
664704 for shotover in shotovers {
665705 tokio:: time:: timeout (
666706 Duration :: from_secs ( 10 ) ,
667- shotover. shutdown_and_then_consume_events ( & multi_shotover_events ( ) ) ,
707+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
668708 )
669709 . await
670710 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -697,6 +737,9 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
697737 }
698738 ) ;
699739
740+ let mut expected_events = vec ! [ ] ;
741+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
742+
700743 tokio:: time:: timeout (
701744 Duration :: from_secs ( 10 ) ,
702745 shotover. shutdown_and_then_consume_events ( & [ ] ) ,
@@ -766,8 +809,11 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
766809 // admin requests sent by regular user remain unsuccessful
767810 let connection_super = KafkaConnectionBuilder :: new ( driver, "127.0.0.1:9192" )
768811 . use_sasl_scram ( "super_user" , "super_password" ) ;
769- produce_consume_partitions1 ( & connection_super, "c3220ff0-9390-425d-a56d-9d880a339c8c" )
770- . await ;
812+ produce_consume_partitions1_topic_already_exists (
813+ & connection_super,
814+ "c3220ff0-9390-425d-a56d-9d880a339c8c" ,
815+ )
816+ . await ;
771817 assert_topic_creation_is_denied_due_to_acl ( & connection_basic) . await ;
772818 assert_connection_fails_with_incorrect_password ( driver, "basic_user" ) . await ;
773819 assert_connection_fails_with_incorrect_password ( driver, "super_user" ) . await ;
@@ -917,7 +963,7 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver
917963
918964 // Wait 20s since we started the initial run to ensure that we hit the 15s token lifetime limit
919965 tokio:: time:: sleep_until ( ( instant + Duration :: from_secs ( 20 ) ) . into ( ) ) . await ;
920- test_cases:: produce_consume_partitions1 (
966+ test_cases:: produce_consume_partitions1_topic_already_exists (
921967 & connection_builder,
922968 "d4f992d1-05c4-4252-b699-509102338519" ,
923969 )
@@ -1056,10 +1102,13 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) {
10561102 }
10571103 ) ;
10581104
1105+ let mut expected_events = vec ! [ ] ;
1106+ workaround_rdkafka_connection_reset_bug ( driver, & mut expected_events) ;
1107+
10591108 for shotover in shotovers {
10601109 tokio:: time:: timeout (
10611110 Duration :: from_secs ( 10 ) ,
1062- shotover. shutdown_and_then_consume_events ( & multi_shotover_events ( ) ) ,
1111+ shotover. shutdown_and_then_consume_events ( & expected_events ) ,
10631112 )
10641113 . await
10651114 . expect ( "Shotover did not shutdown within 10s" ) ;
@@ -1126,3 +1175,19 @@ async fn assert_inaccessible_peers_metric_emitted_on_port(
11261175 )
11271176 . await ;
11281177}
1178+
1179+ fn workaround_rdkafka_connection_reset_bug (
1180+ driver : KafkaDriver ,
1181+ expected_events : & mut Vec < EventMatcher > ,
1182+ ) {
1183+ if driver. is_cpp ( ) {
1184+ // This is buggy behaviour from the driver, it should gracefully close connections instead of just disappearing.
1185+ expected_events. push (
1186+ EventMatcher :: new ( )
1187+ . with_level ( Level :: Warn )
1188+ . with_target ( "shotover::server" )
1189+ . with_message ( r#"failed to receive message on tcp stream: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }"# )
1190+ . with_count ( Count :: Any ) ,
1191+ ) ;
1192+ }
1193+ }
0 commit comments