Skip to content

Commit 417ebce

Browse files
committed
install_programs: broadcast to all backends via new BackendBroadcaster
1 parent 9b0589b commit 417ebce

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

src/ipc/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub trait Ipc: 'static + Send {
3434
pub trait Backend<T: Ipc> {
3535
//fn new(sock: T, continue_listening: Arc<atomic::AtomicBool>) -> Self;
3636
fn sender(&self) -> BackendSender<T>;
37+
fn broadcaster(&self) -> BackendBroadcaster<T>;
3738
fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool>;
3839
fn next(&mut self) -> Option<Msg>;
3940
}
@@ -149,7 +150,7 @@ where
149150
}
150151

151152
MultiBackend {
152-
last_recvd: Some(0), // TODO need a better solution for this, should be none
153+
last_recvd: None,
153154
continue_listening,
154155
sel,
155156
backends,
@@ -174,10 +175,16 @@ impl<T: Ipc> Backend<T> for MultiBackend<T> {
174175
fn sender(&self) -> BackendSender<T> {
175176
match self.last_recvd {
176177
Some(i) => self.backends[i as usize].clone(),
177-
None => panic!("Called sender but no messages have been received yet!")
178+
None => {
179+
panic!("No messages have been received yet, so there is no corresponding sender")
180+
}
178181
}
179182
}
180183

184+
fn broadcaster(&self) -> BackendBroadcaster<T> {
185+
BackendBroadcaster(self.backends.clone())
186+
}
187+
181188
/// Return a copy of the flag variable that indicates that the
182189
/// `Backend` should continue listening (i.e., not exit).
183190
fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {
@@ -209,6 +216,10 @@ impl<T: Ipc> Backend<T> for SingleBackend<T> {
209216
BackendSender(Arc::downgrade(&self.sock))
210217
}
211218

219+
fn broadcaster(&self) -> BackendBroadcaster<T> {
220+
BackendBroadcaster(vec![self.sender()])
221+
}
222+
212223
/// Return a copy of the flag variable that indicates that the
213224
/// `Backend` should continue listening (i.e., not exit).
214225
fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {

src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ mod errors;
102102
pub use crate::errors::*;
103103

104104
use crate::ipc::Ipc;
105-
use crate::ipc::{BackendBuilder, BackendSender};
105+
use crate::ipc::{BackendBroadcaster, BackendBuilder, BackendSender};
106106
use crate::lang::{Bin, Reg, Scope};
107107
use crate::serialize::Msg;
108108

@@ -239,7 +239,7 @@ where
239239
instrs: bin,
240240
};
241241
let buf = serialize::serialize(&msg)?;
242-
sender.send_msg(&buf[..])?;
242+
sender.broadcast_msg(&buf[..])?;
243243
Ok(())
244244
}
245245

@@ -474,12 +474,12 @@ where
474474
fn install_programs<I: Ipc + Sync>(
475475
programs: &HashMap<&'static str, String>,
476476
mut scope_map: &mut Rc<HashMap<String, Scope>>,
477-
b: BackendSender<I>,
477+
b: &BackendBroadcaster<I>,
478478
) -> Result<()> {
479479
for (program_name, program) in programs.iter() {
480480
match lang::compile(program.as_bytes(), &[]) {
481481
Ok((bin, sc)) => {
482-
match send_and_install(0, &b, bin, &sc) {
482+
match send_and_install(0, b, bin, &sc) {
483483
Ok(_) => {}
484484
Err(e) => {
485485
return Err(Error(format!(
@@ -539,7 +539,8 @@ where
539539
let mut scope_map = Rc::new(HashMap::<String, Scope>::default());
540540

541541
let programs = alg.datapath_programs();
542-
match install_programs(&programs, &mut scope_map, b.sender()) {
542+
let broadcaster = b.broadcaster();
543+
match install_programs(&programs, &mut scope_map, &broadcaster) {
543544
Ok(()) => {} // great, the datapath is ready, keep going!
544545
Err(_) => {
545546
if let Some(log) = cfg.logger.as_ref() {
@@ -561,7 +562,7 @@ where
561562
info!(log, "got message from datapath, installing programs...");
562563
}
563564
// got a msg from the datapath, it must be up, so let's try to install programs again
564-
install_programs(&programs, &mut scope_map, b.sender()).unwrap();
565+
install_programs(&programs, &mut scope_map, &broadcaster).unwrap();
565566
}
566567
}
567568
if let Some(log) = cfg.logger.as_ref() {

0 commit comments

Comments
 (0)