Skip to content

Commit 7eb8b29

Browse files
authored
chore: port variants migration from dev (#1216)
* chore: port variants migration from dev * ruff * set clickhouse up * hrm * also make the variants tables * add variants * write new variants table for migration task * different tables for variants migration * ruff * Fix the bug! * better * ruff * short term hack for key lookup * old style still uses variants path, new style hack
1 parent 244db4c commit 7eb8b29

18 files changed

+2690
-29
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import uuid
4+
5+
import luigi
6+
7+
from v03_pipeline.lib.core import DatasetType, ReferenceGenome
8+
from v03_pipeline.lib.core.constants import VARIANTS_MIGRATION_RUN_ID
9+
from v03_pipeline.lib.tasks.variants_migration.load_clickhouse_variants_tables import (
10+
LoadClickhouseVariantsTablesTask,
11+
)
12+
13+
14+
def parse_args() -> argparse.Namespace:
15+
parser = argparse.ArgumentParser(
16+
description='Run variants migration Luigi pipeline.',
17+
)
18+
parser.add_argument(
19+
'--run_all_dataset_types',
20+
action='store_true',
21+
help='If set, runs the pipeline for all dataset types instead of just SNV_INDEL.',
22+
)
23+
return parser.parse_args()
24+
25+
26+
if __name__ == '__main__':
27+
args = parse_args()
28+
for reference_genome, dataset_type in [
29+
(ReferenceGenome.GRCh37, DatasetType.SNV_INDEL),
30+
(ReferenceGenome.GRCh38, DatasetType.SNV_INDEL),
31+
(ReferenceGenome.GRCh38, DatasetType.MITO),
32+
(ReferenceGenome.GRCh38, DatasetType.SV),
33+
(ReferenceGenome.GRCh38, DatasetType.GCNV),
34+
]:
35+
if dataset_type != DatasetType.SNV_INDEL and not args.run_all_dataset_types:
36+
continue
37+
run_id_prefix = (
38+
VARIANTS_MIGRATION_RUN_ID + '-' + str(uuid.uuid1().int)[:4]
39+
) # Note: the randomness is a cache bust for the luigi local scheduler
40+
luigi.build(
41+
[
42+
LoadClickhouseVariantsTablesTask(
43+
reference_genome,
44+
dataset_type,
45+
run_id=run_id_prefix,
46+
attempt_id=0,
47+
),
48+
],
49+
)

v03_pipeline/lib/misc/clickhouse.py

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from v03_pipeline.lib.paths import (
1919
new_entries_parquet_path,
2020
new_transcripts_parquet_path,
21+
new_variant_details_parquet_path,
2122
new_variants_parquet_path,
2223
)
2324

@@ -36,6 +37,9 @@ class ClickHouseTable(StrEnum):
3637
ANNOTATIONS_MEMORY = 'annotations_memory'
3738
KEY_LOOKUP = 'key_lookup'
3839
TRANSCRIPTS = 'transcripts'
40+
VARIANT_DETAILS = 'variants/details'
41+
VARIANTS_DISK = 'variants_disk'
42+
VARIANTS_MEMORY = 'variants_memory'
3943
ENTRIES = 'entries'
4044
PROJECT_GT_STATS = 'project_gt_stats'
4145
GT_STATS = 'gt_stats'
@@ -47,6 +51,9 @@ def src_path_fn(self) -> Callable:
4751
ClickHouseTable.ANNOTATIONS_MEMORY: new_variants_parquet_path,
4852
ClickHouseTable.KEY_LOOKUP: new_variants_parquet_path,
4953
ClickHouseTable.TRANSCRIPTS: new_transcripts_parquet_path,
54+
ClickHouseTable.VARIANTS_DISK: new_variants_parquet_path,
55+
ClickHouseTable.VARIANTS_MEMORY: new_variants_parquet_path,
56+
ClickHouseTable.VARIANT_DETAILS: new_variant_details_parquet_path,
5057
ClickHouseTable.ENTRIES: new_entries_parquet_path,
5158
}[self]
5259

@@ -72,6 +79,7 @@ def select_fields(self) -> str:
7279
def insert(self) -> Callable:
7380
return {
7481
ClickHouseTable.ANNOTATIONS_MEMORY: direct_insert_annotations,
82+
ClickHouseTable.VARIANTS_MEMORY: direct_insert_variants,
7583
ClickHouseTable.ENTRIES: atomic_insert_entries,
7684
ClickHouseTable.KEY_LOOKUP: functools.partial(
7785
direct_insert_all_keys,
@@ -81,6 +89,10 @@ def insert(self) -> Callable:
8189
direct_insert_all_keys,
8290
clickhouse_table=self,
8391
),
92+
ClickHouseTable.VARIANT_DETAILS: functools.partial(
93+
direct_insert_all_keys,
94+
clickhouse_table=self,
95+
),
8496
}[self]
8597

8698
@classmethod
@@ -100,12 +112,22 @@ def for_dataset_type(cls, dataset_type: DatasetType) -> list['ClickHouseTable']:
100112
]
101113

102114
@classmethod
103-
def for_dataset_type_disk_backed_annotations_tables(
115+
def for_dataset_type_variants(
104116
cls,
105-
_dataset_type: DatasetType,
117+
dataset_type: DatasetType,
106118
) -> list['ClickHouseTable']:
119+
tables = [
120+
ClickHouseTable.VARIANTS_MEMORY,
121+
ClickHouseTable.KEY_LOOKUP,
122+
]
123+
if dataset_type.should_write_new_transcripts:
124+
tables = [
125+
*tables,
126+
ClickHouseTable.VARIANT_DETAILS,
127+
]
107128
return [
108-
ClickHouseTable.ANNOTATIONS_DISK,
129+
*tables,
130+
ClickHouseTable.ENTRIES,
109131
]
110132

111133
@classmethod
@@ -254,7 +276,7 @@ class ClickhouseReferenceDataset(StrEnum):
254276
@property
255277
def all_variants_mv_timeout(self):
256278
return {
257-
ClickhouseReferenceDataset.SPLICE_AI: WAIT_VIEW_TIMEOUT_S * 7,
279+
ClickhouseReferenceDataset.SPLICE_AI: WAIT_VIEW_TIMEOUT_S * 10,
258280
}.get(self, WAIT_VIEW_TIMEOUT_S)
259281

260282
@property
@@ -855,6 +877,49 @@ def exchange_tables(
855877
)
856878

857879

880+
def direct_insert_variants(
881+
table_name_builder: TableNameBuilder,
882+
**_,
883+
) -> None:
884+
dst_table = table_name_builder.dst_table(ClickHouseTable.VARIANTS_MEMORY)
885+
src_table = table_name_builder.src_table(ClickHouseTable.VARIANTS_MEMORY)
886+
drop_staging_db()
887+
logged_query(
888+
f"""
889+
CREATE DATABASE {STAGING_CLICKHOUSE_DATABASE}
890+
""",
891+
)
892+
# NB: Unfortunately there's a bug(?) or inaccuracy if this is attempted without an intermediate
893+
# temporary table, likely due to writing to a table and joining against it at the same time.
894+
logged_query(
895+
f"""
896+
CREATE TABLE {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys` ENGINE = Set AS (
897+
SELECT {ClickHouseTable.VARIANTS_MEMORY.key_field}
898+
FROM {src_table} src
899+
LEFT ANTI JOIN {dst_table} dst
900+
ON {ClickHouseTable.VARIANTS_MEMORY.join_condition}
901+
)
902+
""",
903+
)
904+
disk_backed_dst_table = table_name_builder.dst_table(ClickHouseTable.VARIANTS_DISK)
905+
disk_backed_src_table = table_name_builder.src_table(ClickHouseTable.VARIANTS_DISK)
906+
logged_query(
907+
f"""
908+
INSERT INTO {disk_backed_dst_table}
909+
SELECT {ClickHouseTable.VARIANTS_DISK.select_fields}
910+
FROM {disk_backed_src_table} WHERE {ClickHouseTable.VARIANTS_DISK.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
911+
""",
912+
)
913+
logged_query(
914+
f"""
915+
INSERT INTO {dst_table}
916+
SELECT {ClickHouseTable.VARIANTS_MEMORY.select_fields}
917+
FROM {src_table} WHERE {ClickHouseTable.VARIANTS_MEMORY.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
918+
""",
919+
)
920+
drop_staging_db()
921+
922+
858923
def direct_insert_annotations(
859924
table_name_builder: TableNameBuilder,
860925
**_,
@@ -879,20 +944,19 @@ def direct_insert_annotations(
879944
)
880945
""",
881946
)
882-
for (
883-
clickhouse_table
884-
) in ClickHouseTable.for_dataset_type_disk_backed_annotations_tables(
885-
table_name_builder.dataset_type,
886-
):
887-
disk_backed_dst_table = table_name_builder.dst_table(clickhouse_table)
888-
disk_backed_src_table = table_name_builder.src_table(clickhouse_table)
889-
logged_query(
890-
f"""
891-
INSERT INTO {disk_backed_dst_table}
892-
SELECT {clickhouse_table.select_fields}
893-
FROM {disk_backed_src_table} WHERE {clickhouse_table.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
894-
""",
895-
)
947+
disk_backed_dst_table = table_name_builder.dst_table(
948+
ClickHouseTable.ANNOTATIONS_DISK,
949+
)
950+
disk_backed_src_table = table_name_builder.src_table(
951+
ClickHouseTable.ANNOTATIONS_DISK,
952+
)
953+
logged_query(
954+
f"""
955+
INSERT INTO {disk_backed_dst_table}
956+
SELECT {ClickHouseTable.ANNOTATIONS_DISK.select_fields}
957+
FROM {disk_backed_src_table} WHERE {ClickHouseTable.ANNOTATIONS_DISK.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
958+
""",
959+
)
896960
logged_query(
897961
f"""
898962
INSERT INTO {dst_table}
@@ -910,11 +974,16 @@ def direct_insert_all_keys(
910974
) -> None:
911975
dst_table = table_name_builder.dst_table(clickhouse_table)
912976
src_table = table_name_builder.src_table(clickhouse_table)
977+
settings = ''
978+
# Large variant details inserts may OOM
979+
if clickhouse_table == ClickHouseTable.VARIANT_DETAILS:
980+
settings = 'SETTINGS max_insert_threads = 2'
913981
logged_query(
914982
f"""
915983
INSERT INTO {dst_table}
916984
SELECT {clickhouse_table.select_fields}
917985
FROM {src_table}
986+
{settings}
918987
""",
919988
)
920989

v03_pipeline/lib/misc/clickhouse_test.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from v03_pipeline.lib.paths import (
3636
new_entries_parquet_path,
3737
new_transcripts_parquet_path,
38+
new_variant_details_parquet_path,
3839
new_variants_parquet_path,
3940
runs_path,
4041
)
@@ -44,7 +45,7 @@
4445

4546

4647
class ClickhouseTest(MockedDatarootTestCase):
47-
def setUp(self):
48+
def setUp(self): # noqa: PLR0915
4849
super().setUp()
4950
client = get_clickhouse_client()
5051
client.execute(
@@ -228,6 +229,24 @@ def setUp(self):
228229
PRIMARY KEY `key`
229230
""",
230231
)
232+
client.execute(
233+
f"""
234+
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/variants_memory` (
235+
key UInt32,
236+
variantId String,
237+
) ENGINE = EmbeddedRocksDB()
238+
PRIMARY KEY `key`
239+
""",
240+
)
241+
client.execute(
242+
f"""
243+
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/variants_disk` (
244+
key UInt32,
245+
variantId String,
246+
) ENGINE = EmbeddedRocksDB()
247+
PRIMARY KEY `key`
248+
""",
249+
)
231250
client.execute(
232251
f"""
233252
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/key_lookup` (
@@ -335,6 +354,24 @@ def setUp(self):
335354
PRIMARY KEY `key`
336355
""",
337356
)
357+
client.execute(
358+
f"""
359+
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/GCNV/variants_memory` (
360+
key UInt32,
361+
variantId String,
362+
) ENGINE = EmbeddedRocksDB()
363+
PRIMARY KEY `key`
364+
""",
365+
)
366+
client.execute(
367+
f"""
368+
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/GCNV/variants_disk` (
369+
key UInt32,
370+
variantId String,
371+
) ENGINE = EmbeddedRocksDB()
372+
PRIMARY KEY `key`
373+
""",
374+
)
338375
client.execute(
339376
f"""
340377
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/GCNV/key_lookup` (
@@ -364,6 +401,28 @@ def write_test_parquet(df: pd.DataFrame, parquet_path: str, schema=None):
364401
),
365402
)
366403

404+
# Variant Details Parquet
405+
df = pd.DataFrame(
406+
{
407+
'key': [1, 2, 3, 4],
408+
'variantId': [
409+
'1-13-A-C',
410+
'2-14-A-T',
411+
'Y-19-A-C',
412+
'M-12-C-G',
413+
],
414+
'transcripts': ['a', 'b', 'c', 'd'],
415+
},
416+
)
417+
write_test_parquet(
418+
df,
419+
new_variant_details_parquet_path(
420+
ReferenceGenome.GRCh38,
421+
DatasetType.SNV_INDEL,
422+
TEST_RUN_ID,
423+
),
424+
)
425+
367426
# Transcripts Parquet
368427
df = pd.DataFrame({'key': [1, 2, 3, 4], 'transcripts': ['a', 'b', 'c', 'd']})
369428
write_test_parquet(

v03_pipeline/lib/tasks/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@
88
UpdatedReferenceDatasetParquetTask,
99
)
1010
from v03_pipeline.lib.tasks.run_pipeline import RunPipelineTask
11+
from v03_pipeline.lib.tasks.variants_migration.migrate_variant_details_parquet import (
12+
MigrateVariantDetailsParquetTask,
13+
)
14+
from v03_pipeline.lib.tasks.variants_migration.migrate_variants_parquet import (
15+
MigrateVariantsParquetTask,
16+
)
1117
from v03_pipeline.lib.tasks.write_metadata_for_run import WriteMetadataForRunTask
1218
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask
1319

1420
__all__ = [
1521
'MigrateAllProjectsToClickHouseOnDataprocTask',
1622
'MigrateAllProjectsToClickHouseTask',
23+
'MigrateVariantDetailsParquetTask',
24+
'MigrateVariantsParquetTask',
1725
'RunPipelineTask',
1826
'UpdatedReferenceDatasetParquetTask',
1927
'WriteMetadataForRunTask',

v03_pipeline/lib/tasks/clickhouse_migration/migrate_project_variants_to_clickhouse.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
BaseLoadingPipelineParams,
1313
)
1414
from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask
15-
from v03_pipeline.lib.tasks.exports.write_new_transcripts_parquet import (
16-
WriteNewTranscriptsParquetTask,
15+
from v03_pipeline.lib.tasks.files import GCSorLocalTarget, HailTableTask
16+
from v03_pipeline.lib.tasks.variants_migration.write_new_variant_details_parquet import (
17+
WriteNewVariantDetailsParquetTask,
1718
)
18-
from v03_pipeline.lib.tasks.exports.write_new_variants_parquet import (
19-
WriteNewVariantsParquetTask,
19+
from v03_pipeline.lib.tasks.variants_migration.write_new_variants_parquet import (
20+
WriteNewVariantsParquetForMigrationTask,
2021
)
21-
from v03_pipeline.lib.tasks.files import GCSorLocalTarget, HailTableTask
2222

2323
MAX_SNV_INDEL_ALLELE_LENGTH = 500
2424

@@ -94,7 +94,7 @@ def run(self):
9494
*(
9595
[
9696
self.clone(
97-
WriteNewTranscriptsParquetTask,
97+
WriteNewVariantDetailsParquetTask,
9898
# Callset Path being required
9999
# here is byproduct of the "place all variants"
100100
# in the variants path" hack. In theory
@@ -107,11 +107,11 @@ def run(self):
107107
callset_path=None,
108108
),
109109
]
110-
if self.dataset_type.should_write_new_transcripts
110+
if self.dataset_type.should_write_new_variant_details
111111
else []
112112
),
113113
self.clone(
114-
WriteNewVariantsParquetTask,
114+
WriteNewVariantsParquetForMigrationTask,
115115
callset_path=None,
116116
),
117117
],

0 commit comments

Comments
 (0)