Skip to content

Commit 7fc7a4e

Browse files
committed
fix: allow GC to run in between batches
Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
1 parent 8c8ab13 commit 7fc7a4e

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ export class TransformerConsumer {
4747
log.info({ jobId: job.id, platform: job.platform, s3Path: job.s3Path }, 'Processing job')
4848
this.currentPollingIntervalMs = this.pollingIntervalMs
4949
await this.processJob(job)
50+
// yield to the event loop so GC can collect the previous batch before the next one starts
51+
await new Promise<void>((resolve) => setImmediate(resolve))
5052
continue
5153
}
5254
} catch (err) {
@@ -82,7 +84,7 @@ export class TransformerConsumer {
8284
const platform = job.platform as PlatformType
8385
const source = getDataSource(platform, job.sourceName)
8486

85-
const rows = await this.s3Service.readParquetRows(job.s3Path)
87+
let rows: Record<string, unknown>[] | null = await this.s3Service.readParquetRows(job.s3Path)
8688

8789
let transformedCount = 0
8890
let transformSkippedCount = 0
@@ -110,6 +112,8 @@ export class TransformerConsumer {
110112
transformedCount++
111113
}
112114

115+
rows = null
116+
113117
const skippedCount = transformSkippedCount + resolveSkippedCount
114118
const processingMetrics = {
115119
transformedCount,

services/apps/snowflake_connectors/src/core/s3Service.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ export class S3Service {
4242
}
4343

4444
async readParquetRows(s3Uri: string): Promise<Record<string, unknown>[]> {
45-
const buffer = await this.downloadFile(s3Uri)
45+
let buffer: Buffer | null = await this.downloadFile(s3Uri)
4646
const reader = await ParquetReader.openBuffer(buffer)
47+
buffer = null
4748
const cursor = reader.getCursor()
4849
const rows: Record<string, unknown>[] = []
4950
let row: Record<string, unknown> | null = null

0 commit comments

Comments
 (0)