Skip to content

Commit 1a0cc81

Browse files
make run retry on connection error
1 parent 609b5ef commit 1a0cc81

File tree

1 file changed

+46
-41
lines changed

1 file changed

+46
-41
lines changed

shotover/src/source_task.rs

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -248,48 +248,53 @@ impl<C: CodecBuilder + 'static> SourceTask<C> {
248248
tokio::select! {
249249
// Accept new connection
250250
stream_result = Self::accept(&mut self.listener) => {
251-
let stream = stream_result?;
252-
253-
debug!("got socket");
254-
self.available_connections_gauge.set(self.limit_connections.available_permits() as f64);
255-
self.connections_opened.increment(1);
256-
257-
let client_details = stream.peer_addr()
258-
.map(|p| p.ip().to_string())
259-
.unwrap_or_else(|_| "Unknown Peer".to_string());
260-
tracing::info!("New connection from {}", client_details);
261-
262-
let force_run_chain = Arc::new(Notify::new());
263-
let context = TransformContextBuilder{
264-
force_run_chain: force_run_chain.clone(),
265-
client_details:client_details.clone(),
266-
};
267-
268-
// Create a unique shutdown channel for this connection
269-
let (connection_shutdown_tx, connection_shutdown_rx) = watch::channel(false);
270-
271-
let handler = Handler{
272-
chain: self.chain_builder.build(context),
273-
codec: self.codec.clone(),
274-
shutdown: Shutdown::new(connection_shutdown_rx),
275-
tls: self.tls.clone(),
276-
pending_requests: PendingRequests::new(self.codec.protocol()),
277-
timeout: self.timeout,
278-
_permit: permit,
279-
};
280-
// Spawn a new task to process the connections.
281-
let handle = tokio::spawn(async move{
282-
// Process the connection. If an error is encountered, log it.
283-
if let Err(err) = handler.run(stream, transport, force_run_chain, client_details).await{
284-
error!("{:?}", err.context("connection was unexpectedly terminated"));
251+
match stream_result {
252+
Ok(stream) => {
253+
debug!("got socket");
254+
self.available_connections_gauge.set(self.limit_connections.available_permits() as f64);
255+
self.connections_opened.increment(1);
256+
257+
let client_details = stream.peer_addr()
258+
.map(|p| p.ip().to_string())
259+
.unwrap_or_else(|_| "Unknown Peer".to_string());
260+
tracing::info!("New connection from {}", client_details);
261+
262+
let force_run_chain = Arc::new(Notify::new());
263+
let context = TransformContextBuilder{
264+
force_run_chain: force_run_chain.clone(),
265+
client_details:client_details.clone(),
266+
};
267+
268+
// Create a unique shutdown channel for this connection
269+
let (connection_shutdown_tx, connection_shutdown_rx) = watch::channel(false);
270+
271+
let handler = Handler{
272+
chain: self.chain_builder.build(context),
273+
codec: self.codec.clone(),
274+
shutdown: Shutdown::new(connection_shutdown_rx),
275+
tls: self.tls.clone(),
276+
pending_requests: PendingRequests::new(self.codec.protocol()),
277+
timeout: self.timeout,
278+
_permit: permit,
279+
};
280+
// Spawn a new task to process the connections.
281+
let handle = tokio::spawn(async move{
282+
// Process the connection. If an error is encountered, log it.
283+
if let Err(err) = handler.run(stream, transport, force_run_chain, client_details).await{
284+
error!("{:?}", err.context("connection was unexpectedly terminated"));
285+
}
286+
}.in_current_span());
287+
let tracked_connection = TrackedConnection::new(handle, connection_shutdown_tx);
288+
self.connection_handles.push(tracked_connection);
289+
// Only prune the list every so often
290+
// theres no point in doing it every iteration because most likely none of the handles will have completed
291+
if self.connection_count.is_multiple_of(1000) {
292+
self.connection_handles.retain(|x| !x.is_finished());
293+
}
294+
}
295+
Err(err) => {
296+
error!(cause = %err, "failed to accept connection, retrying");
285297
}
286-
}.in_current_span());
287-
let tracked_connection = TrackedConnection::new(handle, connection_shutdown_tx);
288-
self.connection_handles.push(tracked_connection);
289-
// Only prune the list every so often
290-
// theres no point in doing it every iteration because most likely none of the handles will have completed
291-
if self.connection_count.is_multiple_of(1000) {
292-
self.connection_handles.retain(|x| !x.is_finished());
293298
}
294299
Ok::<(), anyhow::Error>(())
295300
},

0 commit comments

Comments
 (0)