@@ -39,43 +39,6 @@ use pin_project_lite::pin_project;
3939use crate :: { body:: TakoBody , types:: BoxError } ;
4040
4141/// Compresses an HTTP body stream using the DEFLATE compression algorithm.
42- ///
43- /// This function converts any HTTP body into a DEFLATE-compressed streaming body using
44- /// the specified compression level. The compression is performed incrementally as data
45- /// flows through the stream, providing memory-efficient compression for responses of
46- /// any size.
47- ///
48- /// # Arguments
49- ///
50- /// * `body` - HTTP body to compress, must implement `Body<Data = Bytes, Error = BoxError>`
51- /// * `level` - DEFLATE compression level (0-9, where 9 provides maximum compression)
52- ///
53- /// # Compression Levels
54- ///
55- /// - **0**: No compression (store only)
56- /// - **1-3**: Fast compression, lower compression ratio
57- /// - **4-6**: Balanced compression and speed (recommended for most use cases)
58- /// - **7-9**: High compression, slower processing (best for static content)
59- ///
60- /// # Examples
61- ///
62- /// ```rust
63- /// use tako::plugins::compression::deflate_stream::stream_deflate;
64- /// use http_body_util::Full;
65- /// use bytes::Bytes;
66- ///
67- /// // Balanced compression for general web content
68- /// let body = Full::from(Bytes::from("JSON API response data"));
69- /// let compressed = stream_deflate(body, 6);
70- ///
71- /// // Fast compression for real-time data
72- /// let realtime_body = Full::from(Bytes::from("Live data stream"));
73- /// let fast_compressed = stream_deflate(realtime_body, 1);
74- ///
75- /// // Maximum compression for static assets
76- /// let static_body = Full::from(Bytes::from("Large static file content..."));
77- /// let max_compressed = stream_deflate(static_body, 9);
78- /// ```
7942pub fn stream_deflate < B > ( body : B , level : u32 ) -> TakoBody
8043where
8144 B : Body < Data = Bytes , Error = BoxError > + Send + ' static ,
@@ -87,75 +50,16 @@ where
8750
8851pin_project ! {
8952 /// Streaming DEFLATE compressor that wraps an inner data stream.
90- ///
91- /// `DeflateStream` provides on-the-fly DEFLATE compression for streaming data sources.
92- /// It maintains an internal encoder state and buffer to efficiently compress data
93- /// as it flows through the stream. The implementation handles proper stream
94- /// finalization and ensures all compressed data is flushed when the input ends.
95- ///
96- /// # Examples
97- ///
98- /// ```rust
99- /// use tako::plugins::compression::deflate_stream::DeflateStream;
100- /// use futures_util::stream;
101- /// use bytes::Bytes;
102- ///
103- /// # fn example() {
104- /// // Create a stream of data chunks
105- /// let data_stream = stream::iter(vec![
106- /// Ok(Bytes::from("First data chunk")),
107- /// Ok(Bytes::from("Second data chunk")),
108- /// Ok(Bytes::from("Final data chunk")),
109- /// ]);
110- ///
111- /// // Wrap with DEFLATE compression at level 6
112- /// let compressed_stream = DeflateStream::new(data_stream, 6);
113- /// # }
114- /// ```
11553 pub struct DeflateStream <S > {
116- /// Inner stream providing source data for compression.
11754 #[ pin] inner: S ,
118- /// DEFLATE encoder with internal buffer for compressed output.
11955 encoder: DeflateEncoder <Vec <u8 >>,
120- /// Current position in the encoder's output buffer.
12156 pos: usize ,
122- /// Flag indicating whether the input stream has ended.
12357 done: bool ,
12458 }
12559}
12660
12761impl < S > DeflateStream < S > {
12862 /// Creates a new DEFLATE compression stream with the specified compression level.
129- ///
130- /// The encoder is initialized with the specified compression level, which controls
131- /// the trade-off between compression ratio and processing speed. Higher levels
132- /// provide better compression at the cost of increased CPU usage.
133- ///
134- /// # Arguments
135- ///
136- /// * `inner` - Source stream to compress
137- /// * `level` - DEFLATE compression level (0-9, clamped to valid range)
138- ///
139- /// # Examples
140- ///
141- /// ```rust
142- /// use tako::plugins::compression::deflate_stream::DeflateStream;
143- /// use futures_util::stream;
144- /// use bytes::Bytes;
145- ///
146- /// # fn example() {
147- /// let source = stream::iter(vec![Ok(Bytes::from("test data"))]);
148- ///
149- /// // Fast compression for dynamic content
150- /// let fast_stream = DeflateStream::new(source.clone(), 1);
151- ///
152- /// // Balanced compression for general use
153- /// let balanced_stream = DeflateStream::new(source.clone(), 6);
154- ///
155- /// // Maximum compression for static content
156- /// let max_stream = DeflateStream::new(source, 9);
157- /// # }
158- /// ```
15963 pub fn new ( inner : S , level : u32 ) -> Self {
16064 Self {
16165 inner,
@@ -173,53 +77,6 @@ where
17377 type Item = Result < Bytes , BoxError > ;
17478
17579 /// Polls the stream for the next compressed data chunk.
176- ///
177- /// This method implements the core streaming compression logic with the following steps:
178- /// 1. Returns any buffered compressed data immediately if available
179- /// 2. Polls the inner stream for new input data when buffer is empty
180- /// 3. Compresses new input data and flushes it to the buffer
181- /// 4. Finalizes compression when the input stream ends
182- /// 5. Handles errors and backpressure appropriately
183- ///
184- /// The implementation prioritizes memory efficiency by returning compressed data
185- /// as soon as it's available rather than accumulating large buffers.
186- ///
187- /// # Returns
188- ///
189- /// - `Poll::Ready(Some(Ok(Bytes)))` - A compressed chunk of data
190- /// - `Poll::Ready(Some(Err(BoxError)))` - An error occurred during compression
191- /// - `Poll::Ready(None)` - The stream has finished and all data has been compressed
192- /// - `Poll::Pending` - The stream is not ready and should be polled again later
193- ///
194- /// # Examples
195- ///
196- /// ```rust,no_run
197- /// use tako::plugins::compression::deflate_stream::DeflateStream;
198- /// use futures_util::{stream, StreamExt};
199- /// use bytes::Bytes;
200- ///
201- /// # async fn example() {
202- /// let data = stream::iter(vec![
203- /// Ok(Bytes::from("chunk1")),
204- /// Ok(Bytes::from("chunk2")),
205- /// ]);
206- ///
207- /// let mut compressed = DeflateStream::new(data, 6);
208- ///
209- /// // Process compressed chunks as they become available
210- /// while let Some(result) = compressed.next().await {
211- /// match result {
212- /// Ok(compressed_chunk) => {
213- /// println!("Compressed {} bytes", compressed_chunk.len());
214- /// }
215- /// Err(e) => {
216- /// eprintln!("Compression error: {}", e);
217- /// break;
218- /// }
219- /// }
220- /// }
221- /// # }
222- /// ```
22380 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
22481 let mut this = self . project ( ) ;
22582
0 commit comments