Skip to content

Commit a1c4c1f

Browse files
authored
@s3/store: allow providing custom cache implementation (#552)
* feat: allow providing custom cache implementation * fix: reuse configstores as the s3 cache * rename: configstores to kvstores
1 parent 5ee803f commit a1c4c1f

File tree

10 files changed

+171
-21
lines changed

10 files changed

+171
-21
lines changed

packages/s3-store/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ Once the expiration period has passed, the upload url will return a 410 Gone sta
8282
Some S3 providers don't support tagging objects.
8383
If you are using certain features like the expiration extension and your provider doesn't support tagging, you can set this option to `false` to disable tagging.
8484

85+
#### `options.cache`
86+
87+
An optional cache implementation. If not provided, the store will use an in-memory cache (`MemoryConfigStore`).
88+
When running multiple instances of the server, you need to provide a cache implementation that is shared between all instances like the `RedisConfigStore`.
89+
8590
## Extensions
8691

8792
The tus protocol supports optional [extensions][]. Below is a table of the supported extensions in `@tus/s3-store`.

packages/s3-store/index.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,25 @@ import http from 'node:http'
77
import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3'
88
import debug from 'debug'
99

10-
import {DataStore, StreamSplitter, Upload} from '@tus/server'
11-
import {ERRORS, TUS_RESUMABLE} from '@tus/server'
10+
import {
11+
DataStore,
12+
StreamSplitter,
13+
Upload,
14+
ERRORS,
15+
TUS_RESUMABLE,
16+
KvStore,
17+
MemoryKvStore,
18+
} from '@tus/server'
1219

1320
const log = debug('tus-node-server:stores:s3store')
1421

15-
function calcOffsetFromParts(parts?: Array<AWS.Part>) {
16-
// @ts-expect-error not undefined
17-
return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0
18-
}
19-
2022
type Options = {
2123
// The preferred part size for parts send to S3. Can not be lower than 5MiB or more than 5GiB.
2224
// The server calculates the optimal part size, which takes this size into account,
2325
// but may increase it to not exceed the S3 10K parts limit.
2426
partSize?: number
2527
useTags?: boolean
28+
cache?: KvStore<MetadataValue>
2629
expirationPeriodInMilliseconds?: number
2730
// Options to pass to the AWS S3 SDK.
2831
s3ClientConfig: S3ClientConfig & {bucket: string}
@@ -33,6 +36,12 @@ type MetadataValue = {
3336
'upload-id': string
3437
'tus-version': string
3538
}
39+
40+
function calcOffsetFromParts(parts?: Array<AWS.Part>) {
41+
// @ts-expect-error not undefined
42+
return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0
43+
}
44+
3645
// Implementation (based on https://github.com/tus/tusd/blob/master/s3store/s3store.go)
3746
//
3847
// Once a new tus upload is initiated, multiple objects in S3 are created:
@@ -68,7 +77,7 @@ type MetadataValue = {
6877
// to S3.
6978
export class S3Store extends DataStore {
7079
private bucket: string
71-
private cache: Map<string, MetadataValue> = new Map()
80+
private cache: KvStore<MetadataValue>
7281
private client: S3
7382
private preferredPartSize: number
7483
private expirationPeriodInMilliseconds = 0
@@ -93,6 +102,7 @@ export class S3Store extends DataStore {
93102
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
94103
this.useTags = options.useTags ?? true
95104
this.client = new S3(restS3ClientConfig)
105+
this.cache = options.cache ?? new MemoryKvStore<MetadataValue>()
96106
}
97107

98108
protected shouldUseExpirationTags() {
@@ -152,8 +162,8 @@ export class S3Store extends DataStore {
152162
* HTTP calls to S3.
153163
*/
154164
private async getMetadata(id: string): Promise<MetadataValue> {
155-
const cached = this.cache.get(id)
156-
if (cached?.file) {
165+
const cached = await this.cache.get(id)
166+
if (cached) {
157167
return cached
158168
}
159169

@@ -162,7 +172,7 @@ export class S3Store extends DataStore {
162172
Key: this.infoKey(id),
163173
})
164174
const file = JSON.parse((await Body?.transformToString()) as string)
165-
this.cache.set(id, {
175+
const metadata: MetadataValue = {
166176
'tus-version': Metadata?.['tus-version'] as string,
167177
'upload-id': Metadata?.['upload-id'] as string,
168178
file: new Upload({
@@ -172,8 +182,9 @@ export class S3Store extends DataStore {
172182
metadata: file.metadata,
173183
creation_date: file.creation_date,
174184
}),
175-
})
176-
return this.cache.get(id) as MetadataValue
185+
}
186+
await this.cache.set(id, metadata)
187+
return metadata
177188
}
178189

179190
private infoKey(id: string) {
@@ -423,10 +434,12 @@ export class S3Store extends DataStore {
423434
id: string,
424435
partNumberMarker?: string
425436
): Promise<Array<AWS.Part>> {
437+
const metadata = await this.getMetadata(id)
438+
426439
const params: AWS.ListPartsCommandInput = {
427440
Bucket: this.bucket,
428441
Key: id,
429-
UploadId: this.cache.get(id)?.['upload-id'],
442+
UploadId: metadata['upload-id'],
430443
PartNumberMarker: partNumberMarker,
431444
}
432445

@@ -450,9 +463,9 @@ export class S3Store extends DataStore {
450463
/**
451464
* Removes cached data for a given file.
452465
*/
453-
private clearCache(id: string) {
466+
private async clearCache(id: string) {
454467
log(`[${id}] removing cached data`)
455-
this.cache.delete(id)
468+
await this.cache.delete(id)
456469
}
457470

458471
private calcOptimalPartSize(size?: number): number {
@@ -546,7 +559,7 @@ export class S3Store extends DataStore {
546559
const parts = await this.retrieveParts(id)
547560
await this.finishMultipartUpload(metadata, parts)
548561
await this.completeMetadata(metadata.file)
549-
this.clearCache(id)
562+
await this.clearCache(id)
550563
} catch (error) {
551564
log(`[${id}] failed to finish upload`, error)
552565
throw error
@@ -579,8 +592,7 @@ export class S3Store extends DataStore {
579592
// Spaces, can also return NoSuchKey.
580593
if (error.Code === 'NoSuchUpload' || error.Code === 'NoSuchKey') {
581594
return new Upload({
582-
id,
583-
...this.cache.get(id)?.file,
595+
...metadata.file,
584596
offset: metadata.file.size as number,
585597
size: metadata.file.size,
586598
metadata: metadata.file.metadata,
@@ -594,8 +606,7 @@ export class S3Store extends DataStore {
594606
const incompletePartSize = await this.getIncompletePartSize(id)
595607

596608
return new Upload({
597-
id,
598-
...this.cache.get(id)?.file,
609+
...metadata.file,
599610
offset: offset + (incompletePartSize ?? 0),
600611
size: metadata.file.size,
601612
})

packages/server/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
"tsconfig": "*",
4141
"typescript": "^5.3.3"
4242
},
43+
"optionalDependencies": {
44+
"@redis/client": "^1.5.13"
45+
},
4346
"engines": {
4447
"node": ">=16"
4548
}

packages/server/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from './types'
33
export * from './models'
44
export * from './lockers'
55
export * from './constants'
6+
export * from './kvstores'
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import fs from 'node:fs/promises'
2+
import path from 'node:path'
3+
4+
import {KvStore} from './Types'
5+
import {Upload} from '../models'
6+
7+
/**
8+
* FileConfigstore writes the `Upload` JSON metadata to disk next the uploaded file itself.
9+
* It uses a queue which only processes one operation at a time to prevent unsafe concurrent access.
10+
*/
11+
export class FileKvStore<T = Upload> implements KvStore<T> {
12+
directory: string
13+
14+
constructor(path: string) {
15+
this.directory = path
16+
}
17+
18+
async get(key: string): Promise<T | undefined> {
19+
try {
20+
const buffer = await fs.readFile(this.resolve(key), 'utf8')
21+
return JSON.parse(buffer as string)
22+
} catch {
23+
return undefined
24+
}
25+
}
26+
27+
async set(key: string, value: T): Promise<void> {
28+
await fs.writeFile(this.resolve(key), JSON.stringify(value))
29+
}
30+
31+
async delete(key: string): Promise<void> {
32+
await fs.rm(this.resolve(key))
33+
}
34+
35+
async list(): Promise<Array<string>> {
36+
const files = await fs.readdir(this.directory)
37+
const sorted = files.sort((a, b) => a.localeCompare(b))
38+
const name = (file: string) => path.basename(file, '.json')
39+
// To only return tus file IDs we check if the file has a corresponding JSON info file
40+
return sorted.filter(
41+
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
42+
)
43+
}
44+
45+
private resolve(key: string): string {
46+
return path.resolve(this.directory, `${key}.json`)
47+
}
48+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import {Upload} from '../models'
2+
import {KvStore} from './Types'
3+
4+
/**
5+
* Memory based configstore.
6+
* Used mostly for unit tests.
7+
*/
8+
export class MemoryKvStore<T = Upload> implements KvStore<T> {
9+
data: Map<string, T> = new Map()
10+
11+
async get(key: string): Promise<T | undefined> {
12+
return this.data.get(key)
13+
}
14+
15+
async set(key: string, value: T): Promise<void> {
16+
this.data.set(key, value)
17+
}
18+
19+
async delete(key: string): Promise<void> {
20+
this.data.delete(key)
21+
}
22+
23+
async list(): Promise<Array<string>> {
24+
return [...this.data.keys()]
25+
}
26+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import {RedisClientType} from '@redis/client'
2+
import {KvStore} from './Types'
3+
import {Upload} from '../models'
4+
5+
/**
6+
* Redis based configstore.
7+
*
8+
* @author Mitja Puzigaća <[email protected]>
9+
*/
10+
export class RedisKvStore<T = Upload> implements KvStore<T> {
11+
constructor(private redis: RedisClientType, private prefix: string = '') {
12+
this.redis = redis
13+
this.prefix = prefix
14+
}
15+
16+
async get(key: string): Promise<T | undefined> {
17+
return this.deserializeValue(await this.redis.get(this.prefix + key))
18+
}
19+
20+
async set(key: string, value: T): Promise<void> {
21+
await this.redis.set(this.prefix + key, this.serializeValue(value))
22+
}
23+
24+
async delete(key: string): Promise<void> {
25+
await this.redis.del(this.prefix + key)
26+
}
27+
28+
async list(): Promise<Array<string>> {
29+
return this.redis.keys(this.prefix + '*')
30+
}
31+
32+
private serializeValue(value: T): string {
33+
return JSON.stringify(value)
34+
}
35+
36+
private deserializeValue(buffer: string | null): T | undefined {
37+
return buffer ? JSON.parse(buffer) : undefined
38+
}
39+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import {Upload} from '../models'
2+
3+
export interface KvStore<T = Upload> {
4+
get(key: string): Promise<T | undefined>
5+
set(key: string, value: T): Promise<void>
6+
delete(key: string): Promise<void>
7+
8+
list?(): Promise<Array<string>>
9+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export {FileKvStore} from './FileKvStore'
2+
export {MemoryKvStore} from './MemoryKvStore'
3+
export {RedisKvStore} from './RedisKvStore'
4+
export {KvStore} from './Types'

yarn.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,6 +1735,7 @@ __metadata:
17351735
version: 0.0.0-use.local
17361736
resolution: "@tus/server@workspace:packages/server"
17371737
dependencies:
1738+
"@redis/client": "npm:^1.5.13"
17381739
"@types/debug": "npm:^4.1.12"
17391740
"@types/mocha": "npm:^10.0.6"
17401741
"@types/node": "npm:^20.11.5"
@@ -1751,6 +1752,9 @@ __metadata:
17511752
ts-node: "npm:^10.9.2"
17521753
tsconfig: "npm:*"
17531754
typescript: "npm:^5.3.3"
1755+
dependenciesMeta:
1756+
"@redis/client":
1757+
optional: true
17541758
languageName: unknown
17551759
linkType: soft
17561760

0 commit comments

Comments
 (0)