Skip to content
49 changes: 49 additions & 0 deletions v03_pipeline/bin/migrate_variants_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import argparse
import uuid

import luigi

from v03_pipeline.lib.core import DatasetType, ReferenceGenome
from v03_pipeline.lib.core.constants import VARIANTS_MIGRATION_RUN_ID
from v03_pipeline.lib.tasks.variants_migration.load_clickhouse_variants_tables import (
LoadClickhouseVariantsTablesTask,
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Run variants migration Luigi pipeline.',
)
parser.add_argument(
'--run_all_dataset_types',
action='store_true',
help='If set, runs the pipeline for all dataset types instead of just SNV_INDEL.',
)
return parser.parse_args()


if __name__ == '__main__':
args = parse_args()
for reference_genome, dataset_type in [
(ReferenceGenome.GRCh37, DatasetType.SNV_INDEL),
(ReferenceGenome.GRCh38, DatasetType.SNV_INDEL),
(ReferenceGenome.GRCh38, DatasetType.MITO),
(ReferenceGenome.GRCh38, DatasetType.SV),
(ReferenceGenome.GRCh38, DatasetType.GCNV),
]:
if dataset_type != DatasetType.SNV_INDEL and not args.run_all_dataset_types:
continue
run_id_prefix = (
VARIANTS_MIGRATION_RUN_ID + '-' + str(uuid.uuid1().int)[:4]
) # Note: the randomness is a cache bust for the luigi local scheduler
luigi.build(
[
LoadClickhouseVariantsTablesTask(
reference_genome,
dataset_type,
run_id=run_id_prefix,
attempt_id=0,
),
],
)
105 changes: 87 additions & 18 deletions v03_pipeline/lib/misc/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from v03_pipeline.lib.paths import (
new_entries_parquet_path,
new_transcripts_parquet_path,
new_variant_details_parquet_path,
new_variants_parquet_path,
)

Expand All @@ -36,6 +37,9 @@ class ClickHouseTable(StrEnum):
ANNOTATIONS_MEMORY = 'annotations_memory'
KEY_LOOKUP = 'key_lookup'
TRANSCRIPTS = 'transcripts'
VARIANT_DETAILS = 'variants/details'
VARIANTS_DISK = 'variants_disk'
VARIANTS_MEMORY = 'variants_memory'
ENTRIES = 'entries'
PROJECT_GT_STATS = 'project_gt_stats'
GT_STATS = 'gt_stats'
Expand All @@ -47,6 +51,9 @@ def src_path_fn(self) -> Callable:
ClickHouseTable.ANNOTATIONS_MEMORY: new_variants_parquet_path,
ClickHouseTable.KEY_LOOKUP: new_variants_parquet_path,
ClickHouseTable.TRANSCRIPTS: new_transcripts_parquet_path,
ClickHouseTable.VARIANTS_DISK: new_variants_parquet_path,
ClickHouseTable.VARIANTS_MEMORY: new_variants_parquet_path,
ClickHouseTable.VARIANT_DETAILS: new_variant_details_parquet_path,
ClickHouseTable.ENTRIES: new_entries_parquet_path,
}[self]

Expand All @@ -72,6 +79,7 @@ def select_fields(self) -> str:
def insert(self) -> Callable:
return {
ClickHouseTable.ANNOTATIONS_MEMORY: direct_insert_annotations,
ClickHouseTable.VARIANTS_MEMORY: direct_insert_variants,
ClickHouseTable.ENTRIES: atomic_insert_entries,
ClickHouseTable.KEY_LOOKUP: functools.partial(
direct_insert_all_keys,
Expand All @@ -81,6 +89,10 @@ def insert(self) -> Callable:
direct_insert_all_keys,
clickhouse_table=self,
),
ClickHouseTable.VARIANT_DETAILS: functools.partial(
direct_insert_all_keys,
clickhouse_table=self,
),
}[self]

@classmethod
Expand All @@ -100,12 +112,22 @@ def for_dataset_type(cls, dataset_type: DatasetType) -> list['ClickHouseTable']:
]

@classmethod
def for_dataset_type_disk_backed_annotations_tables(
def for_dataset_type_variants(
cls,
_dataset_type: DatasetType,
dataset_type: DatasetType,
) -> list['ClickHouseTable']:
tables = [
ClickHouseTable.VARIANTS_MEMORY,
ClickHouseTable.KEY_LOOKUP,
]
if dataset_type.should_write_new_transcripts:
tables = [
*tables,
ClickHouseTable.VARIANT_DETAILS,
]
return [
ClickHouseTable.ANNOTATIONS_DISK,
*tables,
ClickHouseTable.ENTRIES,
]

@classmethod
Expand Down Expand Up @@ -254,7 +276,7 @@ class ClickhouseReferenceDataset(StrEnum):
@property
def all_variants_mv_timeout(self):
return {
ClickhouseReferenceDataset.SPLICE_AI: WAIT_VIEW_TIMEOUT_S * 7,
ClickhouseReferenceDataset.SPLICE_AI: WAIT_VIEW_TIMEOUT_S * 10,
}.get(self, WAIT_VIEW_TIMEOUT_S)

@property
Expand Down Expand Up @@ -855,6 +877,49 @@ def exchange_tables(
)


def direct_insert_variants(
table_name_builder: TableNameBuilder,
**_,
) -> None:
dst_table = table_name_builder.dst_table(ClickHouseTable.VARIANTS_MEMORY)
src_table = table_name_builder.src_table(ClickHouseTable.VARIANTS_MEMORY)
drop_staging_db()
logged_query(
f"""
CREATE DATABASE {STAGING_CLICKHOUSE_DATABASE}
""",
)
# NB: Unfortunately there's a bug(?) or inaccuracy if this is attempted without an intermediate
# temporary table, likely due to writing to a table and joining against it at the same time.
logged_query(
f"""
CREATE TABLE {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys` ENGINE = Set AS (
SELECT {ClickHouseTable.VARIANTS_MEMORY.key_field}
FROM {src_table} src
LEFT ANTI JOIN {dst_table} dst
ON {ClickHouseTable.VARIANTS_MEMORY.join_condition}
)
""",
)
disk_backed_dst_table = table_name_builder.dst_table(ClickHouseTable.VARIANTS_DISK)
disk_backed_src_table = table_name_builder.src_table(ClickHouseTable.VARIANTS_DISK)
logged_query(
f"""
INSERT INTO {disk_backed_dst_table}
SELECT {ClickHouseTable.VARIANTS_DISK.select_fields}
FROM {disk_backed_src_table} WHERE {ClickHouseTable.VARIANTS_DISK.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
""",
)
logged_query(
f"""
INSERT INTO {dst_table}
SELECT {ClickHouseTable.VARIANTS_MEMORY.select_fields}
FROM {src_table} WHERE {ClickHouseTable.VARIANTS_MEMORY.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
""",
)
drop_staging_db()


def direct_insert_annotations(
table_name_builder: TableNameBuilder,
**_,
Expand All @@ -879,20 +944,19 @@ def direct_insert_annotations(
)
""",
)
for (
clickhouse_table
) in ClickHouseTable.for_dataset_type_disk_backed_annotations_tables(
table_name_builder.dataset_type,
):
disk_backed_dst_table = table_name_builder.dst_table(clickhouse_table)
disk_backed_src_table = table_name_builder.src_table(clickhouse_table)
logged_query(
f"""
INSERT INTO {disk_backed_dst_table}
SELECT {clickhouse_table.select_fields}
FROM {disk_backed_src_table} WHERE {clickhouse_table.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
""",
)
disk_backed_dst_table = table_name_builder.dst_table(
ClickHouseTable.ANNOTATIONS_DISK,
)
disk_backed_src_table = table_name_builder.src_table(
ClickHouseTable.ANNOTATIONS_DISK,
)
logged_query(
f"""
INSERT INTO {disk_backed_dst_table}
SELECT {ClickHouseTable.ANNOTATIONS_DISK.select_fields}
FROM {disk_backed_src_table} WHERE {ClickHouseTable.ANNOTATIONS_DISK.key_field} IN {table_name_builder.staging_dst_prefix}/_tmp_loadable_keys`
""",
)
logged_query(
f"""
INSERT INTO {dst_table}
Expand All @@ -910,11 +974,16 @@ def direct_insert_all_keys(
) -> None:
dst_table = table_name_builder.dst_table(clickhouse_table)
src_table = table_name_builder.src_table(clickhouse_table)
settings = ''
# Large variant details inserts may OOM
if clickhouse_table == ClickHouseTable.VARIANT_DETAILS:
settings = 'SETTINGS max_insert_threads = 2'
logged_query(
f"""
INSERT INTO {dst_table}
SELECT {clickhouse_table.select_fields}
FROM {src_table}
{settings}
""",
)

Expand Down
61 changes: 60 additions & 1 deletion v03_pipeline/lib/misc/clickhouse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from v03_pipeline.lib.paths import (
new_entries_parquet_path,
new_transcripts_parquet_path,
new_variant_details_parquet_path,
new_variants_parquet_path,
runs_path,
)
Expand All @@ -44,7 +45,7 @@


class ClickhouseTest(MockedDatarootTestCase):
def setUp(self):
def setUp(self): # noqa: PLR0915
super().setUp()
client = get_clickhouse_client()
client.execute(
Expand Down Expand Up @@ -228,6 +229,24 @@ def setUp(self):
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/variants_memory` (
key UInt32,
variantId String,
) ENGINE = EmbeddedRocksDB()
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/variants_disk` (
key UInt32,
variantId String,
) ENGINE = EmbeddedRocksDB()
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/key_lookup` (
Expand Down Expand Up @@ -335,6 +354,24 @@ def setUp(self):
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/GCNV/variants_memory` (
key UInt32,
variantId String,
) ENGINE = EmbeddedRocksDB()
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/GCNV/variants_disk` (
key UInt32,
variantId String,
) ENGINE = EmbeddedRocksDB()
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/GCNV/key_lookup` (
Expand Down Expand Up @@ -364,6 +401,28 @@ def write_test_parquet(df: pd.DataFrame, parquet_path: str, schema=None):
),
)

# Variant Details Parquet
df = pd.DataFrame(
{
'key': [1, 2, 3, 4],
'variantId': [
'1-13-A-C',
'2-14-A-T',
'Y-19-A-C',
'M-12-C-G',
],
'transcripts': ['a', 'b', 'c', 'd'],
},
)
write_test_parquet(
df,
new_variant_details_parquet_path(
ReferenceGenome.GRCh38,
DatasetType.SNV_INDEL,
TEST_RUN_ID,
),
)

# Transcripts Parquet
df = pd.DataFrame({'key': [1, 2, 3, 4], 'transcripts': ['a', 'b', 'c', 'd']})
write_test_parquet(
Expand Down
8 changes: 8 additions & 0 deletions v03_pipeline/lib/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@
UpdatedReferenceDatasetParquetTask,
)
from v03_pipeline.lib.tasks.run_pipeline import RunPipelineTask
from v03_pipeline.lib.tasks.variants_migration.migrate_variant_details_parquet import (
MigrateVariantDetailsParquetTask,
)
from v03_pipeline.lib.tasks.variants_migration.migrate_variants_parquet import (
MigrateVariantsParquetTask,
)
from v03_pipeline.lib.tasks.write_metadata_for_run import WriteMetadataForRunTask
from v03_pipeline.lib.tasks.write_success_file import WriteSuccessFileTask

__all__ = [
'MigrateAllProjectsToClickHouseOnDataprocTask',
'MigrateAllProjectsToClickHouseTask',
'MigrateVariantDetailsParquetTask',
'MigrateVariantsParquetTask',
'RunPipelineTask',
'UpdatedReferenceDatasetParquetTask',
'WriteMetadataForRunTask',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
BaseLoadingPipelineParams,
)
from v03_pipeline.lib.tasks.base.base_write import BaseWriteTask
from v03_pipeline.lib.tasks.exports.write_new_transcripts_parquet import (
WriteNewTranscriptsParquetTask,
from v03_pipeline.lib.tasks.files import GCSorLocalTarget, HailTableTask
from v03_pipeline.lib.tasks.variants_migration.write_new_variant_details_parquet import (
WriteNewVariantDetailsParquetTask,
)
from v03_pipeline.lib.tasks.exports.write_new_variants_parquet import (
WriteNewVariantsParquetTask,
from v03_pipeline.lib.tasks.variants_migration.write_new_variants_parquet import (
WriteNewVariantsParquetForMigrationTask,
)
from v03_pipeline.lib.tasks.files import GCSorLocalTarget, HailTableTask

MAX_SNV_INDEL_ALLELE_LENGTH = 500

Expand Down Expand Up @@ -94,7 +94,7 @@ def run(self):
*(
[
self.clone(
WriteNewTranscriptsParquetTask,
WriteNewVariantDetailsParquetTask,
# Callset Path being required
# here is byproduct of the "place all variants"
# in the variants path" hack. In theory
Expand All @@ -107,11 +107,11 @@ def run(self):
callset_path=None,
),
]
if self.dataset_type.should_write_new_transcripts
if self.dataset_type.should_write_new_variant_details
else []
),
self.clone(
WriteNewVariantsParquetTask,
WriteNewVariantsParquetForMigrationTask,
callset_path=None,
),
],
Expand Down
Loading