Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions piece-retriever/bin/piece-retriever.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export default {
{
status: 502,
headers: new Headers({
'X-Data-Set-ID': retrievalAttempts
'FB-Data-Set-ID': retrievalAttempts
Comment thread
akronim26 marked this conversation as resolved.
Outdated
.map((a) => a.dataSetId)
.join(','),
}),
Expand Down Expand Up @@ -200,15 +200,22 @@ export default {
retrievalResult.response,
)
setContentSecurityPolicy(response)
response.headers.set('X-Data-Set-ID', retrievalCandidate.dataSetId)
response.headers.set('FB-Data-Set-ID', retrievalCandidate.dataSetId)
response.headers.set(
'Cache-Control',
`public, max-age=${env.CLIENT_CACHE_TTL}`,
)
response.headers.set(
'FB-Cdn-Egress-Remaining',
String(retrievalCandidate.cdnEgressQuota),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we confident it's good enough to respond with the quota for this one retrieval candidate? This means if you have stored with multiple SPs, you will get different responses randomly.

What about instead summing up the quotas of all retrieval candidates, since that's effectively how much you have left?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, got it. There can be multiple retrieval candidates, so we should sum up the quotas from all of them. I'll make the change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose returning two sets of headers: one with the remaining quotas for the selected data-set (as indicated in the X-Data-Set-ID header), and another for the total remaining quota.

If we feel the per-data-set quotas are not needed, then I am okay to not implement such response headers.

Either way, let's rename the headers returning the sum of all quotas to clearly indicate that the value is a sum across all datasets.

)
response.headers.set(
'FB-Cache-Miss-Egress-Remaining',
String(retrievalCandidate.cacheMissEgressQuota),
)
return response
}

// Stream, count bytes and validate (a cache miss)
Comment thread
akronim26 marked this conversation as resolved.
let egressBytes = 0
/** @type {number | null} */
let firstByteAt = null
Expand Down Expand Up @@ -320,18 +327,25 @@ export default {
})(),
)

// Return immediately, proxying the transformed response
Comment thread
akronim26 marked this conversation as resolved.
const response = new Response(returnedStream.readable, {
status: retrievalResult.response.status,
statusText: retrievalResult.response.statusText,
headers: retrievalResult.response.headers,
})
setContentSecurityPolicy(response)
response.headers.set('X-Data-Set-ID', retrievalCandidate.dataSetId)
response.headers.set('FB-Data-Set-ID', retrievalCandidate.dataSetId)
response.headers.set(
'Cache-Control',
`public, max-age=${env.CLIENT_CACHE_TTL}`,
)
response.headers.set(
'FB-Cdn-Egress-Remaining',
String(retrievalCandidate.cdnEgressQuota),
)
response.headers.set(
'FB-Cache-Miss-Egress-Remaining',
String(retrievalCandidate.cacheMissEgressQuota),
)
Comment thread
akronim26 marked this conversation as resolved.
Outdated
return response
} catch (error) {
const { status } = getErrorHttpStatusMessage(error)
Expand Down
106 changes: 100 additions & 6 deletions piece-retriever/test/retriever.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ describe('piece-retriever.fetch', () => {
)
})

it('returns data set ID in the X-Data-Set-ID response header', async () => {
it('returns data set ID in the FB-Data-Set-ID response header', async () => {
const { pieceCid, dataSetId } = CONTENT_STORED_ON_CALIBRATION[0]
const mockRetrieveFile = vi.fn().mockResolvedValue({
response: new Response('hello'),
Expand All @@ -598,7 +598,7 @@ describe('piece-retriever.fetch', () => {
retrieveFile: mockRetrieveFile,
})
expect(await res.text()).toBe('hello')
expect(res.headers.get('X-Data-Set-ID')).toBe(String(dataSetId))
expect(res.headers.get('FB-Data-Set-ID')).toBe(String(dataSetId))
await waitOnExecutionContext(ctx)
})

Expand Down Expand Up @@ -633,7 +633,7 @@ describe('piece-retriever.fetch', () => {
])
})

it('returns data set ID in the X-Data-Set-ID response header when the response body is empty', async () => {
it('returns data set ID in the FB-Data-Set-ID response header when the response body is empty', async () => {
const { pieceCid, dataSetId } = CONTENT_STORED_ON_CALIBRATION[0]
const mockRetrieveFile = vi.fn().mockResolvedValue({
response: new Response(null, { status: 404 }),
Expand All @@ -646,7 +646,7 @@ describe('piece-retriever.fetch', () => {
})
await waitOnExecutionContext(ctx)
expect(res.body).toBeNull()
expect(res.headers.get('X-Data-Set-ID')).toBe(String(dataSetId))
expect(res.headers.get('FB-Data-Set-ID')).toBe(String(dataSetId))
})

it('supports HEAD requests', async () => {
Expand Down Expand Up @@ -981,7 +981,7 @@ describe('piece-retriever.fetch', () => {
expect(await res.text()).toMatch(
/^No available service provider found. Attempted: ID=/,
)
expect(res.headers.get('X-Data-Set-ID')).toBe(String(dataSetId))
expect(res.headers.get('FB-Data-Set-ID')).toBe(String(dataSetId))

const result = await env.DB.prepare(
'SELECT * FROM retrieval_logs WHERE data_set_id = ?',
Expand Down Expand Up @@ -1043,6 +1043,100 @@ describe('piece-retriever.fetch', () => {
expect(result).toMatchObject({ egress_bytes: 0, bot_name: botName })
})

it('sets egress remaining headers for empty response body', async () => {
const dataSetId = 'snapshot-empty'
const pieceCid = 'bagaSnapshotEmpty'
const cdnEgressQuota = 123
const cacheMissEgressQuota = 456

await withDataSetPieces(env, {
dataSetId,
serviceProviderId: 'svc-snap-empty',
payerAddress: defaultPayerAddress,
withCDN: true,
cdnEgressQuota,
cacheMissEgressQuota,
pieceId: 'p-snap-empty',
pieceCid,
Comment thread
akronim26 marked this conversation as resolved.
})
await withApprovedProvider(env, {
id: 'svc-snap-empty',
serviceUrl: 'https://snap-empty.example/',
})

const mockRetrieveFile = vi.fn().mockResolvedValue({
response: new Response(null, { status: 200 }),
cacheMiss: true,
})

const ctx = createExecutionContext()
const req = withRequest(defaultPayerAddress, pieceCid)
const res = await worker.fetch(req, env, ctx, {
retrieveFile: mockRetrieveFile,
})
await waitOnExecutionContext(ctx)

expect(res.status).toBe(200)
expect(res.headers.get('FB-Cdn-Egress-Remaining')).toBe(
String(cdnEgressQuota),
)
expect(res.headers.get('FB-Cache-Miss-Egress-Remaining')).toBe(
String(cacheMissEgressQuota),
)
})

it('sets egress remaining headers for streaming responses', async () => {
const dataSetId = 'snapshot-stream'
const pieceCid = 'bagaSnapshotStream'
const cdnEgressQuota = 789
const cacheMissEgressQuota = 1000

await withDataSetPieces(env, {
dataSetId,
serviceProviderId: 'svc-snap-stream',
payerAddress: defaultPayerAddress,
withCDN: true,
cdnEgressQuota,
cacheMissEgressQuota,
pieceId: 'p-snap-stream',
pieceCid,
})
await withApprovedProvider(env, {
id: 'svc-snap-stream',
serviceUrl: 'https://snap-stream.example/',
})

const fakeBody = new Uint8Array([1, 2, 3, 4])
const fakeResponse = new Response(fakeBody, {
status: 200,
headers: { 'Content-Length': String(fakeBody.length) },
})

const mockRetrieveFile = vi.fn().mockResolvedValue({
response: fakeResponse,
cacheMiss: true,
validate: () => true,
})

const ctx = createExecutionContext()
const req = withRequest(defaultPayerAddress, pieceCid)
const res = await worker.fetch(req, env, ctx, {
retrieveFile: mockRetrieveFile,
})

const body = await res.arrayBuffer()
expect(body.byteLength).toBe(fakeBody.length)
await waitOnExecutionContext(ctx)

expect(res.status).toBe(200)
expect(res.headers.get('FB-Cdn-Egress-Remaining')).toBe(
String(cdnEgressQuota),
)
expect(res.headers.get('FB-Cache-Miss-Egress-Remaining')).toBe(
String(cacheMissEgressQuota),
)
})

it('stores bot name in retrieval logs when SP returns 502', async () => {
const { pieceCid, dataSetId } = CONTENT_STORED_ON_CALIBRATION[0]
const url = 'https://example.com/piece/502test'
Expand Down Expand Up @@ -1114,6 +1208,6 @@ describe('piece-retriever.fetch', () => {
expect(await res.text()).toMatch(
/^No available service provider found. Attempted: ID=/,
)
expect(res.headers.get('X-Data-Set-ID')).toBe(String(dataSetId))
expect(res.headers.get('FB-Data-Set-ID')).toBe(String(dataSetId))
})
})