-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathssl.rs
More file actions
157 lines (137 loc) · 6.1 KB
/
ssl.rs
File metadata and controls
157 lines (137 loc) · 6.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::path::PathBuf;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use futures_time::future::FutureExt;
use log::{error, info, warn};
use rcgen::{BasicConstraints, CertificateParams, DnType, DnValue, IsCa, Issuer, KeyPair, KeyUsagePurpose, SanType, PKCS_ECDSA_P256_SHA256};
use shvbroker::brokerimpl::{BrokerImpl, LastLogin, run_broker};
use shvbroker::config::{BrokerConfig, Listen};
use shvclient::clientapi::{RpcCallDirExists, RpcCallDirList};
use shvclient::{ClientCommandSender, ClientEvent, ClientEventsReceiver};
use shvrpc::client::ClientConfig;
use futures::channel::mpsc::unbounded;
use tempfile::TempDir;
use url::Url;
const BROKER_ADDRESS: &str = "localhost:37568";
async fn start_broker(broker_config: BrokerConfig, broker_address: &str) {
let access_config = broker_config.access.clone();
let broker_config = Arc::new(broker_config);
let (sender, reciever) = unbounded();
shvclient::runtime::spawn_task(async {
run_broker(BrokerImpl::new(broker_config, access_config, LastLogin::default(), sender, None), reciever)
.await
.expect("broker accept_loop failed");
}).detach();
// Wait for the broker
let start = std::time::Instant::now();
while start.elapsed() < std::time::Duration::from_secs(5) {
if smol::net::TcpStream::connect(broker_address).await.is_ok() {
return;
}
smol::Timer::after(std::time::Duration::from_millis(200)).await;
}
panic!("Could not start the broker");
}
async fn start_client(ca_crt_path: impl Into<String>) -> Option<(ClientCommandSender, ClientEventsReceiver)> {
let (tx, rx) = futures::channel::oneshot::channel();
let ca_crt_path = ca_crt_path.into();
shvclient::runtime::spawn_task(async move {
let client_config = ClientConfig {
url: Url::parse(&format!("ssl://admin:admin@{BROKER_ADDRESS}?ca={ca_crt_path}")).expect("Url must be correct"),
device_id: None,
mount: None,
heartbeat_interval: Duration::from_mins(1),
reconnect_interval: None,
};
shvclient::client::Client::new_plain()
.run_with_init(&client_config, |commands_tx, events_rx| {
tx.send((commands_tx, events_rx))
.unwrap_or_else(|(commands_tx, _)| {
warn!("Client channels dropped before handed to the caller. Terminating the client");
commands_tx.terminate_client();
});
})
.await
.unwrap_or_else(|e| error!("Client finished with error: {e}"));
}
).detach();
rx.await.ok()
}
static TEST_TEMP_DIR: LazyLock<TempDir> = LazyLock::new(|| {
TempDir::new().expect("failed to create global test tempdir")
});
/// Returns: (root_ca_path, server_cert_path, server_key_path)
fn generate_test_cert_files() -> anyhow::Result<(PathBuf, PathBuf, PathBuf)> {
let mut ca_params = CertificateParams::default();
ca_params.distinguished_name.push(DnType::CommonName, "Local Test Root");
ca_params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
ca_params.key_usages = vec![
KeyUsagePurpose::KeyCertSign,
KeyUsagePurpose::CrlSign,
];
let ca_key = KeyPair::generate_for(&PKCS_ECDSA_P256_SHA256)?;
let ca_cert = ca_params.self_signed(&ca_key)?;
let issuer = Issuer::from_params(&ca_params, &ca_key);
let server_key = KeyPair::generate_for(&PKCS_ECDSA_P256_SHA256)?;
let mut server_params = CertificateParams::new(vec!["localhost".to_string()])?;
server_params.distinguished_name.push(
DnType::CommonName,
DnValue::Utf8String("localhost".to_string()),
);
server_params.subject_alt_names = vec![
SanType::DnsName("localhost".try_into()?),
SanType::IpAddress(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)),
];
// Issue server cert signed by our CA (using the server public key and the issuer)
let server_cert = server_params.signed_by(&server_key, &issuer)?;
let ca_crt_path = TEST_TEMP_DIR.path().join("ca.crt");
let server_crt_path = TEST_TEMP_DIR.path().join("server.crt");
let server_key_path = TEST_TEMP_DIR.path().join("server.key");
std::fs::write(&ca_crt_path, ca_cert.pem())?;
std::fs::write(&server_crt_path, server_cert.pem())?;
std::fs::write(&server_key_path, server_key.serialize_pem())?;
Ok((ca_crt_path, server_crt_path, server_key_path))
}
fn create_broker_config(cert: &str, key: &str) -> BrokerConfig {
BrokerConfig {
listen: vec![
Listen { url: Url::parse(&format!("ssl://{BROKER_ADDRESS}?cert={cert}&key={key}")).expect("Url must be correct") },
],
..Default::default()
}
}
#[test]
fn ssl() {
shvclient::runtime::block_on(async {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.init()
.unwrap();
let (ca_crt_path, server_crt_path, server_key_path) = generate_test_cert_files()
.expect("Cannot generate test certificates");
let broker_config = create_broker_config(server_crt_path.to_str().unwrap(), server_key_path.to_str().unwrap());
start_broker(broker_config, BROKER_ADDRESS).await;
let (client_cmd, mut client_events) = start_client(ca_crt_path.to_str().unwrap())
.await
.expect("Client start");
match client_events
.wait_for_event()
.timeout(futures_time::time::Duration::from_secs(5))
.await {
Ok(Ok(ClientEvent::Connected(..))) => { },
Ok(_) => panic!("Client connection to broker error"),
Err(err) => panic!("Client connection to broker timed out: {err}"),
}
let res = RpcCallDirList::new(".app")
.timeout(Duration::from_secs(3))
.exec_full(&client_cmd)
.await;
info!(".app:dir:\n{res:?}");
assert!(!res.unwrap().is_empty());
let res = RpcCallDirExists::new(".broker/currentClient", "subscriptions")
.timeout(Duration::from_secs(3))
.exec(&client_cmd)
.await;
assert!(res.unwrap());
});
}