-
Notifications
You must be signed in to change notification settings - Fork 106
Expand file tree
/
Copy pathautobahn_client.rs
More file actions
126 lines (108 loc) · 3.27 KB
/
Copy pathautobahn_client.rs
File metadata and controls
126 lines (108 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2023 Divy Srivastava <dj.srivastava23@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use anyhow::Result;
use bytes::Bytes;
use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use http_body_util::Empty;
use hyper::header::CONNECTION;
use hyper::header::UPGRADE;
use hyper::upgrade::Upgraded;
use hyper::Request;
#[cfg(not(feature = "futures"))]
use hyper_util::rt::TokioIo as IoWrapper;
#[cfg(not(feature = "futures"))]
use tokio::net::TcpStream;
#[cfg(feature = "futures")]
use async_std::net::TcpStream;
#[cfg(feature = "futures")]
use fastwebsockets::FuturesIo as IoWrapper;
struct SpawnExecutor;
impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn(fut);
}
}
async fn connect(path: &str) -> Result<FragmentCollector<IoWrapper<Upgraded>>> {
let stream = TcpStream::connect("localhost:9001").await?;
let req = Request::builder()
.method("GET")
.uri(format!("http://localhost:9001/{}", path))
.header("Host", "localhost:9001")
.header(UPGRADE, "websocket")
.header(CONNECTION, "upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(Empty::<Bytes>::new())?;
let (ws, _) =
fastwebsockets::handshake::client(&SpawnExecutor, req, stream).await?;
Ok(FragmentCollector::new(ws))
}
async fn get_case_count() -> Result<u32> {
let mut ws = connect("getCaseCount").await?;
let msg = ws.read_frame().await?;
ws.write_frame(Frame::close(1000, &[])).await?;
Ok(std::str::from_utf8(&msg.payload)?.parse()?)
}
macro_rules! runtime_main {
($($body:tt)*) => {
#[cfg(feature = "futures")]
#[async_std::main]
$($body)*
#[cfg(not(feature = "futures"))]
#[tokio::main]
$($body)*
};
}
runtime_main! {
async fn main() -> Result<()> {
let count = get_case_count().await?;
for case in 1..=count {
let mut ws =
connect(&format!("runCase?case={}&agent=fastwebsockets", case)).await?;
loop {
let msg = match ws.read_frame().await {
Ok(msg) => msg,
Err(e) => {
println!("Error: {}", e);
ws.write_frame(Frame::close_raw(vec![].into())).await?;
break;
}
};
match msg.opcode {
OpCode::Text | OpCode::Binary => {
ws.write_frame(Frame::new(true, msg.opcode, None, msg.payload))
.await?;
}
OpCode::Close => {
break;
}
_ => {}
}
}
}
let mut ws = connect("updateReports?agent=fastwebsockets").await?;
ws.write_frame(Frame::close(1000, &[])).await?;
Ok(())
}
}