Skip to content

Commit cd18691

Browse files
committed
Implement ReaderState enum
1 parent fde35de commit cd18691

File tree

1 file changed

+64
-93
lines changed

1 file changed

+64
-93
lines changed

lightning-net/src/lib.rs

Lines changed: 64 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -338,31 +338,20 @@ impl SocketDescriptor for SyncSocketDescriptor {
338338
return 0;
339339
}
340340

341-
if self.write_data_tx.is_empty() {
342-
// The data must be copied into the channel since a &[u8] reference
343-
// cannot be sent across threads. This incurs a small amount of overhead.
344-
match self.write_data_tx.try_send(data.to_vec()) {
345-
Ok(()) => data.len(),
346-
Err(e) => match e {
347-
TrySendError::Full(_) => {
348-
// This could only happen if another Sender pushed into
349-
// the channel in between the if check above and now - a
350-
// TOCTTOU error. This really shouldn't happen, but
351-
// let's just proceed normally: pause reads and return 0
352-
let _ = self.reader_cmd_tx.try_send(ReaderCommand::PauseRead);
353-
0
354-
}
355-
TrySendError::Disconnected(_) => {
356-
// This might happen if the Writer detected a disconnect and
357-
// shut down on its own. Return 0.
358-
0
359-
}
360-
},
341+
// The data must be copied into the channel since a &[u8] reference
342+
// cannot be sent across threads. This incurs a small amount of overhead.
343+
match self.write_data_tx.try_send(data.to_vec()) {
344+
Ok(()) => data.len(),
345+
Err(TrySendError::Full(_)) => {
346+
// Writes are processing; pause reads.
347+
let _ = self.reader_cmd_tx.try_send(ReaderCommand::PauseRead);
348+
0
349+
}
350+
Err(TrySendError::Disconnected(_)) => {
351+
// This might happen if the Writer detected a disconnect and
352+
// shut down on its own. Return 0.
353+
0
361354
}
362-
} else {
363-
// Writes are processing; pause reads.
364-
let _ = self.reader_cmd_tx.try_send(ReaderCommand::PauseRead);
365-
0
366355
}
367356
}
368357

@@ -423,6 +412,16 @@ impl SocketDescriptor for SyncSocketDescriptor {
423412
}
424413
}
425414

415+
/// The states that the Reader can be in.
416+
enum ReaderState {
417+
/// Ready state; Reader is blocked on read().
418+
Reading,
419+
/// Reading is paused; Reader is blocked on recv().
420+
Paused,
421+
/// Reader will shut down in the next iteration of the run() event loop.
422+
ShuttingDown,
423+
}
424+
426425
/// An actor that synchronously handles the read() events emitted by the socket.
427426
struct Reader<CMH, RMH, L, UMH>
428427
where
@@ -435,7 +434,7 @@ where
435434
peer_manager: Arc<PeerManager<SyncSocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>,
436435
descriptor: SyncSocketDescriptor,
437436
reader_cmd_rx: Receiver<ReaderCommand>,
438-
read_paused: bool,
437+
state: ReaderState,
439438
}
440439
impl<CMH, RMH, L, UMH> Reader<CMH, RMH, L, UMH>
441440
where
@@ -455,7 +454,7 @@ where
455454
peer_manager,
456455
descriptor,
457456
reader_cmd_rx,
458-
read_paused: false,
457+
state: ReaderState::Reading,
459458
}
460459
}
461460

@@ -469,30 +468,20 @@ where
469468
/// read events accordingly.
470469
/// - In between each event, do a non-blocking check for `ReaderCommand`s.
471470
fn run(&mut self) {
471+
use ReaderState::*;
472+
472473
// 8KB is nice and big but also should never cause any issues with stack
473474
// overflowing.
474475
let mut buf = [0; 8192];
475476

476477
loop {
477-
// Every time this line is reached, read_paused == false.
478-
// Do a non-blocking try_recv() to check for commands
479-
if self.do_try_recv() {
480-
break;
481-
}
478+
self.do_try_recv();
482479

483-
if self.read_paused {
484-
// To avoid a busy loop while reading is paused, block on the
485-
// reader_cmd channel until we are told to resume reading again
486-
// or until we receive a shut down command.
487-
if self.do_recv() {
488-
break;
489-
}
490-
} else {
491-
// Reading is not paused; block on the next read.
492-
if self.do_read(&mut buf) {
493-
break;
494-
}
495-
}
480+
match self.state {
481+
Reading => self.do_read(&mut buf),
482+
Paused => self.do_recv(),
483+
ShuttingDown => break,
484+
};
496485
}
497486

498487
// Shut down the underlying stream. It's fine if it was already closed.
@@ -505,65 +494,50 @@ where
505494
}
506495

507496
/// Checks for a command in a non-blocking manner, handling the command
508-
/// accordingly if there was one.
509-
///
510-
/// Returns a bool indicating whether the Reader should shut down.
511-
fn do_try_recv(&mut self) -> bool {
497+
/// if there was one.
498+
fn do_try_recv(&mut self) {
512499
match self.reader_cmd_rx.try_recv() {
513-
Ok(cmd) => match cmd {
514-
ReaderCommand::PauseRead => {
515-
self.read_paused = true;
516-
}
517-
ReaderCommand::ResumeRead => {
518-
self.read_paused = false;
519-
}
520-
ReaderCommand::Shutdown => return true,
521-
},
500+
Ok(cmd) => self.handle_command(cmd),
522501
Err(e) => match e {
523502
TryRecvError::Empty => {}
524-
TryRecvError::Disconnected => return true,
503+
TryRecvError::Disconnected => self.state = ReaderState::ShuttingDown,
525504
},
526-
}
527-
528-
false
505+
};
529506
}
530507

531-
/// Blocks on the command channel and handles the command accordingly.
532-
///
533-
/// Returns a bool indicating whether the Reader should shut down.
534-
fn do_recv(&mut self) -> bool {
508+
/// Blocks on the command channel and handles the command.
509+
fn do_recv(&mut self) {
535510
match self.reader_cmd_rx.recv() {
536-
Ok(cmd) => match cmd {
537-
ReaderCommand::PauseRead => {
538-
self.read_paused = true;
539-
}
540-
ReaderCommand::ResumeRead => {
541-
self.read_paused = false;
542-
}
543-
ReaderCommand::Shutdown => return true,
544-
},
545-
Err(_) => {
546-
// The channel is disconnected, break and shut down
547-
return true;
548-
}
549-
}
511+
Ok(cmd) => self.handle_command(cmd),
512+
Err(_) => self.state = ReaderState::ShuttingDown,
513+
};
514+
}
550515

551-
false
516+
/// Handles a `ReaderCommand`.
517+
fn handle_command(&mut self, cmd: ReaderCommand) {
518+
use ReaderCommand::*;
519+
use ReaderState::*;
520+
521+
match cmd {
522+
PauseRead if !matches!(self.state, ShuttingDown) => self.state = Paused,
523+
ResumeRead if !matches!(self.state, ShuttingDown) => self.state = Reading,
524+
Shutdown => self.state = ShuttingDown,
525+
_ => {}
526+
}
552527
}
553528

554529
/// Blocks on read() and handles the response accordingly.
555-
///
556-
/// Returns a bool indicating whether the Reader should shut down.
557-
fn do_read(&mut self, buf: &mut [u8; 8192]) -> bool {
530+
fn do_read(&mut self, buf: &mut [u8; 8192]) {
558531
use std::io::ErrorKind::*;
532+
use ReaderState::*;
559533

560534
match self.inner.read(buf) {
561535
Ok(0) => {
562536
// Peer disconnected or TcpStream::shutdown was called.
563537
// Notify the PeerManager then shutdown
564538
self.peer_manager.socket_disconnected(&self.descriptor);
565539
self.peer_manager.process_events();
566-
return true;
540+
self.state = ShuttingDown;
567541
}
568542
Ok(bytes_read) => {
569543
// Register the read event with the PeerManager
@@ -573,33 +547,30 @@ where
573547
{
574548
Ok(pause_read) => {
575549
if pause_read {
576-
self.read_paused = true;
550+
self.state = Paused;
577551
}
578552
}
579553
Err(_) => {
580-
// Rust-Lightning told us to disconnect; do it
581-
// No need to notify PeerManager in this case
582-
return true;
554+
// Rust-Lightning told us to disconnect;
555+
// no need to notify PeerManager in this case
556+
self.state = ShuttingDown;
583557
}
584558
}
585559

586-
// As noted in the read_event() docs, call process_events().
587560
self.peer_manager.process_events()
588561
}
589562
Err(e) => match e.kind() {
590563
TimedOut | Interrupted => {
591-
// Retry
564+
// Acceptable error; retry
592565
}
593566
_ => {
594567
// For all other errors, notify PeerManager and shut down
595568
self.peer_manager.socket_disconnected(&self.descriptor);
596569
self.peer_manager.process_events();
597-
return true;
570+
self.state = ShuttingDown;
598571
}
599572
},
600573
}
601-
602-
false
603574
}
604575
}
605576

0 commit comments

Comments
 (0)