Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ import { CID } from '@web3-storage/gateway-lib/handlers'
import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts'
import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'

import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { UnknownLink } from 'multiformats'
export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment {
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
ACCOUNTING_SERVICE_URL: string
}

export interface AccountingService {
record: (cid: CID, options: GetCIDRequestConfig) => Promise<void>
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
}

export interface Accounting {
create: ({ serviceURL }: { serviceURL?: string }) => AccountingService
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
}
6 changes: 5 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
withCarBlockHandler,
withRateLimit,
withNotFound,
withLocator
withLocator,
withEgressTracker
} from './middleware/index.js'

/**
Expand Down Expand Up @@ -57,6 +58,9 @@ export default {
// Rate-limit requests
withRateLimit,

// Track egress bytes
withEgressTracker,

// Fetch data
withCarBlockHandler,
withNotFound,
Expand Down
1 change: 1 addition & 0 deletions src/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export { withRateLimit } from './withRateLimit.js'
export { withVersionHeader } from './withVersionHeader.js'
export { withNotFound } from './withNotFound.js'
export { withLocator } from './withLocator.js'
export { withEgressTracker } from './withEgressTracker.js'
92 changes: 92 additions & 0 deletions src/middleware/withEgressTracker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Accounting } from '../services/accounting.js'

/**
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
* @import { Environment } from './withEgressTracker.types.js'
* @import { AccountingService } from '../bindings.js'
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
*/

/**
* The egress tracking handler must be enabled after the rate limiting handler,
* and before any handler that serves the response body. It uses the CID of the
* served content to record the egress in the accounting service, and it counts
* the bytes served with a TransformStream to determine the egress amount.
*
* @type {Middleware<EgressTrackerContext, EgressTrackerContext, Environment>}
*/
export function withEgressTracker (handler) {
return async (req, env, ctx) => {
if (env.FF_EGRESS_TRACKER_ENABLED !== 'true') {
return handler(req, env, ctx)
}

const response = await handler(req, env, ctx)
if (!response.ok || !response.body) {
return response
}

const { dataCid } = ctx
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const responseBody = response.body.pipeThrough(
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
)
}
})
)

return new Response(responseBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers
})
}
}

/**
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
*
* @param {(totalBytesServed: number) => void} onClose
* @template {Uint8Array} T
* @returns {TransformStream<T, T>} - The created TransformStream.
*/
function createByteCountStream (onClose) {
let totalBytesServed = 0

return new TransformStream({
/**
* The transform function is called for each chunk of the response body.
* It enqueues the chunk and updates the total bytes served.
* If an error occurs, it signals an error to the controller and logs it.
* The bytes are not counted in case of enqueuing an error.
*/
async transform (chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
} catch (error) {
console.error('Error while counting bytes:', error)
controller.error(error)
}
},

/**
* The flush function is called when the stream is being finalized,
* which is when the response is being sent to the client.
* So before the response is sent, we record the egress using the callback.
* If an error occurs, the egress is not recorded.
* NOTE: The flush function is NOT called in case of a stream error.
*/
async flush () {
onClose(totalBytesServed)
}
})
}
6 changes: 6 additions & 0 deletions src/middleware/withEgressTracker.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
FF_EGRESS_TRACKER_ENABLED: string
}
19 changes: 6 additions & 13 deletions src/middleware/withRateLimit.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Accounting } from '../services/accounting.js'
* RateLimitService,
* RateLimitExceeded
* } from './withRateLimit.types.js'
* @typedef {Context & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
*/

/**
Expand All @@ -20,7 +21,7 @@ import { Accounting } from '../services/accounting.js'
* it can be enabled or disabled using the FF_RATE_LIMITER_ENABLED flag.
* Every successful request is recorded in the accounting service.
*
* @type {Middleware<Context, Context, Environment>}
* @type {Middleware<RateLimiterContext, RateLimiterContext, Environment>}
*/
export function withRateLimit (handler) {
return async (req, env, ctx) => {
Expand All @@ -33,20 +34,14 @@ export function withRateLimit (handler) {
const isRateLimitExceeded = await rateLimitService.check(dataCid, req)
if (isRateLimitExceeded === RATE_LIMIT_EXCEEDED.YES) {
throw new HttpError('Too Many Requests', { status: 429 })
} else {
const accounting = Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})
// NOTE: non-blocking call to the accounting service
ctx.waitUntil(accounting.record(dataCid, req))
return handler(req, env, ctx)
}
return handler(req, env, ctx)
}
}

/**
* @param {Environment} env
* @param {Context} ctx
* @param {RateLimiterContext} ctx
* @returns {RateLimitService}
*/
function create (env, ctx) {
Expand Down Expand Up @@ -105,7 +100,7 @@ async function isRateLimited (rateLimitAPI, cid) {
/**
* @param {Environment} env
* @param {string} authToken
* @param {Context} ctx
* @param {RateLimiterContext} ctx
* @returns {Promise<TokenMetadata | null>}
*/
async function getTokenMetadata (env, authToken, ctx) {
Expand All @@ -116,9 +111,7 @@ async function getTokenMetadata (env, authToken, ctx) {
return decode(cachedValue)
}

const accounting = Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
const tokenMetadata = await accounting.getTokenMetadata(authToken)
if (tokenMetadata) {
// NOTE: non-blocking call to the auth token metadata cache
Expand Down
4 changes: 2 additions & 2 deletions src/services/accounting.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
*/
export const Accounting = {
create: ({ serviceURL }) => ({
record: async (cid, options) => {
console.log(`using ${serviceURL} to record a GET for ${cid} with options`, options)
record: async (cid, bytes, servedAt) => {
console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`)
},

getTokenMetadata: async () => {
Expand Down
8 changes: 6 additions & 2 deletions test/fixtures/worker-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const __dirname = path.dirname(__filename)
*/
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'

const DEBUG = process.env.DEBUG === 'true'

/**
* Worker information object
* @typedef {Object} WorkerInfo
Expand Down Expand Up @@ -41,7 +43,7 @@ export const mochaGlobalSetup = async () => {
)
console.log(`Output: ${await workerInfo.getOutput()}`)
console.log('WorkerInfo:', workerInfo)
console.log('Test worker started!')
console.log(`Test worker started! ENV: ${wranglerEnv}, DEBUG: ${DEBUG}`)
} catch (error) {
console.error('Failed to start test worker:', error)
throw error
Expand All @@ -59,7 +61,9 @@ export const mochaGlobalTeardown = async () => {
try {
const { stop } = workerInfo
await stop?.()
// console.log('getOutput', getOutput()) // uncomment for debugging
if (DEBUG) {
console.log('getOutput', await workerInfo.getOutput())
}
console.log('Test worker stopped!')
} catch (error) {
console.error('Failed to stop test worker:', error)
Expand Down
Loading