@@ -269,7 +269,10 @@ class Response extends Writable {
269269 // Wait for this chunk to be written to the client
270270 let drained = false
271271 return this . drain ( ( ) => {
272- if ( this . completed ) return true
272+ if ( this . completed ) {
273+ callback ( )
274+ return true
275+ }
273276 // Call the callback once the chunk is drained
274277 if ( ! drained ) {
275278 drained = true
@@ -382,72 +385,86 @@ class Response extends Writable {
382385 }
383386
384387 /**
388+ * Writes a given chunk to the client over uWS with the appropriate writing method.
389+ * Note! This method uses `uWS.tryEnd()` when a `total_size` is provided.
390+ * Note! This method uses `uWS.write()` when a `total_size` is not provided.
391+ *
385392 * @private
393+ * @param {Buffer } chunk
394+ * @param {Number= } total_size
395+ * @returns {Array<Boolean> } [sent, finished]
396+ */
397+ _uws_write_chunk ( chunk , totalSize ) {
398+ // The specific uWS method to stream the chunk to the client differs depending on if we have a total_size or not
399+ if ( totalSize ) {
400+ // Attempt to stream the current chunk using uWS.tryEnd with a total size
401+ return this . raw_response . tryEnd ( chunk , totalSize )
402+ } else {
403+ // Attempt to stream the current chunk uWS.write()
404+ const sent = this . raw_response . write ( chunk )
405+
406+ // Since we are streaming without a total size, we are not finished
407+ return [ sent , false ]
408+ }
409+ }
410+
411+ /**
386412 * Streams individual chunk from a stream.
387413 * Delivers with chunked transfer without content-length header when no total_size is specified.
388414 * Delivers with backpressure handling and content-length header when a total_size is specified.
389415 *
416+ * @private
390417 * @param {Readable } stream
391418 * @param {Buffer } chunk
392- * @param {Number= } totalSize
419+ * @param {Number= } total_size
393420 */
394421 _stream_chunk ( stream , chunk , totalSize ) {
395422 // Ensure the client is still connected and request is pending
396423 if ( ! this . completed ) {
397- // Attempt to stream the chunk using appropriate uWS.Response chunk serving method
398- // This will depend on whether a total_size is specified or not
399- let sent , finished
400- const lastOffset = this . raw_response . getWriteOffset ( )
401- if ( totalSize ) {
402- // Attempt to stream the current chunk using uWS.tryEnd with a total size
403- const [ ok , done ] = this . raw_response . tryEnd ( chunk , totalSize )
404- sent = ok
405- finished = done
406- } else {
407- // Attempt to stream the current chunk uWS.write()
408- sent = this . raw_response . write ( chunk )
409-
410- // Since we are streaming without a total size, we are not finished
411- finished = false
412- }
413-
424+ // Write the chunk to the client using the appropriate uWS chunk writing method
425+ const [ sent , finished ] = this . _uws_write_chunk ( chunk , totalSize )
414426 if ( finished ) {
415- // If streaming has finished, we can destroy the readable stream just to be safe
427+ // Destroy the readable stream as no more writing will occur
416428 if ( ! stream . destroyed ) stream . destroy ( )
417429 } else if ( ! sent ) {
418- // Pause the readable stream to prevent any further data from being read
419- stream . pause ( )
430+ // Remember the initial write offset for future backpressure sliced chunks
431+ const writeOffset = this . write_offset
432+ // Pause the readable stream to prevent any further data from being read as chunk was not fully sent
433+ if ( stream . readable && ! stream . isPaused ( ) ) stream . pause ( )
420434
421- this . raw_response . stream_lastOffset = lastOffset
422- this . raw_response . stream_chunk = chunk
423-
424- // Bind a drain handler which will resume the once the backpressure is cleared
435+ // Bind a drain handler to relieve backpressure
436+ // Note! This callback may be called as many times as neccessary to send a full chunk when using the tryEnd method
425437 this . drain ( ( offset ) => {
438+ // Check if the response has been completed / connection has been closed
426439 if ( this . completed ) {
440+ // Destroy the readable stream as no more writing will occur
427441 if ( ! stream . destroyed ) stream . destroy ( )
428442
443+ // Return true to signify this was a no-op
429444 return true
430445 }
431446
447+ // If we have a total size then we need to serve sliced chunks as uWS does not buffer under the hood
432448 if ( totalSize ) {
433- const [ ok , done ] = this . raw_response . tryEnd ( this . raw_response . stream_chunk . slice ( offset - this . raw_response . stream_lastOffset ) , totalSize )
434- if ( done ) {
449+ // Slice the chunk to the correct offset and send it to the client
450+ const [ flushed , ended ] = this . _uws_write_chunk ( chunk . slice ( offset - writeOffset ) , totalSize )
451+ if ( ended ) {
452+ // Destroy the readable stream as no more writing will occur
435453 if ( ! stream . destroyed ) stream . destroy ( )
436- } else if ( ok ) {
437- // We sent a chunk and it was not the last one, so let's resume reading.
438- // Timeout is still disabled, so we can spend any amount of time waiting
439- // for more chunks to send.
454+ } else if ( flushed ) {
455+ // Resume the readable stream to allow more data to be read
440456 if ( stream . readable && stream . isPaused ( ) ) stream . resume ( )
441457 }
442458
443- // We always have to return true/false in onWritable.
444- // If you did not send anything, return true for success.
445- return ok
446- } else {
447- if ( stream . readable && stream . isPaused ( ) ) stream . resume ( )
448-
449- return true
459+ // Return the flushed boolean as that signifies whether this specific chunk was fully sent
460+ return flushed
450461 }
462+
463+ // Resume the readable stream to allow more data to be read
464+ if ( stream . readable && stream . isPaused ( ) ) stream . resume ( )
465+
466+ // Return true to signify this was a no-op
467+ return true
451468 } )
452469 }
453470 }
@@ -469,9 +486,7 @@ class Response extends Writable {
469486 if ( ! this . completed ) {
470487 this . chunked = true
471488 // Bind an 'abort' event handler which will destroy the consumed stream if request is aborted
472- this . on ( 'abort' , ( ) => {
473- if ( ! readable . destroyed ) readable . destroy ( )
474- } )
489+ this . once ( 'abort' , ( ) => ! readable . destroyed && readable . destroy ( ) )
475490
476491 // Initiate response as we will begin writing body chunks
477492 this . _initiate_response ( )
@@ -605,6 +620,16 @@ class Response extends Writable {
605620 return this . completed
606621 }
607622
623+ /**
624+ * Returns the current response body content write offset in bytes.
625+ * Use in conjunction with the drain() offset handler to retry writing failed chunks.
626+ * Note! This method will return `-1` after the Response has been completed and the connection has been closed.
627+ * @returns {Number }
628+ */
629+ get write_offset ( ) {
630+ return this . completed ? - 1 : this . raw_response . getWriteOffset ( )
631+ }
632+
608633 /**
609634 * Returns a "Server-Sent Events" connection object to allow for SSE functionality.
610635 * This property will only be available for GET requests as per the SSE specification.
0 commit comments