@@ -12,7 +12,7 @@ declare var Buffer: any;
12
12
const CUTOFF = 8192 ;
13
13
14
14
export class Pipe {
15
- buf : Buffer = new Buffer ( 0 ) ;
15
+ bufs : Buffer [ ] = [ ] ;
16
16
refcount : number = 1 ; // maybe more accurately a reader count
17
17
readWaiter : Function = undefined ;
18
18
writeWaiter : Function = undefined ;
@@ -23,14 +23,20 @@ export class Pipe {
23
23
this . writeBuffer ( b , ( err : number , len ?: number ) => { } ) ;
24
24
}
25
25
26
+ get bufferLength ( ) : number {
27
+ let len = 0 ;
28
+
29
+ for ( let i = 0 ; i < this . bufs . length ; i ++ )
30
+ len += this . bufs [ i ] . length ;
31
+
32
+ return len ;
33
+ }
34
+
26
35
writeBuffer ( b : Buffer , cb : RWCallback ) : void {
27
- this . buf = Buffer . concat ( [ this . buf , b ] ) ;
28
- if ( this . readWaiter ) {
29
- let waiter = this . readWaiter ;
30
- this . readWaiter = undefined ;
31
- waiter ( ) ;
32
- }
33
- if ( this . buf . length <= CUTOFF ) {
36
+ this . bufs . push ( b ) ;
37
+ this . releaseReader ( ) ;
38
+
39
+ if ( this . bufferLength <= CUTOFF ) {
34
40
cb ( 0 , b . length ) ;
35
41
} else {
36
42
if ( this . writeWaiter ) {
@@ -43,29 +49,29 @@ export class Pipe {
43
49
}
44
50
45
51
read ( buf : Buffer , off : number , len : number , pos : number , cb : RWCallback ) : void {
46
- if ( this . buf . length || this . closed ) {
47
- let n = this . buf . copy ( buf , pos , off , off + len ) ;
48
- if ( this . buf . length === off + n )
49
- this . buf = new Buffer ( 0 ) ;
50
- else
51
- this . buf = this . buf . slice ( off + n ) ;
52
- releaseWriter ( ) ;
52
+ if ( off !== 0 ) {
53
+ console . log ( 'ERROR: Pipe.read w/ non-zero offset' ) ;
54
+ }
55
+
56
+ if ( this . bufs . length || this . closed ) {
57
+ let n = this . copy ( buf , len , pos ) ;
58
+ this . releaseWriter ( ) ;
53
59
return cb ( undefined , n ) ;
54
60
}
61
+
55
62
// at this point, we're waiting on more data or an EOF.
56
63
this . readWaiter = ( ) => {
57
- let n = this . buf . copy ( buf , pos , off , off + len ) ;
58
- if ( this . buf . length === off + n )
59
- this . buf = new Buffer ( 0 ) ;
60
- else
61
- this . buf = this . buf . slice ( off + n ) ;
62
- releaseWriter ( ) ;
64
+ let n = this . copy ( buf , len , pos ) ;
65
+ this . releaseWriter ( ) ;
63
66
cb ( undefined , n ) ;
64
67
} ;
65
68
}
66
69
67
70
readSync ( ) : Buffer {
68
- return this . buf ;
71
+ let len = this . bufferLength ;
72
+ let buf = new Buffer ( len ) ;
73
+ this . copy ( buf , len , 0 ) ;
74
+ return buf ;
69
75
}
70
76
71
77
ref ( ) : void {
@@ -86,6 +92,28 @@ export class Pipe {
86
92
this . readWaiter = undefined ;
87
93
}
88
94
95
+ private copy ( dst : Buffer , len : number , pos : number ) : number {
96
+ let result = 0 ;
97
+ // ensure pos is a number
98
+ pos = pos ? pos : 0 ;
99
+
100
+ while ( this . bufs . length > 0 && len > 0 ) {
101
+ let src = this . bufs [ 0 ] ;
102
+
103
+ let n = src . copy ( dst , pos ) ;
104
+ pos += n ;
105
+ result += n ;
106
+ len -= n ;
107
+
108
+ if ( src . length === n )
109
+ this . bufs . shift ( ) ;
110
+ else
111
+ this . bufs [ 0 ] = src . slice ( n ) ;
112
+ }
113
+
114
+ return result ;
115
+ }
116
+
89
117
// if any writers are blocked (because the buffer was at
90
118
// capacity) unblock them
91
119
private releaseWriter ( ) : void {
@@ -95,6 +123,14 @@ export class Pipe {
95
123
waiter ( ) ;
96
124
}
97
125
}
126
+
127
+ private releaseReader ( ) : void {
128
+ if ( this . readWaiter ) {
129
+ let waiter = this . readWaiter ;
130
+ this . readWaiter = undefined ;
131
+ waiter ( ) ;
132
+ }
133
+ }
98
134
}
99
135
100
136
export function isPipe ( f : IFile ) : f is PipeFile {
0 commit comments