Skip to content

Commit 939de26

Browse files
committed
synchronize client io
1 parent 298c581 commit 939de26

File tree

1 file changed

+26
-15
lines changed

1 file changed

+26
-15
lines changed

src/client.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::str::FromStr;
99
use std::sync::{
1010
atomic::{AtomicU32, Ordering},
1111
RwLock,
12+
Mutex
1213
};
1314
use std::time::Duration;
1415

@@ -132,6 +133,8 @@ pub struct Client {
132133
notif_recv: Receiver<notif::Notification>,
133134
/// Active notification handles: these will be closed on Drop
134135
notif_handles: RwLock<BTreeSet<(AmsAddr, notif::Handle)>>,
136+
/// Mutex around IO operations
137+
io_mutex: Mutex<()>,
135138
/// If we opened our local port with the router
136139
source_port_opened: bool,
137140
}
@@ -264,6 +267,7 @@ impl Client {
264267
read_timeout: timeouts.read,
265268
notif_handles: RwLock::default(),
266269
source_port_opened,
270+
io_mutex: Mutex::new(())
267271
})
268272
}
269273

@@ -336,21 +340,28 @@ impl Client {
336340
for buf in data_in {
337341
request.extend_from_slice(buf);
338342
}
339-
// &T impls Write for T: Write, so no &mut self required.
340-
(&self.socket).write_all(&request).ctx("sending request")?;
341-
342-
// Get a reply from the reader thread, with timeout or not.
343-
let reply = if let Some(tmo) = self.read_timeout {
344-
self.reply_recv
345-
.recv_timeout(tmo)
346-
.map_err(|_| io::ErrorKind::TimedOut.into())
347-
.ctx("receiving reply (route set?)")?
348-
} else {
349-
self.reply_recv
350-
.recv()
351-
.map_err(|_| io::ErrorKind::UnexpectedEof.into())
352-
.ctx("receiving reply (route set?)")?
353-
}?;
343+
344+
let reply = {
345+
let _io_guard = self.io_mutex
346+
.lock()
347+
.expect("attempted to read after an io panic");
348+
349+
// &T impls Write for T: Write, so no &mut self required.
350+
(&self.socket).write_all(&request).ctx("sending request")?;
351+
352+
// Get a reply from the reader thread, with timeout or not.
353+
if let Some(tmo) = self.read_timeout {
354+
self.reply_recv
355+
.recv_timeout(tmo)
356+
.map_err(|_| io::ErrorKind::TimedOut.into())
357+
.ctx("receiving reply (route set?)")?
358+
} else {
359+
self.reply_recv
360+
.recv()
361+
.map_err(|_| io::ErrorKind::UnexpectedEof.into())
362+
.ctx("receiving reply (route set?)")?
363+
}?
364+
};
354365

355366
// Validate the incoming reply. The reader thread already made sure that
356367
// it is consistent and addressed to us.

0 commit comments

Comments
 (0)