Skip to content

Commit 28dc194

Browse files
authored
feat: publish egress events to cf queue (#191)
## Overview This PR refactors the egress tracking system to use asynchronous batch processing instead of synchronous invocation execution. The freeway gateway now serializes egress tracking invocations and sends them to a Cloudflare Queue, where the [egress-consumer](https://github.com/storacha/egress-consumer) processes them in batches before forwarding to the upload-api in a CAR with multiple invocations. ## Key Changes ### Freeway Gateway (Producer) Modified the egress tracking middleware to serialize invocations instead of executing them immediately: - Removed the direct connection to upload-api from withEgressClient - Invocations are now built, converted to IPLD view using buildIPLDView(), and archived to bytes - Serialized invocations are sent to EGRESS_QUEUE as message payloads - Egress Tracking controlled using feature flags: `FF_EGRESS_TRACKER_ENABLED` & `FF_EGRESS_TRACKER_ROLLOUT_PERCENTAGE` ## Flow 1. Gateway serves content and creates an egress record invocation 2. Invocation is archived to bytes and queued 3. Cloudflare Queue batches messages (configured batch size: 50, timeout: 30s) 4. Egress-consumer receives batch, extracts invocations 5. Consumer builds CAR message with all invocations 6. Single POST to upload-api processes entire batch 7. Upload-api executes each invocation and records egress in DynamoDB+Stripe ## Additional Changes - Added missing `dev` packages - Fixed type issues - Fixed build issues - Fixed tests - Updated `package-lock.json`
1 parent 7483d3d commit 28dc194

File tree

11 files changed

+3250
-1743
lines changed

11 files changed

+3250
-1743
lines changed

package-lock.json

Lines changed: 2923 additions & 1542 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
"main": "src/index.js",
2323
"types": "dist/src/index.d.ts",
2424
"scripts": {
25-
"build": "esbuild --bundle src/index.js --format=esm --external:node:buffer --external:node:events --external:node:async_hooks --sourcemap --minify --outfile=dist/worker.mjs && npm run build:tsc",
26-
"build:debug": "esbuild --bundle src/index.js --format=esm --external:node:buffer --external:node:events --external:node:async_hooks --outfile=dist/worker.mjs",
25+
"build": "esbuild --bundle src/index.js --format=esm --external:node:buffer --external:node:events --external:node:async_hooks --external:cloudflare:workers --sourcemap --minify --outfile=dist/worker.mjs && npm run build:tsc",
26+
"build:debug": "esbuild --bundle src/index.js --format=esm --external:node:buffer --external:node:events --external:node:async_hooks --external:cloudflare:workers --outfile=dist/worker.mjs",
2727
"build:tsc": "tsc --build",
2828
"dev": "npm run build:debug && miniflare dist/worker.mjs --watch --debug -m --r2-persist --global-async-io --global-timers",
2929
"lint": "standard",
@@ -36,12 +36,13 @@
3636
"test:unit:only": "npm run build:debug && mocha --experimental-vm-modules"
3737
},
3838
"dependencies": {
39+
"@ipld/dag-pb": "^4.1.5",
3940
"@microlabs/otel-cf-workers": "^1.0.0-rc.48",
4041
"@opentelemetry/api": "^1.9.0",
4142
"@opentelemetry/sdk-trace-base": "^1.27.0",
4243
"@storacha/capabilities": "^1.10.0",
4344
"@storacha/indexing-service-client": "^2.6.6",
44-
"@types/node": "^24.9.1",
45+
"@types/node": "^24.10.1",
4546
"@ucanto/client": "^9.0.2",
4647
"@ucanto/core": "^10.4.5",
4748
"@ucanto/interface": "^11.0.1",
@@ -57,12 +58,16 @@
5758
"multiformats": "^13.0.1"
5859
},
5960
"devDependencies": {
61+
"@storacha/blob-index": "^1.2.4",
6062
"@storacha/cli": "^1.6.2",
6163
"@storacha/client": "^1.8.2",
64+
"@storacha/upload-client": "^1.3.6",
6265
"@types/chai": "^5.0.0",
6366
"@types/mocha": "^10.0.9",
67+
"@types/node": "^24.10.1",
6468
"@types/node-fetch": "^2.6.11",
6569
"@types/sinon": "^17.0.3",
70+
"@web3-storage/content-claims": "^5.2.4",
6671
"@web3-storage/public-bucket": "^1.4.0",
6772
"carstream": "^2.2.0",
6873
"chai": "^5.1.1",
@@ -71,6 +76,7 @@
7176
"miniflare": "^4.20251011.1",
7277
"mocha": "^10.7.3",
7378
"multipart-byte-range": "^3.0.1",
79+
"sade": "^1.8.1",
7480
"sinon": "^19.0.2",
7581
"standard": "^17.1.2",
7682
"tree-kill": "^1.2.2",
@@ -81,5 +87,6 @@
8187
"ignore": [
8288
"*.ts"
8389
]
84-
}
90+
},
91+
"packageManager": "pnpm@10.14.0+sha512.ad27a79641b49c3e481a16a805baa71817a04bbe06a38d17e60e2eaee83f6a146c6a688125f5792e48dd5ba30e7da52a5cda4c3992b9ccf333f9ce223af84748"
8592
}

src/index.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ function config (env, _trigger) {
125125
}
126126
}
127127
return {
128+
// @ts-expect-error - NoopSpanProcessor extends SpanProcessor, but ts doesn't recognize it
128129
spanProcessors: new NoopSpanProcessor(),
129130
service: { name: 'freeway' }
130131
}
@@ -154,8 +155,13 @@ async function initializeHandler (env) {
154155
return async (request, env, ctx) => {
155156
const response = await finalHandler(request, env, ctx)
156157
const cacheControl = response.headers.get('Cache-Control') ?? ''
157-
response.headers.set('Cache-Control', cacheControl ? `${cacheControl}, no-transform` : 'no-transform')
158-
return response
158+
const newHeaders = new Headers(response.headers)
159+
newHeaders.set('Cache-Control', cacheControl ? `${cacheControl}, no-transform` : 'no-transform')
160+
return new Response(response.body, {
161+
status: response.status,
162+
statusText: response.statusText,
163+
headers: newHeaders
164+
})
159165
}
160166
}
161167

src/middleware/withAuthorizedSpace.js

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,52 @@
11
import { Verifier } from '@ucanto/principal'
2-
import { ok, access, Unauthorized } from '@ucanto/validator'
2+
import { ok, access, fail, Unauthorized } from '@ucanto/validator'
33
import { HttpError } from '@web3-storage/gateway-lib/util'
44
import * as serve from '../capabilities/serve.js'
55
import { SpaceDID } from '@storacha/capabilities/utils'
66

7+
/**
8+
* Extracts a SpaceDID string from various space object formats.
9+
* Handles string DIDs, objects with .did() method, and Uint8Arrays.
10+
*
11+
* @param {any} space - The space object to extract DID from
12+
* @returns {import('@storacha/capabilities/types').SpaceDID | undefined}
13+
*/
14+
function extractSpaceDID (space) {
15+
if (!space) return undefined
16+
17+
try {
18+
// Already a string DID
19+
if (typeof space === 'string' && space.startsWith('did:')) {
20+
return /** @type {import('@storacha/capabilities/types').SpaceDID} */ (space)
21+
}
22+
23+
// Object with .did() method (most common case from indexing service)
24+
if (typeof space === 'object' && typeof /** @type {any} */ (space).did === 'function') {
25+
return /** @type {import('@storacha/capabilities/types').SpaceDID} */ (/** @type {any} */ (space).did())
26+
}
27+
28+
// Uint8Array (fallback case)
29+
if (ArrayBuffer.isView(space)) {
30+
const spaceDID = SpaceDID.from(space)
31+
return /** @type {import('@storacha/capabilities/types').SpaceDID} */ (spaceDID.toString())
32+
}
33+
34+
// Last resort: try String() conversion
35+
const spaceStr = String(space)
36+
if (spaceStr.startsWith('did:')) {
37+
return /** @type {import('@storacha/capabilities/types').SpaceDID} */ (spaceStr)
38+
}
39+
40+
return undefined
41+
} catch (error) {
42+
// Log error in debug mode only
43+
if (process.env.DEBUG) {
44+
console.warn('Failed to extract space DID:', error, 'Raw space:', space)
45+
}
46+
return undefined
47+
}
48+
}
49+
750
/**
851
* @import * as Ucanto from '@ucanto/interface'
952
* @import { IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
@@ -43,51 +86,64 @@ export function withAuthorizedSpace (handler) {
4386
// Legacy behavior: Site results which have no Space attached are from
4487
// before we started authorizing serving content explicitly. For these, we
4588
// always serve the content, but only if the request has no authorization
46-
// token.
89+
// token AND there are no sites with space information available.
90+
const sitesWithSpace = locRes.ok.site.filter((site) => site.space !== undefined)
91+
const sitesWithoutSpace = locRes.ok.site.filter((site) => site.space === undefined)
4792
const shouldServeLegacy =
48-
locRes.ok.site.some((site) => site.space === undefined) &&
93+
sitesWithSpace.length === 0 &&
94+
sitesWithoutSpace.length > 0 &&
4995
ctx.authToken === null
5096

5197
if (shouldServeLegacy) {
5298
return handler(request, env, ctx)
5399
}
54100

55101
// These Spaces all have the content we're to serve, if we're allowed to.
56-
const spaces = locRes.ok.site
57-
.map((site) => site.space)
58-
.filter((s) => s !== undefined)
102+
// Extract space DIDs from sites with space information
103+
const spaces = sitesWithSpace
104+
.map((site) => extractSpaceDID(site.space))
105+
.filter((space) => space !== undefined)
59106

60107
try {
61108
// First space to successfully authorize is the one we'll use.
62109
const { space: selectedSpace, delegationProofs } = await Promise.any(
63110
spaces.map(async (space) => {
64-
const result = await authorize(SpaceDID.from(space), ctx)
65-
if (result.error) throw result.error
111+
const result = await authorize(SpaceDID.from(space.toString()), ctx)
112+
if (result.error) {
113+
throw result.error
114+
}
66115
return result.ok
67116
})
68117
)
69118
return handler(request, env, {
70119
...ctx,
71-
space: SpaceDID.from(selectedSpace),
120+
space: SpaceDID.from(selectedSpace.toString()),
72121
delegationProofs,
73122
locator: locator.scopeToSpaces([selectedSpace])
74123
})
75124
} catch (error) {
76-
// If all Spaces failed to authorize, throw the first error.
77-
if (
78-
error instanceof AggregateError &&
79-
error.errors.every((e) => e instanceof Unauthorized)
80-
) {
81-
if (env.DEBUG === 'true') {
82-
console.log(
83-
[
84-
'Authorization Failures:',
85-
...error.errors.map((e) => e.message)
86-
].join('\n\n')
87-
)
88-
}
125+
// If all Spaces failed to authorize, return 404 (security through obscurity)
126+
if (error instanceof AggregateError) {
127+
// Check if all errors are authorization failures (not storage errors)
128+
const isAuthFailure = error.errors.every((e) =>
129+
e instanceof Unauthorized ||
130+
(e.message && e.message.includes('not authorized to serve'))
131+
)
89132

90-
throw new HttpError('Not Found', { status: 404, cause: error })
133+
if (isAuthFailure) {
134+
if (env.DEBUG === 'true') {
135+
console.log(
136+
[
137+
'Authorization Failures:',
138+
...error.errors.map((e) => e.message)
139+
].join('\n\n')
140+
)
141+
}
142+
// Don't reveal whether content exists in unauthorized spaces
143+
throw new HttpError('Not Found', { status: 404, cause: error })
144+
}
145+
// For storage or other errors, throw the AggregateError as-is
146+
throw error
91147
} else {
92148
throw error
93149
}
@@ -107,8 +163,17 @@ export function withAuthorizedSpace (handler) {
107163
const authorize = async (space, ctx) => {
108164
// Look up delegations that might authorize us to serve the content.
109165
const relevantDelegationsResult = await ctx.delegationsStorage.find(space)
110-
if (relevantDelegationsResult.error) return relevantDelegationsResult
166+
if (relevantDelegationsResult.error) {
167+
return relevantDelegationsResult
168+
}
169+
111170
const delegationProofs = relevantDelegationsResult.ok
171+
172+
// If no delegations found, the server is not authorized to serve this content
173+
if (!delegationProofs || delegationProofs.length === 0) {
174+
return fail('The gateway is not authorized to serve this content.')
175+
}
176+
112177
// Create an invocation of the serve capability.
113178
const invocation = await serve.transportHttp
114179
.invoke({
@@ -129,6 +194,7 @@ const authorize = async (space, ctx) => {
129194
principal: Verifier,
130195
validateAuthorization: () => ok({})
131196
})
197+
132198
if (accessResult.error) {
133199
return accessResult
134200
}

src/middleware/withDelegationsStorage.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ function createStorage (env) {
8888
try {
8989
await env.CONTENT_SERVE_DELEGATIONS_STORE.put(
9090
`${space}:${delegation.cid.toString()}`,
91-
value.ok.buffer,
91+
/** @type {ArrayBuffer} */ (value.ok.buffer),
9292
options
9393
)
9494
return ok({})

src/middleware/withEgressTracker.js

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
* @typedef {import('./withEgressTracker.types.js').Context} EgressTrackerContext
55
*/
66

7+
import { Space } from '@storacha/capabilities'
8+
import { SpaceDID } from '@storacha/capabilities/utils'
9+
import { DID } from '@ucanto/core'
10+
711
/**
812
* The egress tracking handler must be enabled after the rate limiting, authorized space,
913
* and egress client handlers, and before any handler that serves the response body.
@@ -18,14 +22,29 @@ export function withEgressTracker (handler) {
1822
return handler(req, env, ctx)
1923
}
2024

25+
// Check rollout percentage for gradual deployment
26+
const rolloutPercentage = parseInt(env.FF_EGRESS_TRACKER_ROLLOUT_PERCENTAGE || '100')
27+
const shouldTrack = Math.random() * 100 < rolloutPercentage
28+
if (!shouldTrack) {
29+
return handler(req, env, ctx)
30+
}
31+
2132
// If the space is not defined, it is a legacy request and we can't track egress
2233
const space = ctx.space
2334
if (!space) {
35+
console.log('Egress tracking skipped: no space context available (legacy request)')
2436
return handler(req, env, ctx)
2537
}
38+
console.log('Egress tracking enabled for space:', space)
2639

27-
if (!ctx.egressClient) {
28-
console.error('EgressClient is not defined')
40+
// Check if Cloudflare Queue is available for egress tracking
41+
if (!env.EGRESS_QUEUE) {
42+
console.error('EGRESS_QUEUE is not defined')
43+
return handler(req, env, ctx)
44+
}
45+
46+
if (!env.UPLOAD_SERVICE_DID) {
47+
console.error('UPLOAD_SERVICE_DID is not defined')
2948
return handler(req, env, ctx)
3049
}
3150

@@ -35,17 +54,44 @@ export function withEgressTracker (handler) {
3554
}
3655

3756
const responseBody = response.body.pipeThrough(
38-
createByteCountStream((totalBytesServed) => {
57+
createByteCountStream(async (totalBytesServed) => {
3958
if (totalBytesServed > 0) {
40-
// Non-blocking call to the accounting service to record egress
41-
ctx.waitUntil(
42-
ctx.egressClient.record(
43-
/** @type {import('@ucanto/principal/ed25519').DIDKey} */(space),
44-
ctx.dataCid,
45-
totalBytesServed,
46-
new Date()
59+
try {
60+
// Create UCAN invocation for egress record
61+
const invocation = Space.egressRecord.invoke({
62+
issuer: ctx.gatewayIdentity,
63+
audience: DID.parse(env.UPLOAD_SERVICE_DID),
64+
with: SpaceDID.from(space),
65+
nb: {
66+
resource: ctx.dataCid,
67+
bytes: totalBytesServed,
68+
servedAt: new Date().getTime()
69+
},
70+
expiration: Infinity,
71+
nonce: Date.now().toString(),
72+
proofs: ctx.delegationProofs
73+
})
74+
75+
// Serialize and send to Cloudflare Queue
76+
const delegation = await invocation.delegate()
77+
const archiveResult = await delegation.archive()
78+
if (archiveResult.error) {
79+
console.error('Failed to serialize egress invocation:', archiveResult.error)
80+
return
81+
}
82+
const serializedInvocation = archiveResult.ok
83+
84+
// Non-blocking call to queue the invocation
85+
ctx.waitUntil(
86+
env.EGRESS_QUEUE.send({
87+
messageId: delegation.cid,
88+
invocation: serializedInvocation,
89+
timestamp: Date.now()
90+
})
4791
)
48-
)
92+
} catch (error) {
93+
console.error('Failed to create or queue egress invocation:', error)
94+
}
4995
}
5096
})
5197
)
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
2-
import { EgressClientContext } from './withEgressClient.types.js'
3-
import { SpaceContext } from './withAuthorizedSpace.types.js'
2+
import { SpaceContext, DelegationProofsContext } from './withAuthorizedSpace.types.js'
3+
import { GatewayIdentityContext } from './withGatewayIdentity.types.js'
44

55
export interface Environment extends MiddlewareEnvironment {
66
FF_EGRESS_TRACKER_ENABLED: string
7+
FF_EGRESS_TRACKER_ROLLOUT_PERCENTAGE?: string
8+
EGRESS_QUEUE: Queue
9+
UPLOAD_SERVICE_DID: string
710
}
811

9-
export interface Context extends IpfsUrlContext, SpaceContext, EgressClientContext {
12+
export interface Context extends IpfsUrlContext, SpaceContext, GatewayIdentityContext, DelegationProofsContext {
1013
}

src/middleware/withUcanInvocationHandler.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export function withUcanInvocationHandler (handler) {
3030
headers: Object.fromEntries(request.headers)
3131
})
3232

33+
// @ts-expect-error - ByteView is compatible with BodyInit but TypeScript doesn't recognize it
3334
return new Response(body, { headers, status: status ?? 200 })
3435
}
3536
}

test/miniflare/freeway.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { mockBucketService } from '../helpers/bucket.js'
2424
import { fromShardArchives } from '@storacha/blob-index/util'
2525
import { CAR_CODE } from '../../src/constants.js'
2626
import http from 'node:http'
27-
/** @import { Block, Position } from 'carstream' */
27+
/** @import { Block, Position } from 'carstream/api' */
2828

2929
/**
3030
* @param {{ arrayBuffer: () => Promise<ArrayBuffer> }} a

0 commit comments

Comments
 (0)