11import fs from 'fs'
22import path from 'path'
33import { Readable , Writable } from 'stream'
4+ import { pipeline } from 'stream/promises'
45
56const EOF = Symbol ( 'EOF' )
67
@@ -15,6 +16,8 @@ class FileParser {
1516 private bytesRead = 0
1617 private file : string
1718 private delimiter : string
19+ private fh : fs . promises . FileHandle | undefined
20+ private eof = false
1821
1922 constructor ( file : string , delimiter : string ) {
2023 this . file = file
@@ -38,45 +41,49 @@ class FileParser {
3841 return this . checkBuffer ( )
3942 }
4043
41- const fh = await fs . promises . open ( this . file , 'r' )
44+ if ( this . eof ) {
45+ return EOF
46+ }
47+
48+ if ( ! this . fh ) {
49+ this . fh = await fs . promises . open ( this . file , 'r' )
50+ }
51+
4252 const cBuffer = Buffer . alloc ( 512 )
4353 let readed : { bytesRead : number }
4454
45- try {
46- while (
47- ( readed = await fh . read ( cBuffer , 0 , 512 , this . bytesRead ) ) . bytesRead > 0
48- ) {
49- this . bbuffer = Buffer . concat ( [
50- this . bbuffer ,
51- cBuffer . subarray ( 0 , readed . bytesRead ) ,
52- ] )
53- this . bytesRead += readed . bytesRead
54- const dIndex = this . bbuffer . indexOf ( this . delimiter )
55- if ( dIndex === - 1 ) {
56- continue
57- }
58- this . buffer = this . bbuffer . subarray ( 0 , dIndex + 1 ) . toString ( 'utf8' )
59- this . bbuffer = this . bbuffer . subarray ( dIndex + 1 )
60- return this . checkBuffer ( )
61- }
62-
63- if ( this . bbuffer . length > 0 && this . bbuffer . includes ( this . delimiter ) ) {
64- this . buffer = this . bbuffer . toString ( 'utf8' )
65- this . bbuffer = Buffer . from ( '' )
66- return this . checkBuffer ( )
55+ while (
56+ ( readed = await this . fh . read ( cBuffer , 0 , 512 , this . bytesRead ) ) . bytesRead >
57+ 0
58+ ) {
59+ this . bbuffer = Buffer . concat ( [
60+ this . bbuffer ,
61+ cBuffer . subarray ( 0 , readed . bytesRead ) ,
62+ ] )
63+ this . bytesRead += readed . bytesRead
64+ const dIndex = this . bbuffer . indexOf ( this . delimiter )
65+ if ( dIndex === - 1 ) {
66+ continue
6767 }
68+ this . buffer = this . bbuffer . subarray ( 0 , dIndex + 1 ) . toString ( 'utf8' )
69+ this . bbuffer = this . bbuffer . subarray ( dIndex + 1 )
70+ return this . checkBuffer ( )
71+ }
6872
69- return EOF
70- } finally {
71- await fh . close ( )
73+ if ( this . bbuffer . length > 0 && this . bbuffer . includes ( this . delimiter ) ) {
74+ this . buffer = this . bbuffer . toString ( 'utf8' )
75+ this . bbuffer = Buffer . from ( '' )
76+ return this . checkBuffer ( )
7277 }
78+
79+ this . eof = true
80+ await this . fh . close ( )
81+ return EOF
7382 }
7483}
7584
7685function swap ( harr : HeapItem [ ] , a : number , b : number ) {
77- const temp = harr [ a ]
78- harr [ a ] = harr [ b ] !
79- harr [ b ] = temp
86+ ; [ harr [ a ] , harr [ b ] ] = [ harr [ b ] , harr [ a ] ]
8087}
8188
8289function compare ( a : string | typeof EOF , b : string | typeof EOF ) : number {
@@ -86,13 +93,7 @@ function compare(a: string | typeof EOF, b: string | typeof EOF): number {
8693 if ( b === EOF ) {
8794 return - 1
8895 }
89- if ( a < b ) {
90- return - 1
91- }
92- if ( a === b ) {
93- return 0
94- }
95- return 1
96+ return a < b ? - 1 : a > b ? 1 : 0
9697}
9798
9899function heapify ( harr : HeapItem [ ] , i : number , heapSize : number ) {
@@ -189,19 +190,8 @@ async function mergeSortedFiles(
189190 const flen = filesPath . length
190191
191192 if ( flen === 1 ) {
192- await new Promise < void > ( ( resolve , reject ) => {
193- const rs = fs . createReadStream ( filesPath [ 0 ] , 'utf8' )
194- rs . on ( 'open' , ( ) => {
195- rs . pipe ( output )
196- } )
197- rs . on ( 'error' , err => {
198- output . end ( )
199- reject ( err )
200- } )
201- rs . on ( 'end' , ( ) => {
202- resolve ( )
203- } )
204- } )
193+ const rs = fs . createReadStream ( filesPath [ 0 ] , 'utf8' )
194+ await pipeline ( rs , output )
205195 return
206196 }
207197
0 commit comments