Skip to content

Commit 82d3a6c

Browse files
committed
fix
1 parent 27884af commit 82d3a6c

File tree

6 files changed

+207
-108
lines changed

6 files changed

+207
-108
lines changed

shotover-proxy/tests/kafka_int_tests/mod.rs

Lines changed: 97 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use pretty_assertions::assert_eq;
55
use rstest::rstest;
66
use std::time::Duration;
77
use std::time::Instant;
8-
use test_cases::produce_consume_partitions1;
8+
use test_cases::produce_consume_partitions1_topic_already_exists;
99
use test_cases::produce_consume_partitions3;
1010
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
1111
use test_helpers::connection::kafka::node::run_node_smoke_test_scram;
@@ -31,9 +31,12 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
3131
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
3232
test_cases::standard_test_suite(&connection_builder).await;
3333

34+
let mut expected_events = vec![];
35+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
36+
3437
tokio::time::timeout(
3538
Duration::from_secs(10),
36-
shotover.shutdown_and_then_consume_events(&[]),
39+
shotover.shutdown_and_then_consume_events(&expected_events),
3740
)
3841
.await
3942
.expect("Shotover did not shutdown within 10s");
@@ -74,9 +77,12 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
7477
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
7578
test_cases::standard_test_suite(&connection_builder).await;
7679

80+
let mut expected_events = vec![];
81+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
82+
7783
tokio::time::timeout(
7884
Duration::from_secs(10),
79-
shotover.shutdown_and_then_consume_events(&[]),
85+
shotover.shutdown_and_then_consume_events(&expected_events),
8086
)
8187
.await
8288
.expect("Shotover did not shutdown within 10s");
@@ -169,7 +175,15 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) {
169175
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
170176
test_cases::standard_test_suite(&connection_builder).await;
171177

172-
shotover.shutdown_and_then_consume_events(&[]).await;
178+
let mut expected_events = vec![];
179+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
180+
181+
tokio::time::timeout(
182+
Duration::from_secs(10),
183+
shotover.shutdown_and_then_consume_events(&expected_events),
184+
)
185+
.await
186+
.expect("Shotover did not shutdown within 10s");
173187
}
174188

175189
#[cfg(feature = "alpha-transforms")]
@@ -189,7 +203,15 @@ async fn passthrough_sasl_plain(#[case] driver: KafkaDriver) {
189203
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_plain("user", "password");
190204
test_cases::standard_test_suite(&connection_builder).await;
191205

192-
shotover.shutdown_and_then_consume_events(&[]).await;
206+
let mut expected_events = vec![];
207+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
208+
209+
tokio::time::timeout(
210+
Duration::from_secs(10),
211+
shotover.shutdown_and_then_consume_events(&expected_events),
212+
)
213+
.await
214+
.expect("Shotover did not shutdown within 10s");
193215
}
194216

195217
#[cfg(feature = "alpha-transforms")]
@@ -210,7 +232,12 @@ async fn passthrough_sasl_plain_python() {
210232
)
211233
.await;
212234

213-
shotover.shutdown_and_then_consume_events(&[]).await;
235+
tokio::time::timeout(
236+
Duration::from_secs(10),
237+
shotover.shutdown_and_then_consume_events(&[]),
238+
)
239+
.await
240+
.expect("Shotover did not shutdown within 10s");
214241
}
215242

216243
#[rstest]
@@ -287,9 +314,12 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
287314

288315
assert_inaccessible_peers_metric_emitted_on_port(0, 9001).await;
289316

317+
let mut expected_events = vec![];
318+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
319+
290320
tokio::time::timeout(
291321
Duration::from_secs(10),
292-
shotover.shutdown_and_then_consume_events(&[]),
322+
shotover.shutdown_and_then_consume_events(&expected_events),
293323
)
294324
.await
295325
.expect("Shotover did not shutdown within 10s");
@@ -311,16 +341,19 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
311341
.await;
312342

313343
// Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them.
344+
let mut expected_events = vec![
345+
EventMatcher::new()
346+
.with_level(Level::Error)
347+
.with_count(Count::Any),
348+
EventMatcher::new()
349+
.with_level(Level::Warn)
350+
.with_count(Count::Any),
351+
];
352+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
353+
314354
tokio::time::timeout(
315355
Duration::from_secs(10),
316-
shotover.shutdown_and_then_consume_events(&[
317-
EventMatcher::new()
318-
.with_level(Level::Error)
319-
.with_count(Count::Any),
320-
EventMatcher::new()
321-
.with_level(Level::Warn)
322-
.with_count(Count::Any),
323-
]),
356+
shotover.shutdown_and_then_consume_events(&expected_events),
324357
)
325358
.await
326359
.expect("Shotover did not shutdown within 10s");
@@ -350,9 +383,12 @@ async fn cluster_1_rack_single_shotover_broker_idle_timeout(#[case] driver: Kafk
350383
// So instead we rely on a test case hits the timeout with plenty of buffer to avoid the race condition.
351384
test_cases::test_broker_idle_timeout(&connection_builder, shotover.pid()).await;
352385

386+
let mut expected_events = vec![];
387+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
388+
353389
tokio::time::timeout(
354390
Duration::from_secs(10),
355-
shotover.shutdown_and_then_consume_events(&[]),
391+
shotover.shutdown_and_then_consume_events(&expected_events),
356392
)
357393
.await
358394
.expect("Shotover did not shutdown within 10s");
@@ -384,10 +420,13 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
384420
test_cases::cluster_test_suite(&connection_builder).await;
385421
assert_inaccessible_peers_metric_emitted_on_port(0, 9001).await;
386422

423+
let mut expected_events = multi_shotover_events();
424+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
425+
387426
for shotover in shotovers {
388427
tokio::time::timeout(
389428
Duration::from_secs(10),
390-
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
429+
shotover.shutdown_and_then_consume_events(&expected_events),
391430
)
392431
.await
393432
.expect("Shotover did not shutdown within 10s");
@@ -444,6 +483,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk
444483
.with_message(r#"Shotover peer 127.0.0.1:9191 is down"#)
445484
.with_count(Count::GreaterThanOrEqual(1)),
446485
);
486+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
447487

448488
for shotover in shotovers {
449489
tokio::time::timeout(
@@ -508,6 +548,8 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf
508548
);
509549
}
510550

551+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
552+
511553
for shotover in shotovers {
512554
tokio::time::timeout(
513555
Duration::from_secs(10),
@@ -558,15 +600,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver:
558600
.with_count(Count::GreaterThanOrEqual(1)),
559601
);
560602

561-
if driver.is_cpp() {
562-
expected_events.push(
563-
EventMatcher::new()
564-
.with_level(Level::Warn)
565-
.with_target("shotover::server")
566-
.with_message(r#"failed to receive message on tcp stream: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }"#)
567-
.with_count(Count::Any),
568-
);
569-
}
603+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
570604

571605
for shotover in shotovers {
572606
tokio::time::timeout(
@@ -613,10 +647,13 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
613647
test_cases::describe_log_dirs(&connection_builder).await;
614648
}
615649

650+
let mut expected_events = multi_shotover_events();
651+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
652+
616653
for shotover in shotovers {
617654
tokio::time::timeout(
618655
Duration::from_secs(10),
619-
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
656+
shotover.shutdown_and_then_consume_events(&expected_events),
620657
)
621658
.await
622659
.expect("Shotover did not shutdown within 10s");
@@ -661,10 +698,13 @@ async fn cluster_2_racks_multi_shotover_rebalance_protocol(#[case] driver: Kafka
661698
.await;
662699
}
663700

701+
let mut expected_events = multi_shotover_events();
702+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
703+
664704
for shotover in shotovers {
665705
tokio::time::timeout(
666706
Duration::from_secs(10),
667-
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
707+
shotover.shutdown_and_then_consume_events(&expected_events),
668708
)
669709
.await
670710
.expect("Shotover did not shutdown within 10s");
@@ -697,6 +737,9 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
697737
}
698738
);
699739

740+
let mut expected_events = vec![];
741+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
742+
700743
tokio::time::timeout(
701744
Duration::from_secs(10),
702745
shotover.shutdown_and_then_consume_events(&[]),
@@ -766,8 +809,11 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
766809
// admin requests sent by regular user remain unsuccessful
767810
let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
768811
.use_sasl_scram("super_user", "super_password");
769-
produce_consume_partitions1(&connection_super, "c3220ff0-9390-425d-a56d-9d880a339c8c")
770-
.await;
812+
produce_consume_partitions1_topic_already_exists(
813+
&connection_super,
814+
"c3220ff0-9390-425d-a56d-9d880a339c8c",
815+
)
816+
.await;
771817
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
772818
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;
773819
assert_connection_fails_with_incorrect_password(driver, "super_user").await;
@@ -917,7 +963,7 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver
917963

918964
// Wait 20s since we started the initial run to ensure that we hit the 15s token lifetime limit
919965
tokio::time::sleep_until((instant + Duration::from_secs(20)).into()).await;
920-
test_cases::produce_consume_partitions1(
966+
test_cases::produce_consume_partitions1_topic_already_exists(
921967
&connection_builder,
922968
"d4f992d1-05c4-4252-b699-509102338519",
923969
)
@@ -1056,10 +1102,13 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) {
10561102
}
10571103
);
10581104

1105+
let mut expected_events = vec![];
1106+
workaround_rdkafka_connection_reset_bug(driver, &mut expected_events);
1107+
10591108
for shotover in shotovers {
10601109
tokio::time::timeout(
10611110
Duration::from_secs(10),
1062-
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
1111+
shotover.shutdown_and_then_consume_events(&expected_events),
10631112
)
10641113
.await
10651114
.expect("Shotover did not shutdown within 10s");
@@ -1126,3 +1175,19 @@ async fn assert_inaccessible_peers_metric_emitted_on_port(
11261175
)
11271176
.await;
11281177
}
1178+
1179+
fn workaround_rdkafka_connection_reset_bug(
1180+
driver: KafkaDriver,
1181+
expected_events: &mut Vec<EventMatcher>,
1182+
) {
1183+
if driver.is_cpp() {
1184+
// This is buggy behaviour from the driver, it should gracefully close connections instead of just disappearing.
1185+
expected_events.push(
1186+
EventMatcher::new()
1187+
.with_level(Level::Warn)
1188+
.with_target("shotover::server")
1189+
.with_message(r#"failed to receive message on tcp stream: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }"#)
1190+
.with_count(Count::Any),
1191+
);
1192+
}
1193+
}

0 commit comments

Comments
 (0)