Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion biome.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
},
"nursery": {
"noFloatingPromises": "error",
"noUnnecessaryConditions": "warn"
"noUnnecessaryConditions": "error"
}
}
},
Expand Down
14 changes: 3 additions & 11 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ export async function tenantHasMigrations(tenantId: string, migration: keyof typ
export async function* listTenantsToMigrate(signal: AbortSignal) {
let lastCursor = 0

while (true) {
if (signal.aborted) {
break
}

while (!signal.aborted) {
const migrationVersion = await lastLocalMigrationName()

const data = await multitenantKnex
Expand Down Expand Up @@ -140,11 +136,7 @@ export async function* listTenantsToResetMigrations(
) {
let lastCursor = 0

while (true) {
if (signal.aborted) {
break
}

while (!signal.aborted) {
const afterMigrations = Object.keys(DBMigration).filter((migrationName) => {
return DBMigration[migrationName as keyof typeof DBMigration] > DBMigration[migration]
})
Expand Down Expand Up @@ -675,7 +667,7 @@ function runMigrations({
const icebergDefaultShard = icebergShards.length > 0 ? icebergShards[0] : ''

if (migrationsToRun.length > 0) {
await client.query(SQL`SELECT
await client.query(SQL`SELECT
set_config('storage.install_roles', ${dbInstallRoles}, false),
set_config('storage.multitenant', ${isMultitenant ? 'true' : 'false'}, false),
set_config('storage.anon_role', ${dbAnonRole}, false),
Expand Down
12 changes: 1 addition & 11 deletions src/internal/database/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,7 @@ class TenantPool implements PoolStrategy {
}

protected async drainPool(pool: Knex) {
if (!pool?.client?.pool) {
if (pool) return pool.destroy()
return
}

while (true) {
if (!pool?.client?.pool) {
if (pool) return pool.destroy()
return
}

for (; pool?.client?.pool; ) {
let waiting = 0
waiting += pool.client.pool.numPendingAcquires()
waiting += pool.client.pool.numPendingValidations()
Expand Down
24 changes: 6 additions & 18 deletions src/storage/protocols/iceberg/catalog/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,41 +241,29 @@ export class IcebergCatalogReconciler {
}

private async *listNamespaces(shardKey: string) {
let restToken: string | undefined = undefined
while (true) {
let restToken: string | undefined
do {
const resp = await this.restCatalog.listNamespaces({
warehouse: shardKey,
pageSize: 1000,
pageToken: restToken,
})

yield resp.namespaces

if (!resp['next-page-token']) {
break
}

restToken = resp['next-page-token']
}
} while (restToken)
}

private async *listTables(namespaceName: string, shardKey: string) {
let restToken: string | undefined = undefined
while (true) {
let restToken: string | undefined
do {
const resp = await this.restCatalog.listTables({
warehouse: shardKey,
namespace: namespaceName,
pageSize: 1000,
pageToken: restToken,
})

yield resp.identifiers

if (!resp['next-page-token']) {
break
}

restToken = resp['next-page-token']
}
} while (restToken)
}
}
20 changes: 9 additions & 11 deletions src/storage/protocols/s3/s3-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1374,18 +1374,16 @@ function toAwsMeatadataHeaders(records: Record<string, unknown>) {
const metadataHeaders: Record<string, unknown> = {}
let missingCount = 0

if (records) {
Object.keys(records).forEach((key) => {
const value = records[key]
if (value && typeof value === 'string' && isUSASCII(value) && isValidHeader(key, value)) {
metadataHeaders['x-amz-meta-' + key.toLowerCase()] = value
} else {
missingCount++
}
})
}
Object.keys(records).forEach((key) => {
const value = records[key]
if (value && typeof value === 'string' && isUSASCII(value) && isValidHeader(key, value)) {
metadataHeaders['x-amz-meta-' + key.toLowerCase()] = value
} else {
missingCount++
}
})

if (missingCount) {
if (missingCount > 0) {
metadataHeaders['x-amz-missing-meta'] = missingCount
}

Expand Down
17 changes: 3 additions & 14 deletions src/storage/scanner/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,7 @@ export class ObjectScanner {
) {
let nextToken: string | undefined = undefined

while (true) {
if (options.signal.aborted) {
break
}

for (; !options.signal.aborted; ) {
const storageObjects = await this.storage.db.listObjects(
bucket,
'id,name,version,metadata',
Expand Down Expand Up @@ -195,10 +191,7 @@ export class ObjectScanner {
}

protected async *listAllCacheS3Keys(tableName: string, nextItem: string, signal: AbortSignal) {
while (true) {
if (signal.aborted) {
break
}
for (; !signal.aborted; ) {
const query = this.storage.db.connection.pool
.acquire()
.table(tableName)
Expand Down Expand Up @@ -299,11 +292,7 @@ export class ObjectScanner {
) {
let nextToken: string | undefined = undefined

while (true) {
if (options.signal.aborted) {
break
}

for (; !options.signal.aborted; ) {
const result = await this.storage.backend.list(storageS3Bucket, {
prefix: prefix + '/',
nextToken,
Expand Down
16 changes: 6 additions & 10 deletions src/storage/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,14 @@ export class Storage {
* @param type
*/
renderer(type: 'asset' | 'head' | 'image' | 'info') {
switch (type) {
case 'asset':
return new AssetRenderer(this.backend)
case 'head':
return new HeadRenderer()
case 'image':
return new ImageRenderer(this.backend)
case 'info':
return new InfoRenderer()
const renderers = {
asset: () => new AssetRenderer(this.backend),
head: () => new HeadRenderer(),
image: () => new ImageRenderer(this.backend),
info: () => new InfoRenderer(),
}

throw new Error(`renderer of type "${type}" not supported`)
return renderers[type]()
}

/**
Expand Down
17 changes: 9 additions & 8 deletions src/test/object-list-v2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,14 @@ describe('objects - list v2 sorting tests', () => {
for (const { desc, options, expected } of TEST_CASES) {
test(desc + ' in correct order with pagination', async () => {
const limit = 5
let cursor: string | undefined = undefined
let cursor: string | undefined
let pageCount = 0
let lastObjectIdx = -1
let lastFolderIdx = -1
let hasNext = false

// Paginate through all results
while (true) {
do {
const response = await appInstance.inject({
method: 'POST',
url: '/object/list-v2/' + LIST_V2_BUCKET,
Expand Down Expand Up @@ -560,14 +561,14 @@ describe('objects - list v2 sorting tests', () => {
})
pageCount++

if (!data.hasNext) {
hasNext = data.hasNext ?? false
if (!hasNext) {
expect(data.nextCursor).toBeUndefined()
break
} else {
cursor = data.nextCursor as string
expect(cursor).toBeDefined()
}

cursor = data.nextCursor as string
expect(cursor).toBeDefined()
}
} while (hasNext)

// Verify we processed all expected items
expect(lastObjectIdx).toBe(expected.objects.length - 1)
Expand Down
9 changes: 3 additions & 6 deletions src/test/scanner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,14 @@ describe('ObjectScanner', () => {
const s3ObjectAll = []
let nextToken = ''

while (true) {
do {
const s3Objects = await storage.adapter.list(storageS3Bucket, {
prefix: `${tenantId}/${bucket.id}`,
nextToken,
})
s3ObjectAll.push(...s3Objects.keys)
if (!s3Objects.nextToken) {
break
}
nextToken = s3Objects.nextToken
}
nextToken = s3Objects.nextToken ?? ''
} while (nextToken)

// Check s3 files are deleted
expect(s3ObjectAll).toHaveLength(maxUploads - numToDelete)
Expand Down