Skip to content

Commit 9e07f17

Browse files
authored
fix: orphan object client detector (#672)
1 parent 5fe27e5 commit 9e07f17

File tree

3 files changed

+325
-0
lines changed

3 files changed

+325
-0
lines changed

src/internal/streams/ndjson.ts

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { Transform, TransformCallback } from 'stream'
2+
import { StringDecoder } from 'string_decoder'
3+
4+
export class NdJsonTransform extends Transform {
5+
private decoder = new StringDecoder('utf8')
6+
private buffer = ''
7+
8+
constructor() {
9+
super({ readableObjectMode: true })
10+
}
11+
12+
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback) {
13+
// decode safely across chunk boundaries
14+
this.buffer += this.decoder.write(chunk)
15+
16+
let newlineIdx: number
17+
while ((newlineIdx = this.buffer.indexOf('\n')) !== -1) {
18+
const line = this.buffer.slice(0, newlineIdx)
19+
this.buffer = this.buffer.slice(newlineIdx + 1)
20+
if (line.trim()) {
21+
let obj
22+
try {
23+
obj = JSON.parse(line)
24+
} catch (err) {
25+
if (err instanceof Error) {
26+
// this is the case when JSON.parse fails
27+
return callback(new Error(`Invalid JSON on flush: ${err.message}`))
28+
}
29+
30+
return callback(err as Error)
31+
}
32+
// .push() participates in backpressure automatically
33+
this.push(obj)
34+
}
35+
}
36+
37+
callback()
38+
}
39+
40+
_flush(callback: TransformCallback) {
41+
this.buffer += this.decoder.end()
42+
if (this.buffer.trim()) {
43+
try {
44+
this.push(JSON.parse(this.buffer))
45+
} catch (err) {
46+
if (err instanceof Error) {
47+
// this is the case when JSON.parse fails
48+
return callback(new Error(`Invalid JSON on flush: ${err.message}`))
49+
}
50+
51+
return callback(err as Error)
52+
}
53+
}
54+
callback()
55+
}
56+
}

src/scripts/orphan-client.ts

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import axios from 'axios'
2+
import { NdJsonTransform } from '@internal/streams/ndjson'
3+
import fs from 'fs'
4+
import path from 'path'
5+
6+
const ADMIN_URL = process.env.ADMIN_URL
7+
const ADMIN_API_KEY = process.env.ADMIN_API_KEY
8+
const TENANT_ID = process.env.TENANT_ID
9+
const BUCKET_ID = process.env.BUCKET_ID
10+
11+
const BEFORE = undefined // new Date().toISOString()
12+
13+
const FILE_PATH = (operation: string) =>
14+
`../../dist/${operation}-${TENANT_ID}-${Date.now()}-orphan-objects.json`
15+
16+
const client = axios.create({
17+
baseURL: ADMIN_URL,
18+
headers: {
19+
ApiKey: ADMIN_API_KEY,
20+
},
21+
})
22+
23+
interface OrphanObject {
24+
event: 'data'
25+
type: 's3Orphans'
26+
value: {
27+
name: string
28+
version: string
29+
size: number
30+
}[]
31+
}
32+
33+
interface PingObject {
34+
event: 'ping'
35+
}
36+
37+
async function main() {
38+
const action = process.argv[2]
39+
40+
if (!action) {
41+
console.error('Please provide an action: list or delete')
42+
return
43+
}
44+
45+
if (!TENANT_ID) {
46+
console.error('Please provide a tenant ID')
47+
return
48+
}
49+
50+
if (!BUCKET_ID) {
51+
console.error('Please provide a bucket ID')
52+
return
53+
}
54+
55+
if (action === 'list') {
56+
await listOrphans(TENANT_ID, BUCKET_ID)
57+
return
58+
}
59+
60+
await deleteS3Orphans(TENANT_ID, BUCKET_ID)
61+
}
62+
63+
/**
64+
* List Orphan objects in a bucket
65+
* @param tenantId
66+
* @param bucketId
67+
*/
68+
async function listOrphans(tenantId: string, bucketId: string) {
69+
const request = await client.get(`/tenants/${tenantId}/buckets/${bucketId}/orphan-objects`, {
70+
responseType: 'stream',
71+
params: {
72+
before: BEFORE,
73+
},
74+
})
75+
76+
const transformStream = new NdJsonTransform()
77+
request.data.on('error', (err: Error) => {
78+
transformStream.emit('error', err)
79+
})
80+
81+
const jsonStream = request.data.pipe(transformStream)
82+
83+
await writeStreamToJsonArray(jsonStream, FILE_PATH('list'))
84+
}
85+
86+
/**
87+
* Deletes S3 orphan objects in a bucket
88+
* @param tenantId
89+
* @param bucketId
90+
*/
91+
async function deleteS3Orphans(tenantId: string, bucketId: string) {
92+
const request = await client.delete(`/tenants/${tenantId}/buckets/${bucketId}/orphan-objects`, {
93+
responseType: 'stream',
94+
data: {
95+
deleteS3Keys: true,
96+
before: BEFORE,
97+
},
98+
})
99+
100+
const transformStream = new NdJsonTransform()
101+
request.data.on('error', (err: Error) => {
102+
transformStream.emit('error', err)
103+
})
104+
105+
const jsonStream = request.data.pipe(transformStream)
106+
107+
await writeStreamToJsonArray(jsonStream, FILE_PATH('delete'))
108+
}
109+
110+
/**
111+
* Writes the output to a JSON array
112+
* @param stream
113+
* @param relativePath
114+
*/
115+
async function writeStreamToJsonArray(
116+
stream: NodeJS.ReadableStream,
117+
relativePath: string
118+
): Promise<void> {
119+
const filePath = path.resolve(__dirname, relativePath)
120+
const localFile = fs.createWriteStream(filePath)
121+
122+
// Start with an empty array
123+
localFile.write('[\n')
124+
let isFirstItem = true
125+
126+
return new Promise((resolve, reject) => {
127+
let receivedAnyData = false
128+
129+
stream.on('data', (data: OrphanObject | PingObject) => {
130+
if (data.event === 'ping') {
131+
console.log('Received ping event, ignoring')
132+
return
133+
}
134+
135+
if (data.event === 'data' && data.value && Array.isArray(data.value)) {
136+
receivedAnyData = true
137+
console.log(`Processing ${data.value.length} objects`)
138+
139+
for (const item of data.value) {
140+
if (!isFirstItem) {
141+
localFile.write(',\n')
142+
} else {
143+
isFirstItem = false
144+
}
145+
146+
localFile.write(JSON.stringify(item, null, 2))
147+
}
148+
} else {
149+
console.warn(
150+
'Received data with invalid format:',
151+
JSON.stringify(data).substring(0, 100) + '...'
152+
)
153+
}
154+
})
155+
156+
stream.on('error', (err) => {
157+
console.error('Stream error:', err)
158+
localFile.end('\n]', () => {
159+
reject(err)
160+
})
161+
})
162+
163+
stream.on('end', () => {
164+
localFile.write('\n]')
165+
localFile.end(() => {
166+
resolve()
167+
})
168+
169+
if (!receivedAnyData) {
170+
console.warn(`No data was received! File might be empty: ${filePath}`)
171+
} else {
172+
// Check if the file exists and has content
173+
console.log(`Finished writing data to ${filePath}. Data was received and saved.`)
174+
}
175+
})
176+
})
177+
}
178+
179+
main()
180+
.catch((e) => {
181+
console.error('Error:', e)
182+
})
183+
.then(() => {
184+
console.log('Done')
185+
})

src/test/ndjson.test.ts

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// NdJsonTransform.test.ts
2+
3+
import { Buffer } from 'buffer'
4+
import { NdJsonTransform } from '@internal/streams/ndjson'
5+
6+
/**
7+
* Helper that writes the given chunks into the transform,
8+
* collects all the parsed objects (in order), and resolves
9+
* them—or rejects on error.
10+
*/
11+
function collect(transform: NdJsonTransform, chunks: Buffer[]): Promise<any[]> {
12+
return new Promise((resolve, reject) => {
13+
const out: any[] = []
14+
transform.on('data', (obj) => out.push(obj))
15+
transform.on('error', (err) => reject(err))
16+
transform.on('end', () => resolve(out))
17+
for (const c of chunks) transform.write(c)
18+
transform.end()
19+
})
20+
}
21+
22+
describe('NdJsonTransform', () => {
23+
it('parses a single JSON object terminated by newline', async () => {
24+
const t = new NdJsonTransform()
25+
const result = await collect(t, [Buffer.from('{"foo":123}\n')])
26+
expect(result).toEqual([{ foo: 123 }])
27+
})
28+
29+
it('parses multiple JSON objects in one chunk', async () => {
30+
const t = new NdJsonTransform()
31+
const chunk = Buffer.from('{"a":1}\n{"b":2}\n')
32+
const result = await collect(t, [chunk])
33+
expect(result).toEqual([{ a: 1 }, { b: 2 }])
34+
})
35+
36+
it('skips empty and whitespace-only lines', async () => {
37+
const t = new NdJsonTransform()
38+
const chunk = Buffer.from('\n \n{"x":10}\n \n')
39+
const result = await collect(t, [chunk])
40+
expect(result).toEqual([{ x: 10 }])
41+
})
42+
43+
it('parses JSON split across multiple chunks', async () => {
44+
const t = new NdJsonTransform()
45+
const chunks = [Buffer.from('{"split":'), Buffer.from('true}\n')]
46+
const result = await collect(t, chunks)
47+
expect(result).toEqual([{ split: true }])
48+
})
49+
50+
it('parses final line without trailing newline on flush', async () => {
51+
const t = new NdJsonTransform()
52+
const chunks = [Buffer.from('{"end":"last"}')]
53+
const result = await collect(t, chunks)
54+
expect(result).toEqual([{ end: 'last' }])
55+
})
56+
57+
it('propagates parse errors in _transform (invalid JSON with newline)', async () => {
58+
const t = new NdJsonTransform()
59+
const bad = Buffer.from('{"foo": bad}\n')
60+
await expect(collect(t, [bad])).rejects.toThrow(/Invalid JSON on flush:/)
61+
})
62+
63+
it('propagates parse errors in _flush (invalid final JSON)', async () => {
64+
const t = new NdJsonTransform()
65+
const bad = Buffer.from('{"incomplete":123')
66+
await expect(collect(t, [bad])).rejects.toThrow(/Invalid JSON on flush:/)
67+
})
68+
69+
it('handles multi-byte UTF-8 characters split across chunk boundary', async () => {
70+
const t = new NdJsonTransform()
71+
const full = Buffer.from('{"emoji":"💩"}\n', 'utf8')
72+
// Split in the middle of the 4‑byte 💩 codepoint:
73+
const chunk1 = full.slice(0, 12) // up through two bytes of the emoji
74+
const chunk2 = full.slice(12) // remainder of emoji + '}' + '\n'
75+
const result = await collect(t, [chunk1, chunk2])
76+
expect(result).toEqual([{ emoji: '💩' }])
77+
})
78+
79+
it('emits no data for completely empty input', async () => {
80+
const t = new NdJsonTransform()
81+
const result = await collect(t, [])
82+
expect(result).toEqual([])
83+
})
84+
})

0 commit comments

Comments
 (0)