Skip to content

Commit d408f29

Browse files
committed
feat: use @smithy/undici-http-handler for S3 client
1 parent 74a0f64 commit d408f29

9 files changed

Lines changed: 153 additions & 1867 deletions

File tree

package-lock.json

Lines changed: 75 additions & 1770 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@
8383
"@platformatic/node": "^3.52.0",
8484
"@platformatic/wattpm-pprof-capture": "^3.52.1",
8585
"@shopify/semaphore": "^3.0.2",
86-
"@smithy/node-http-handler": "^2.3.1",
86+
"@smithy/types": "^4.14.3",
87+
"@smithy/undici-http-handler": "^2.0.2",
8788
"@tus/file-store": "2.0.0",
8889
"@tus/s3-store": "2.0.3",
8990
"@tus/server": "2.2.1",
90-
"agentkeepalive": "^4.6.0",
9191
"ajv": "^8.18.0",
9292
"async-retry": "^1.3.3",
9393
"aws-sigv4-sign": "^1.2.1",

src/http/plugins/storage.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ export const storage = fastifyPlugin(
3838
request.storage = new Storage(storageBackend, database, location)
3939
request.cdnCache = new CdnCacheManager(request.storage)
4040
})
41-
42-
fastify.addHook('onClose', async () => {
43-
storageBackend.close()
44-
})
4541
},
4642
{ name: 'storage-init' }
4743
)

src/http/routes/tus/index.ts

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
import * as https from 'node:https'
21
import { S3Client } from '@aws-sdk/client-s3'
32
import { PubSub } from '@internal/database'
43
import { ERRORS } from '@internal/errors'
5-
import { createAgent } from '@internal/http'
4+
import { createAgent, InstrumentedAgent } from '@internal/http'
65
import { logSchema } from '@internal/monitoring'
7-
import { NodeHttpHandler } from '@smithy/node-http-handler'
6+
import { UndiciHttpHandler } from '@smithy/undici-http-handler'
87
import { getFileSizeLimit } from '@storage/limits'
98
import { AlsMemoryKV, FileStore, LockNotifier, PgLocker, UploadId } from '@storage/protocols/tus'
109
import { S3Locker } from '@storage/protocols/tus/s3-locker'
1110
import { S3Store } from '@tus/s3-store'
1211
import { DataStore, Server, ServerOptions } from '@tus/server'
1312
import { FastifyInstance } from 'fastify'
1413
import fastifyPlugin from 'fastify-plugin'
15-
import * as http from 'http'
1614
import type { ServerRequest as Request } from 'srvx'
1715
import { getConfig } from '../../../config'
1816
import { db, dbSuperUser, registerJwtAuth, storage } from '../../plugins'
@@ -47,7 +45,7 @@ const {
4745
storageFilePath,
4846
} = getConfig()
4947

50-
function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent }) {
48+
function createTusStore(agent: InstrumentedAgent) {
5149
if (storageBackendType === 's3') {
5250
return new S3Store({
5351
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
@@ -56,11 +54,7 @@ function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent
5654
maxConcurrentPartUploads: tusMaxConcurrentUploads,
5755
useTags: tusAllowS3Tags,
5856
s3ClientConfig: {
59-
requestHandler: new NodeHttpHandler({
60-
...agent,
61-
connectionTimeout: 5000,
62-
requestTimeout: storageS3ClientTimeout,
63-
}),
57+
requestHandler: new UndiciHttpHandler({ dispatcher: agent.dispatcher }),
6458
bucket: storageS3Bucket,
6559
region: storageS3Region,
6660
endpoint: storageS3Endpoint,
@@ -74,19 +68,12 @@ function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent
7468
})
7569
}
7670

77-
function createTusServer(
78-
lockNotifier: LockNotifier,
79-
agent: { httpsAgent: https.Agent; httpAgent: http.Agent }
80-
) {
71+
function createTusServer(lockNotifier: LockNotifier, agent: InstrumentedAgent) {
8172
const datastore = createTusStore(agent)
8273
const sharedS3Client =
8374
tusLockType === 's3'
8475
? new S3Client({
85-
requestHandler: new NodeHttpHandler({
86-
...agent,
87-
connectionTimeout: 5000,
88-
requestTimeout: storageS3ClientTimeout,
89-
}),
76+
requestHandler: new UndiciHttpHandler({ dispatcher: agent.dispatcher }),
9077
region: storageS3Region,
9178
endpoint: storageS3Endpoint,
9279
forcePathStyle: storageS3ForcePathStyle,
@@ -171,11 +158,12 @@ export default async function routes(fastify: FastifyInstance) {
171158

172159
const agent = createAgent('s3_tus', {
173160
maxSockets: storageS3MaxSockets,
161+
requestTimeoutMs: storageS3ClientTimeout,
174162
})
175163
agent.monitor()
176164

177165
fastify.addHook('onClose', async () => {
178-
agent.close()
166+
await agent.close()
179167

180168
await lockNotifier.stop().catch((e) => {
181169
logSchema.error(fastify.log, 'Failed to stop TUS lock notifier', {

src/internal/http/agent.ts

Lines changed: 57 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,115 +4,112 @@ import {
44
httpPoolFreeSockets,
55
httpPoolPendingRequests,
66
} from '@internal/monitoring/metrics'
7-
import Agent, { HttpsAgent } from 'agentkeepalive'
7+
import { Agent } from 'undici'
88
import { getConfig } from '../../config'
99

1010
const { region } = getConfig()
1111

1212
export interface InstrumentedAgent {
13-
httpAgent: Agent
14-
httpsAgent: HttpsAgent
13+
dispatcher: Agent
1514
monitor: () => NodeJS.Timeout | undefined
16-
close: () => void
15+
close: () => Promise<void>
1716
}
1817

1918
export interface AgentStats {
2019
busySocketCount: number
2120
freeSocketCount: number
2221
pendingRequestCount: number
2322
errorSocketCount: number
24-
timeoutSocketCount: number
25-
createSocketErrorCount: number
23+
}
24+
25+
export interface AgentOptions {
26+
maxSockets: number
27+
connectTimeoutMs?: number
28+
requestTimeoutMs?: number
2629
}
2730

2831
/**
29-
* Creates an instrumented agent
30-
* Adding metrics to the agent
32+
* Creates an instrumented undici Agent.
33+
* Tracks connect errors via the Agent's `connectionError` event so the
34+
* `http_pool_errors` metric stays populated.
3135
*/
32-
export function createAgent(name: string, options: { maxSockets: number }): InstrumentedAgent {
33-
const agentOptions = {
34-
maxSockets: options.maxSockets,
35-
keepAlive: true,
36-
keepAliveMsecs: 1000,
37-
freeSocketTimeout: 1000 * 15,
38-
}
36+
export function createAgent(name: string, options: AgentOptions): InstrumentedAgent {
37+
const dispatcher = new Agent({
38+
connections: options.maxSockets,
39+
keepAliveTimeout: 15_000,
40+
pipelining: 1,
41+
headersTimeout: options.requestTimeoutMs ?? 0,
42+
bodyTimeout: options.requestTimeoutMs ?? 0,
43+
connect: {
44+
timeout: options.connectTimeoutMs ?? 5_000,
45+
keepAlive: true,
46+
keepAliveInitialDelay: 1_000,
47+
},
48+
})
3949

40-
const httpAgent = new Agent(agentOptions)
41-
const httpsAgent = new HttpsAgent(agentOptions)
42-
let watcher: NodeJS.Timeout | undefined = undefined
50+
let errorCount = 0
51+
dispatcher.on('connectionError', () => {
52+
errorCount++
53+
})
54+
55+
let watcher: NodeJS.Timeout | undefined
56+
let closing: Promise<void> | undefined
4357

4458
return {
45-
httpAgent,
46-
httpsAgent,
59+
dispatcher,
4760
monitor: () => {
48-
const agent = watchAgent(name, 'https', httpsAgent)
49-
watcher = agent
50-
return agent
61+
watcher = watchAgent(name, dispatcher, () => errorCount)
62+
return watcher
5163
},
5264
close: () => {
65+
if (closing) return closing
5366
if (watcher) {
5467
clearInterval(watcher)
68+
watcher = undefined
5569
}
70+
closing = dispatcher.close()
71+
return closing
5672
},
5773
}
5874
}
5975

60-
/**
61-
* Updates HTTP agent metrics
62-
*/
63-
function updateHttpAgentMetrics(name: string, protocol: string, stats: AgentStats) {
64-
const baseAttrs = { name, protocol }
76+
function updateHttpAgentMetrics(name: string, stats: AgentStats) {
77+
const baseAttrs = { name, protocol: 'https' }
6578

6679
httpPoolBusySockets.record(stats.busySocketCount, baseAttrs)
6780
httpPoolFreeSockets.record(stats.freeSocketCount, baseAttrs)
6881
httpPoolPendingRequests.record(stats.pendingRequestCount, { name, region })
69-
httpPoolErrors.record(stats.errorSocketCount, { ...baseAttrs, type: 'socket_error' })
70-
httpPoolErrors.record(stats.timeoutSocketCount, { ...baseAttrs, type: 'timeout_socket_error' })
71-
httpPoolErrors.record(stats.createSocketErrorCount, { ...baseAttrs, type: 'create_socket_error' })
82+
httpPoolErrors.record(stats.errorSocketCount, { ...baseAttrs, type: 'connect_error' })
7283
}
7384

74-
export function watchAgent(name: string, protocol: 'http' | 'https', agent: Agent | HttpsAgent) {
85+
export function watchAgent(name: string, dispatcher: Agent, getErrorCount: () => number) {
7586
return setInterval(() => {
76-
const httpStatus = agent.getCurrentStatus()
77-
78-
const httpStats = gatherHttpAgentStats(httpStatus)
79-
80-
updateHttpAgentMetrics(name, protocol, httpStats)
87+
const stats = gatherDispatcherStats(dispatcher, getErrorCount())
88+
updateHttpAgentMetrics(name, stats)
8189
}, 5000)
8290
}
8391

84-
// Function to update metrics based on the current status of the agent
85-
export function gatherHttpAgentStats(status: Agent.AgentStatus) {
86-
// Calculate the number of busy sockets by iterating over the `sockets` object
92+
export function gatherDispatcherStats(dispatcher: Agent, errorCount: number): AgentStats {
8793
let busySocketCount = 0
88-
for (const host in status.sockets) {
89-
if (status.sockets.hasOwnProperty(host)) {
90-
busySocketCount += status.sockets[host]
91-
}
92-
}
93-
94-
// Calculate the number of free sockets by iterating over the `freeSockets` object
9594
let freeSocketCount = 0
96-
for (const host in status.freeSockets) {
97-
if (status.freeSockets.hasOwnProperty(host)) {
98-
freeSocketCount += status.freeSockets[host]
99-
}
100-
}
101-
102-
// Calculate the number of pending requests by iterating over the `requests` object
10395
let pendingRequestCount = 0
104-
for (const host in status.requests) {
105-
if (status.requests.hasOwnProperty(host)) {
106-
pendingRequestCount += status.requests[host]
96+
97+
for (const origin of Object.keys(dispatcher.stats)) {
98+
const s = dispatcher.stats[origin] as {
99+
running: number
100+
free?: number
101+
pending: number
102+
queued?: number
107103
}
104+
busySocketCount += s.running
105+
freeSocketCount += s.free ?? 0
106+
pendingRequestCount += s.pending + (s.queued ?? 0)
108107
}
109108

110109
return {
111110
busySocketCount,
112111
freeSocketCount,
113112
pendingRequestCount,
114-
errorSocketCount: status.errorSocketCount,
115-
timeoutSocketCount: status.timeoutSocketCount,
116-
createSocketErrorCount: status.createSocketErrorCount,
113+
errorSocketCount: errorCount,
117114
}
118115
}

src/storage/backend/adapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ export abstract class StorageBackendAdapter {
236236
throw new Error('not implemented')
237237
}
238238

239-
close(): void {
239+
close(): void | Promise<void> {
240240
// do nothing
241241
}
242242
}

src/storage/backend/s3/adapter.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
2222
import { ERRORS, StorageBackendError } from '@internal/errors'
2323
import { createAgent, InstrumentedAgent } from '@internal/http'
2424
import { monitorStream } from '@internal/streams'
25-
import { NodeHttpHandler } from '@smithy/node-http-handler'
25+
import { UndiciHttpHandler } from '@smithy/undici-http-handler'
2626
import { BackupObjectInfo, ObjectBackup } from '@storage/backend/s3/backup'
2727
import { getConfig } from '../../../config'
2828
import {
@@ -71,9 +71,10 @@ export class S3Backend implements StorageBackendAdapter {
7171
options.httpAgent ??
7272
createAgent('s3_default', {
7373
maxSockets: storageS3MaxSockets,
74+
requestTimeoutMs: options.requestTimeout,
7475
})
7576

76-
if (this.agent.httpsAgent && tracingEnabled) {
77+
if (tracingEnabled) {
7778
this.agent.monitor()
7879
}
7980

@@ -683,19 +684,16 @@ export class S3Backend implements StorageBackendAdapter {
683684
}
684685

685686
close() {
686-
this.agent.close()
687+
return this.agent.close()
687688
}
688689

689690
protected createS3Client(options: S3ClientOptions & { name: string }) {
690691
const params: S3ClientConfig = {
691692
region: options.region,
692693
runtime: 'node',
693694
requestStreamBufferSize: 32 * 1024,
694-
requestHandler: new NodeHttpHandler({
695-
httpAgent: options.httpAgent?.httpAgent,
696-
httpsAgent: options.httpAgent?.httpsAgent,
697-
connectionTimeout: 5000,
698-
requestTimeout: options.requestTimeout,
695+
requestHandler: new UndiciHttpHandler({
696+
dispatcher: options.httpAgent?.dispatcher,
699697
}),
700698
}
701699
if (options.endpoint) {

src/storage/events/base-event.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import { createStorageBackend, StorageBackendAdapter } from '../backend'
88
import { StorageKnexDB } from '../database'
99
import { Storage } from '../storage'
1010

11-
const { storageS3Bucket, storageS3MaxSockets, storageBackendType, region } = getConfig()
11+
const { storageS3Bucket, storageS3MaxSockets, storageS3ClientTimeout, storageBackendType, region } =
12+
getConfig()
1213

1314
let storageBackend: StorageBackendAdapter | undefined = undefined
1415
let Webhook: Awaited<typeof import('./lifecycle/webhook')>['Webhook'] | undefined = undefined
@@ -95,6 +96,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends
9596

9697
const httpAgent = createAgent('s3_worker', {
9798
maxSockets: storageS3MaxSockets,
99+
requestTimeoutMs: storageS3ClientTimeout,
98100
})
99101

100102
storageBackend = createStorageBackend(storageBackendType, {

src/storage/renderer/renderer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ function isCallerAbort(error: unknown, signal: AbortSignal | undefined) {
229229
return true
230230
}
231231

232-
// AWS SDK via @smithy/node-http-handler creates a fresh AbortError without
232+
// AWS SDK via @smithy/undici-http-handler creates a fresh AbortError without
233233
// preserving the original reason. Once our signal is aborted, treat that as
234234
// caller-driven to surface 499 instead of 500.
235235
return isAbortError(error)

0 commit comments

Comments
 (0)