Skip to content

Commit 56f121e

Browse files
authored
Fix compression for deferred responses (#2986)
We replace tower-http's `CompressionLayer` with a custom stream transformation. This is necessary because tower-http uses async-compression, which buffers data until the end of the stream to then write it, ensuring a better compression. This is incompatible with the multipart protocol for `@defer`, which requires chunks to be sent as soon as possible. So we need to compress them independently. This extracts parts of the codec module of async-compression, which so far is not public, and makes a streaming wrapper above it that flushes the compressed data on every response in the stream. This is expected to be temporary, as we have in flight PRs for async-compression: - Nullus157/async-compression#155 - Nullus157/async-compression#178 With Nullus157/async-compression#150 we might be able to at least remove the vendored code
2 parents 1addfdc + 0fbaed1 commit 56f121e

File tree

20 files changed

+1090
-11
lines changed

20 files changed

+1090
-11
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
### Fix compression for deferred responses ([Issue #1572](https://github.com/apollographql/router/issues/1572))
2+
3+
We replace tower-http's `CompressionLayer` with a custom stream transformation. This is necessary because tower-http uses async-compression, which buffers data until the end of the stream to then write it, ensuring a better compression. This is incompatible with the multipart protocol for `@defer`, which requires chunks to be sent as soon as possible. So we need to compress them independently.
4+
5+
This extracts parts of the codec module of async-compression, which so far is not public, and makes a streaming wrapper above it that flushes the compressed data on every response in the stream.
6+
7+
By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2986

Cargo.lock

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ dependencies = [
286286
"axum",
287287
"backtrace",
288288
"base64 0.20.0",
289+
"brotli",
289290
"buildstructor 0.5.2",
290291
"bytes",
291292
"ci_info",
@@ -401,6 +402,8 @@ dependencies = [
401402
"wiremock",
402403
"wsl",
403404
"yaml-rust",
405+
"zstd",
406+
"zstd-safe",
404407
]
405408

406409
[[package]]
@@ -7073,3 +7076,33 @@ dependencies = [
70737076
"quote",
70747077
"syn 2.0.13",
70757078
]
7079+
7080+
[[package]]
7081+
name = "zstd"
7082+
version = "0.12.3+zstd.1.5.2"
7083+
source = "registry+https://github.com/rust-lang/crates.io-index"
7084+
checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806"
7085+
dependencies = [
7086+
"zstd-safe",
7087+
]
7088+
7089+
[[package]]
7090+
name = "zstd-safe"
7091+
version = "6.0.5+zstd.1.5.4"
7092+
source = "registry+https://github.com/rust-lang/crates.io-index"
7093+
checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b"
7094+
dependencies = [
7095+
"libc",
7096+
"zstd-sys",
7097+
]
7098+
7099+
[[package]]
7100+
name = "zstd-sys"
7101+
version = "2.0.8+zstd.1.5.5"
7102+
source = "registry+https://github.com/rust-lang/crates.io-index"
7103+
checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
7104+
dependencies = [
7105+
"cc",
7106+
"libc",
7107+
"pkg-config",
7108+
]

apollo-router/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ yaml-rust = "0.4.5"
203203
wsl = "0.1.0"
204204
tokio-rustls = "0.23.4"
205205
http-serde = "1.1.2"
206+
memchr = "2.5.0"
207+
brotli = "3.3.4"
208+
zstd = "0.12.3"
209+
zstd-safe = "6.0.5"
206210

207211
[target.'cfg(macos)'.dependencies]
208212
uname = "0.1.1"

apollo-router/src/axum_factory/axum_http_server_factory.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use futures::channel::oneshot;
1919
use futures::future::join;
2020
use futures::future::join_all;
2121
use futures::prelude::*;
22+
use http::header::ACCEPT_ENCODING;
23+
use http::header::CONTENT_ENCODING;
24+
use http::HeaderValue;
2225
use http::Request;
2326
use http_body::combinators::UnsyncBoxBody;
2427
use hyper::Body;
@@ -32,10 +35,6 @@ use tokio_rustls::TlsAcceptor;
3235
use tower::service_fn;
3336
use tower::BoxError;
3437
use tower::ServiceExt;
35-
use tower_http::compression::predicate::NotForContentType;
36-
use tower_http::compression::CompressionLayer;
37-
use tower_http::compression::DefaultPredicate;
38-
use tower_http::compression::Predicate;
3938
use tower_http::trace::TraceLayer;
4039

4140
use super::listeners::ensure_endpoints_consistency;
@@ -45,6 +44,7 @@ use super::listeners::ListenersAndRouters;
4544
use super::utils::decompress_request_body;
4645
use super::utils::PropagatingMakeSpan;
4746
use super::ListenAddrAndRouter;
47+
use crate::axum_factory::compression::Compressor;
4848
use crate::axum_factory::listeners::get_extra_listeners;
4949
use crate::axum_factory::listeners::serve_router_on_listen_addr;
5050
use crate::configuration::Configuration;
@@ -329,12 +329,7 @@ where
329329
))
330330
.layer(TraceLayer::new_for_http().make_span_with(PropagatingMakeSpan { entitlement }))
331331
.layer(Extension(service_factory))
332-
.layer(cors)
333-
// Compress the response body, except for multipart responses such as with `@defer`.
334-
// This is a work-around for https://github.com/apollographql/router/issues/1572
335-
.layer(CompressionLayer::new().compress_when(
336-
DefaultPredicate::new().and(NotForContentType::const_new("multipart/")),
337-
));
332+
.layer(cors);
338333

339334
let route = endpoints_on_main_listener
340335
.into_iter()
@@ -434,6 +429,11 @@ async fn handle_graphql(
434429

435430
let request: router::Request = http_request.into();
436431
let context = request.context.clone();
432+
let accept_encoding = request
433+
.router_request
434+
.headers()
435+
.get(ACCEPT_ENCODING)
436+
.cloned();
437437

438438
let res = service.oneshot(request).await;
439439
let dur = context.busy_time().await;
@@ -467,7 +467,24 @@ async fn handle_graphql(
467467
}
468468
Ok(response) => {
469469
tracing::info!(counter.apollo_router_session_count_active = -1,);
470-
response.response.into_response()
470+
let (mut parts, body) = response.response.into_parts();
471+
472+
let opt_compressor = accept_encoding
473+
.as_ref()
474+
.and_then(|value| value.to_str().ok())
475+
.and_then(|v| Compressor::new(v.split(',').map(|s| s.trim())));
476+
let body = match opt_compressor {
477+
None => body,
478+
Some(compressor) => {
479+
parts.headers.insert(
480+
CONTENT_ENCODING,
481+
HeaderValue::from_static(compressor.content_encoding()),
482+
);
483+
Body::wrap_stream(compressor.process(body))
484+
}
485+
};
486+
487+
http::Response::from_parts(parts, body).into_response()
471488
}
472489
}
473490
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
use std::fmt;
4+
use std::io::Error;
5+
use std::io::ErrorKind;
6+
use std::io::Result;
7+
8+
use brotli::enc::backward_references::BrotliEncoderParams;
9+
use brotli::enc::encode::BrotliEncoderCompressStream;
10+
use brotli::enc::encode::BrotliEncoderCreateInstance;
11+
use brotli::enc::encode::BrotliEncoderHasMoreOutput;
12+
use brotli::enc::encode::BrotliEncoderIsFinished;
13+
use brotli::enc::encode::BrotliEncoderOperation;
14+
use brotli::enc::encode::BrotliEncoderStateStruct;
15+
use brotli::enc::StandardAlloc;
16+
17+
use crate::axum_factory::compression::codec::Encode;
18+
use crate::axum_factory::compression::util::PartialBuffer;
19+
20+
pub(crate) struct BrotliEncoder {
21+
state: BrotliEncoderStateStruct<StandardAlloc>,
22+
}
23+
24+
impl BrotliEncoder {
25+
pub(crate) fn new(params: BrotliEncoderParams) -> Self {
26+
let mut state = BrotliEncoderCreateInstance(StandardAlloc::default());
27+
state.params = params;
28+
Self { state }
29+
}
30+
31+
fn encode(
32+
&mut self,
33+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
34+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
35+
op: BrotliEncoderOperation,
36+
) -> Result<()> {
37+
let in_buf = input.unwritten();
38+
let out_buf = output.unwritten_mut();
39+
40+
let mut input_len = 0;
41+
let mut output_len = 0;
42+
43+
if BrotliEncoderCompressStream(
44+
&mut self.state,
45+
op,
46+
&mut in_buf.len(),
47+
in_buf,
48+
&mut input_len,
49+
&mut out_buf.len(),
50+
out_buf,
51+
&mut output_len,
52+
&mut None,
53+
&mut |_, _, _, _| (),
54+
) <= 0
55+
{
56+
return Err(Error::new(ErrorKind::Other, "brotli error"));
57+
}
58+
59+
input.advance(input_len);
60+
output.advance(output_len);
61+
62+
Ok(())
63+
}
64+
}
65+
66+
impl Encode for BrotliEncoder {
67+
fn encode(
68+
&mut self,
69+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
70+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
71+
) -> Result<()> {
72+
self.encode(
73+
input,
74+
output,
75+
BrotliEncoderOperation::BROTLI_OPERATION_PROCESS,
76+
)
77+
}
78+
79+
fn flush(
80+
&mut self,
81+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
82+
) -> Result<bool> {
83+
self.encode(
84+
&mut PartialBuffer::new(&[][..]),
85+
output,
86+
BrotliEncoderOperation::BROTLI_OPERATION_FLUSH,
87+
)?;
88+
89+
Ok(BrotliEncoderHasMoreOutput(&self.state) == 0)
90+
}
91+
92+
fn finish(
93+
&mut self,
94+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
95+
) -> Result<bool> {
96+
self.encode(
97+
&mut PartialBuffer::new(&[][..]),
98+
output,
99+
BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
100+
)?;
101+
102+
Ok(BrotliEncoderIsFinished(&self.state) == 1)
103+
}
104+
}
105+
106+
impl fmt::Debug for BrotliEncoder {
107+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108+
f.debug_struct("BrotliEncoder")
109+
.field("compress", &"<no debug>")
110+
.finish()
111+
}
112+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
mod encoder;
4+
5+
pub(crate) use self::encoder::BrotliEncoder;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
use std::io::Result;
4+
5+
use flate2::Compression;
6+
7+
use crate::axum_factory::compression::codec::Encode;
8+
use crate::axum_factory::compression::codec::FlateEncoder;
9+
use crate::axum_factory::compression::util::PartialBuffer;
10+
11+
#[derive(Debug)]
12+
pub(crate) struct DeflateEncoder {
13+
inner: FlateEncoder,
14+
}
15+
16+
impl DeflateEncoder {
17+
pub(crate) fn new(level: Compression) -> Self {
18+
Self {
19+
inner: FlateEncoder::new(level, false),
20+
}
21+
}
22+
}
23+
24+
impl Encode for DeflateEncoder {
25+
fn encode(
26+
&mut self,
27+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
28+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
29+
) -> Result<()> {
30+
self.inner.encode(input, output)
31+
}
32+
33+
fn flush(
34+
&mut self,
35+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
36+
) -> Result<bool> {
37+
self.inner.flush(output)
38+
}
39+
40+
fn finish(
41+
&mut self,
42+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
43+
) -> Result<bool> {
44+
self.inner.finish(output)
45+
}
46+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
mod encoder;
4+
5+
pub(crate) use self::encoder::DeflateEncoder;

0 commit comments

Comments
 (0)