11//! The [`IncrementalReader`] struct, which handles reading to delimiters without line buffering.
22
3+ use std:: borrow:: Cow ;
34use std:: pin:: Pin ;
45
56use aho_corasick:: AhoCorasick ;
67use line_span:: LineSpans ;
78use miette:: miette;
89use miette:: IntoDiagnostic ;
9- use miette:: WrapErr ;
1010use tokio:: io:: AsyncRead ;
1111use tokio:: io:: AsyncReadExt ;
1212use tokio:: io:: AsyncWrite ;
1313use tokio:: io:: AsyncWriteExt ;
1414
1515use crate :: aho_corasick:: AhoCorasickExt ;
1616use crate :: buffers:: LINE_BUFFER_CAPACITY ;
17+ use crate :: buffers:: SPLIT_UTF8_CODEPOINT_CAPACITY ;
1718use crate :: buffers:: VEC_BUFFER_CAPACITY ;
1819
1920/// A tool for incrementally reading from a stream like stdout (and forwarding that stream to a
@@ -32,12 +33,15 @@ use crate::buffers::VEC_BUFFER_CAPACITY;
3233pub struct IncrementalReader < R , W > {
3334 /// The wrapped reader.
3435 reader : Pin < Box < R > > ,
36+ /// The wrapped writer, if any.
37+ writer : Option < Pin < Box < W > > > ,
3538 /// Lines we've already read since the last marker/chunk.
3639 lines : String ,
3740 /// The line currently being written to.
3841 line : String ,
39- /// The wrapped writer, if any.
40- writer : Option < Pin < Box < W > > > ,
42+ /// We're not guaranteed that the data we read at one time is aligned on a UTF-8 boundary. If
43+ /// that's the case, we store the data here until we get more data.
44+ non_utf8 : Vec < u8 > ,
4145}
4246
4347impl < R , W > IncrementalReader < R , W >
4953 pub fn new ( reader : R ) -> Self {
5054 Self {
5155 reader : Box :: pin ( reader) ,
56+ writer : None ,
5257 lines : String :: with_capacity ( VEC_BUFFER_CAPACITY * LINE_BUFFER_CAPACITY ) ,
5358 line : String :: with_capacity ( LINE_BUFFER_CAPACITY ) ,
54- writer : None ,
59+ non_utf8 : Vec :: with_capacity ( SPLIT_UTF8_CODEPOINT_CAPACITY ) ,
5560 }
5661 }
5762
@@ -97,15 +102,8 @@ where
97102 Err ( miette ! ( "End-of-file reached" ) )
98103 }
99104 Ok ( n) => {
100- let decoded = std:: str:: from_utf8 ( & opts. buffer [ ..n] )
101- . into_diagnostic ( )
102- . wrap_err_with ( || {
103- format ! (
104- "Read invalid UTF-8: {:?}" ,
105- String :: from_utf8_lossy( & opts. buffer[ ..n] )
106- )
107- } ) ?;
108- match self . consume_str ( decoded, opts) . await ? {
105+ let decoded = self . decode ( & opts. buffer [ ..n] ) ;
106+ match self . consume_str ( & decoded, opts) . await ? {
109107 Some ( lines) => {
110108 tracing:: trace!( data = decoded, "Decoded data" ) ;
111109 tracing:: trace!( lines = lines. len( ) , "Got chunk" ) ;
@@ -121,6 +119,49 @@ where
121119 }
122120 }
123121
122+ fn decode ( & mut self , buffer : & [ u8 ] ) -> String {
123+ // Do we have data we failed to decode?
124+ let buffer = if self . non_utf8 . is_empty ( ) {
125+ Cow :: Borrowed ( buffer)
126+ } else {
127+ // We have some data that failed to decode when we read it, add the data we just read
128+ // and hope that completes a UTF-8 boundary:
129+ let mut non_utf8 = std:: mem:: replace (
130+ & mut self . non_utf8 ,
131+ Vec :: with_capacity ( SPLIT_UTF8_CODEPOINT_CAPACITY ) ,
132+ ) ;
133+ non_utf8. extend ( buffer) ;
134+ Cow :: Owned ( non_utf8)
135+ } ;
136+
137+ match std:: str:: from_utf8 ( & buffer) {
138+ Ok ( data) => data. to_owned ( ) ,
139+ Err ( err) => {
140+ match err. error_len ( ) {
141+ Some ( _) => {
142+ // An unexpected byte was encountered; this is a "real" UTF-8 decode
143+ // failure that we can't recover from by reading more data.
144+ //
145+ // As a backup, we'll log an error and decode the rest lossily.
146+ tracing:: error!( "Failed to decode UTF-8 from `ghci`: {err}.\n \
147+ This is a bug, please report it upstream: https://github.com/MercuryTechnologies/ghciwatch/issues/new") ;
148+ String :: from_utf8_lossy ( & buffer) . into_owned ( )
149+ }
150+ None => {
151+ // End of input reached unexpectedly.
152+ let valid_utf8 = & buffer[ ..err. valid_up_to ( ) ] ;
153+ self . non_utf8 . extend ( & buffer[ err. valid_up_to ( ) ..] ) ;
154+ unsafe {
155+ // Safety: We already confirmed the input contains valid UTF-8 up to
156+ // this index.
157+ std:: str:: from_utf8_unchecked ( valid_utf8) . to_owned ( )
158+ }
159+ }
160+ }
161+ }
162+ }
163+ }
164+
124165 /// Consumes a string, adding it to the internal buffer.
125166 ///
126167 /// If one of the lines in `data` begins with `end_marker`, the lines in the internal buffer
@@ -372,8 +413,8 @@ mod tests {
372413 /// Basic test. Reads data from the reader, gets the first chunk.
373414 #[ tokio:: test]
374415 async fn test_read_until ( ) {
375- let fake_reader = FakeReader :: with_str_chunks ( [ indoc ! (
376- "Build profile: -w ghc-9.6.1 -O0
416+ let fake_reader = FakeReader :: with_byte_chunks ( [ indoc ! (
417+ b "Build profile: -w ghc-9.6.1 -O0
377418 In order, the following will be built (use -v for more details):
378419 - mwb-0 (lib:test-dev) (ephemeral targets)
379420 Preprocessing library 'test-dev' for mwb-0..
@@ -411,8 +452,8 @@ mod tests {
411452 /// Same as `test_read_until` but with `FindAt::Anywhere`.
412453 #[ tokio:: test]
413454 async fn test_read_until_find_anywhere ( ) {
414- let fake_reader = FakeReader :: with_str_chunks ( [ indoc ! (
415- "Build profile: -w ghc-9.6.1 -O0
455+ let fake_reader = FakeReader :: with_byte_chunks ( [ indoc ! (
456+ b "Build profile: -w ghc-9.6.1 -O0
416457 In order, the following will be built (use -v for more details):
417458 - mwb-0 (lib:test-dev) (ephemeral targets)
418459 Preprocessing library 'test-dev' for mwb-0..
@@ -557,29 +598,29 @@ mod tests {
557598 /// chunks.
558599 #[ tokio:: test]
559600 async fn test_read_until_incremental ( ) {
560- let fake_reader = FakeReader :: with_str_chunks ( [
561- "Build profile: -w ghc-9.6.1 -O0\n " ,
562- "In order, the following will be built (use -v for more details):\n " ,
563- " - mwb-0 (lib:test-dev) (ephemeral targets)\n " ,
564- "Preprocessing library 'test-dev' for mwb-0..\n " ,
565- "GH" ,
566- "C" ,
567- "i" ,
568- "," ,
569- " " ,
570- "v" ,
571- "e" ,
572- "r" ,
573- "s" ,
574- "i" ,
575- "o" ,
576- "n" ,
577- " " ,
578- "9" ,
579- ".6.1: https://www.haskell.org/ghc/ :? for help\n " ,
580- "Loaded GHCi configuration from .ghci-mwb" ,
581- "Ok, 5699 modules loaded." ,
582- "ghci> " ,
601+ let fake_reader = FakeReader :: with_byte_chunks ( [
602+ b "Build profile: -w ghc-9.6.1 -O0\n ",
603+ b "In order, the following will be built (use -v for more details):\n ",
604+ b " - mwb-0 (lib:test-dev) (ephemeral targets)\n ",
605+ b "Preprocessing library 'test-dev' for mwb-0..\n ",
606+ b "GH",
607+ b "C",
608+ b "i",
609+ b ",",
610+ b " ",
611+ b "v",
612+ b "e",
613+ b "r",
614+ b "s",
615+ b "i",
616+ b "o",
617+ b "n",
618+ b " ",
619+ b "9",
620+ b ".6.1: https://www.haskell.org/ghc/ :? for help\n ",
621+ b "Loaded GHCi configuration from .ghci-mwb",
622+ b "Ok, 5699 modules loaded.",
623+ b "ghci> ",
583624 ] ) ;
584625 let mut reader = IncrementalReader :: new ( fake_reader) . with_writer ( tokio:: io:: sink ( ) ) ;
585626 let end_marker = AhoCorasick :: from_anchored_patterns ( [ "GHCi, version " ] ) ;
@@ -607,4 +648,93 @@ mod tests {
607648
608649 assert_eq ! ( reader. buffer( ) , String :: new( ) ) ;
609650 }
651+
652+ /// Test that we can keep reading when a chunk from `read()` splits a UTF-8 boundary.
653+ async fn utf8_boundary < const N : usize > ( chunks : [ & ' static [ u8 ] ; N ] , decoded : & ' static str ) {
654+ let fake_reader = FakeReader :: with_byte_chunks ( chunks) ;
655+ let mut reader = IncrementalReader :: new ( fake_reader) . with_writer ( tokio:: io:: sink ( ) ) ;
656+ let end_marker = AhoCorasick :: from_anchored_patterns ( [ "ghci> " ] ) ;
657+ let mut buffer = vec ! [ 0 ; LINE_BUFFER_CAPACITY ] ;
658+
659+ assert_eq ! (
660+ reader
661+ . read_until( & mut ReadOpts {
662+ end_marker: & end_marker,
663+ find: FindAt :: LineStart ,
664+ writing: WriteBehavior :: Hide ,
665+ buffer: & mut buffer
666+ } )
667+ . await
668+ . unwrap( ) ,
669+ decoded,
670+ "Failed to decode codepoint {decoded:?} when split across two chunks: {chunks:?}" ,
671+ ) ;
672+
673+ assert_eq ! ( reader. buffer( ) , String :: new( ) ) ;
674+ }
675+
676+ #[ tokio:: test]
677+ async fn test_read_utf8_boundary_u_00a9 ( ) {
678+ // U+00A9 ©
679+ // 2 bytes, 1 test case.
680+ utf8_boundary ( [ b"\xc2 " , b"\xa9 \n ghci> " ] , "©\n " ) . await ;
681+ }
682+
683+ #[ tokio:: test]
684+ async fn test_read_utf8_boundary_u_2194 ( ) {
685+ // U+2194 ↔
686+ // 3 bytes, 2 test cases.
687+ utf8_boundary ( [ b"\xe2 " , b"\x86 \x94 \n ghci> " ] , "↔\n " ) . await ;
688+ utf8_boundary ( [ b"\xe2 \x86 " , b"\x94 \n ghci> " ] , "↔\n " ) . await ;
689+ }
690+
691+ #[ tokio:: test]
692+ async fn test_read_utf8_boundary_u_1f436 ( ) {
693+ // U+1F436 🐶
694+ // 4 bytes, 3 test cases.
695+ utf8_boundary ( [ b"\xf0 " , b"\x9f \x90 \xb6 \n ghci> " ] , "🐶\n " ) . await ;
696+ utf8_boundary ( [ b"\xf0 \x9f " , b"\x90 \xb6 \n ghci> " ] , "🐶\n " ) . await ;
697+ utf8_boundary ( [ b"\xf0 \x9f \x90 " , b"\xb6 \n ghci> " ] , "🐶\n " ) . await ;
698+ }
699+
700+ #[ tokio:: test]
701+ async fn test_read_invalid_utf8_overlong ( ) {
702+ // Overlong sequence, U+20AC € encoded as 4 bytes.
703+ // We get four U+FFFD � replacement characters out, one for each byte in the sequence.
704+ utf8_boundary ( [ b"\xf0 " , b"\x82 \x82 \xac \n ghci> " ] , "����\n " ) . await ;
705+ utf8_boundary ( [ b"\xf0 \x82 " , b"\x82 \xac \n ghci> " ] , "����\n " ) . await ;
706+ utf8_boundary ( [ b"\xf0 \x82 \x82 " , b"\xac \n ghci> " ] , "����\n " ) . await ;
707+ }
708+
709+ #[ tokio:: test]
710+ async fn test_read_invalid_utf8_surrogate_pair_half ( ) {
711+ // Half of a surrogate pair, invalid in UTF-8. (U+D800)
712+ utf8_boundary ( [ b"\xed " , b"\xa0 \x80 \n ghci> " ] , "���\n " ) . await ;
713+ utf8_boundary ( [ b"\xed \xa0 " , b"\x80 \n ghci> " ] , "���\n " ) . await ;
714+ }
715+
716+ #[ tokio:: test]
717+ async fn test_read_invalid_utf8_unexpected_continuation ( ) {
718+ // An unexpected continuation byte.
719+ utf8_boundary ( [ b"\xa0 \x80 \n ghci> " ] , "��\n " ) . await ;
720+ utf8_boundary ( [ b"\xa0 " , b"\x80 \n ghci> " ] , "��\n " ) . await ;
721+ }
722+
723+ #[ tokio:: test]
724+ async fn test_read_invalid_utf8_missing_continuation ( ) {
725+ // Missing continuation byte.
726+ // Weirdly, these only come out as one replacement character, not the three we might
727+ // naïvely expect.
728+ utf8_boundary ( [ b"\xf0 " , b"\x9f \x90 \n ghci> " ] , "�\n " ) . await ;
729+ utf8_boundary ( [ b"\xf0 \x9f " , b"\x90 \n ghci> " ] , "�\n " ) . await ;
730+ }
731+
732+ #[ tokio:: test]
733+ async fn test_read_invalid_utf8_invalid_byte ( ) {
734+ // Invalid byte (no defined meaning in UTF-8).
735+ utf8_boundary ( [ b"\xc0 \n ghci> " ] , "�\n " ) . await ;
736+ utf8_boundary ( [ b"\xc1 \n ghci> " ] , "�\n " ) . await ;
737+ utf8_boundary ( [ b"\xf5 \n ghci> " ] , "�\n " ) . await ;
738+ utf8_boundary ( [ b"\xff \n ghci> " ] , "�\n " ) . await ;
739+ }
610740}
0 commit comments