@@ -21,14 +21,7 @@ export function withEgressHandler (handler) {
2121 return handler ( req , env , ctx )
2222 }
2323
24- let response
25- try {
26- response = await handler ( req , env , ctx )
27- } catch ( error ) {
28- console . error ( 'Error in egress tracker handler:' , error )
29- throw error
30- }
31-
24+ const response = await handler ( req , env , ctx )
3225 if ( ! response . ok || ! response . body ) {
3326 return response
3427 }
@@ -38,17 +31,22 @@ export function withEgressHandler (handler) {
3831 serviceURL : env . ACCOUNTING_SERVICE_URL
3932 } )
4033
41- const { readable, writable } = createEgressPassThroughStream ( ctx , accounting , dataCid )
42-
43- try {
44- ctx . waitUntil ( response . body . pipeTo ( writable ) )
45- } catch ( error ) {
46- console . error ( 'Error in egress tracker handler:' , error )
47- // Original response in case of an error to avoid breaking the chain and serve the content
48- return response
49- }
34+ const responseBody = response . body . pipeThrough (
35+ createByteCountStream ( ( totalBytesServed ) => {
36+ // Non-blocking call to the accounting service to record egress
37+ if ( totalBytesServed > 0 ) {
38+ ctx . waitUntil (
39+ accounting
40+ . record ( dataCid , totalBytesServed , new Date ( ) . toISOString ( ) )
41+ . catch ( error => {
42+ console . error ( 'Error while recording egress:' , error )
43+ } )
44+ )
45+ }
46+ } )
47+ )
5048
51- return new Response ( readable , {
49+ return new Response ( responseBody , {
5250 status : response . status ,
5351 statusText : response . statusText ,
5452 headers : response . headers
@@ -60,58 +58,39 @@ export function withEgressHandler (handler) {
6058 * Creates a TransformStream to count bytes served to the client.
6159 * It records egress when the stream is finalized without an error.
6260 *
63- * @param {import('@web3-storage/gateway-lib/middleware').Context } ctx - The context object.
64- * @param {AccountingService } accounting - The accounting service instance to record egress.
65- * @param {import('@web3-storage/gateway-lib/handlers').CID } dataCid - The CID of the served content.
66- * @returns {TransformStream } - The created TransformStream.
61+ * @param {(totalBytesServed: number) => void } onClose
62+ * @template {Uint8Array} T
63+ * @returns {TransformStream<T, T> } - The created TransformStream.
6764 */
68- function createEgressPassThroughStream ( ctx , accounting , dataCid ) {
65+ function createByteCountStream ( onClose ) {
6966 let totalBytesServed = 0
7067
7168 return new TransformStream ( {
72- /**
73- * The start function is called when the stream is being initialized.
74- * It resets the total bytes served to 0.
75- */
76- start ( ) {
77- totalBytesServed = 0
78- } ,
7969 /**
8070 * The transform function is called for each chunk of the response body.
8171 * It enqueues the chunk and updates the total bytes served.
8272 * If an error occurs, it signals an error to the controller and logs it.
8373 * The bytes are not counted in case of enqueuing an error.
84- * @param {Uint8Array } chunk
85- * @param {TransformStreamDefaultController } controller
8674 */
8775 async transform ( chunk , controller ) {
8876 try {
8977 controller . enqueue ( chunk )
9078 totalBytesServed += chunk . byteLength
9179 } catch ( error ) {
92- console . error ( 'Error while counting egress bytes:' , error )
80+ console . error ( 'Error while counting bytes:' , error )
9381 controller . error ( error )
9482 }
9583 } ,
9684
9785 /**
9886 * The flush function is called when the stream is being finalized,
9987 * which is when the response is being sent to the client.
100- * So before the response is sent, we record the egress.
101- * It is called only once and it triggers a non-blocking call to the accounting service.
88+ * So before the response is sent, we record the egress using the callback.
10289 * If an error occurs, the egress is not recorded.
103- * NOTE: The flush function is NOT called in case of an stream error.
90+ * NOTE: The flush function is NOT called in case of a stream error.
10491 */
105- async flush ( controller ) {
106- try {
107- // Non-blocking call to the accounting service to record egress
108- if ( totalBytesServed > 0 ) {
109- ctx . waitUntil ( accounting . record ( dataCid , totalBytesServed , new Date ( ) . toISOString ( ) ) )
110- }
111- } catch ( error ) {
112- console . error ( 'Error while recording egress:' , error )
113- controller . error ( error )
114- }
92+ async flush ( ) {
93+ onClose ( totalBytesServed )
11594 }
11695 } )
11796}
0 commit comments