Skip to content

Commit cb6d345

Browse files
committed
synchronize client io
1 parent 298c581 commit cb6d345

File tree

1 file changed

+26
-17
lines changed

1 file changed

+26
-17
lines changed

src/client.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::net::{IpAddr, Shutdown, TcpStream, ToSocketAddrs};
88
use std::str::FromStr;
99
use std::sync::{
1010
atomic::{AtomicU32, Ordering},
11-
RwLock,
11+
Mutex, RwLock,
1212
};
1313
use std::time::Duration;
1414

@@ -132,6 +132,8 @@ pub struct Client {
132132
notif_recv: Receiver<notif::Notification>,
133133
/// Active notification handles: these will be closed on Drop
134134
notif_handles: RwLock<BTreeSet<(AmsAddr, notif::Handle)>>,
135+
/// Mutex around IO operations
136+
io_mutex: Mutex<()>,
135137
/// If we opened our local port with the router
136138
source_port_opened: bool,
137139
}
@@ -140,7 +142,8 @@ impl Drop for Client {
140142
fn drop(&mut self) {
141143
// the notif_handles lock should only be poisioned in panics coming
142144
// from std code, so a panic is probably acceptable here.
143-
let handles = self.notif_handles
145+
let handles = self
146+
.notif_handles
144147
.get_mut()
145148
.expect("notification handle cache lock was poisoned");
146149

@@ -264,6 +267,7 @@ impl Client {
264267
read_timeout: timeouts.read,
265268
notif_handles: RwLock::default(),
266269
source_port_opened,
270+
io_mutex: Mutex::new(()),
267271
})
268272
}
269273

@@ -336,21 +340,26 @@ impl Client {
336340
for buf in data_in {
337341
request.extend_from_slice(buf);
338342
}
339-
// &T impls Write for T: Write, so no &mut self required.
340-
(&self.socket).write_all(&request).ctx("sending request")?;
341-
342-
// Get a reply from the reader thread, with timeout or not.
343-
let reply = if let Some(tmo) = self.read_timeout {
344-
self.reply_recv
345-
.recv_timeout(tmo)
346-
.map_err(|_| io::ErrorKind::TimedOut.into())
347-
.ctx("receiving reply (route set?)")?
348-
} else {
349-
self.reply_recv
350-
.recv()
351-
.map_err(|_| io::ErrorKind::UnexpectedEof.into())
352-
.ctx("receiving reply (route set?)")?
353-
}?;
343+
344+
let reply = {
345+
let _io_guard = self.io_mutex.lock().expect("attempted to read after an io panic");
346+
347+
// &T impls Write for T: Write, so no &mut self required.
348+
(&self.socket).write_all(&request).ctx("sending request")?;
349+
350+
// Get a reply from the reader thread, with timeout or not.
351+
if let Some(tmo) = self.read_timeout {
352+
self.reply_recv
353+
.recv_timeout(tmo)
354+
.map_err(|_| io::ErrorKind::TimedOut.into())
355+
.ctx("receiving reply (route set?)")?
356+
} else {
357+
self.reply_recv
358+
.recv()
359+
.map_err(|_| io::ErrorKind::UnexpectedEof.into())
360+
.ctx("receiving reply (route set?)")?
361+
}?
362+
};
354363

355364
// Validate the incoming reply. The reader thread already made sure that
356365
// it is consistent and addressed to us.

0 commit comments

Comments
 (0)