Skip to content

Commit bac3320

Browse files
committed
install_programs: broadcast to all backends via new BackendBroadcaster
1 parent 38af7e4 commit bac3320

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

src/ipc/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub trait Ipc: 'static + Send {
3434
pub trait Backend<T: Ipc> {
3535
//fn new(sock: T, continue_listening: Arc<atomic::AtomicBool>) -> Self;
3636
fn sender(&self) -> BackendSender<T>;
37+
fn broadcaster(&self) -> BackendBroadcaster<T>;
3738
fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool>;
3839
fn next(&mut self) -> Option<Msg>;
3940
}
@@ -153,7 +154,7 @@ where
153154
}
154155

155156
MultiBackend {
156-
last_recvd: Some(0), // TODO need a better solution for this, should be none
157+
last_recvd: None,
157158
continue_listening,
158159
sel,
159160
backends,
@@ -178,10 +179,16 @@ impl<T: Ipc> Backend<T> for MultiBackend<T> {
178179
fn sender(&self) -> BackendSender<T> {
179180
match self.last_recvd {
180181
Some(i) => self.backends[i as usize].clone(),
181-
None => panic!("Called sender but no messages have been received yet!")
182+
None => {
183+
panic!("No messages have been received yet, so there is no corresponding sender")
184+
}
182185
}
183186
}
184187

188+
fn broadcaster(&self) -> BackendBroadcaster<T> {
189+
BackendBroadcaster(self.backends.clone())
190+
}
191+
185192
/// Return a copy of the flag variable that indicates that the
186193
/// `Backend` should continue listening (i.e., not exit).
187194
fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {
@@ -213,6 +220,10 @@ impl<T: Ipc> Backend<T> for SingleBackend<T> {
213220
BackendSender(Arc::downgrade(&self.sock))
214221
}
215222

223+
fn broadcaster(&self) -> BackendBroadcaster<T> {
224+
BackendBroadcaster(vec![self.sender()])
225+
}
226+
216227
/// Return a copy of the flag variable that indicates that the
217228
/// `Backend` should continue listening (i.e., not exit).
218229
fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {

src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ mod errors;
102102
pub use crate::errors::*;
103103

104104
use crate::ipc::Ipc;
105-
use crate::ipc::{BackendBuilder, BackendSender};
105+
use crate::ipc::{BackendBroadcaster, BackendBuilder, BackendSender};
106106
use crate::lang::{Bin, Reg, Scope};
107107
use crate::serialize::Msg;
108108

@@ -239,7 +239,7 @@ where
239239
instrs: bin,
240240
};
241241
let buf = serialize::serialize(&msg)?;
242-
sender.send_msg(&buf[..])?;
242+
sender.broadcast_msg(&buf[..])?;
243243
Ok(())
244244
}
245245

@@ -474,12 +474,12 @@ where
474474
fn install_programs<I: Ipc + Sync>(
475475
programs: &HashMap<&'static str, String>,
476476
mut scope_map: &mut Rc<HashMap<String, Scope>>,
477-
b: BackendSender<I>,
477+
b: &BackendBroadcaster<I>,
478478
) -> Result<()> {
479479
for (program_name, program) in programs.iter() {
480480
match lang::compile(program.as_bytes(), &[]) {
481481
Ok((bin, sc)) => {
482-
match send_and_install(0, &b, bin, &sc) {
482+
match send_and_install(0, b, bin, &sc) {
483483
Ok(_) => {}
484484
Err(e) => {
485485
return Err(Error(format!(
@@ -539,7 +539,8 @@ where
539539
let mut scope_map = Rc::new(HashMap::<String, Scope>::default());
540540

541541
let programs = alg.datapath_programs();
542-
match install_programs(&programs, &mut scope_map, b.sender()) {
542+
let broadcaster = b.broadcaster();
543+
match install_programs(&programs, &mut scope_map, &broadcaster) {
543544
Ok(()) => {} // great, the datapath is ready, keep going!
544545
Err(_) => {
545546
if let Some(log) = cfg.logger.as_ref() {
@@ -561,7 +562,7 @@ where
561562
info!(log, "got message from datapath, installing programs...");
562563
}
563564
// got a msg from the datapath, it must be up, so let's try to install programs again
564-
install_programs(&programs, &mut scope_map, b.sender()).unwrap();
565+
install_programs(&programs, &mut scope_map, &broadcaster).unwrap();
565566
}
566567
}
567568
if let Some(log) = cfg.logger.as_ref() {

0 commit comments

Comments
 (0)