Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"author": "Alan Shaw",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@ucanto/client": "^9.0.1",
"@ucanto/principal": "^8.1.0",
"@ucanto/transport": "^9.1.1",
"@web3-storage/blob-fetcher": "^2.2.0",
"@web3-storage/gateway-lib": "^5.1.2",
"dagula": "^8.0.0",
Expand All @@ -48,7 +51,6 @@
"@types/mocha": "^10.0.9",
"@types/node-fetch": "^2.6.11",
"@types/sinon": "^17.0.3",
"@ucanto/principal": "^8.1.0",
"@web3-storage/content-claims": "^5.0.0",
"@web3-storage/public-bucket": "^1.1.0",
"@web3-storage/upload-client": "^16.1.1",
Expand Down
12 changes: 2 additions & 10 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,13 @@ import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHan
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { UnknownLink } from 'multiformats'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
ACCOUNTING_SERVICE_URL: string
}

export interface AccountingService {
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
}

export interface Accounting {
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
}
25 changes: 8 additions & 17 deletions src/middleware/withEgressTracker.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { Accounting } from '../services/accounting.js'

/**
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
* @import { Middleware } from '@web3-storage/gateway-lib'
* @import { Environment } from './withEgressTracker.types.js'
* @import { AccountingService } from '../bindings.js'
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
* @typedef {import('./withEgressTracker.types.js').Context} EgressTrackerContext
*/

/**
Expand All @@ -26,17 +23,12 @@ export function withEgressTracker (handler) {
return response
}

const { dataCid } = ctx
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const responseBody = response.body.pipeThrough(
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
ctx.ucantoClient.record(ctx.space, ctx.dataCid, totalBytesServed, new Date())
)
}
})
Expand All @@ -51,15 +43,14 @@ export function withEgressTracker (handler) {
}

/**
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
* Creates a TransformStream to count bytes in the response body.
*
* @param {(totalBytesServed: number) => void} onClose
* @param {(totalBytes: number) => void} onClose
* @template {Uint8Array} T
* @returns {TransformStream<T, T>} - The created TransformStream.
*/
function createByteCountStream (onClose) {
let totalBytesServed = 0
let totalBytes = 0

return new TransformStream({
/**
Expand All @@ -71,7 +62,7 @@ function createByteCountStream (onClose) {
async transform (chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
totalBytes += chunk.byteLength
} catch (error) {
console.error('Error while counting bytes:', error)
controller.error(error)
Expand All @@ -86,7 +77,7 @@ function createByteCountStream (onClose) {
* NOTE: The flush function is NOT called in case of a stream error.
*/
async flush () {
onClose(totalBytesServed)
onClose(totalBytes)
}
})
}
10 changes: 8 additions & 2 deletions src/middleware/withEgressTracker.types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { UCantoClient } from './withUcantoClient.types.js'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
FF_EGRESS_TRACKER_ENABLED: string
}

export interface Context extends IpfsUrlContext {
space: DIDKey
ucantoClient: UCantoClient
}
13 changes: 5 additions & 8 deletions src/middleware/withRateLimit.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import { HttpError } from '@web3-storage/gateway-lib/util'
import { RATE_LIMIT_EXCEEDED } from '../constants.js'
import { Accounting } from '../services/accounting.js'

/**
* @import { Middleware } from '@web3-storage/gateway-lib'
* @import { R2Bucket, KVNamespace, RateLimit } from '@cloudflare/workers-types'
* @import {
* Environment,
* Context,
* TokenMetadata,
* RateLimitService,
* RateLimitExceeded
* } from './withRateLimit.types.js'
* @typedef {Context & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
* @typedef {Context} RateLimiterContext
*/

/**
Expand Down Expand Up @@ -101,7 +99,7 @@ async function isRateLimited (rateLimitAPI, cid) {
* @param {Environment} env
* @param {string} authToken
* @param {RateLimiterContext} ctx
* @returns {Promise<TokenMetadata | null>}
* @returns {Promise<import('./withUcantoClient.types.ts').TokenMetadata | null>}
*/
async function getTokenMetadata (env, authToken, ctx) {
const cachedValue = await env.AUTH_TOKEN_METADATA.get(authToken)
Expand All @@ -111,8 +109,7 @@ async function getTokenMetadata (env, authToken, ctx) {
return decode(cachedValue)
}

const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
const tokenMetadata = await accounting.getTokenMetadata(authToken)
const tokenMetadata = await ctx.ucantoClient.getTokenMetadata(authToken)
if (tokenMetadata) {
// NOTE: non-blocking call to the auth token metadata cache
ctx.waitUntil(env.AUTH_TOKEN_METADATA.put(authToken, encode(tokenMetadata)))
Expand All @@ -124,15 +121,15 @@ async function getTokenMetadata (env, authToken, ctx) {

/**
* @param {string} s
* @returns {TokenMetadata}
* @returns {import('./withUcantoClient.types.ts').TokenMetadata}
*/
function decode (s) {
// TODO should this be dag-json?
return JSON.parse(s)
}

/**
* @param {TokenMetadata} m
* @param {import('./withUcantoClient.types.ts').TokenMetadata} m
* @returns {string}
*/
function encode (m) {
Expand Down
8 changes: 2 additions & 6 deletions src/middleware/withRateLimit.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@ import { CID } from '@web3-storage/gateway-lib/handlers'
import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { KVNamespace, RateLimit } from '@cloudflare/workers-types'
import { RATE_LIMIT_EXCEEDED } from '../constants.js'
import { UCantoClient } from './withUcantoClient.types.js'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
RATE_LIMITER: RateLimit
AUTH_TOKEN_METADATA: KVNamespace
FF_RATE_LIMITER_ENABLED: string
}

export interface Context extends IpfsUrlContext {
authToken: string | null
}

export interface TokenMetadata {
locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth
invalid?: boolean
ucantoClient: UCantoClient
}

export type RateLimitExceeded = typeof RATE_LIMIT_EXCEEDED[keyof typeof RATE_LIMIT_EXCEEDED]
Expand Down
21 changes: 21 additions & 0 deletions src/middleware/withUcantoClient.capabilities.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { SpaceDID, equalWith } from '@web3-storage/capabilities/utils'
import { capability, Schema } from '@ucanto/validator'

export const Usage = {
/**
* Capability can be invoked by an agent to record egress data for a given resource.
*/
record: capability({
can: 'usage/record',
with: SpaceDID,
nb: Schema.struct({
/** CID of the resource that was served. */
resource: Schema.link(),
/** Amount of bytes served. */
bytes: Schema.integer().greaterThan(0),
/** Timestamp of the event in seconds after Unix epoch. */
servedAt: Schema.integer().greaterThan(-1)
}),
derives: equalWith
})
}
92 changes: 92 additions & 0 deletions src/middleware/withUcantoClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import * as UCantoClient from '@ucanto/client'
import * as CAR from '@ucanto/transport/car'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import { ed25519 } from '@ucanto/principal'
import { HTTP } from '@ucanto/transport'
import { Usage } from './withUcantoClient.capabilities.js'

/**
* @import { Middleware } from '@web3-storage/gateway-lib'
* @typedef {import('./withUcantoClient.types.ts').UcantoClientContext} UcantoClientContext
* @typedef {import('./withUcantoClient.types.ts').Environment} Environment
*/

/**
* The UCantoClient handler exposes the methods to invoke capabilities on the Upload API.
*
* @type {Middleware<UcantoClientContext, UcantoClientContext, Environment>}
*/
export function withUcantoClient (handler) {
return async (req, env, ctx) => {
const ucantoClient = await create(env)

return handler(req, env, { ...ctx, ucantoClient })
}
}

/**
* Creates a UCantoClient instance with the given environment.
*
* @param {Environment} env
* @returns {Promise<import('./withUcantoClient.types.ts').UCantoClient>}
*/
async function create (env) {
const service = ed25519.Verifier.parse(env.SERVICE_ID)
const principal = ed25519.Signer.parse(env.SIGNER_PRINCIPAL_KEY)

const { connection } = await connectUcantoClient(env.UPLOAD_API_URL, principal)

return {
/**
* @param {import('@ucanto/principal/ed25519').DIDKey} space - The Space DID where the content was served
* @param {import('@ucanto/principal/ed25519').UnknownLink} resource - The link to the resource that was served
* @param {number} bytes - The number of bytes served
* @param {Date} servedAt - The timestamp of when the content was served
*/
record: async (space, resource, bytes, servedAt) => {
const res = await Usage.record.invoke({
issuer: principal,
audience: service,
with: SpaceDID.from(space),
nb: {
resource,
bytes,
servedAt: Math.floor(servedAt.getTime() / 1000)
}
}).execute(connection)

if (res.out.error) {
console.error('Failed to record egress', res.out.error)
}
},

/**
* TODO: implement this function
*
* @param {string} authToken
* @returns {Promise<import('./withUcantoClient.types.ts').TokenMetadata | undefined>}
*/
getTokenMetadata: async (authToken) => {
// TODO I think this needs to check the content claims service (?) for any claims relevant to this token
// TODO do we have a plan for this? need to ask Hannah if the indexing service covers this?
return undefined
}
}
}

/**
* Creates a connection with the UCanto Server at the provided server URL.
*
* @param {string} serverUrl
* @param {import('@ucanto/principal/ed25519').EdSigner} principal
*
*/
async function connectUcantoClient (serverUrl, principal) {
const connection = await UCantoClient.connect({
id: principal,
codec: CAR.outbound,
channel: HTTP.open({ url: new URL(serverUrl) })
})

return { connection }
}
24 changes: 24 additions & 0 deletions src/middleware/withUcantoClient.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Environment as MiddlewareEnvironment, Context as MiddlewareContext } from '@web3-storage/gateway-lib'
import { DIDKey, UnknownLink } from '@ucanto/principal/ed25519'

export interface Environment extends MiddlewareEnvironment {
SERVICE_ID: string
SIGNER_PRINCIPAL_KEY: string
UPLOAD_API_URL: string
}

export interface UcantoClientContext extends MiddlewareContext {
ucantoClient?: UCantoClient
}

export interface TokenMetadata {
locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth
invalid?: boolean
}

export interface UCantoClient {
record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: Date) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | undefined>
}


Loading