Skip to content

Commit 847829b

Browse files
authored
feat: egress tracker middleware (#120)
### Egress Tracker Middleware **Summary:** This PR introduces an egress tracking middleware for the Freeway project, enabling accurate measurement and recording of egress data for served content. The middleware tracks the bytes sent in each response body and logs them with the accounting service tied to each content ID (CID). **Key Changes:** - **Egress Middleware (`withEgressHandler`)**: - Wraps response handlers to track and count bytes sent to the client. - Controlled by the `FF_EGRESS_TRACKER_ENABLED` feature flag, enabling or disabling egress tracking as needed. It is disabled by default. - **Accounting Service Integration**: - Logs egress data with the accounting service, using either an `ACCOUNTING_SERVICE` from the context or a new instance based on the `ACCOUNTING_SERVICE_URL` environment variable. - Egress data is linked to the CID of the served content, ensuring precise tracking. (The actual accounting service implementation, integrating `w3up-client` for the new `usage/record` capability, will follow in a separate PR.) - **Efficient Byte Counting via `TransformStream`**: - Utilizes a `TransformStream` (`createEgressPassThroughStream`) to passively count bytes in the response body without altering content. - On stream completion, the `flush` method records total egress to the accounting service using `ctx.waitUntil()` for non-blocking calls. **Error Handling**: - Logs errors encountered during data streaming and halts byte counting without interrupting the original response chain. This ensures resilience even in cases of partial or interrupted streams. **Testing**: - Added thorough tests to validate egress recording across scenarios, including complete responses, interrupted streams, and error cases. **Next Steps**: - Integration tests for verifying egress tracking accuracy and accounting service interactions in various streaming conditions (planned for a future PR). - `w3up-client` integration to execute the new `usage/record` capability in subsequent development.
1 parent 4796ed3 commit 847829b

File tree

10 files changed

+596
-23
lines changed

10 files changed

+596
-23
lines changed

src/bindings.d.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,23 @@ import { CID } from '@web3-storage/gateway-lib/handlers'
22
import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts'
33
import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts'
44
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'
5-
5+
import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
6+
import { UnknownLink } from 'multiformats'
67
export interface Environment
78
extends CarBlockEnvironment,
89
RateLimiterEnvironment,
9-
ContentClaimsDagulaEnvironment {
10+
ContentClaimsDagulaEnvironment,
11+
EgressTrackerEnvironment {
1012
VERSION: string
13+
CONTENT_CLAIMS_SERVICE_URL?: string
14+
ACCOUNTING_SERVICE_URL: string
1115
}
1216

1317
export interface AccountingService {
14-
record: (cid: CID, options: GetCIDRequestConfig) => Promise<void>
18+
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
1519
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
1620
}
1721

1822
export interface Accounting {
19-
create: ({ serviceURL }: { serviceURL?: string }) => AccountingService
23+
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
2024
}

src/index.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import {
2222
withCarBlockHandler,
2323
withRateLimit,
2424
withNotFound,
25-
withLocator
25+
withLocator,
26+
withEgressTracker
2627
} from './middleware/index.js'
2728

2829
/**
@@ -57,6 +58,9 @@ export default {
5758
// Rate-limit requests
5859
withRateLimit,
5960

61+
// Track egress bytes
62+
withEgressTracker,
63+
6064
// Fetch data
6165
withCarBlockHandler,
6266
withNotFound,

src/middleware/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export { withRateLimit } from './withRateLimit.js'
55
export { withVersionHeader } from './withVersionHeader.js'
66
export { withNotFound } from './withNotFound.js'
77
export { withLocator } from './withLocator.js'
8+
export { withEgressTracker } from './withEgressTracker.js'
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { Accounting } from '../services/accounting.js'
2+
3+
/**
4+
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
5+
* @import { Environment } from './withEgressTracker.types.js'
6+
* @import { AccountingService } from '../bindings.js'
7+
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
8+
*/
9+
10+
/**
11+
* The egress tracking handler must be enabled after the rate limiting handler,
12+
* and before any handler that serves the response body. It uses the CID of the
13+
* served content to record the egress in the accounting service, and it counts
14+
* the bytes served with a TransformStream to determine the egress amount.
15+
*
16+
* @type {Middleware<EgressTrackerContext, EgressTrackerContext, Environment>}
17+
*/
18+
export function withEgressTracker (handler) {
19+
return async (req, env, ctx) => {
20+
if (env.FF_EGRESS_TRACKER_ENABLED !== 'true') {
21+
return handler(req, env, ctx)
22+
}
23+
24+
const response = await handler(req, env, ctx)
25+
if (!response.ok || !response.body) {
26+
return response
27+
}
28+
29+
const { dataCid } = ctx
30+
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
31+
serviceURL: env.ACCOUNTING_SERVICE_URL
32+
})
33+
34+
const responseBody = response.body.pipeThrough(
35+
createByteCountStream((totalBytesServed) => {
36+
// Non-blocking call to the accounting service to record egress
37+
if (totalBytesServed > 0) {
38+
ctx.waitUntil(
39+
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
40+
)
41+
}
42+
})
43+
)
44+
45+
return new Response(responseBody, {
46+
status: response.status,
47+
statusText: response.statusText,
48+
headers: response.headers
49+
})
50+
}
51+
}
52+
53+
/**
54+
* Creates a TransformStream to count bytes served to the client.
55+
* It records egress when the stream is finalized without an error.
56+
*
57+
* @param {(totalBytesServed: number) => void} onClose
58+
* @template {Uint8Array} T
59+
* @returns {TransformStream<T, T>} - The created TransformStream.
60+
*/
61+
function createByteCountStream (onClose) {
62+
let totalBytesServed = 0
63+
64+
return new TransformStream({
65+
/**
66+
* The transform function is called for each chunk of the response body.
67+
* It enqueues the chunk and updates the total bytes served.
68+
* If an error occurs, it signals an error to the controller and logs it.
69+
* The bytes are not counted in case of enqueuing an error.
70+
*/
71+
async transform (chunk, controller) {
72+
try {
73+
controller.enqueue(chunk)
74+
totalBytesServed += chunk.byteLength
75+
} catch (error) {
76+
console.error('Error while counting bytes:', error)
77+
controller.error(error)
78+
}
79+
},
80+
81+
/**
82+
* The flush function is called when the stream is being finalized,
83+
* which is when the response is being sent to the client.
84+
* So before the response is sent, we record the egress using the callback.
85+
* If an error occurs, the egress is not recorded.
86+
* NOTE: The flush function is NOT called in case of a stream error.
87+
*/
88+
async flush () {
89+
onClose(totalBytesServed)
90+
}
91+
})
92+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
2+
3+
export interface Environment extends MiddlewareEnvironment {
4+
ACCOUNTING_SERVICE_URL: string
5+
FF_EGRESS_TRACKER_ENABLED: string
6+
}

src/middleware/withRateLimit.js

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { Accounting } from '../services/accounting.js'
1212
* RateLimitService,
1313
* RateLimitExceeded
1414
* } from './withRateLimit.types.js'
15+
* @typedef {Context & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
1516
*/
1617

1718
/**
@@ -20,7 +21,7 @@ import { Accounting } from '../services/accounting.js'
2021
* it can be enabled or disabled using the FF_RATE_LIMITER_ENABLED flag.
2122
* Every successful request is recorded in the accounting service.
2223
*
23-
* @type {Middleware<Context, Context, Environment>}
24+
* @type {Middleware<RateLimiterContext, RateLimiterContext, Environment>}
2425
*/
2526
export function withRateLimit (handler) {
2627
return async (req, env, ctx) => {
@@ -33,20 +34,14 @@ export function withRateLimit (handler) {
3334
const isRateLimitExceeded = await rateLimitService.check(dataCid, req)
3435
if (isRateLimitExceeded === RATE_LIMIT_EXCEEDED.YES) {
3536
throw new HttpError('Too Many Requests', { status: 429 })
36-
} else {
37-
const accounting = Accounting.create({
38-
serviceURL: env.ACCOUNTING_SERVICE_URL
39-
})
40-
// NOTE: non-blocking call to the accounting service
41-
ctx.waitUntil(accounting.record(dataCid, req))
42-
return handler(req, env, ctx)
4337
}
38+
return handler(req, env, ctx)
4439
}
4540
}
4641

4742
/**
4843
* @param {Environment} env
49-
* @param {Context} ctx
44+
* @param {RateLimiterContext} ctx
5045
* @returns {RateLimitService}
5146
*/
5247
function create (env, ctx) {
@@ -105,7 +100,7 @@ async function isRateLimited (rateLimitAPI, cid) {
105100
/**
106101
* @param {Environment} env
107102
* @param {string} authToken
108-
* @param {Context} ctx
103+
* @param {RateLimiterContext} ctx
109104
* @returns {Promise<TokenMetadata | null>}
110105
*/
111106
async function getTokenMetadata (env, authToken, ctx) {
@@ -116,9 +111,7 @@ async function getTokenMetadata (env, authToken, ctx) {
116111
return decode(cachedValue)
117112
}
118113

119-
const accounting = Accounting.create({
120-
serviceURL: env.ACCOUNTING_SERVICE_URL
121-
})
114+
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
122115
const tokenMetadata = await accounting.getTokenMetadata(authToken)
123116
if (tokenMetadata) {
124117
// NOTE: non-blocking call to the auth token metadata cache

src/services/accounting.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
*/
44
export const Accounting = {
55
create: ({ serviceURL }) => ({
6-
record: async (cid, options) => {
7-
console.log(`using ${serviceURL} to record a GET for ${cid} with options`, options)
6+
record: async (cid, bytes, servedAt) => {
7+
console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`)
88
},
99

1010
getTokenMetadata: async () => {

test/fixtures/worker-fixture.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const __dirname = path.dirname(__filename)
1212
*/
1313
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'
1414

15+
const DEBUG = process.env.DEBUG === 'true'
16+
1517
/**
1618
* Worker information object
1719
* @typedef {Object} WorkerInfo
@@ -41,7 +43,7 @@ export const mochaGlobalSetup = async () => {
4143
)
4244
console.log(`Output: ${await workerInfo.getOutput()}`)
4345
console.log('WorkerInfo:', workerInfo)
44-
console.log('Test worker started!')
46+
console.log(`Test worker started! ENV: ${wranglerEnv}, DEBUG: ${DEBUG}`)
4547
} catch (error) {
4648
console.error('Failed to start test worker:', error)
4749
throw error
@@ -59,7 +61,9 @@ export const mochaGlobalTeardown = async () => {
5961
try {
6062
const { stop } = workerInfo
6163
await stop?.()
62-
// console.log('getOutput', getOutput()) // uncomment for debugging
64+
if (DEBUG) {
65+
console.log('getOutput', await workerInfo.getOutput())
66+
}
6367
console.log('Test worker stopped!')
6468
} catch (error) {
6569
console.error('Failed to stop test worker:', error)

0 commit comments

Comments
 (0)