@@ -5,6 +5,9 @@ import { SerialTransport } from "./types"
55export class SerialSource implements UnderlyingSource < Uint8Array > {
66 type : undefined
77 controller : ReadableStreamController < Uint8Array > = null
8+ buffer : Uint8Array = null
9+ bufferUsed : number = 0
10+ wantData : boolean = false
811
912 public constructor (
1013 private transport_ : SerialTransport ,
@@ -24,9 +27,48 @@ export class SerialSource implements UnderlyingSource<Uint8Array> {
2427 start ( controller : ReadableStreamController < Uint8Array > ) {
2528 debugLog ( "STREAM" , "source" , "start()" )
2629 this . controller = controller
30+
31+ const bufferSize = controller . desiredSize
32+ this . buffer = new Uint8Array ( bufferSize )
33+ this . bufferUsed = 0
34+ this . wantData = false
35+
2736 this . transport_ . sourceFeedData = ( data ) => {
28- controller . enqueue ( data )
37+ if ( this . bufferUsed + data . length >= bufferSize ) {
38+ // the buffer would overflow
39+ const newSize = bufferSize - this . bufferUsed
40+ const newData = data . slice ( 0 , newSize )
41+ // cut from data whatever fits in the buffer
42+ data = data . slice ( newSize )
43+ // write it at the end
44+ this . buffer . set ( newData , this . bufferUsed )
45+ // reset the buffer usage
46+ this . bufferUsed = 0
47+ // pass the entire buffer to the controller
48+ controller . enqueue ( new Uint8Array ( this . buffer ) )
49+ }
50+ if ( data . length > 0 ) {
51+ // the buffer will NOT overflow anymore, whatever's left in 'data' will fit
52+ this . buffer . set ( data , this . bufferUsed )
53+ this . bufferUsed += data . length
54+ // if reader is waiting for data, enqueue it using pull()
55+ if ( this . wantData ) this . pull ( controller )
56+ }
57+ }
58+ }
59+
60+ pull ( controller : ReadableStreamController < Uint8Array > ) {
61+ if ( this . bufferUsed == 0 ) {
62+ // nothing to read, but the reader is waiting for data
63+ this . wantData = true
64+ return
2965 }
66+ // consume the entire buffer
67+ const newData = this . buffer . slice ( 0 , this . bufferUsed )
68+ this . bufferUsed = 0
69+ // enqueue after consume, since it may unblock and call pull()
70+ controller . enqueue ( newData )
71+ this . wantData = false
3072 }
3173
3274 cancel ( _reason ?: any ) {
0 commit comments