Skip to content
This repository was archived by the owner on Feb 3, 2023. It is now read-only.

Bounded #2164

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open

Bounded #2164

Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/conductor_lib/src/conductor/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
port_utils::{try_with_port, INTERFACE_CONNECT_ATTEMPTS_MAX},
Holochain,
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use holochain_common::paths::DNA_EXTENSION;
use holochain_core::{logger::Logger, signal::Signal};
use holochain_core_types::{
Expand Down Expand Up @@ -61,6 +61,8 @@ use holochain_net::p2p_config::{BackendConfig, P2pBackendKind, P2pConfig};

pub const MAX_DYNAMIC_PORT: u16 = std::u16::MAX;

use crate::CHANNEL_SIZE;

/// Special string to be printed on stdout, which clients must parse
/// in order to discover which port the interface bound to.
/// DO NOT CHANGE!
Expand Down Expand Up @@ -168,8 +170,6 @@ pub fn notify(msg: String) {
println!("{}", msg);
}

#[autotrace]
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CONDUCTOR_LIB)]
impl Conductor {
pub fn from_config(config: Configuration) -> Self {
lib3h_sodium::check_init();
Expand Down Expand Up @@ -243,8 +243,8 @@ impl Conductor {
pub fn spawn_stats_thread(&mut self) {
self.stop_stats_thread();
let instances = self.instances.clone();
let (kill_switch_tx, kill_switch_rx) = unbounded();
let (stats_tx, stats_rx) = unbounded();
let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE);
let (stats_tx, stats_rx) = bounded(CHANNEL_SIZE);
self.stats_thread_kill_switch = Some(kill_switch_tx);
self.stats_signal_receiver = Some(stats_rx);
thread::Builder::new()
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Conductor {
let instance_signal_receivers = self.instance_signal_receivers.clone();
let signal_tx = self.signal_tx.clone();
let config = self.config.clone();
let (kill_switch_tx, kill_switch_rx) = unbounded();
let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE);
self.signal_multiplexer_kill_switch = Some(kill_switch_tx);
self.spawn_stats_thread();
let stats_signal_receiver = self.stats_signal_receiver.clone().expect(
Expand Down Expand Up @@ -784,7 +784,7 @@ impl Conductor {
)> {
match self.config.tracing.clone().unwrap_or_default() {
TracingConfiguration::Jaeger(jaeger_config) => {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (span_tx, span_rx) = crossbeam_channel::bounded(CHANNEL_SIZE);
let service_name = format!("{}-{}", jaeger_config.service_name, id);
let mut reporter = ht::reporter::JaegerCompactReporter::new(&service_name).unwrap();
if let Some(s) = jaeger_config.socket_address {
Expand Down Expand Up @@ -834,7 +834,7 @@ impl Conductor {
context_builder = context_builder.with_p2p_config(self.get_p2p_config());

// Signal config:
let (sender, receiver) = unbounded();
let (sender, receiver) = bounded(CHANNEL_SIZE);
self.instance_signal_receivers
.write()
.unwrap()
Expand Down Expand Up @@ -1341,7 +1341,7 @@ impl Conductor {
fn spawn_interface_thread(&self, interface_config: InterfaceConfiguration) -> Sender<()> {
let dispatcher = self.make_interface_handler(&interface_config);
// The "kill switch" is the channel which allows the interface to be stopped from outside its thread
let (kill_switch_tx, kill_switch_rx) = unbounded();
let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE);

let (broadcaster, _handle) = run_interface(&interface_config, dispatcher, kill_switch_rx)
.map_err(|error| {
Expand Down
5 changes: 3 additions & 2 deletions crates/conductor_lib/src/conductor/passphrase_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crossbeam_channel::{unbounded, Sender};
use crossbeam_channel::{bounded, Sender};
use holochain_core_types::error::HolochainError;
use holochain_locksmith::Mutex;
use lib3h_sodium::secbuf::SecBuf;
Expand All @@ -11,6 +11,7 @@ use std::{
time::{Duration, Instant},
};

use crate::CHANNEL_SIZE;
#[cfg(unix)]
use std::io::{BufRead, BufReader};
#[cfg(unix)]
Expand All @@ -34,7 +35,7 @@ pub struct PassphraseManager {
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CONDUCTOR_LIB)]
impl PassphraseManager {
pub fn new(passphrase_service: Arc<Mutex<dyn PassphraseService + Send>>) -> Self {
let (kill_switch_tx, kill_switch_rx) = unbounded::<()>();
let (kill_switch_tx, kill_switch_rx) = bounded::<()>(CHANNEL_SIZE);
let pm = PassphraseManager {
passphrase_cache: Arc::new(Mutex::new(None)),
passphrase_service,
Expand Down
2 changes: 2 additions & 0 deletions crates/conductor_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ pub mod static_server_impls;

pub use crate::holochain::Holochain;

const CHANNEL_SIZE: usize = 1000;

new_relic_setup!("NEW_RELIC_LICENSE_KEY");
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use nickel::{
Response, StaticFilesHandler,
};

use crate::CHANNEL_SIZE;

pub struct NickelStaticServer {
shutdown_signal: Option<Sender<()>>,
config: UiInterfaceConfiguration,
Expand All @@ -38,7 +40,7 @@ impl ConductorStaticFileServer for NickelStaticServer {
}

fn start(&mut self) -> HolochainResult<()> {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = crossbeam_channel::bounded(CHANNEL_SIZE);

self.shutdown_signal = Some(tx);
self.running = true;
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
persister::Persister,
signal::{Signal, SignalSender},
state::StateWrapper,
CHANNEL_SIZE,
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use futures::{
executor::ThreadPool,
task::{noop_waker_ref, Poll},
Expand Down Expand Up @@ -50,8 +51,8 @@ use std::{
#[cfg(test)]
use test_utils::mock_signing::mock_conductor_api;

pub type ActionSender = ht::channel::SpanSender<ActionWrapper>;
pub type ActionReceiver = ht::channel::SpanReceiver<ActionWrapper>;
pub type ActionSender = crossbeam_channel::Sender<ActionWrapper>;
pub type ActionReceiver = crossbeam_channel::Receiver<ActionWrapper>;

pub struct P2pNetworkWrapper(Arc<Mutex<Option<P2pNetwork>>>);

Expand Down Expand Up @@ -306,13 +307,13 @@ impl Context {
pub fn is_action_channel_open(&self) -> bool {
self.action_channel
.clone()
.map(|tx| tx.send_wrapped(ActionWrapper::new(Action::Ping)).is_ok())
.map(|tx| tx.send(ActionWrapper::new(Action::Ping)).is_ok())
.unwrap_or(false)
}

pub fn action_channel_error(&self, msg: &str) -> Option<HolochainError> {
match &self.action_channel {
Some(tx) => match tx.send_wrapped(ActionWrapper::new(Action::Ping)) {
Some(tx) => match tx.send(ActionWrapper::new(Action::Ping)) {
Ok(()) => None,
Err(_) => Some(HolochainError::LifecycleError(msg.into())),
},
Expand Down Expand Up @@ -343,7 +344,7 @@ impl Context {
/// got mutated.
/// This enables blocking/parking the calling thread until the application state got changed.
pub fn create_observer(&self) -> Receiver<()> {
let (tick_tx, tick_rx) = unbounded();
let (tick_tx, tick_rx) = bounded(CHANNEL_SIZE);
self.observer_channel()
.send(Observer { ticker: tick_tx })
.expect("Observer channel not initialized");
Expand Down
36 changes: 10 additions & 26 deletions crates/core/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ use crate::{
signal::Signal,
state::{State, StateWrapper},
workflows::{application, run_holding_workflow},
CHANNEL_SIZE,
};
#[cfg(test)]
use crate::{
network::actions::initialize_network::initialize_network_with_spoofed_dna,
nucleus::actions::initialize::initialize_chain,
};
use clokwerk::{ScheduleHandle, Scheduler, TimeUnits};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use holochain_core_types::{
dna::Dna,
error::{HcResult, HolochainError},
};
use holochain_locksmith::RwLock;
#[cfg(test)]
use holochain_persistence_api::cas::content::Address;
use holochain_tracing::{self as ht, channel::lax_send_wrapped};
use snowflake::ProcessUniqueId;
use std::{
sync::{
Expand Down Expand Up @@ -160,8 +160,8 @@ impl Instance {

/// Returns recievers for actions and observers that get added to this instance
fn initialize_channels(&mut self) -> (ActionReceiver, Receiver<Observer>) {
let (tx_action, rx_action) = unbounded::<ht::SpanWrap<ActionWrapper>>();
let (tx_observer, rx_observer) = unbounded::<Observer>();
let (tx_action, rx_action) = bounded::<ActionWrapper>(CHANNEL_SIZE);
let (tx_observer, rx_observer) = bounded::<Observer>(CHANNEL_SIZE);
self.action_channel = Some(tx_action.into());
self.observer_channel = Some(tx_observer);

Expand All @@ -188,7 +188,7 @@ impl Instance {
let mut sync_self = self.clone();
let sub_context = self.initialize_context(context);

let (kill_sender, kill_receiver) = crossbeam_channel::unbounded();
let (kill_sender, kill_receiver) = crossbeam_channel::bounded(CHANNEL_SIZE);
self.kill_switch = Some(kill_sender);
let instance_is_alive = sub_context.instance_is_alive.clone();
instance_is_alive.store(true, Ordering::Relaxed);
Expand All @@ -200,7 +200,7 @@ impl Instance {
))
.spawn(move || {
let mut state_observers: Vec<Observer> = Vec::new();
let mut unprocessed_action: Option<ht::SpanWrap<ActionWrapper>> = None;
let mut unprocessed_action: Option<ActionWrapper> = None;
while kill_receiver.try_recv().is_err() {
if let Some(action_wrapper) = unprocessed_action.take().or_else(|| rx_action.recv_timeout(Duration::from_secs(1)).ok()) {
// Add new observers
Expand All @@ -211,11 +211,6 @@ impl Instance {
if should_process {
match sync_self.process_action(&action_wrapper, &sub_context) {
Ok(()) => {
let tag = ht::Tag::new("action", format!("{:?}", action));
let _guard = action_wrapper.follower_(&sub_context.tracer, "action_loop thread", |s| s.tag(tag).start()).map(|span| {

ht::push_span(span)
});
sync_self.emit_signals(&sub_context, &action_wrapper);
// Tick all observers and remove those that have lost their receiving part
state_observers= state_observers
Expand Down Expand Up @@ -253,20 +248,9 @@ impl Instance {
/// returns the new vector of observers
pub(crate) fn process_action(
&self,
action_wrapper: &ht::SpanWrap<ActionWrapper>,
action_wrapper: &ActionWrapper,
context: &Arc<Context>,
) -> Result<(), HolochainError> {
let span = action_wrapper
.follower(&context.tracer, "begin process_action")
.unwrap_or_else(|| {
context
.tracer
.span("ROOT: process_action")
.tag(ht::debug_tag("action_wrapper", action_wrapper))
.start()
.into()
});
let _trace_guard = ht::push_span(span);
context.redux_wants_write.store(true, Relaxed);
// Mutate state
{
Expand All @@ -280,7 +264,7 @@ impl Instance {
HolochainError::Timeout(format!("timeout src: {}:{}", file!(), line!()))
})?;

new_state = state.reduce(action_wrapper.data.clone());
new_state = state.reduce(action_wrapper.clone());

// Change the state
*state = new_state;
Expand All @@ -306,7 +290,7 @@ impl Instance {
}

fn start_holding_loop(&mut self, context: Arc<Context>) {
let (kill_sender, kill_receiver) = crossbeam_channel::unbounded();
let (kill_sender, kill_receiver) = crossbeam_channel::bounded(CHANNEL_SIZE);
self.kill_switch_holding = Some(kill_sender);
thread::Builder::new()
.name(format!(
Expand Down Expand Up @@ -492,7 +476,7 @@ impl Drop for Instance {
/// Panics if the channels passed are disconnected.
#[autotrace]
pub fn dispatch_action(action_channel: &ActionSender, action_wrapper: ActionWrapper) {
lax_send_wrapped(action_channel.clone(), action_wrapper, "dispatch_action");
action_channel.send(action_wrapper).ok();
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ pub mod state_dump;
pub mod wasm_engine;
pub mod workflows;

const CHANNEL_SIZE: usize = 1000;

new_relic_setup!("NEW_RELIC_LICENSE_KEY");
3 changes: 2 additions & 1 deletion crates/core/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This logger is the logger that's attached to each Holochain application
//! which is separate from standard logging via the log crate warn! info! debug! logging that
//! gets emitted globaly from the conductor.
use crate::CHANNEL_SIZE;
use chrono::Local;
use crossbeam_channel;
use holochain_locksmith::Mutex;
Expand Down Expand Up @@ -56,7 +57,7 @@ impl ChannelLogger {
ChannelLogger { id, sender }
}
pub fn setup() -> (Sender, Receiver) {
crossbeam_channel::unbounded()
crossbeam_channel::bounded(CHANNEL_SIZE)
}
}
pub fn default_handler(msg: String) {
Expand Down
14 changes: 7 additions & 7 deletions crates/core/src/nucleus/actions/call_zome_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use holochain_core_types::{

use holochain_json_api::json::JsonString;
use holochain_persistence_api::cas::content::{Address, AddressableContent};
use holochain_tracing::channel::lax_send_wrapped;

use holochain_dpki::utils::Verify;

Expand Down Expand Up @@ -86,7 +85,7 @@ pub async fn call_zome_function(
// Signal (currently mainly to the nodejs_waiter) that we are about to start a zome function:
context
.action_channel()
.send_wrapped(ActionWrapper::new(Action::QueueZomeFunctionCall(
.send(ActionWrapper::new(Action::QueueZomeFunctionCall(
zome_call.clone(),
)))
.expect("action channel to be open");
Expand Down Expand Up @@ -305,11 +304,12 @@ pub fn spawn_zome_function(context: Arc<Context>, zome_call: ZomeFnCall) {
context,
"actions/call_zome_fn: sending ReturnZomeFunctionResult action."
);
lax_send_wrapped(
context.action_channel().clone(),
ActionWrapper::new(Action::ReturnZomeFunctionResult(response)),
"call_zome_function",
);
context
.action_channel()
.send(ActionWrapper::new(Action::ReturnZomeFunctionResult(
response,
)))
.ok();
log_debug!(
context,
"actions/call_zome_fn: sent ReturnZomeFunctionResult action."
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/nucleus/actions/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn initialize_chain(
fn dispatch_error_result(context: &Arc<Context>, err: HolochainError) {
context
.action_channel()
.send_wrapped(ActionWrapper::new(Action::ReturnInitializationResult(Err(
.send(ActionWrapper::new(Action::ReturnInitializationResult(Err(
err.to_string(),
))))
.expect("Action channel not usable in initialize_chain()");
Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn initialize_chain(

context_clone
.action_channel()
.send_wrapped(ActionWrapper::new(Action::ReturnInitializationResult(
.send(ActionWrapper::new(Action::ReturnInitializationResult(
initialization_result,
)))
.expect("Action channel not usable in initialize_chain()");
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/nucleus/reducers/init_application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ pub mod tests {
state::{NucleusState, NucleusStatus},
},
state::test_store,
CHANNEL_SIZE,
};
use crossbeam_channel::unbounded;
use crossbeam_channel::bounded;
use holochain_core_types::dna::Dna;
use holochain_tracing as ht;
use std::sync::Arc;

#[test]
Expand All @@ -58,8 +58,8 @@ pub mod tests {
let dna = Dna::new();
let action_wrapper = ActionWrapper::new(Action::InitializeChain(dna.clone()));
let nucleus = Arc::new(NucleusState::new()); // initialize to bogus value
let (sender, _receiver) = unbounded::<ht::SpanWrap<ActionWrapper>>();
let (tx_observer, _observer) = unbounded::<Observer>();
let (sender, _receiver) = bounded::<ActionWrapper>(CHANNEL_SIZE);
let (tx_observer, _observer) = bounded::<Observer>(CHANNEL_SIZE);
let context = test_context_with_channels("jimmy", &sender.into(), &tx_observer, None);
let root_state = test_store(context);

Expand Down
Loading