Skip to content

Commit 216f183

Browse files
authored
chore: add support for multi-source platforms in snowflake connectors [CM-1028] (#3891)
1 parent 3e15c35 commit 216f183

File tree

11 files changed

+131
-55
lines changed

11 files changed

+131
-55
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
ALTER TABLE integration."snowflakeExportJobs" ADD COLUMN "sourceName" VARCHAR(100);
2+
3+
UPDATE integration."snowflakeExportJobs" SET "sourceName" = 'event-registrations' WHERE platform = 'cvent';
4+
5+
ALTER TABLE integration."snowflakeExportJobs" ALTER COLUMN "sourceName" SET NOT NULL;
6+
7+
CREATE INDEX "idx_snowflakeExportJobs_platform_source" ON integration."snowflakeExportJobs" (platform, "sourceName");

services/apps/snowflake_connectors/src/activities/exportActivity.ts

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,24 @@ import { PlatformType } from '@crowd/types'
1010

1111
import { MetadataStore } from '../core/metadataStore'
1212
import { SnowflakeExporter } from '../core/snowflakeExporter'
13-
import { getEnabledPlatforms as _getEnabledPlatforms, getPlatform } from '../integrations'
13+
import {
14+
getDataSourceNames as _getDataSourceNames,
15+
getEnabledPlatforms as _getEnabledPlatforms,
16+
getDataSource,
17+
} from '../integrations'
18+
import type { DataSourceName } from '../integrations/types'
1419

1520
export async function getEnabledPlatforms(): Promise<PlatformType[]> {
1621
return _getEnabledPlatforms()
1722
}
1823

24+
export async function getDataSourceNamesForPlatform(platform: PlatformType): Promise<string[]> {
25+
return _getDataSourceNames(platform)
26+
}
27+
1928
const log = getServiceChildLogger('exportActivity')
2029

21-
function buildS3FilenamePrefix(platform: string): string {
30+
function buildS3FilenamePrefix(platform: string, sourceName: string): string {
2231
const now = new Date()
2332
const year = now.getFullYear()
2433
const month = String(now.getMonth() + 1).padStart(2, '0')
@@ -27,37 +36,50 @@ function buildS3FilenamePrefix(platform: string): string {
2736
if (!s3BucketPath) {
2837
throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH')
2938
}
30-
return `${s3BucketPath}/${platform}/${year}/${month}/${day}`
39+
return `${s3BucketPath}/${platform}/${sourceName}/${year}/${month}/${day}`
3140
}
3241

33-
export async function executeExport(platform: PlatformType): Promise<void> {
34-
log.info({ platform }, 'Starting export')
42+
export async function executeExport(
43+
platform: PlatformType,
44+
sourceName: DataSourceName,
45+
): Promise<void> {
46+
log.info({ platform, sourceName }, 'Starting export')
3547

3648
const exporter = new SnowflakeExporter()
3749
const db = await getDbConnection(WRITE_DB_CONFIG())
3850

3951
try {
4052
const metadataStore = new MetadataStore(db)
41-
const platformDef = getPlatform(platform)
53+
const source = getDataSource(platform, sourceName)
4254

43-
const lastSuccessfulExportTimestamp = await metadataStore.getLatestExportStartedAt(platform)
55+
const lastSuccessfulExportTimestamp = await metadataStore.getLatestExportStartedAt(
56+
platform,
57+
sourceName,
58+
)
4459
const sinceTimestamp = lastSuccessfulExportTimestamp
4560
? new Date(lastSuccessfulExportTimestamp).toISOString()
4661
: undefined
47-
const sourceQuery = platformDef.buildSourceQuery(sinceTimestamp)
48-
const s3FilenamePrefix = buildS3FilenamePrefix(platform)
62+
const sourceQuery = source.buildSourceQuery(sinceTimestamp)
63+
const s3FilenamePrefix = buildS3FilenamePrefix(platform, sourceName)
4964

5065
const exportStartedAt = new Date()
5166

5267
const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => {
53-
await metadataStore.insertExportJob(platform, s3Path, totalRows, totalBytes, exportStartedAt)
68+
await metadataStore.insertExportJob(
69+
platform,
70+
sourceName,
71+
s3Path,
72+
totalRows,
73+
totalBytes,
74+
exportStartedAt,
75+
)
5476
}
5577

5678
await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete)
5779

58-
log.info({ platform }, 'Export completed')
80+
log.info({ platform, sourceName }, 'Export completed')
5981
} catch (err) {
60-
log.error({ platform, err }, 'Export failed')
82+
log.error({ platform, sourceName, err }, 'Export failed')
6183
throw err
6284
} finally {
6385
await exporter
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
export { getEnabledPlatforms, executeExport } from './exportActivity'
1+
export { getEnabledPlatforms, getDataSourceNamesForPlatform, executeExport } from './exportActivity'
22
export { executeCleanup } from './cleanupActivity'

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { PlatformType } from '@crowd/types'
1414
import { IntegrationResolver } from '../core/integrationResolver'
1515
import { MetadataStore, SnowflakeExportJob } from '../core/metadataStore'
1616
import { S3Service } from '../core/s3Service'
17-
import { getEnabledPlatforms, getPlatform } from '../integrations'
17+
import { getDataSource, getEnabledPlatforms } from '../integrations'
1818

1919
const log = getServiceChildLogger('transformerConsumer')
2020

@@ -79,7 +79,8 @@ export class TransformerConsumer {
7979
const startTime = Date.now()
8080

8181
try {
82-
const platformDef = getPlatform(job.platform as PlatformType)
82+
const platform = job.platform as PlatformType
83+
const source = getDataSource(platform, job.sourceName)
8384

8485
const rows = await this.s3Service.readParquetRows(job.s3Path)
8586

@@ -89,16 +90,13 @@ export class TransformerConsumer {
8990

9091
for (let i = 0; i < rows.length; i++) {
9192
const row = rows[i]
92-
const result = platformDef.transformer.safeTransformRow(row)
93+
const result = source.transformer.safeTransformRow(row)
9394
if (!result) {
9495
transformSkippedCount++
9596
continue
9697
}
9798

98-
const resolved = await this.integrationResolver.resolve(
99-
job.platform as PlatformType,
100-
result.segment,
101-
)
99+
const resolved = await this.integrationResolver.resolve(platform, result.segment)
102100
if (!resolved) {
103101
resolveSkippedCount++
104102
continue

services/apps/snowflake_connectors/src/core/metadataStore.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export interface JobMetrics {
1919
export interface SnowflakeExportJob {
2020
id: number
2121
platform: string
22+
sourceName: string
2223
s3Path: string
2324
exportStartedAt: Date | null
2425
createdAt: Date
@@ -35,15 +36,16 @@ export class MetadataStore {
3536

3637
async insertExportJob(
3738
platform: string,
39+
sourceName: string,
3840
s3Path: string,
3941
totalRows: number,
4042
totalBytes: number,
4143
exportStartedAt: Date,
4244
): Promise<void> {
4345
const metrics: JobMetrics = { exportedRows: totalRows, exportedBytes: totalBytes }
4446
await this.db.none(
45-
`INSERT INTO integration."snowflakeExportJobs" (platform, s3_path, "exportStartedAt", metrics)
46-
VALUES ($1, $2, $3, $4::jsonb)
47+
`INSERT INTO integration."snowflakeExportJobs" (platform, "sourceName", s3_path, "exportStartedAt", metrics)
48+
VALUES ($1, $2, $3, $4, $5::jsonb)
4749
ON CONFLICT (s3_path) DO UPDATE SET
4850
"exportStartedAt" = EXCLUDED."exportStartedAt",
4951
"processingStartedAt" = NULL,
@@ -52,7 +54,7 @@ export class MetadataStore {
5254
error = NULL,
5355
metrics = EXCLUDED.metrics,
5456
"updatedAt" = NOW()`,
55-
[platform, s3Path, exportStartedAt, JSON.stringify(metrics)],
57+
[platform, sourceName, s3Path, exportStartedAt, JSON.stringify(metrics)],
5658
)
5759
}
5860

@@ -64,6 +66,7 @@ export class MetadataStore {
6466
const row = await this.db.oneOrNone<{
6567
id: number
6668
platform: string
69+
sourceName: string
6770
s3_path: string
6871
exportStartedAt: Date | null
6972
createdAt: Date
@@ -83,7 +86,7 @@ export class MetadataStore {
8386
LIMIT 1
8487
FOR UPDATE SKIP LOCKED
8588
)
86-
RETURNING id, platform, s3_path, "exportStartedAt",
89+
RETURNING id, platform, "sourceName", s3_path, "exportStartedAt",
8790
"createdAt", "updatedAt", "processingStartedAt", "completedAt", "cleanedAt", error, metrics`,
8891
)
8992
return row ? mapRowToJob(row) : null
@@ -137,14 +140,15 @@ export class MetadataStore {
137140
)
138141
}
139142

140-
async getLatestExportStartedAt(platform: string): Promise<Date | null> {
143+
async getLatestExportStartedAt(platform: string, sourceName: string): Promise<Date | null> {
141144
const row = await this.db.oneOrNone<{ max: Date | null }>(
142145
`SELECT MAX("exportStartedAt") AS max
143146
FROM integration."snowflakeExportJobs"
144147
WHERE platform = $1
148+
AND "sourceName" = $2
145149
AND "completedAt" IS NOT NULL
146150
AND error IS NULL`,
147-
[platform],
151+
[platform, sourceName],
148152
)
149153
return row?.max ?? null
150154
}
@@ -153,6 +157,7 @@ export class MetadataStore {
153157
function mapRowToJob(row: {
154158
id: number
155159
platform: string
160+
sourceName: string
156161
s3_path: string
157162
exportStartedAt: Date | null
158163
createdAt: Date
@@ -166,6 +171,7 @@ function mapRowToJob(row: {
166171
return {
167172
id: row.id,
168173
platform: row.platform,
174+
sourceName: row.sourceName,
169175
s3Path: row.s3_path,
170176
exportStartedAt: row.exportStartedAt,
171177
createdAt: row.createdAt,

services/apps/snowflake_connectors/src/core/snowflakeExporter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ export class SnowflakeExporter {
8383

8484
log.info({ batch, batchRows, batchBytes, files: results.length }, 'COPY INTO batch completed')
8585

86-
if (onBatchComplete) {
86+
if (onBatchComplete && batchRows > 0) {
8787
await onBatchComplete(s3Path, batchRows, batchBytes)
8888
}
8989
if (batchRows < limit) {

services/apps/snowflake_connectors/src/integrations/cvent/buildSourceQuery.ts renamed to services/apps/snowflake_connectors/src/integrations/cvent/event-registrations/buildSourceQuery.ts

File renamed without changes.

services/apps/snowflake_connectors/src/integrations/cvent/transformer.ts renamed to services/apps/snowflake_connectors/src/integrations/cvent/event-registrations/transformer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
PlatformType,
1212
} from '@crowd/types'
1313

14-
import { TransformedActivity, TransformerBase } from '../../core/transformerBase'
14+
import { TransformedActivity, TransformerBase } from '../../../core/transformerBase'
1515

1616
const log = getServiceChildLogger('cventTransformer')
1717

services/apps/snowflake_connectors/src/integrations/index.ts

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@
66
*/
77
import { PlatformType } from '@crowd/types'
88

9-
import { TransformerBase } from '../core/transformerBase'
9+
import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery'
10+
import { CventTransformer } from './cvent/event-registrations/transformer'
11+
import { DataSource, DataSourceName, PlatformDefinition } from './types'
1012

11-
import { buildSourceQuery as cventBuildSourceQuery } from './cvent/buildSourceQuery'
12-
import { CventTransformer } from './cvent/transformer'
13-
14-
export type BuildSourceQuery = (sinceTimestamp?: string) => string
15-
16-
export interface PlatformDefinition {
17-
transformer: TransformerBase
18-
buildSourceQuery: BuildSourceQuery
19-
}
13+
export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types'
14+
export { DataSourceName } from './types'
2015

2116
const supported: Partial<Record<PlatformType, PlatformDefinition>> = {
2217
[PlatformType.CVENT]: {
23-
transformer: new CventTransformer(),
24-
buildSourceQuery: cventBuildSourceQuery,
18+
sources: [
19+
{
20+
name: DataSourceName.CVENT_EVENT_REGISTRATIONS,
21+
buildSourceQuery: cventBuildSourceQuery,
22+
transformer: new CventTransformer(),
23+
},
24+
],
2525
},
2626
}
2727

@@ -40,6 +40,19 @@ export function getPlatform(platform: PlatformType): PlatformDefinition {
4040
return supported[platform]
4141
}
4242

43+
export function getDataSourceNames(platform: PlatformType): string[] {
44+
return getPlatform(platform).sources.map((s) => s.name)
45+
}
46+
47+
export function getDataSource(platform: PlatformType, sourceName: string): DataSource {
48+
const def = getPlatform(platform)
49+
const source = def.sources.find((s) => s.name === sourceName)
50+
if (!source) {
51+
throw new Error(`Unknown data source '${sourceName}' for platform '${platform}'`)
52+
}
53+
return source
54+
}
55+
4356
export function getEnabledPlatforms(): PlatformType[] {
4457
return enabled
4558
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { TransformerBase } from '../core/transformerBase'
2+
3+
export type BuildSourceQuery = (sinceTimestamp?: string) => string
4+
5+
// Each data source maps to a distinct Snowflake table (or joined set of tables) that is exported and transformed independently.
6+
export enum DataSourceName {
7+
CVENT_EVENT_REGISTRATIONS = 'event-registrations',
8+
}
9+
10+
export interface DataSource {
11+
name: DataSourceName
12+
buildSourceQuery: BuildSourceQuery
13+
transformer: TransformerBase
14+
}
15+
16+
export interface PlatformDefinition {
17+
sources: DataSource[]
18+
}

0 commit comments

Comments
 (0)