Skip to content

Commit ae18c75

Browse files
committed
Move HTTP client into separate async task and communicate over channels
1 parent 9f506dc commit ae18c75

5 files changed

Lines changed: 74 additions & 19 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "thirtyfour"
3-
version = "0.21.0"
3+
version = "0.22.0"
44
authors = ["Steve Pryde <steve@stevepryde.com>"]
55
edition = "2018"
66
license = "MIT OR Apache-2.0"

src/session.rs

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,65 @@
11
use crate::common::command::FormatRequestData;
22
use crate::common::config::WebDriverConfig;
3-
use crate::error::WebDriverResult;
3+
use crate::error::{WebDriverError, WebDriverResult};
44
use crate::http::connection_async::WebDriverHttpClientAsync;
55
use crate::webdrivercommands::WebDriverCommands;
6-
use crate::SessionId;
6+
use crate::{RequestData, SessionId};
77
use async_trait::async_trait;
8-
use std::sync::Arc;
8+
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
9+
use futures::channel::oneshot;
10+
use futures::SinkExt;
11+
use futures::StreamExt;
12+
13+
#[derive(Debug)]
14+
pub enum SessionMessage {
15+
Request(RequestData, oneshot::Sender<WebDriverResult<serde_json::Value>>),
16+
}
17+
18+
pub fn spawn_session_task(
19+
conn: Box<dyn WebDriverHttpClientAsync>,
20+
) -> UnboundedSender<SessionMessage> {
21+
let (tx, rx) = unbounded();
22+
23+
#[cfg(feature = "async-std-runtime")]
24+
{
25+
async_std::task::spawn(session_runner(rx, conn));
26+
}
27+
28+
#[cfg(all(feature = "tokio-runtime", not(feature = "async-std-runtime")))]
29+
{
30+
tokio::spawn(session_runner(rx, conn));
31+
}
32+
33+
tx
34+
}
35+
36+
async fn session_runner(
37+
mut rx: UnboundedReceiver<SessionMessage>,
38+
conn: Box<dyn WebDriverHttpClientAsync>,
39+
) {
40+
// This will return None when the sender hangs up.
41+
while let Some(msg) = rx.next().await {
42+
match msg {
43+
SessionMessage::Request(data, tx) => {
44+
let ret = conn.execute(data).await;
45+
tx.send(ret).expect("Failed to send response");
46+
}
47+
}
48+
}
49+
}
950

1051
#[derive(Debug)]
1152
pub struct WebDriverSession {
1253
session_id: SessionId,
13-
conn: Arc<dyn WebDriverHttpClientAsync>,
54+
tx: UnboundedSender<SessionMessage>,
1455
config: WebDriverConfig,
1556
}
1657

1758
impl WebDriverSession {
18-
pub fn new(session_id: SessionId, conn: Arc<dyn WebDriverHttpClientAsync>) -> Self {
59+
pub fn new(session_id: SessionId, tx: UnboundedSender<SessionMessage>) -> Self {
1960
Self {
2061
session_id,
21-
conn,
62+
tx,
2263
config: WebDriverConfig::new(),
2364
}
2465
}
@@ -39,7 +80,21 @@ impl WebDriverSession {
3980
&self,
4081
request: Box<dyn FormatRequestData + Send + Sync>,
4182
) -> WebDriverResult<serde_json::Value> {
42-
self.conn.execute(request.format_request(&self.session_id)).await
83+
let (ret_tx, ret_rx) = oneshot::channel();
84+
self.tx
85+
.clone()
86+
.send(SessionMessage::Request(request.format_request(&self.session_id), ret_tx))
87+
.await
88+
.map_err(|e| {
89+
WebDriverError::UnknownResponse(format!("Failed to send request to server: {}", e))
90+
})?;
91+
92+
match ret_rx.await {
93+
Ok(x) => x,
94+
Err(oneshot::Canceled) => Err(WebDriverError::UnknownResponse(
95+
"Failed to get response from server".to_string(),
96+
)),
97+
}
4398
}
4499
}
45100

src/support.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ where
1313
rt.block_on(future)
1414
}
1515

16-
#[cfg(all(feature = "async-std-runtime"))]
16+
#[cfg(feature = "async-std-runtime")]
1717
pub fn block_on<F, T>(future: F) -> WebDriverResult<T>
1818
where
1919
F: Future<Output = WebDriverResult<T>>,
@@ -26,7 +26,7 @@ pub async fn sleep(duration: std::time::Duration) {
2626
tokio::time::delay_for(duration).await;
2727
}
2828

29-
#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))]
29+
#[cfg(feature = "async-std-runtime")]
3030
pub async fn sleep(duration: std::time::Duration) {
3131
async_std::task::sleep(duration).await;
3232
}

src/webdriver.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
use std::marker::PhantomData;
2-
use std::sync::Arc;
3-
41
use async_trait::async_trait;
5-
use futures::executor::block_on;
62
use log::error;
73
use serde::Serialize;
84
use serde_json::Value;
5+
use std::marker::PhantomData;
96

107
use crate::common::config::WebDriverConfig;
118
use crate::http::connection_async::WebDriverHttpClientAsync;
@@ -15,11 +12,13 @@ use crate::http::nulldriver_async::NullDriverAsync;
1512
use crate::http::reqwest_async::ReqwestDriverAsync;
1613
#[cfg(feature = "async-std-runtime")]
1714
use crate::http::surf_async::SurfDriverAsync;
15+
use crate::session::spawn_session_task;
1816
use crate::webdrivercommands::{start_session, WebDriverCommands};
1917
use crate::{
2018
common::command::Command, error::WebDriverResult, session::WebDriverSession,
2119
DesiredCapabilities, SessionId,
2220
};
21+
use futures::executor::block_on;
2322

2423
#[cfg(not(any(feature = "tokio-runtime", feature = "async-std-runtime")))]
2524
/// The WebDriver struct represents a browser session.
@@ -105,10 +104,12 @@ where
105104
where
106105
C: Serialize,
107106
{
108-
let conn = Arc::new(T::create(remote_server_addr)?);
109-
let (session_id, session_capabilities) = start_session(conn.clone(), capabilities).await?;
107+
let conn = T::create(remote_server_addr)?;
108+
let (session_id, session_capabilities) = start_session(&conn, capabilities).await?;
109+
let tx = spawn_session_task(Box::new(conn));
110+
110111
let driver = GenericWebDriver {
111-
session: WebDriverSession::new(session_id, conn),
112+
session: WebDriverSession::new(session_id, tx),
112113
capabilities: session_capabilities,
113114
quit_on_drop: true,
114115
phantom: PhantomData,

src/webdrivercommands.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
22
use std::path::Path;
3-
use std::sync::Arc;
43
use std::time::Duration;
54

65
#[cfg(feature = "async-std-runtime")]
@@ -29,7 +28,7 @@ use crate::{
2928
/// Start a new WebDriver session, returning the session id and the
3029
/// capabilities JSON that was received back from the server.
3130
pub async fn start_session<C>(
32-
conn: Arc<dyn WebDriverHttpClientAsync>,
31+
conn: &dyn WebDriverHttpClientAsync,
3332
capabilities: C,
3433
) -> WebDriverResult<(SessionId, serde_json::Value)>
3534
where

0 commit comments

Comments
 (0)