Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ async fn cluster_1_rack_single_shotover_broker_idle_timeout(#[case] driver: Kafk
// But for an integration test this would lead to flakey tests which is unacceptable.
//
// So instead we rely on a test case hits the timeout with plenty of buffer to avoid the race condition.
test_cases::test_broker_idle_timeout(&connection_builder).await;
test_cases::test_broker_idle_timeout(&connection_builder, shotover.pid()).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -858,15 +858,17 @@ async fn cluster_sasl_scram_over_mtls_nodejs_and_python() {
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover::server")
.with_message(r#"encountered an error when flushing the chain kafka for shutdown

Caused by:
0: KafkaSinkCluster transform failed
1: Failed to receive responses (without sending requests)
2: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted.
3: Failed to receive from ControlConnection
4: The other side of this connection closed the connection"#)
.with_count(Count::Times(2))]),
.with_message_regex(concat!(
r"connection was unexpectedly terminated\s+",
r"Caused by:\s+",
r"0: Chain failed to send and/or receive messages, the connection will now be closed\.\s+",
r"1: KafkaSinkCluster transform failed\s+",
r"2: Failed to receive responses \(without sending requests\)\s+",
r"3: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted\.\s+",
r"4: Failed to receive from ControlConnection\s+",
r"5: The other side of this connection closed the connection",
))
.with_count(Count::Any)]),
)
.await
.expect("Shotover did not shutdown within 10s");
Expand Down
47 changes: 46 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,10 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
}
}

pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilder) {
pub async fn test_broker_idle_timeout(
connection_builder: &KafkaConnectionBuilder,
shotover_pid: i32,
) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics_and_wait(&[NewTopic {
Expand Down Expand Up @@ -1595,6 +1598,9 @@ pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilde
// write to some open shotover connections,
// ensuring shotover reopens any connections closed by the broker due to idle timeout.
test_produce_consume_10_times(&mut producer, &mut consumer).await;

// Verify that Shotover cleaned up the dead connections and there are no CLOSE_WAIT leaks.
assert_no_close_wait_connections(shotover_pid);
}

async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &mut KafkaConsumer) {
Expand Down Expand Up @@ -2100,3 +2106,42 @@ pub async fn test_no_out_of_rack_request(connection_builder: &KafkaConnectionBui
0
);
}

/// Counts CLOSE_WAIT connections for a given PID using lsof
pub fn count_close_wait_connections_for_process(pid: i32) -> anyhow::Result<usize> {
use test_helpers::run_command;

// Use lsof to get network connections for the specific PID
// -i: network files
// -P: don't resolve port numbers to names
// -n: don't resolve hostnames
// -a: AND the conditions together
let output = run_command("lsof", &["-i", "-P", "-n", "-a", "-p", &pid.to_string()])?;

let mut count = 0;

for line in output.lines() {
// lsof output format (columns vary, but typically):
// COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
// The NAME column contains the connection info like "127.0.0.1:54321->172.16.1.2:9092 (CLOSE_WAIT)"

// Check if line contains CLOSE_WAIT state
if line.contains("CLOSE_WAIT") {
count += 1;
}
}
Ok(count)
}

/// Asserts that there are no CLOSE_WAIT connections for the given PID
pub fn assert_no_close_wait_connections(pid: i32) {
let count = count_close_wait_connections_for_process(pid)
.expect("Failed to check for CLOSE_WAIT connections");

assert_eq!(
count, 0,
"Found {} CLOSE_WAIT connection(s) for PID {}. \
This indicates connections that were closed by the remote peer but not yet closed by the local process.",
count, pid
);
}
26 changes: 17 additions & 9 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,28 +281,32 @@ fn spawn_read_write_tasks<
// 2. The reader task detects that in_rx has dropped and terminates, the last out_tx instance is dropped
// 3. The writer task detects that the last out_tx is dropped by out_rx returning None and terminates
//
// Destination closes connection and then shotover tries to receive:
// 1. The reader task detects that the client has closed the connection via reader returning None and terminates,
// Destination closes connection (proactive cleanup via force_run_chain):
// 1. The reader task detects that the destination has closed the connection via reader returning None
// 1.1. connection_closed_tx is sent `ConnectionError::OtherSideClosed`
// 1.2. in_tx and the first out_tx are dropped
// 2. The `Connection::recv/recv_try` detects that in_tx is dropped by in_rx returning None and returns the ConnectionError::OtherSideClosed received from connection_closed_rx.
// 2.1. `Connection::recv/recv_try` prevents any future sends or receives by storing the ConnectionError
// 3. Once the user handles the error by dropping the Connection out_tx is dropped, the writer task detects this by out_rx returning None causing the task to terminate.
// 3.1. The writer task could also close early by detecting that the client has closed the connection via writer returning BrokenPipe
// 1.2. in_tx is dropped, closing the channel
// 1.3. force_run_chain.notify_one() is called to wake up the transform chain
// 1.4. The reader task terminates
// 2. The transform chain wakes up and runs with empty requests
// 2.1. The transform is responsible for detecting the closed connection and cleaning it up
// 3. When the Connection is dropped, out_tx is dropped
// 4. The writer task detects that out_tx is dropped by out_rx returning None and terminates
// 4.1. The writer task could also terminate early by detecting a BrokenPipe/ConnectionReset error when writing
//
// Destination closes connection and then shotover tries to send:
// if a send or recv has not been attempted yet the send will appear to have succeeded.
// if a recv was already attempted, then the logic is the same as the above example.
// if a send was already attempted, then the following logic occurs:
// 1. Connection::send sends a message to the writer task via out_tx.
// 2. The writer task attempts to send the mesage to the writer but it returns a BrokenPipe or ConnectionReset error.
// 3.1 The writer task task sends an OtherSideClosed error to the Connection.
// 2. The writer task attempts to send the message to the writer but it returns a BrokenPipe or ConnectionReset error.
// 3.1 The writer task sends an OtherSideClosed error to the Connection.
// 3.2 The writer task terminates.
// 4. Connection::send sends a message to the writer task via out_tx but detects the writer task terminated due to out_tx returning None.
// 4.1 Connection::send checks connection_closed_rx for the error, stores it and returns it to the caller.

let connection_closed_tx2 = connection_closed_tx.clone();
let request_pending2 = request_pending.clone();
let force_run_chain2 = force_run_chain.clone();
tokio::spawn(
async move {
match reader_task::<C, _>(
Expand All @@ -322,6 +326,10 @@ fn spawn_read_write_tasks<
// This ensures the handle side logic will always have an
// error available to consult as to why the `in_` channel was closed.
std::mem::drop(in_tx);
// Notify the transform chain about the error so it can run recv_responses()
// and detect the closed connection. This must happen after dropping in_tx
// so that when the transform wakes up, the channel is already closed.
force_run_chain2.notify_one();
}
}
}
Expand Down
79 changes: 41 additions & 38 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2239,54 +2239,57 @@ The connection to the client has been closed."

// Convert all received PendingRequestState::Sent into PendingRequestState::Received
for (connection_destination, connection) in &mut self.connections.connections {
// skip recv when no pending requests to avoid timeouts on old connections
if connection.pending_requests_count() != 0 {
self.temp_responses_buffer.clear();
match connection
.try_recv_into(&mut self.temp_responses_buffer)
.with_context(|| format!("Failed to receive from {connection_destination:?}"))
{
Ok(()) => {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent { index, request } =
&mut pending_request.state
{
if &pending_request.destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
response: response.take().unwrap(),
request: request.take(),
};
} else {
*index -= 1;
}
self.temp_responses_buffer.clear();
match connection
.try_recv_into(&mut self.temp_responses_buffer)
.with_context(|| format!("Failed to receive from {connection_destination:?}"))
{
Ok(()) => {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent { index, request } =
&mut pending_request.state
{
if &pending_request.destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
response: response.take().unwrap(),
request: request.take(),
};
} else {
*index -= 1;
}
}
}
}
}
Err(err) => connection_errors.push((*connection_destination, err)),
}
Err(err) => connection_errors.push((*connection_destination, err)),
}
}

for (destination, err) in connection_errors {
// Since pending_requests_count > 0 we expect this to return an Err.
self.connections
.handle_connection_error(
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
destination,
err,
)
.await?;
if let Some(connection) = self.connections.connections.get(&destination) {
if connection.pending_requests_count() == 0 {
self.connections.connections.remove(&destination);
} else {
// Has pending requests - need to handle the error properly
self.connections
.handle_connection_error(
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
destination,
err,
)
.await?;
}
}
}

// Remove and return all PendingRequestState::Received that are ready to be received.
Expand Down
12 changes: 12 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ impl KafkaProducerJava {
}
}

impl Drop for KafkaProducerJava {
fn drop(&mut self) {
tokio::task::block_in_place(|| self.producer.call("close", vec![]));
}
}

pub struct KafkaConsumerJava {
jvm: Jvm,
consumer: Value,
Expand Down Expand Up @@ -1082,6 +1088,12 @@ impl KafkaAdminJava {
}
}

impl Drop for KafkaAdminJava {
fn drop(&mut self) {
tokio::task::block_in_place(|| self.admin.call("close", vec![]));
}
}

fn topic_partition_to_java(jvm: &Jvm, tp: &TopicPartition) -> Value {
jvm.construct(
"org.apache.kafka.common.TopicPartition",
Expand Down
Loading