-
Notifications
You must be signed in to change notification settings - Fork 732
Expand file tree
/
Copy pathactivities.ts
More file actions
124 lines (100 loc) · 3.53 KB
/
activities.ts
File metadata and controls
124 lines (100 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import { parse } from 'csv-parse'
import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer'
import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'
import { svc } from '../main'
import { getAvailableSourceNames, getSource } from '../sources/registry'
import { IDatasetDescriptor } from '../sources/types'
const log = getServiceLogger()
const BATCH_SIZE = 5000
export async function listSources(): Promise<string[]> {
return getAvailableSourceNames()
}
export async function listDatasets(sourceName: string): Promise<IDatasetDescriptor[]> {
const source = getSource(sourceName)
const datasets = await source.listAvailableDatasets()
log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.')
return datasets
}
export async function processDataset(
sourceName: string,
dataset: IDatasetDescriptor,
): Promise<void> {
const qx = pgpQx(svc.postgres.writer.connection())
const startTime = Date.now()
log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...')
const source = getSource(sourceName)
const stream = await source.fetchDatasetStream(dataset)
// For CSV sources: pipe through csv-parse to get Record<string, string> objects.
// For JSON sources: the stream already emits pre-parsed objects in object mode.
const records =
source.format === 'json'
? stream
: stream.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
}),
)
// pipe() does not forward source errors to the destination automatically, so we
// destroy records explicitly — this surfaces the error in the for-await loop and
// lets Temporal mark the activity as failed and retry it.
stream.on('error', (err: Error) => {
log.error({ datasetId: dataset.id, error: err.message }, 'Stream error.')
records.destroy(err)
})
if (source.format !== 'json') {
const csvRecords = records as ReturnType<typeof parse>
csvRecords.on('error', (err) => {
log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.')
})
}
let batch: IDbProjectCatalogCreate[] = []
let totalProcessed = 0
let totalSkipped = 0
let batchNumber = 0
let totalRows = 0
for await (const rawRow of records) {
totalRows++
const parsed = source.parseRow(rawRow as Record<string, unknown>)
if (!parsed) {
totalSkipped++
continue
}
batch.push({
projectSlug: parsed.projectSlug,
repoName: parsed.repoName,
repoUrl: parsed.repoUrl,
ossfCriticalityScore: parsed.ossfCriticalityScore,
lfCriticalityScore: parsed.lfCriticalityScore,
})
if (batch.length >= BATCH_SIZE) {
batchNumber++
await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
batch = []
log.info({ totalProcessed, batchNumber, datasetId: dataset.id }, 'Batch upserted.')
}
}
// Flush remaining rows that didn't fill a complete batch
if (batch.length > 0) {
batchNumber++
await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
}
const elapsedSeconds = ((Date.now() - startTime) / 1000).toFixed(1)
log.info(
{
sourceName,
datasetId: dataset.id,
totalRows,
totalProcessed,
totalSkipped,
totalBatches: batchNumber,
elapsedSeconds,
},
'Dataset processing complete.',
)
}