-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.rs
122 lines (99 loc) · 3.32 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::node::Node;
/// This modules implements the most basic form of distributed system, a single node server that handles
/// client requests to a key/value store. There is no replication and this no fault-tolerance.
use clap::Parser;
use lib::network::Receiver;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::task::JoinHandle;
mod node;
pub const CHANNEL_CAPACITY: usize = 1_000;
#[derive(Parser)]
#[clap(author, version, about)]
struct Cli {
/// The network port of the node where to send txs.
#[clap(short, long, value_parser, value_name = "UINT", default_value_t = 6100)]
port: u16,
/// The network address of the node where to send txs.
#[clap(short, long, value_parser, value_name = "UINT", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
address: IpAddr,
}
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let cli = Cli::parse();
log::info!("Node socket: {}:{}", cli.address, cli.port);
simple_logger::SimpleLogger::new().env().init().unwrap();
let address = SocketAddr::new(cli.address, cli.port);
let node = Node::new();
let (network_handle, _) = spawn_node_tasks(address, node).await;
network_handle.await.unwrap();
}
async fn spawn_node_tasks(
client_address: SocketAddr,
mut node: Node,
) -> (JoinHandle<()>, JoinHandle<()>) {
// listen for client command tcp connections
let (client_tcp_receiver, client_channel_receiver) = Receiver::new(client_address);
let client_handle = tokio::spawn(async move {
client_tcp_receiver.run().await;
});
let node_handle = tokio::spawn(async move {
node.run(client_channel_receiver).await;
});
(node_handle, client_handle)
}
#[cfg(test)]
mod tests {
use super::*;
use lib::command::ClientCommand;
use std::fs;
use tokio::time::{sleep, Duration};
#[tokio::test(flavor = "multi_thread")]
async fn test_server() {
fs::remove_dir_all(".db_single_node").unwrap_or_default();
let address: SocketAddr = "127.0.0.1:6182".parse().unwrap();
let node = Node::new();
spawn_node_tasks(address, node).await;
sleep(Duration::from_millis(10)).await;
let reply = ClientCommand::Get {
key: "k1".to_string(),
}
.send_to(address)
.await
.unwrap();
assert!(reply.is_none());
let reply = ClientCommand::Set {
key: "k1".to_string(),
value: "v1".to_string(),
}
.send_to(address)
.await
.unwrap();
assert!(reply.is_some());
assert_eq!("v1".to_string(), reply.unwrap());
let reply = ClientCommand::Get {
key: "k1".to_string(),
}
.send_to(address)
.await
.unwrap();
assert!(reply.is_some());
assert_eq!("v1".to_string(), reply.unwrap());
let reply = ClientCommand::Set {
key: "k1".to_string(),
value: "v2".to_string(),
}
.send_to(address)
.await
.unwrap();
assert!(reply.is_some());
assert_eq!("v2".to_string(), reply.unwrap());
let reply = ClientCommand::Get {
key: "k1".to_string(),
}
.send_to(address)
.await
.unwrap();
assert!(reply.is_some());
assert_eq!("v2".to_string(), reply.unwrap());
}
}