Skip to content

Commit

Permalink
chore(ext/node): use BufView natively in http2 (#21688)
Browse files Browse the repository at this point in the history
Node HTTP/2 was using the default h2 `Bytes` datatype when we can be
making using of `BufView` like we do in `Deno.serve`.

`fetch` and `Deno.serverHttp` can't make use of `BufView` because they
are using `reqwest` which is stuck on hyper 0.x at this time.
  • Loading branch information
mmastrac authored Dec 23, 2023
1 parent 36536c7 commit 1297c9a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cli/ops/jupyter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn op_jupyter_broadcast(
.new_message(&message_type)
.with_content(content)
.with_metadata(metadata)
.with_buffers(buffers.into_iter().map(|b| b.into()).collect())
.with_buffers(buffers.into_iter().map(|b| b.to_vec().into()).collect())
.send(&mut *iopub_socket.lock().await)
.await?;
}
Expand Down
7 changes: 4 additions & 3 deletions ext/fetch/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
Expand Down Expand Up @@ -233,7 +234,7 @@ unsafe impl Send for ResourceToBodyAdapter {}
unsafe impl Sync for ResourceToBodyAdapter {}

impl Stream for ResourceToBodyAdapter {
type Item = Result<BufView, Error>;
type Item = Result<Bytes, Error>;

fn poll_next(
self: Pin<&mut Self>,
Expand All @@ -250,9 +251,9 @@ impl Stream for ResourceToBodyAdapter {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
Ok(_) => {
this.1 = Some(this.0.clone().read(64 * 1024));
Poll::Ready(Some(res))
Poll::Ready(Some(res.map(|b| b.to_vec().into())))
}
_ => Poll::Ready(Some(res)),
_ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))),
},
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ fn http_response(
Some(data) => {
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
Ok((HttpResponseWriter::Closed, Bytes::from(data).into()))
Ok((HttpResponseWriter::Closed, data.to_vec().into()))
}
None if compressing => {
// Create a one way pipe that implements tokio's async io traits. To do
Expand Down Expand Up @@ -881,7 +881,7 @@ async fn op_http_write_resource(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(view);
let bytes = view.to_vec().into();
if let Err(err) = body.sender().send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
Expand Down Expand Up @@ -930,7 +930,7 @@ async fn op_http_write(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(buf);
let bytes = Bytes::from(buf.to_vec());
match body.sender().send_data(bytes).await {
Ok(_) => Ok(()),
Err(err) => {
Expand Down
20 changes: 11 additions & 9 deletions ext/node/ops/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use deno_core::futures::future::poll_fn;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
Expand All @@ -34,7 +35,7 @@ use reqwest::header::HeaderValue;
use url::Url;

pub struct Http2Client {
pub client: AsyncRefCell<h2::client::SendRequest<Bytes>>,
pub client: AsyncRefCell<h2::client::SendRequest<BufView>>,
pub url: Url,
}

Expand All @@ -46,7 +47,7 @@ impl Resource for Http2Client {

#[derive(Debug)]
pub struct Http2ClientConn {
pub conn: AsyncRefCell<h2::client::Connection<NetworkStream>>,
pub conn: AsyncRefCell<h2::client::Connection<NetworkStream, BufView>>,
cancel_handle: CancelHandle,
}

Expand All @@ -63,7 +64,7 @@ impl Resource for Http2ClientConn {
#[derive(Debug)]
pub struct Http2ClientStream {
pub response: AsyncRefCell<h2::client::ResponseFuture>,
pub stream: AsyncRefCell<h2::SendStream<Bytes>>,
pub stream: AsyncRefCell<h2::SendStream<BufView>>,
}

impl Resource for Http2ClientStream {
Expand All @@ -89,7 +90,7 @@ impl Resource for Http2ClientResponseBody {

#[derive(Debug)]
pub struct Http2ServerConnection {
pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, Bytes>>,
pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, BufView>>,
}

impl Resource for Http2ServerConnection {
Expand All @@ -99,7 +100,7 @@ impl Resource for Http2ServerConnection {
}

pub struct Http2ServerSendResponse {
pub send_response: AsyncRefCell<h2::server::SendResponse<Bytes>>,
pub send_response: AsyncRefCell<h2::server::SendResponse<BufView>>,
}

impl Resource for Http2ServerSendResponse {
Expand All @@ -123,7 +124,8 @@ pub async fn op_http2_connect(

let url = Url::parse(&url)?;

let (client, conn) = h2::client::handshake(network_stream).await?;
let (client, conn) =
h2::client::Builder::new().handshake(network_stream).await?;
let mut state = state.borrow_mut();
let client_rid = state.resource_table.add(Http2Client {
client: AsyncRefCell::new(client),
Expand All @@ -145,7 +147,7 @@ pub async fn op_http2_listen(
let stream =
take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;

let conn = h2::server::handshake(stream).await?;
let conn = h2::server::Builder::new().handshake(stream).await?;
Ok(
state
.borrow_mut()
Expand Down Expand Up @@ -349,7 +351,7 @@ pub async fn op_http2_client_send_data(
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;

// TODO(bartlomieju): handle end of stream
stream.send_data(bytes::Bytes::from(data), false)?;
stream.send_data(data.to_vec().into(), false)?;
Ok(())
}

Expand All @@ -365,7 +367,7 @@ pub async fn op_http2_client_end_stream(
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;

// TODO(bartlomieju): handle end of stream
stream.send_data(bytes::Bytes::from(vec![]), true)?;
stream.send_data(BufView::empty(), true)?;
Ok(())
}

Expand Down

0 comments on commit 1297c9a

Please sign in to comment.