-
-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy paths3.js
More file actions
142 lines (132 loc) · 4.34 KB
/
s3.js
File metadata and controls
142 lines (132 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import * as t from '../types.js'
import * as buffer from 'lib0/buffer'
import { Client as S3Client } from 'minio'
import { Readable } from 'stream'
import http from 'http'
import https from 'https'
import { logger } from '../logger.js'
const log = logger.child({ module: 's3' })
/**
* @typedef {{ bucket: string, endPoint: string, port: number, useSSL: boolean, accessKey: string, secretKey: string }} S3Conf
*/
const TRANSIENT_CODES = new Set(['ECONNRESET', 'ECONNREFUSED', 'ETIMEDOUT', 'ESOCKETTIMEDOUT'])
const TRANSIENT_RE = /ECONNRESET|socket hang up|EPIPE/i
const S3_PART_SIZE = 5 * 1024 * 1024
/**
* Transient errors are temporary network failures where retrying the same request is expected to
* succeed (e.g. a keepalive connection dropped by the server, a momentary timeout, or throttling).
*
* @param {unknown} err
*/
const isTransient = (err) => {
if (!(err instanceof Error)) return false
const code = /** @type {any} */ (err).code
const status = /** @type {any} */ (err).statusCode
return TRANSIENT_CODES.has(code) ||
TRANSIENT_RE.test(code) ||
TRANSIENT_RE.test(err.message) ||
status === 503 ||
status === 429
}
/**
* @implements {t.PersistencePlugin}
*/
export class S3PersistenceV1 {
/**
* @param {S3Conf} s3conf
*/
constructor (s3conf) {
this.bucket = s3conf.bucket
const Agent = s3conf.useSSL ? https.Agent : http.Agent
this._agent = new Agent({ keepAlive: true, keepAliveMsecs: 30_000 })
this.s3client = new S3Client({ ...s3conf, transportAgent: this._agent, partSize: S3_PART_SIZE })
}
get pluginid () {
return 'S3Persistence:v1'
}
async init () {
log.info({ bucket: this.bucket }, 'checking if S3 bucket exists')
const exists = await this.s3client.bucketExists(this.bucket)
if (!exists) {
log.info({ bucket: this.bucket }, 'creating S3 bucket')
await this.s3client.makeBucket(this.bucket)
log.info({ bucket: this.bucket }, 'S3 bucket created')
} else {
log.info({ bucket: this.bucket }, 'S3 bucket already exists')
}
}
/**
* @param {t.AssetId} assetId
* @param {t.Asset} asset
* @return {Promise<t.RetrievableAsset?>}
*/
async store (assetId, asset) {
if (assetId.branch === 'main') {
const path = t.assetIdToString(assetId)
const file = Buffer.from(buffer.encodeAny(asset))
const put = () => this.s3client.putObject(this.bucket, path, Readable.from(file), file.length)
try {
await put()
} catch (e) {
if (!isTransient(e)) throw e
log.warn({ err: e, path }, 'transient error storing object, retrying')
await put()
}
return {
type: 'asset:retrievable:v1',
plugin: this.pluginid
}
}
return null
}
/**
* @param {t.AssetId} assetId
* @param {t.Asset} assetInfo
* @return {Promise<t.Asset?>}
*/
async retrieve (assetId, assetInfo) {
if (assetInfo.type === 'asset:retrievable:v1' && assetInfo.plugin === this.pluginid) {
const path = t.assetIdToString(assetId)
const get = async () => {
try {
const stream = await this.s3client.getObject(this.bucket, path)
const chunks = []
for await (const chunk of stream) {
chunks.push(chunk)
}
return Buffer.concat(chunks)
} catch (e) {
if (/** @type {any} */ (e)?.code === 'NoSuchKey') return null
throw e
}
}
let data
try {
data = await get()
} catch (e) {
if (!isTransient(e)) throw e
log.warn({ err: e, path }, 'transient error retrieving object, retrying')
data = await get()
}
return data && t.$asset.expect(buffer.decodeAny(data))
}
return null
}
/**
* @param {t.AssetId} assetId
* @param {t.Asset} assetInfo
* @return {Promise<boolean>}
*/
async delete (assetId, assetInfo) {
if (assetInfo.type !== 'asset:retrievable:v1' || assetInfo.plugin !== this.pluginid) {
return false
}
const path = t.assetIdToString(assetId)
setTimeout(() => {
// delete at some point later, avoiding issues of clients pulling from stale data
// @todo it would be nice to implement a worker that finds unused s3 docs and deletes them
this.s3client.removeObject(this.bucket, path).catch(err => log.error({ err, path }, 'error deleting object'))
}, 10_000)
return true
}
}