Skip to content

Commit 4debebc

Browse files
authored
fix: oom in snowflake worker [CM-1067] (#3948)
Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
1 parent 8983a97 commit 4debebc

File tree

3 files changed

+28
-13
lines changed

3 files changed

+28
-13
lines changed

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ export class TransformerConsumer {
4747
log.info({ jobId: job.id, platform: job.platform, s3Path: job.s3Path }, 'Processing job')
4848
this.currentPollingIntervalMs = this.pollingIntervalMs
4949
await this.processJob(job)
50+
// yield to the event loop so GC can collect the previous batch before the next one starts
51+
await new Promise<void>((resolve) => setImmediate(resolve))
5052
continue
5153
}
5254
} catch (err) {
@@ -82,14 +84,11 @@ export class TransformerConsumer {
8284
const platform = job.platform as PlatformType
8385
const source = getDataSource(platform, job.sourceName)
8486

85-
const rows = await this.s3Service.readParquetRows(job.s3Path)
86-
8787
let transformedCount = 0
8888
let transformSkippedCount = 0
8989
let resolveSkippedCount = 0
9090

91-
for (let i = 0; i < rows.length; i++) {
92-
const row = rows[i]
91+
for await (const row of this.s3Service.streamParquetRows(job.s3Path)) {
9392
const result = source.transformer.safeTransformRow(row)
9493
if (!result) {
9594
transformSkippedCount++

services/apps/snowflake_connectors/src/core/s3Service.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,19 @@ export class S3Service {
4141
return Buffer.from(byteArray)
4242
}
4343

44-
async readParquetRows(s3Uri: string): Promise<Record<string, unknown>[]> {
45-
const buffer = await this.downloadFile(s3Uri)
44+
async *streamParquetRows(s3Uri: string): AsyncGenerator<Record<string, unknown>> {
45+
let buffer: Buffer | null = await this.downloadFile(s3Uri)
4646
const reader = await ParquetReader.openBuffer(buffer)
47+
buffer = null
4748
const cursor = reader.getCursor()
48-
const rows: Record<string, unknown>[] = []
49-
let row: Record<string, unknown> | null = null
50-
while ((row = (await cursor.next()) as Record<string, unknown> | null) !== null) {
51-
rows.push(row)
49+
try {
50+
let row: Record<string, unknown> | null = null
51+
while ((row = (await cursor.next()) as Record<string, unknown> | null) !== null) {
52+
yield row
53+
}
54+
} finally {
55+
await reader.close()
5256
}
53-
await reader.close()
54-
return rows
5557
}
5658

5759
async deleteFile(s3Uri: string): Promise<void> {

services/apps/snowflake_connectors/src/integrations/tnc/courses/buildSourceQuery.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,21 @@ export const buildSourceQuery = (sinceTimestamp?: string): string => {
3434
let select = `
3535
SELECT
3636
ca.*,
37-
co.*,
37+
co.title,
38+
co.course_group_id,
39+
co.client_id,
40+
co.sku,
41+
co.status,
42+
co.slug,
43+
co.created_ts,
44+
co.start_date,
45+
co.end_date,
46+
co.is_training,
47+
co.is_certification,
48+
co.instruction_type,
49+
co.product_type,
50+
co.location,
51+
co.is_test_or_archived,
3852
tu.user_email,
3953
tu.lfid,
4054
tu.learner_name,

0 commit comments

Comments
 (0)