From 902d69c1405d77900ba1148385bb4f003e860c78 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Wed, 30 Oct 2024 11:11:13 -0300 Subject: [PATCH 1/2] refactor: accounting service --- src/bindings.d.ts | 12 +--- src/middleware/withAccountingService.js | 37 ++++++++++++ src/middleware/withAccountingService.types.ts | 19 ++++++ src/middleware/withEgressTracker.js | 15 ++--- src/middleware/withEgressTracker.types.ts | 10 +++- src/middleware/withRateLimit.js | 23 ++++--- src/middleware/withRateLimit.types.ts | 6 -- src/services/accounting.js | 16 ----- .../unit/middleware/withEgressTracker.spec.js | 60 +++++++++++-------- test/unit/middleware/withRateLimit.spec.js | 1 - wrangler.toml | 1 - 11 files changed, 121 insertions(+), 79 deletions(-) create mode 100644 src/middleware/withAccountingService.js create mode 100644 src/middleware/withAccountingService.types.ts delete mode 100644 src/services/accounting.js diff --git a/src/bindings.d.ts b/src/bindings.d.ts index 929045e..29789a2 100644 --- a/src/bindings.d.ts +++ b/src/bindings.d.ts @@ -4,6 +4,8 @@ import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHan import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts' import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts' import { UnknownLink } from 'multiformats' +import { DIDKey } from '@ucanto/principal/ed25519' + export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment, @@ -11,14 +13,4 @@ export interface Environment EgressTrackerEnvironment { VERSION: string CONTENT_CLAIMS_SERVICE_URL?: string - ACCOUNTING_SERVICE_URL: string -} - -export interface AccountingService { - record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise - getTokenMetadata: (token: string) => Promise -} - -export interface Accounting { - create: ({ serviceURL }: { serviceURL: string }) => AccountingService } diff --git a/src/middleware/withAccountingService.js b/src/middleware/withAccountingService.js new file mode 100644 index 0000000..dd5d78b --- /dev/null +++ b/src/middleware/withAccountingService.js @@ -0,0 +1,37 @@ +/** + * @import { Middleware } from '@web3-storage/gateway-lib' + * @typedef {import('./withAccountingService.types.ts').AccountingServiceContext} AccountingServiceContext + * @typedef {import('./withAccountingService.types.ts').Environment} Environment + */ + +/** + * The accounting service handler exposes the method `record` to record the egress bytes for a given SpaceDID, Content CID, and servedAt timestamp. + * + * @type {Middleware} + */ +export function withAccountingService (handler) { + return async (req, env, ctx) => { + const accountingService = create(env, ctx) + + return handler(req, env, { ...ctx, accountingService }) + } +} + +/** + * @param {Environment} env + * @param {AccountingServiceContext} ctx + * @returns {import('./withAccountingService.types.ts').AccountingService} + */ +function create (env, ctx) { + return { + /** + * @param {import('@ucanto/principal/ed25519').DIDKey} space - The Space DID where the content was served + * @param {import('@ucanto/principal/ed25519').UnknownLink} resource - The link to the resource that was served + * @param {number} bytes - The number of bytes served + * @param {string} servedAt - The timestamp of when the content was served + */ + record: async (space, resource, bytes, servedAt) => { + console.log(`Record egress: ${space}, ${resource}, ${bytes}, ${servedAt}`) + } + } +} diff --git a/src/middleware/withAccountingService.types.ts b/src/middleware/withAccountingService.types.ts new file mode 100644 index 0000000..88abbc9 --- /dev/null +++ b/src/middleware/withAccountingService.types.ts @@ -0,0 +1,19 @@ +import { Environment as MiddlewareEnvironment, Context as MiddlewareContext } from '@web3-storage/gateway-lib' +import { DIDKey, UnknownLink } from '@ucanto/principal/ed25519' + +export interface Environment extends MiddlewareEnvironment { + //TODO: ucanto signer principal key +} + +export interface AccountingServiceContext extends MiddlewareContext { + accountingService?: AccountingService +} + +export interface TokenMetadata { + locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth + invalid?: boolean +} + +export interface AccountingService { + record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: string) => Promise +} diff --git a/src/middleware/withEgressTracker.js b/src/middleware/withEgressTracker.js index 125f5df..43a9a26 100644 --- a/src/middleware/withEgressTracker.js +++ b/src/middleware/withEgressTracker.js @@ -1,10 +1,7 @@ -import { Accounting } from '../services/accounting.js' - /** - * @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib' + * @import { Middleware } from '@web3-storage/gateway-lib' * @import { Environment } from './withEgressTracker.types.js' - * @import { AccountingService } from '../bindings.js' - * @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext + * @typedef {import('./withEgressTracker.types.js').Context} EgressTrackerContext */ /** @@ -26,17 +23,13 @@ export function withEgressTracker (handler) { return response } - const { dataCid } = ctx - const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ - serviceURL: env.ACCOUNTING_SERVICE_URL - }) - const responseBody = response.body.pipeThrough( createByteCountStream((totalBytesServed) => { // Non-blocking call to the accounting service to record egress if (totalBytesServed > 0) { + const { space, dataCid: resource } = ctx ctx.waitUntil( - accounting.record(dataCid, totalBytesServed, new Date().toISOString()) + ctx.accountingService.record(space, resource, totalBytesServed, new Date().toISOString()) ) } }) diff --git a/src/middleware/withEgressTracker.types.ts b/src/middleware/withEgressTracker.types.ts index d764548..dee0c6b 100644 --- a/src/middleware/withEgressTracker.types.ts +++ b/src/middleware/withEgressTracker.types.ts @@ -1,6 +1,12 @@ -import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib' +import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib' +import { AccountingService } from './withAccountingService.types.js' +import { DIDKey, UnknownLink } from '@ucanto/client' export interface Environment extends MiddlewareEnvironment { - ACCOUNTING_SERVICE_URL: string FF_EGRESS_TRACKER_ENABLED: string } + +export interface Context extends IpfsUrlContext { + space: DIDKey + accountingService: AccountingService +} diff --git a/src/middleware/withRateLimit.js b/src/middleware/withRateLimit.js index 5487dcb..8da961e 100644 --- a/src/middleware/withRateLimit.js +++ b/src/middleware/withRateLimit.js @@ -1,6 +1,5 @@ import { HttpError } from '@web3-storage/gateway-lib/util' import { RATE_LIMIT_EXCEEDED } from '../constants.js' -import { Accounting } from '../services/accounting.js' /** * @import { Middleware } from '@web3-storage/gateway-lib' @@ -8,11 +7,10 @@ import { Accounting } from '../services/accounting.js' * @import { * Environment, * Context, - * TokenMetadata, * RateLimitService, * RateLimitExceeded * } from './withRateLimit.types.js' - * @typedef {Context & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext + * @typedef {Context} RateLimiterContext */ /** @@ -101,7 +99,7 @@ async function isRateLimited (rateLimitAPI, cid) { * @param {Environment} env * @param {string} authToken * @param {RateLimiterContext} ctx - * @returns {Promise} + * @returns {Promise} */ async function getTokenMetadata (env, authToken, ctx) { const cachedValue = await env.AUTH_TOKEN_METADATA.get(authToken) @@ -111,8 +109,7 @@ async function getTokenMetadata (env, authToken, ctx) { return decode(cachedValue) } - const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL }) - const tokenMetadata = await accounting.getTokenMetadata(authToken) + const tokenMetadata = findTokenMetadata(authToken) if (tokenMetadata) { // NOTE: non-blocking call to the auth token metadata cache ctx.waitUntil(env.AUTH_TOKEN_METADATA.put(authToken, encode(tokenMetadata))) @@ -122,9 +119,19 @@ async function getTokenMetadata (env, authToken, ctx) { return null } +/** + * @param {string} authToken + * @returns {import('./withAccountingService.types.js').TokenMetadata | null} + */ +function findTokenMetadata (authToken) { + // TODO I think this needs to check the content claims service (?) for any claims relevant to this token + // TODO do we have a plan for this? need to ask Hannah if the indexing service covers this? + return null +} + /** * @param {string} s - * @returns {TokenMetadata} + * @returns {import('./withAccountingService.types.js').TokenMetadata} */ function decode (s) { // TODO should this be dag-json? @@ -132,7 +139,7 @@ function decode (s) { } /** - * @param {TokenMetadata} m + * @param {import('./withAccountingService.types.js').TokenMetadata} m * @returns {string} */ function encode (m) { diff --git a/src/middleware/withRateLimit.types.ts b/src/middleware/withRateLimit.types.ts index 40749b0..efa8750 100644 --- a/src/middleware/withRateLimit.types.ts +++ b/src/middleware/withRateLimit.types.ts @@ -4,7 +4,6 @@ import { KVNamespace, RateLimit } from '@cloudflare/workers-types' import { RATE_LIMIT_EXCEEDED } from '../constants.js' export interface Environment extends MiddlewareEnvironment { - ACCOUNTING_SERVICE_URL: string RATE_LIMITER: RateLimit AUTH_TOKEN_METADATA: KVNamespace FF_RATE_LIMITER_ENABLED: string @@ -14,11 +13,6 @@ export interface Context extends IpfsUrlContext { authToken: string | null } -export interface TokenMetadata { - locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth - invalid?: boolean -} - export type RateLimitExceeded = typeof RATE_LIMIT_EXCEEDED[keyof typeof RATE_LIMIT_EXCEEDED] export interface RateLimitService { diff --git a/src/services/accounting.js b/src/services/accounting.js deleted file mode 100644 index 4fe2462..0000000 --- a/src/services/accounting.js +++ /dev/null @@ -1,16 +0,0 @@ -/** - * @type {import('../bindings.js').Accounting} - */ -export const Accounting = { - create: ({ serviceURL }) => ({ - record: async (cid, bytes, servedAt) => { - console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`) - }, - - getTokenMetadata: async () => { - // TODO I think this needs to check the content claims service (?) for any claims relevant to this token - // TODO do we have a plan for this? need to ask Hannah if the indexing service covers this? - return null - } - }) -} diff --git a/test/unit/middleware/withEgressTracker.spec.js b/test/unit/middleware/withEgressTracker.spec.js index 9e6dcbf..eb7aaca 100644 --- a/test/unit/middleware/withEgressTracker.spec.js +++ b/test/unit/middleware/withEgressTracker.spec.js @@ -30,41 +30,38 @@ const env = /** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Environment} */ ({ DEBUG: 'true', - ACCOUNTING_SERVICE_URL: 'http://example.com', FF_EGRESS_TRACKER_ENABLED: 'true' }) const accountingRecordMethodStub = sinon.stub() .returns( - /** @type {import('../../../src/bindings.js').AccountingService['record']} */ - async (cid, bytes, servedAt) => { - console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`) + /** @type {import('../../../src/middleware/withAccountingService.types.js').AccountingService['record']} */ + async (space, resource, bytes, servedAt) => { + console.log(`[mock] record called with space: ${space}, resource: ${resource}, bytes: ${bytes}, servedAt: ${servedAt}`) }) /** * Mock implementation of the AccountingService. * - * @param {Object} options - * @param {string} options.serviceURL - The URL of the accounting service. - * @returns {import('../../../src/bindings.js').AccountingService} + * @returns {import('../../../src/middleware/withAccountingService.types.js').AccountingService} */ -const AccountingService = ({ serviceURL }) => { - console.log(`[mock] Accounting.create called with serviceURL: ${serviceURL}`) +const AccountingService = () => { + console.log('[mock] Accounting.create called') return { - record: accountingRecordMethodStub, - getTokenMetadata: sinon.stub().resolves(undefined) + record: accountingRecordMethodStub } } const ctx = - /** @satisfies {import('../../../src/middleware/withEgressTracker.js').EgressTrackerContext} */ + /** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Context} */ ({ + space: 'did:key:z6MkknBAHEGCWvBzAi4amdH5FXEXrdKoWF1UJuvc8Psm2Mda', dataCid: CID.parse('bafybeibv7vzycdcnydl5n5lbws6lul2omkm6a6b5wmqt77sicrwnhesy7y'), waitUntil: sinon.stub().returns(undefined), path: '', searchParams: new URLSearchParams(), - ACCOUNTING_SERVICE: AccountingService({ serviceURL: env.ACCOUNTING_SERVICE_URL }) + accountingService: AccountingService() }) describe('withEgressTracker', async () => { @@ -119,8 +116,9 @@ describe('withEgressTracker', async () => { expect(response.status).to.equal(200) expect(responseBody).to.equal('Hello, world!') expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }).timeout(10_000) it('should record egress for a large file', async () => { @@ -142,8 +140,9 @@ describe('withEgressTracker', async () => { expect(response.status).to.equal(200) expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }) it('should correctly track egress for responses with chunked transfer encoding', async () => { @@ -169,7 +168,9 @@ describe('withEgressTracker', async () => { expect(response.status).to.equal(200) expect(responseBody).to.equal('Hello, world!') expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }) it('should record egress bytes for a CAR file request', async () => { @@ -216,7 +217,9 @@ describe('withEgressTracker', async () => { // expect(blocks[0].bytes).to.deep.equal(carBytes) - FIXME (fforbeck): how to get the correct byte count? expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(expectedTotalBytes) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(expectedTotalBytes) }) it('should correctly track egress for delayed responses', async () => { @@ -242,7 +245,9 @@ describe('withEgressTracker', async () => { expect(response.status).to.equal(200) expect(responseBody).to.equal('Delayed response content') expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }).timeout(5000) }) @@ -330,8 +335,13 @@ describe('withEgressTracker', async () => { expect(responseBody2).to.equal('Goodbye, world!') expect(accountingRecordMethodStub.calledTwice, 'record should be called twice').to.be.true - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes for first request').to.equal(totalBytes1) - expect(accountingRecordMethodStub.args[1][1], 'second argument should be the total bytes for second request').to.equal(totalBytes2) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid for first request').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes for first request').to.equal(totalBytes1) + + expect(accountingRecordMethodStub.args[1][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[1][1], 'second argument should be the cid for second request').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[1][2], 'third argument should be the total bytes for second request').to.equal(totalBytes2) }).timeout(10_000) }) @@ -356,7 +366,9 @@ describe('withEgressTracker', async () => { expect(response.status).to.equal(200) expect(responseBody).to.deep.equal({ message: 'Hello, JSON!' }) expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes) + expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }).timeout(10_000) }) @@ -449,7 +461,7 @@ describe('withEgressTracker', async () => { const request = await createRequest() // Simulate an error in the accounting service record method - ctx.ACCOUNTING_SERVICE.record = sinon.stub().rejects(new Error('Accounting service error')) + ctx.accountingService.record = sinon.stub().rejects(new Error('Accounting service error')) const response = await handler(request, env, ctx) const responseBody = await response.text() diff --git a/test/unit/middleware/withRateLimit.spec.js b/test/unit/middleware/withRateLimit.spec.js index 917ba85..291a06a 100644 --- a/test/unit/middleware/withRateLimit.spec.js +++ b/test/unit/middleware/withRateLimit.spec.js @@ -40,7 +40,6 @@ const env = /** @satisfies {Environment} */ ({ DEBUG: 'false', - ACCOUNTING_SERVICE_URL: 'http://example.com/accounting-service', RATE_LIMITER: { limit: strictStub(sandbox, 'limit') }, diff --git a/wrangler.toml b/wrangler.toml index 7b2918a..90c5d31 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -132,7 +132,6 @@ DEBUG = "true" FF_RATE_LIMITER_ENABLED = "true" FF_EGRESS_TRACKER_ENABLED = "true" CONTENT_CLAIMS_SERVICE_URL = "https://staging.claims.web3.storage" -ACCOUNTING_SERVICE_URL = "https://example.com/service" [[env.integration.unsafe.bindings]] name = "RATE_LIMITER" From abcd7192db18fbad73c0e6f1ee08c93e2e1fdddc Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Wed, 30 Oct 2024 17:06:14 -0300 Subject: [PATCH 2/2] ucanto integration + fixing tests --- package-lock.json | 8 +- package.json | 4 +- src/middleware/withAccountingService.js | 37 --- src/middleware/withEgressTracker.js | 14 +- src/middleware/withEgressTracker.types.ts | 6 +- src/middleware/withRateLimit.js | 18 +- src/middleware/withRateLimit.types.ts | 2 + .../withUcantoClient.capabilities.js | 21 ++ src/middleware/withUcantoClient.js | 92 +++++++ ...ice.types.ts => withUcantoClient.types.ts} | 15 +- test/helpers/builder.js | 4 +- .../unit/middleware/withEgressTracker.spec.js | 245 ++++++++---------- test/unit/middleware/withRateLimit.spec.js | 6 +- 13 files changed, 256 insertions(+), 216 deletions(-) delete mode 100644 src/middleware/withAccountingService.js create mode 100644 src/middleware/withUcantoClient.capabilities.js create mode 100644 src/middleware/withUcantoClient.js rename src/middleware/{withAccountingService.types.ts => withUcantoClient.types.ts} (61%) diff --git a/package-lock.json b/package-lock.json index cbfc5ce..8e72974 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,9 @@ "version": "2.20.2", "license": "Apache-2.0 OR MIT", "dependencies": { + "@ucanto/client": "^9.0.1", + "@ucanto/principal": "^8.1.0", + "@ucanto/transport": "^9.1.1", "@web3-storage/blob-fetcher": "^2.2.0", "@web3-storage/gateway-lib": "^5.1.2", "dagula": "^8.0.0", @@ -22,7 +25,6 @@ "@types/mocha": "^10.0.9", "@types/node-fetch": "^2.6.11", "@types/sinon": "^17.0.3", - "@ucanto/principal": "^8.1.0", "@web3-storage/content-claims": "^5.0.0", "@web3-storage/public-bucket": "^1.1.0", "@web3-storage/upload-client": "^16.1.1", @@ -3403,7 +3405,6 @@ "version": "8.1.0", "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-8.1.0.tgz", "integrity": "sha512-n6WL9miVcN1PUq+e41hKUgZR0+Xn5sHHMQfXnt4YuLnGbh93tIgQkeGWmfUBJI+Y6C0vAFfaSCZnM6Z+kedskA==", - "dev": true, "dependencies": { "@ipld/dag-ucan": "^3.4.0", "multiformats": "^11.0.2" @@ -3413,7 +3414,6 @@ "version": "11.0.2", "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "dev": true, "engines": { "node": ">=16.0.0", "npm": ">=7.0.0" @@ -3423,7 +3423,6 @@ "version": "8.1.0", "resolved": "https://registry.npmjs.org/@ucanto/principal/-/principal-8.1.0.tgz", "integrity": "sha512-tSkqpxRXP/M+GXNKqQLCmMAP+7zX7l/tKb3uygAaQwTnev4nRauklXgWx6EYDK+2d8tiOyPdL3SlG54GQPFcLQ==", - "dev": true, "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@noble/curves": "^1.2.0", @@ -3438,7 +3437,6 @@ "version": "11.0.2", "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "dev": true, "engines": { "node": ">=16.0.0", "npm": ">=7.0.0" diff --git a/package.json b/package.json index f051805..faa718b 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,9 @@ "author": "Alan Shaw", "license": "Apache-2.0 OR MIT", "dependencies": { + "@ucanto/client": "^9.0.1", + "@ucanto/principal": "^8.1.0", + "@ucanto/transport": "^9.1.1", "@web3-storage/blob-fetcher": "^2.2.0", "@web3-storage/gateway-lib": "^5.1.2", "dagula": "^8.0.0", @@ -48,7 +51,6 @@ "@types/mocha": "^10.0.9", "@types/node-fetch": "^2.6.11", "@types/sinon": "^17.0.3", - "@ucanto/principal": "^8.1.0", "@web3-storage/content-claims": "^5.0.0", "@web3-storage/public-bucket": "^1.1.0", "@web3-storage/upload-client": "^16.1.1", diff --git a/src/middleware/withAccountingService.js b/src/middleware/withAccountingService.js deleted file mode 100644 index dd5d78b..0000000 --- a/src/middleware/withAccountingService.js +++ /dev/null @@ -1,37 +0,0 @@ -/** - * @import { Middleware } from '@web3-storage/gateway-lib' - * @typedef {import('./withAccountingService.types.ts').AccountingServiceContext} AccountingServiceContext - * @typedef {import('./withAccountingService.types.ts').Environment} Environment - */ - -/** - * The accounting service handler exposes the method `record` to record the egress bytes for a given SpaceDID, Content CID, and servedAt timestamp. - * - * @type {Middleware} - */ -export function withAccountingService (handler) { - return async (req, env, ctx) => { - const accountingService = create(env, ctx) - - return handler(req, env, { ...ctx, accountingService }) - } -} - -/** - * @param {Environment} env - * @param {AccountingServiceContext} ctx - * @returns {import('./withAccountingService.types.ts').AccountingService} - */ -function create (env, ctx) { - return { - /** - * @param {import('@ucanto/principal/ed25519').DIDKey} space - The Space DID where the content was served - * @param {import('@ucanto/principal/ed25519').UnknownLink} resource - The link to the resource that was served - * @param {number} bytes - The number of bytes served - * @param {string} servedAt - The timestamp of when the content was served - */ - record: async (space, resource, bytes, servedAt) => { - console.log(`Record egress: ${space}, ${resource}, ${bytes}, ${servedAt}`) - } - } -} diff --git a/src/middleware/withEgressTracker.js b/src/middleware/withEgressTracker.js index 43a9a26..1ffc6cf 100644 --- a/src/middleware/withEgressTracker.js +++ b/src/middleware/withEgressTracker.js @@ -27,9 +27,8 @@ export function withEgressTracker (handler) { createByteCountStream((totalBytesServed) => { // Non-blocking call to the accounting service to record egress if (totalBytesServed > 0) { - const { space, dataCid: resource } = ctx ctx.waitUntil( - ctx.accountingService.record(space, resource, totalBytesServed, new Date().toISOString()) + ctx.ucantoClient.record(ctx.space, ctx.dataCid, totalBytesServed, new Date()) ) } }) @@ -44,15 +43,14 @@ export function withEgressTracker (handler) { } /** - * Creates a TransformStream to count bytes served to the client. - * It records egress when the stream is finalized without an error. + * Creates a TransformStream to count bytes in the response body. * - * @param {(totalBytesServed: number) => void} onClose + * @param {(totalBytes: number) => void} onClose * @template {Uint8Array} T * @returns {TransformStream} - The created TransformStream. */ function createByteCountStream (onClose) { - let totalBytesServed = 0 + let totalBytes = 0 return new TransformStream({ /** @@ -64,7 +62,7 @@ function createByteCountStream (onClose) { async transform (chunk, controller) { try { controller.enqueue(chunk) - totalBytesServed += chunk.byteLength + totalBytes += chunk.byteLength } catch (error) { console.error('Error while counting bytes:', error) controller.error(error) @@ -79,7 +77,7 @@ function createByteCountStream (onClose) { * NOTE: The flush function is NOT called in case of a stream error. */ async flush () { - onClose(totalBytesServed) + onClose(totalBytes) } }) } diff --git a/src/middleware/withEgressTracker.types.ts b/src/middleware/withEgressTracker.types.ts index dee0c6b..6954b89 100644 --- a/src/middleware/withEgressTracker.types.ts +++ b/src/middleware/withEgressTracker.types.ts @@ -1,6 +1,6 @@ import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib' -import { AccountingService } from './withAccountingService.types.js' -import { DIDKey, UnknownLink } from '@ucanto/client' +import { UCantoClient } from './withUcantoClient.types.js' +import { DIDKey } from '@ucanto/principal/ed25519' export interface Environment extends MiddlewareEnvironment { FF_EGRESS_TRACKER_ENABLED: string @@ -8,5 +8,5 @@ export interface Environment extends MiddlewareEnvironment { export interface Context extends IpfsUrlContext { space: DIDKey - accountingService: AccountingService + ucantoClient: UCantoClient } diff --git a/src/middleware/withRateLimit.js b/src/middleware/withRateLimit.js index 8da961e..9854d7d 100644 --- a/src/middleware/withRateLimit.js +++ b/src/middleware/withRateLimit.js @@ -99,7 +99,7 @@ async function isRateLimited (rateLimitAPI, cid) { * @param {Environment} env * @param {string} authToken * @param {RateLimiterContext} ctx - * @returns {Promise} + * @returns {Promise} */ async function getTokenMetadata (env, authToken, ctx) { const cachedValue = await env.AUTH_TOKEN_METADATA.get(authToken) @@ -109,7 +109,7 @@ async function getTokenMetadata (env, authToken, ctx) { return decode(cachedValue) } - const tokenMetadata = findTokenMetadata(authToken) + const tokenMetadata = await ctx.ucantoClient.getTokenMetadata(authToken) if (tokenMetadata) { // NOTE: non-blocking call to the auth token metadata cache ctx.waitUntil(env.AUTH_TOKEN_METADATA.put(authToken, encode(tokenMetadata))) @@ -119,19 +119,9 @@ async function getTokenMetadata (env, authToken, ctx) { return null } -/** - * @param {string} authToken - * @returns {import('./withAccountingService.types.js').TokenMetadata | null} - */ -function findTokenMetadata (authToken) { - // TODO I think this needs to check the content claims service (?) for any claims relevant to this token - // TODO do we have a plan for this? need to ask Hannah if the indexing service covers this? - return null -} - /** * @param {string} s - * @returns {import('./withAccountingService.types.js').TokenMetadata} + * @returns {import('./withUcantoClient.types.ts').TokenMetadata} */ function decode (s) { // TODO should this be dag-json? @@ -139,7 +129,7 @@ function decode (s) { } /** - * @param {import('./withAccountingService.types.js').TokenMetadata} m + * @param {import('./withUcantoClient.types.ts').TokenMetadata} m * @returns {string} */ function encode (m) { diff --git a/src/middleware/withRateLimit.types.ts b/src/middleware/withRateLimit.types.ts index efa8750..e72e768 100644 --- a/src/middleware/withRateLimit.types.ts +++ b/src/middleware/withRateLimit.types.ts @@ -2,6 +2,7 @@ import { CID } from '@web3-storage/gateway-lib/handlers' import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib' import { KVNamespace, RateLimit } from '@cloudflare/workers-types' import { RATE_LIMIT_EXCEEDED } from '../constants.js' +import { UCantoClient } from './withUcantoClient.types.js' export interface Environment extends MiddlewareEnvironment { RATE_LIMITER: RateLimit @@ -11,6 +12,7 @@ export interface Environment extends MiddlewareEnvironment { export interface Context extends IpfsUrlContext { authToken: string | null + ucantoClient: UCantoClient } export type RateLimitExceeded = typeof RATE_LIMIT_EXCEEDED[keyof typeof RATE_LIMIT_EXCEEDED] diff --git a/src/middleware/withUcantoClient.capabilities.js b/src/middleware/withUcantoClient.capabilities.js new file mode 100644 index 0000000..9f32488 --- /dev/null +++ b/src/middleware/withUcantoClient.capabilities.js @@ -0,0 +1,21 @@ +import { SpaceDID, equalWith } from '@web3-storage/capabilities/utils' +import { capability, Schema } from '@ucanto/validator' + +export const Usage = { + /** + * Capability can be invoked by an agent to record egress data for a given resource. + */ + record: capability({ + can: 'usage/record', + with: SpaceDID, + nb: Schema.struct({ + /** CID of the resource that was served. */ + resource: Schema.link(), + /** Amount of bytes served. */ + bytes: Schema.integer().greaterThan(0), + /** Timestamp of the event in seconds after Unix epoch. */ + servedAt: Schema.integer().greaterThan(-1) + }), + derives: equalWith + }) +} diff --git a/src/middleware/withUcantoClient.js b/src/middleware/withUcantoClient.js new file mode 100644 index 0000000..64d8740 --- /dev/null +++ b/src/middleware/withUcantoClient.js @@ -0,0 +1,92 @@ +import * as UCantoClient from '@ucanto/client' +import * as CAR from '@ucanto/transport/car' +import { SpaceDID } from '@web3-storage/capabilities/utils' +import { ed25519 } from '@ucanto/principal' +import { HTTP } from '@ucanto/transport' +import { Usage } from './withUcantoClient.capabilities.js' + +/** + * @import { Middleware } from '@web3-storage/gateway-lib' + * @typedef {import('./withUcantoClient.types.ts').UcantoClientContext} UcantoClientContext + * @typedef {import('./withUcantoClient.types.ts').Environment} Environment + */ + +/** + * The UCantoClient handler exposes the methods to invoke capabilities on the Upload API. + * + * @type {Middleware} + */ +export function withUcantoClient (handler) { + return async (req, env, ctx) => { + const ucantoClient = await create(env) + + return handler(req, env, { ...ctx, ucantoClient }) + } +} + +/** + * Creates a UCantoClient instance with the given environment. + * + * @param {Environment} env + * @returns {Promise} + */ +async function create (env) { + const service = ed25519.Verifier.parse(env.SERVICE_ID) + const principal = ed25519.Signer.parse(env.SIGNER_PRINCIPAL_KEY) + + const { connection } = await connectUcantoClient(env.UPLOAD_API_URL, principal) + + return { + /** + * @param {import('@ucanto/principal/ed25519').DIDKey} space - The Space DID where the content was served + * @param {import('@ucanto/principal/ed25519').UnknownLink} resource - The link to the resource that was served + * @param {number} bytes - The number of bytes served + * @param {Date} servedAt - The timestamp of when the content was served + */ + record: async (space, resource, bytes, servedAt) => { + const res = await Usage.record.invoke({ + issuer: principal, + audience: service, + with: SpaceDID.from(space), + nb: { + resource, + bytes, + servedAt: Math.floor(servedAt.getTime() / 1000) + } + }).execute(connection) + + if (res.out.error) { + console.error('Failed to record egress', res.out.error) + } + }, + + /** + * TODO: implement this function + * + * @param {string} authToken + * @returns {Promise} + */ + getTokenMetadata: async (authToken) => { + // TODO I think this needs to check the content claims service (?) for any claims relevant to this token + // TODO do we have a plan for this? need to ask Hannah if the indexing service covers this? + return undefined + } + } +} + +/** + * Creates a connection with the UCanto Server at the provided server URL. + * + * @param {string} serverUrl + * @param {import('@ucanto/principal/ed25519').EdSigner} principal + * + */ +async function connectUcantoClient (serverUrl, principal) { + const connection = await UCantoClient.connect({ + id: principal, + codec: CAR.outbound, + channel: HTTP.open({ url: new URL(serverUrl) }) + }) + + return { connection } +} diff --git a/src/middleware/withAccountingService.types.ts b/src/middleware/withUcantoClient.types.ts similarity index 61% rename from src/middleware/withAccountingService.types.ts rename to src/middleware/withUcantoClient.types.ts index 88abbc9..232e1f3 100644 --- a/src/middleware/withAccountingService.types.ts +++ b/src/middleware/withUcantoClient.types.ts @@ -2,11 +2,13 @@ import { Environment as MiddlewareEnvironment, Context as MiddlewareContext } fr import { DIDKey, UnknownLink } from '@ucanto/principal/ed25519' export interface Environment extends MiddlewareEnvironment { - //TODO: ucanto signer principal key + SERVICE_ID: string + SIGNER_PRINCIPAL_KEY: string + UPLOAD_API_URL: string } -export interface AccountingServiceContext extends MiddlewareContext { - accountingService?: AccountingService +export interface UcantoClientContext extends MiddlewareContext { + ucantoClient?: UCantoClient } export interface TokenMetadata { @@ -14,6 +16,9 @@ export interface TokenMetadata { invalid?: boolean } -export interface AccountingService { - record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: string) => Promise +export interface UCantoClient { + record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: Date) => Promise + getTokenMetadata: (token: string) => Promise } + + diff --git a/test/helpers/builder.js b/test/helpers/builder.js index d8a104d..33ef391 100644 --- a/test/helpers/builder.js +++ b/test/helpers/builder.js @@ -19,7 +19,9 @@ export class Builder { * @returns {Promise<{ root: import('multiformats').UnknownLink, shards: import('multiformats').Link[]}>} */ async add (input, options = {}) { - console.log('Adding ' + (Array.isArray(input) ? `${input.length} file${input.length === 1 ? '' : 's'}` : '1 blob') + '...') + if (process.env.DEBUG) { + console.log('Adding ' + (Array.isArray(input) ? `${input.length} file${input.length === 1 ? '' : 's'}` : '1 blob') + '...') + } const unixFsEncoder = Array.isArray(input) ? UnixFS.createDirectoryEncoderStream(input) : UnixFS.createFileEncoderStream(input) diff --git a/test/unit/middleware/withEgressTracker.spec.js b/test/unit/middleware/withEgressTracker.spec.js index eb7aaca..d2a4c51 100644 --- a/test/unit/middleware/withEgressTracker.spec.js +++ b/test/unit/middleware/withEgressTracker.spec.js @@ -13,55 +13,44 @@ import { Builder, toBlobKey } from '../../helpers/builder.js' import { CARReaderStream } from 'carstream' /** - * Creates a request with an optional authorization header. - * - * @param {Object} [options] - * @param {string} [options.authorization] The value for the `Authorization` - * header, if any. + * @typedef {import('../../../src/middleware/withEgressTracker.types.js').Environment} EgressTrackerEnvironment + * @typedef {import('../../../src/middleware/withEgressTracker.types.js').Context} EgressTrackerContext */ -const createRequest = async ({ authorization } = {}) => - new Request('http://doesnt-matter.com/', { - headers: new Headers( - authorization ? { Authorization: authorization } : {} - ) - }) const env = - /** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Environment} */ + /** @satisfies {EgressTrackerEnvironment} */ ({ DEBUG: 'true', FF_EGRESS_TRACKER_ENABLED: 'true' }) -const accountingRecordMethodStub = sinon.stub() - .returns( - /** @type {import('../../../src/middleware/withAccountingService.types.js').AccountingService['record']} */ - async (space, resource, bytes, servedAt) => { - console.log(`[mock] record called with space: ${space}, resource: ${resource}, bytes: ${bytes}, servedAt: ${servedAt}`) - }) +const recordEgressMock = sinon.fake() /** * Mock implementation of the AccountingService. * - * @returns {import('../../../src/middleware/withAccountingService.types.js').AccountingService} + * @returns {import('../../../src/middleware/withUcantoClient.types.js').UCantoClient} */ -const AccountingService = () => { - console.log('[mock] Accounting.create called') +const UCantoClient = () => { + if (process.env.DEBUG) { + console.log('[mock] UCantoClient created') + } return { - record: accountingRecordMethodStub + record: recordEgressMock, + getTokenMetadata: sinon.fake() } } const ctx = - /** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Context} */ + /** @satisfies {EgressTrackerContext} */ ({ space: 'did:key:z6MkknBAHEGCWvBzAi4amdH5FXEXrdKoWF1UJuvc8Psm2Mda', dataCid: CID.parse('bafybeibv7vzycdcnydl5n5lbws6lul2omkm6a6b5wmqt77sicrwnhesy7y'), waitUntil: sinon.stub().returns(undefined), path: '', searchParams: new URLSearchParams(), - accountingService: AccountingService() + ucantoClient: UCantoClient() }) describe('withEgressTracker', async () => { @@ -76,13 +65,17 @@ describe('withEgressTracker', async () => { bucketData = new Map() bucket = { put: async (/** @type {string} */ digest, /** @type {Uint8Array} */ bytes) => { - console.log(`[mock] bucket.put called with digest: ${digest}, bytes: ${bytes.byteLength}`) + if (process.env.DEBUG) { + console.log(`[mock] bucket.put called with digest: ${digest}, bytes: ${bytes.byteLength}`) + } bucketData.set(digest, bytes) return Promise.resolve() }, // @ts-expect-error - don't need to check the type of the fake bucket get: async (/** @type {string} */ blobKey) => { - console.log(`[mock] bucket.get called with digest: ${blobKey}`) + if (process.env.DEBUG) { + console.log(`[mock] bucket.get called with digest: ${blobKey}`) + } return Promise.resolve(bucketData.get(blobKey)) } } @@ -90,42 +83,36 @@ describe('withEgressTracker', async () => { }) afterEach(() => { - accountingRecordMethodStub.reset() + recordEgressMock.resetHistory() bucketData.clear() }) - describe('withEgressTracker -> Successful Requests', () => { + describe('-> Successful Requests', () => { it('should track egress bytes for a successful request', async () => { - const content = new TextEncoder().encode('Hello, world!') - const totalBytes = Buffer.byteLength(content) - const mockResponse = new Response(new ReadableStream({ - start (controller) { - controller.enqueue(content) - controller.close() - } - }), { status: 200 }) - - const innerHandler = sinon.stub().resolves(mockResponse) - - const handler = withEgressTracker(innerHandler) - const request = await createRequest() - const response = await handler(request, env, ctx) + const handler = withEgressTracker( + async () => new Response('Hello, world!', { status: 200 }) + ) + const response = await handler( + new Request('http://example.com/'), + env, + ctx + ) // Ensure the response body is fully consumed const responseBody = await response.text() expect(response.status).to.equal(200) expect(responseBody).to.equal('Hello, world!') - expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) - }).timeout(10_000) + expect(recordEgressMock.calledOnce, 'record should be called once').to.be.true + expect(recordEgressMock.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(recordEgressMock.args[0][2], 'third argument should be the total bytes').to.equal(13) + }) it('should record egress for a large file', async () => { const largeContent = new Uint8Array(100 * 1024 * 1024) // 100 MB const totalBytes = largeContent.byteLength const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.enqueue(largeContent) controller.close() } @@ -133,16 +120,16 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) await response.text() // Consume the response body expect(response.status).to.equal(200) - expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) + expect(recordEgressMock.calledOnce, 'record should be called once').to.be.true + expect(recordEgressMock.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(recordEgressMock.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }) it('should correctly track egress for responses with chunked transfer encoding', async () => { @@ -151,7 +138,7 @@ describe('withEgressTracker', async () => { const totalBytes = Buffer.byteLength(chunk1) + Buffer.byteLength(chunk2) const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.enqueue(chunk1) controller.enqueue(chunk2) controller.close() @@ -160,17 +147,17 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) const responseBody = await response.text() expect(response.status).to.equal(200) expect(responseBody).to.equal('Hello, world!') - expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) + expect(recordEgressMock.calledOnce, 'record should be called once').to.be.true + expect(recordEgressMock.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(recordEgressMock.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }) it('should record egress bytes for a CAR file request', async () => { @@ -188,7 +175,7 @@ describe('withEgressTracker', async () => { // Mock a response with the CAR file content const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.enqueue(carBytes) controller.close() } @@ -199,7 +186,7 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) expect(response.status).to.equal(200) @@ -216,10 +203,10 @@ describe('withEgressTracker', async () => { })) // expect(blocks[0].bytes).to.deep.equal(carBytes) - FIXME (fforbeck): how to get the correct byte count? - expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(expectedTotalBytes) + expect(recordEgressMock.calledOnce, 'record should be called once').to.be.true + expect(recordEgressMock.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(recordEgressMock.args[0][2], 'third argument should be the total bytes').to.equal(expectedTotalBytes) }) it('should correctly track egress for delayed responses', async () => { @@ -227,7 +214,7 @@ describe('withEgressTracker', async () => { const totalBytes = Buffer.byteLength(content) const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { setTimeout(() => { controller.enqueue(content) controller.close() @@ -237,61 +224,61 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) const responseBody = await response.text() expect(response.status).to.equal(200) expect(responseBody).to.equal('Delayed response content') - expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) + expect(recordEgressMock.calledOnce, 'record should be called once').to.be.true + expect(recordEgressMock.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) + expect(recordEgressMock.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) }).timeout(5000) }) - describe('withEgressTracker -> Feature Flag', () => { + describe('-> Feature Flag', () => { it('should not track egress if the feature flag is disabled', async () => { const innerHandler = sinon.stub().resolves(new Response(null, { status: 200 })) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const envDisabled = { ...env, FF_EGRESS_TRACKER_ENABLED: 'false' } const response = await handler(request, envDisabled, ctx) expect(response.status).to.equal(200) - expect(accountingRecordMethodStub.notCalled, 'record should not be called').to.be.true + expect(recordEgressMock.notCalled, 'record should not be called').to.be.true }) }) - describe('withEgressTracker -> Non-OK Responses', () => { + describe('-> Non-OK Responses', () => { it('should not track egress for non-OK responses', async () => { const mockResponse = new Response(null, { status: 404 }) const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) expect(response.status).to.equal(404) - expect(accountingRecordMethodStub.called, 'record should not be called').to.be.false + expect(recordEgressMock.called, 'record should not be called').to.be.false }) it('should not track egress if the response has no body', async () => { const mockResponse = new Response(null, { status: 200 }) const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) expect(response.status).to.equal(200) - expect(accountingRecordMethodStub.called, 'record should not be called').to.be.false + expect(recordEgressMock.called, 'record should not be called').to.be.false }) }) - describe('withEgressTracker -> Concurrent Requests', () => { + describe('-> Concurrent Requests', () => { it('should correctly track egress for multiple concurrent requests', async () => { const content1 = new TextEncoder().encode('Hello, world!') const content2 = new TextEncoder().encode('Goodbye, world!') @@ -299,14 +286,14 @@ describe('withEgressTracker', async () => { const totalBytes2 = Buffer.byteLength(content2) const mockResponse1 = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.enqueue(content1) controller.close() } }), { status: 200 }) const mockResponse2 = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.enqueue(content2) controller.close() } @@ -318,8 +305,8 @@ describe('withEgressTracker', async () => { const handler1 = withEgressTracker(innerHandler1) const handler2 = withEgressTracker(innerHandler2) - const request1 = await createRequest() - const request2 = await createRequest() + const request1 = new Request('http://doesnt-matter.com/') + const request2 = new Request('http://doesnt-matter.com/') const [response1, response2] = await Promise.all([ handler1(request1, env, ctx), @@ -334,48 +321,21 @@ describe('withEgressTracker', async () => { expect(response2.status).to.equal(200) expect(responseBody2).to.equal('Goodbye, world!') - expect(accountingRecordMethodStub.calledTwice, 'record should be called twice').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid for first request').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes for first request').to.equal(totalBytes1) + expect(recordEgressMock.calledTwice, 'record should be called twice').to.be.true + expect(recordEgressMock.args[0][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[0][1], 'second argument should be the cid for first request').to.equal(ctx.dataCid) + expect(recordEgressMock.args[0][2], 'third argument should be the total bytes for first request').to.equal(totalBytes1) - expect(accountingRecordMethodStub.args[1][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[1][1], 'second argument should be the cid for second request').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[1][2], 'third argument should be the total bytes for second request').to.equal(totalBytes2) - }).timeout(10_000) - }) - - describe('withEgressTracker -> Different Content Types', () => { - it('should track egress for JSON content type', async () => { - const jsonContent = JSON.stringify({ message: 'Hello, JSON!' }) - const totalBytes = Buffer.byteLength(jsonContent) - const mockResponse = new Response(new ReadableStream({ - start (controller) { - controller.enqueue(new TextEncoder().encode(jsonContent)) - controller.close() - } - }), { status: 200, headers: { 'Content-Type': 'application/json' } }) - - const innerHandler = sinon.stub().resolves(mockResponse) - const handler = withEgressTracker(innerHandler) - const request = await createRequest() - - const response = await handler(request, env, ctx) - const responseBody = await response.json() - - expect(response.status).to.equal(200) - expect(responseBody).to.deep.equal({ message: 'Hello, JSON!' }) - expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true - expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space) - expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid) - expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes) - }).timeout(10_000) + expect(recordEgressMock.args[1][0], 'first argument should be the space').to.equal(ctx.space) + expect(recordEgressMock.args[1][1], 'second argument should be the cid for second request').to.equal(ctx.dataCid) + expect(recordEgressMock.args[1][2], 'third argument should be the total bytes for second request').to.equal(totalBytes2) + }) }) - describe('withEgressTracker -> Zero-byte Responses', () => { + describe('-> Zero-byte Responses', () => { it('should not record egress for zero-byte responses', async () => { const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { // Do not enqueue any data, simulating a zero-byte response controller.close() } @@ -383,28 +343,28 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) const responseBody = await response.text() expect(response.status).to.equal(200) expect(responseBody).to.equal('') - expect(accountingRecordMethodStub.called, 'record should not be called').to.be.false + expect(recordEgressMock.called, 'record should not be called').to.be.false }) }) - describe('withEgressTracker -> Interrupted Connection', () => { + describe('-> Interrupted Connection', () => { it('should not record egress if there is a stream error while downloading', async () => { const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.error(new Error('Stream error')) } }), { status: 200 }) const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) @@ -415,13 +375,13 @@ describe('withEgressTracker', async () => { } catch (/** @type {any} */ error) { expect(error.message).to.equal('Stream error') } - expect(accountingRecordMethodStub.called, 'record should not be called').to.be.false + expect(recordEgressMock.called, 'record should not be called').to.be.false }) it('should not record egress if the connection is interrupted during a large file download', async () => { const largeContent = new Uint8Array(100 * 1024 * 1024) // 100 MB const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { // Stream a portion of the content controller.enqueue(largeContent.subarray(0, 10 * 1024 * 1024)) // 10 MB // Simulate connection interruption by raising an error @@ -431,7 +391,7 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() + const request = new Request('http://doesnt-matter.com/') const response = await handler(request, env, ctx) try { @@ -442,15 +402,15 @@ describe('withEgressTracker', async () => { expect(error.message).to.equal('Connection interrupted') } - expect(accountingRecordMethodStub.called, 'record should not be called').to.be.false - }).timeout(10_000) + expect(recordEgressMock.called, 'record should not be called').to.be.false + }) }) - describe('withEgressTracker -> Accounting Service', () => { - it('should log an error and continue serving the response if the accounting service fails', async () => { + describe('-> Ucanto Client', () => { + it('should log an error and continue serving the response if the ucanto client fails', async () => { const content = new TextEncoder().encode('Hello, world!') const mockResponse = new Response(new ReadableStream({ - start (controller) { + start(controller) { controller.enqueue(content) controller.close() } @@ -458,17 +418,20 @@ describe('withEgressTracker', async () => { const innerHandler = sinon.stub().resolves(mockResponse) const handler = withEgressTracker(innerHandler) - const request = await createRequest() - - // Simulate an error in the accounting service record method - ctx.accountingService.record = sinon.stub().rejects(new Error('Accounting service error')) - - const response = await handler(request, env, ctx) + const request = new Request('http://doesnt-matter.com/') + const response = await handler(request, env, { + ...ctx, + ucantoClient: { + ...ctx.ucantoClient, + // Simulate an error in the ucanto client record method + record: async () => { throw new Error('ucanto client error') } + } + }) const responseBody = await response.text() expect(response.status).to.equal(200) expect(responseBody).to.equal('Hello, world!') - expect(accountingRecordMethodStub.called, 'record should not be called').to.be.false + expect(recordEgressMock.called, 'record should not be called').to.be.false }) }) }) diff --git a/test/unit/middleware/withRateLimit.spec.js b/test/unit/middleware/withRateLimit.spec.js index 291a06a..b52ffd2 100644 --- a/test/unit/middleware/withRateLimit.spec.js +++ b/test/unit/middleware/withRateLimit.spec.js @@ -66,7 +66,11 @@ const createContext = async ({ authToken } = {}) => ({ waitUntil: strictStub(sandbox, 'waitUntil').returns(undefined), path: '', searchParams: new URLSearchParams(), - authToken: authToken ?? null + authToken: authToken ?? null, + ucantoClient: { + record: strictStub(sandbox, 'record'), + getTokenMetadata: strictStub(sandbox, 'getTokenMetadata') + } }) describe('withRateLimits', async () => {