Skip to content
This repository was archived by the owner on Feb 3, 2023. It is now read-only.

Sim2h recording & playback of connections and messages #2030

Open
wants to merge 43 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
85d963e
Add two queue sizes to StatusData
maackle Jan 2, 2020
70851a8
Add status data for Limbo connections
maackle Jan 2, 2020
d3809c5
Merge branch 'develop' into sim2h-status-additions
zippy Jan 2, 2020
d9796b4
Break out into separate DebugData and print periodically
maackle Jan 2, 2020
5f53838
Opt-in to debug dumps
maackle Jan 2, 2020
82a1e8f
Merge branch 'sim2h-status-additions' of github.com:holochain/holocha…
maackle Jan 2, 2020
b070760
Add debug_dump flag to sim2h_server
maackle Jan 2, 2020
b538e7d
Merge remote-tracking branch 'origin/develop' into sim2h-status-addit…
maackle Jan 2, 2020
feac4d8
fmt
maackle Jan 2, 2020
608701d
Change Job -> Client, beginnings of sim2h_walkman
maackle Dec 30, 2019
3c05ea1
Hook up walkman sim2h logging
maackle Dec 31, 2019
237e20b
Almost there
maackle Dec 31, 2019
c114c9d
Separate sim2h_client crate, more hookup
maackle Dec 31, 2019
9adb032
fmt
maackle Dec 31, 2019
8f6c14d
First pass at cassette playback
maackle Dec 31, 2019
f398187
Use SignedWireMessage, bypassing client agent identity
maackle Dec 31, 2019
3d56bb6
Add cassette compile command
maackle Dec 31, 2019
763470d
Can playback cassette correctly. But is it useful?
maackle Jan 1, 2020
c98953a
README
maackle Jan 1, 2020
5cce189
Add installation blip
maackle Jan 1, 2020
4ebb89c
README
maackle Jan 1, 2020
18fa7f6
Update alpha1 -> alpha2
maackle Jan 2, 2020
a9c6996
WIP write an integration test using DebugData
maackle Jan 2, 2020
659250c
Merge remote-tracking branch 'origin/develop' into sim2h-walkman
maackle Jan 24, 2020
be0cc05
Post-merge fixups
maackle Jan 24, 2020
e56fae3
fmt
maackle Jan 24, 2020
4152af9
Add build info to README
maackle Jan 24, 2020
9279840
Update README.md
maackle Jan 24, 2020
ebf8e5b
Update README.md
maackle Jan 24, 2020
e0dff9c
Use delays informed by timestamps of recorded events during playback
maackle Jan 25, 2020
3170995
Merge branch 'sim2h-walkman' of github.com:holochain/holochain-rust i…
maackle Jan 25, 2020
c7fbce3
Merge branch 'develop' of https://github.com/holochain/holochain-rust…
freesig Jan 28, 2020
55f13a2
fix unused import
freesig Jan 28, 2020
160c449
add back in tracing
freesig Jan 28, 2020
37a5a50
format code
freesig Jan 28, 2020
86e412a
Merge branch 'develop' of https://github.com/holochain/holochain-rust…
freesig Jan 30, 2020
03463c4
add compression
freesig Jan 30, 2020
b5de6e8
Merge branch 'develop' of https://github.com/holochain/holochain-rust…
freesig Jan 30, 2020
8506dc6
lazy env check
freesig Jan 30, 2020
aef5e0a
Fix clippy
maackle Jan 31, 2020
8cb865b
Remove get_debug_data test
maackle Jan 31, 2020
057ac49
Merge remote-tracking branch 'origin/develop' into sim2h-walkman
maackle Feb 4, 2020
4ac173e
POC do reconnect on disconnect, collab with @freesig
maackle Feb 4, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ node_modules
package-lock.json
**/*.rs.bk
*.log
*.cassette
net_ipc/tests/node-p2p-ipc
build_docs_key
*.orig
Expand Down
437 changes: 274 additions & 163 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ members = [
"crates/trycp_server",
"crates/sim1h",
"crates/sim2h",
"crates/sim2h_client",
"crates/sim2h_server",
"crates/walkman_cli",
"crates/walkman_types",
"crates/wasm_utils"
]
exclude = [
Expand Down
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ holochain_locksmith = { version = "=0.0.42-alpha5", path = "../locksmith" }
holochain_tracing_macros = "0.0.14"
newrelic="0.2"
sim2h = { version = "=0.0.42-alpha5", path = "../sim2h" }
sim2h_client = { version = "=0.0.42-alpha5", path = "../sim2h_client" }
lib3h_crypto_api = "=0.0.31"
in_stream = { version = "=0.0.42-alpha5", path = "../in_stream" }
url2 = "=0.0.4"
Expand Down
2 changes: 2 additions & 0 deletions crates/cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod package;
pub mod run;
mod sim2h_client;
pub mod test;
mod walkman;

pub use self::{
chain_log::{chain_list, chain_log},
Expand All @@ -18,4 +19,5 @@ pub use self::{
run::{get_interface_type_string, hc_run_bundle_configuration, hc_run_configuration, run},
sim2h_client::sim2h_client,
test::{test, TEST_DIR_NAME},
walkman::walkman,
};
113 changes: 5 additions & 108 deletions crates/cli/src/cli/sim2h_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ use crate::NEW_RELIC_LICENSE_KEY;
use dns_lookup::lookup_host;
use in_stream::*;
use lib3h_crypto_api::CryptoSystem;
use lib3h_protocol::data_types::*;
use lib3h_sodium::SodiumCryptoSystem;
use sim2h::{
crypto::{Provenance, SignedWireMessage},
WireMessage, WIRE_VERSION,
};
use std::sync::{Arc, Mutex};
use sim2h::{WireMessage, WIRE_VERSION};
use sim2h_client::Sim2hClient;
use url2::prelude::*;

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CLI)]
Expand Down Expand Up @@ -36,8 +32,8 @@ pub fn sim2h_client(url_string: String, message_string: String) -> Result<(), St
let url = Url2::parse(format!("{}://{}:{}", url.scheme(), ip, maybe_port.unwrap()));

println!("connecting to: {}", url);
let mut job = Job::new(&url)?;
job.send_wire(match message_string.as_ref() {
let mut client = Sim2hClient::new(&url)?;
client.send_wire(match message_string.as_ref() {
"ping" => WireMessage::Ping,
"hello" => WireMessage::Hello(WIRE_VERSION),
"status" => WireMessage::Status,
Expand All @@ -54,7 +50,7 @@ pub fn sim2h_client(url_string: String, message_string: String) -> Result<(), St
loop {
std::thread::sleep(std::time::Duration::from_millis(10));
let mut frame = WsFrame::default();
match job.connection.read(&mut frame) {
match client.connection().read(&mut frame) {
Ok(_) => {
if let WsFrame::Binary(b) = frame {
let msg: WireMessage = serde_json::from_slice(&b).unwrap();
Expand All @@ -78,102 +74,3 @@ pub fn sim2h_client(url_string: String, message_string: String) -> Result<(), St
thread_local! {
pub static CRYPTO: Box<dyn CryptoSystem> = Box::new(SodiumCryptoSystem::new());
}
struct Job {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this section is moved or if that's what we want @maackle ?

agent_id: String,
#[allow(dead_code)]
pub_key: Arc<Mutex<Box<dyn lib3h_crypto_api::Buffer>>>,
sec_key: Arc<Mutex<Box<dyn lib3h_crypto_api::Buffer>>>,
connection: InStreamWss<InStreamTcp>,
// wss_connection: InStreamWss<InStreamTls<InStreamTcp>>,
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CLI)]
impl Job {
pub fn new(connect_uri: &Url2) -> Result<Self, String> {
let (pub_key, sec_key) = CRYPTO.with(|crypto| {
let mut pub_key = crypto.buf_new_insecure(crypto.sign_public_key_bytes());
let mut sec_key = crypto.buf_new_secure(crypto.sign_secret_key_bytes());
crypto.sign_keypair(&mut pub_key, &mut sec_key).unwrap();
(pub_key, sec_key)
});
let enc = hcid::HcidEncoding::with_kind("hcs0").map_err(|e| format!("{}", e))?;
let agent_id = enc.encode(&*pub_key).unwrap();
println!("Generated agent id: {}", agent_id);
let connection = await_in_stream_connect(connect_uri)
.map_err(|e| format!("Error awaiting connection: {}", e))?;
println!("Await successfull");
let out = Self {
agent_id,
pub_key: Arc::new(Mutex::new(pub_key)),
sec_key: Arc::new(Mutex::new(sec_key)),
connection,
};

Ok(out)
}

/// sign a message and send it to sim2h
pub fn send_wire(&mut self, message: WireMessage) {
println!("Sending wire message to sim2h: {:?}", message);
let payload: Opaque = message.into();
let payload_buf: Box<dyn lib3h_crypto_api::Buffer> = Box::new(payload.clone().as_bytes());
let sig = base64::encode(
&*CRYPTO
.with(|crypto| {
let mut sig = crypto.buf_new_insecure(crypto.sign_bytes());
crypto
.sign(&mut sig, &payload_buf, &*self.sec_key.lock().unwrap())
.unwrap();
sig
})
.read_lock(),
);
let signed_message = SignedWireMessage {
provenance: Provenance::new(self.agent_id.clone().into(), sig.into()),
payload,
};
let to_send: Opaque = signed_message.into();
self.connection.write(to_send.as_bytes().into()).unwrap();
}
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CLI)]
fn await_in_stream_connect(connect_uri: &Url2) -> Result<InStreamWss<InStreamTcp>, String> {
let timeout = std::time::Instant::now()
.checked_add(std::time::Duration::from_millis(60000))
.unwrap();

let mut read_frame = WsFrame::default();

// keep trying to connect
loop {
// let config = WssConnectConfig::new(TlsConnectConfig::new(TcpConnectConfig::default()));
let config = WssConnectConfig::new(TcpConnectConfig::default());
let mut connection =
InStreamWss::connect(connect_uri, config).map_err(|e| format!("{}", e))?;
connection.write(WsFrame::Ping(b"".to_vec())).unwrap();

loop {
let mut err = false;
match connection.read(&mut read_frame) {
Ok(_) => return Ok(connection),
Err(e) if e.would_block() => (),
Err(_) => {
err = true;
}
}

if std::time::Instant::now() >= timeout {
Err("could not connect within timeout".to_string())?
}

if err {
break;
}

std::thread::sleep(std::time::Duration::from_millis(10));
}

std::thread::sleep(std::time::Duration::from_millis(500));
}
}
6 changes: 6 additions & 0 deletions crates/cli/src/cli/walkman.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use std::path::PathBuf;
// use crate::cli::sim2h_client::Sim2hClient;

pub fn walkman(_cassette: PathBuf) {
println!("run walkman");
}
13 changes: 10 additions & 3 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ extern crate holochain_persistence_api;
extern crate holochain_persistence_file;
extern crate json_patch;
extern crate lib3h_crypto_api;
extern crate lib3h_protocol;
extern crate lib3h_sodium;
extern crate sim2h;
extern crate sim2h_client;
extern crate structopt;
#[macro_use]
extern crate failure;
Expand Down Expand Up @@ -164,6 +164,11 @@ enum Cli {
/// message to send to the sim2h server ('ping' or 'status')
message: String,
},
Walkman {
#[structopt()]
/// Path to walkman cassette file for playback
cassette: PathBuf,
},
}
arg_enum! {
#[derive(Debug)]
Expand Down Expand Up @@ -330,10 +335,12 @@ fn run() -> HolochainResult<()> {
}

Cli::Sim2hClient { url, message } => {
println!("url: {}", &url);
println!("message: {}", &message);
cli::sim2h_client(url, message)?;
}

Cli::Walkman { cassette } => {
cli::walkman(cassette);
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/metrics/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl LoggerMetricPublisher {
impl MetricPublisher for LoggerMetricPublisher {
fn publish(&mut self, metric: &Metric) {
let log_line: LogLine = metric.into();
debug!("{}", log_line);
trace!("{}", log_line);
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sim1h/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ holochain_tracing = "=0.0.13"
uuid = { version = "0.4", features = ["v4"] }
log = "0.4.8"
env_logger = "=0.6.1"
lazy_static = "1.2.0"
lazy_static = "=1.4.0"
holochain_persistence_api = "=0.0.13"
holochain_json_api = "=0.0.17"
url = "=2.1.0"
Expand Down
6 changes: 5 additions & 1 deletion crates/sim2h/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ holochain_tracing = "=0.0.13"
holochain_core_types = { version = "=0.0.42-alpha5", path = "../core_types" }
holochain_locksmith = { version = "=0.0.42-alpha5", path = "../locksmith" }
holochain_metrics = { version = "=0.0.42-alpha5", path = "../metrics" }
holochain_walkman_types = { version = "=0.0.42-alpha5", path = "../walkman_types" }
holochain_common = { version = "=0.0.42-alpha5", path = "../common" }
holochain_tracing_macros = "0.0.14"
in_stream = { version = "=0.0.42-alpha5", path = "../in_stream" }
Expand All @@ -52,5 +53,8 @@ native-tls = "=0.2.3"
openssl = "=0.10.25"
tungstenite = "=0.9.2"
threadpool = "=1.7.1"
chashmap = "=2.2.2"
newrelic="0.2"

[dev-dependencies]
sim2h_client = { version = "=0.0.42-alpha5", path = "../sim2h_client" }
chashmap = "=2.2.2"
13 changes: 13 additions & 0 deletions crates/sim2h/src/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DebugData {
pub limbo: DebugLimboData,
pub msg_queue_size: usize,
pub wss_queue_size: usize,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DebugLimboData {
pub total_connections: usize,
pub total_messages: usize,
pub max_messages: usize,
}
33 changes: 27 additions & 6 deletions crates/sim2h/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern crate num_cpus;
extern crate serde;
#[macro_use]
extern crate lazy_static;
extern crate holochain_walkman_types;
extern crate newrelic;

#[macro_use]
Expand All @@ -21,6 +22,7 @@ mod naive_sharding;
pub mod cache;
pub mod connection_state;
pub mod crypto;
pub mod debug;
pub mod error;
use lib3h_protocol::types::{AgentPubKey, AspectHash, EntryHash};
mod message_log;
Expand All @@ -31,6 +33,11 @@ pub use crate::message_log::MESSAGE_LOGGER;
use crate::{crypto::*, error::*, naive_sharding::entry_location};
use cache::*;
use connection_state::*;
use holochain_locksmith::Mutex;
use holochain_metrics::{config::MetricPublisherConfig, Metric};
use holochain_walkman_types::{walkman_log_sim2h, WalkmanSim2hEvent};
use in_stream::*;
use lib3h::rrdht_util::*;
use lib3h_crypto_api::CryptoSystem;
use lib3h_protocol::{
data_types::{
Expand All @@ -50,21 +57,29 @@ use futures::{
future::{BoxFuture, FutureExt},
stream::StreamExt,
};
use in_stream::*;
use log::*;
use rand::{seq::SliceRandom, thread_rng};
use std::{
collections::{HashMap, HashSet},
convert::TryFrom,
sync::Arc,
time::Instant,
};

use holochain_locksmith::Mutex;
use holochain_metrics::{config::MetricPublisherConfig, Metric};
lazy_static! {
static ref HOLOCHAIN_WALKMAN_SIM2H: bool = std::env::var("HOLOCHAIN_WALKMAN_SIM2H").is_ok();
}

/// if we can't acquire a lock in 20 seconds, panic!
const MAX_LOCK_TIMEOUT: u64 = 20000;

fn walkman_log<F: FnOnce() -> WalkmanSim2hEvent>(event: F) {
if *HOLOCHAIN_WALKMAN_SIM2H {
let log_item = event();
let json =
serde_json::to_string(&walkman_log_sim2h(log_item)).expect("Serialized walkman event");
println!("<walkman>{}</walkman>", json);
}
}
//set up license_key
new_relic_setup!("NEW_RELIC_LICENSE_KEY");

Expand Down Expand Up @@ -99,7 +114,6 @@ impl<T> SendExt<T> for crossbeam_channel::Sender<T> {
}
}
}

const RETRY_FETCH_MISSING_ASPECTS_INTERVAL_MS: u64 = 30000; // 30 seconds

fn conn_lifecycle(desc: &str, uuid: &str, obj: &ConnectionState, uri: &Lib3hUri) {
Expand Down Expand Up @@ -405,7 +419,7 @@ pub struct Sim2h {
connection_mgr_evt_recv: ConnectionMgrEventRecv,
num_ticks: u64,
/// when should we try to resync nodes that are still missing aspect data
missing_aspects_resync: std::time::Instant,
missing_aspects_resync: Instant,
dht_algorithm: DhtAlgorithm,
recv_com: tokio::sync::mpsc::UnboundedReceiver<Sim2hCom>,
sim2h_handle: Sim2hHandle,
Expand Down Expand Up @@ -502,6 +516,7 @@ impl Sim2h {
.connection_states
.insert(url.clone(), (nanoid::simple(), ConnectionState::new()));

walkman_log(|| WalkmanSim2hEvent::Connect(url.to_string()));
state.connection_mgr.connect(url, wss);
}
});
Expand Down Expand Up @@ -806,6 +821,12 @@ impl Sim2h {
Ok((signed_message.provenance.source().into(), wire_message))
})() {
Ok((source, wire_message)) => {
walkman_log(|| {
let signed_message = SignedWireMessage::try_from(payload.clone()).unwrap();
let msg_serialized = serde_json::to_string(&signed_message)
.expect("SignedWireMessage serialized");
WalkmanSim2hEvent::Message(url.to_string(), msg_serialized)
});
sim2h_handle.handle_message(url, wire_message, source)
}
Err(error) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/sim2h/src/sim2h_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::*;
use lib3h::rrdht_util::*;

pub(crate) struct Sim2hState {
pub(crate) crypto: Box<dyn CryptoSystem>,
Expand Down Expand Up @@ -86,6 +85,7 @@ impl Sim2hState {
}
}
}
walkman_log(|| WalkmanSim2hEvent::Disconnect(uri.0.to_string()));
trace!("disconnect done");
}

Expand Down
Loading