Conversation
|
Remember to squash merge! |
🔍Version Validation Results: |
🔍Changelog Validation Results: |
…adinstitute/warp into lk_local_ancestry_subsetting
|
Remember to squash merge! |
🔍Changelog Validation Results: |
🔍Version Validation Results: |
There was a problem hiding this comment.
Pull request overview
This PR adds initial local ancestry subsetting functionality to the WARP repository under the all_of_us/local_ancestry directory. The PR introduces a new WDL workflow for subsetting phased VCF files using Hail on Google Cloud Dataproc, along with supporting Python scripts.
Changes:
- Added SubsetPhasedVcfsForFLARE workflow for subsetting All of Us phased VCF files to specific sample lists using Hail
- Added run_relatedness_only_workflow.py script for identifying related samples without computing maximal independent sets
- Registered the new SubsetPhasedVcfsForFLARE workflow in .dockstore.yml
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| all_of_us/local_ancestry/SubsetPhasedVcfsForFLARE.wdl | New WDL workflow that orchestrates Dataproc cluster creation, Hail-based VCF subsetting, and output management |
| all_of_us/local_ancestry/subset_aou_phased.py | Python script for subsetting per-chromosome phased VCFs using Hail, with support for checkpointing and GCS paths |
| all_of_us/local_ancestry/SubsetPhasedVcfsForFLARE.changelog.md | Initial changelog entry documenting the first version of the pipeline |
| all_of_us/ancestry/run_relatedness_only_workflow.py | Python script for relatedness analysis using pc_relate without maximal independent set computation |
| .dockstore.yml | Added registry entry for the new SubsetPhasedVcfsForFLARE workflow |
| def vcf_path_for_chrom(phased_vcf_gcs_dir: str, chrom: str) -> str: | ||
| phased_vcf_gcs_dir = phased_vcf_gcs_dir.rstrip("/") | ||
| return f"{phased_vcf_gcs_dir}/{chrom}.aou.v9.phased.vcf.gz" |
There was a problem hiding this comment.
The vcf_path_for_chrom function hardcodes the filename pattern as "{chrom}.aou.v9.phased.vcf.gz", which includes a specific version "v9". If the phased VCF files have a different version or naming convention, this function will fail to locate them. Consider making the version number or full filename pattern configurable through an argument.
| default_reference=reference_genome, | ||
| idempotent=True, | ||
| spark_conf=spark_conf, | ||
| # tmp_dir=tmp_dir, |
There was a problem hiding this comment.
The tmp_dir parameter is commented out in the hl.init call but is accepted as an argument and passed to hail_init. This parameter should either be uncommented in the hl.init call or removed from the function signature and argument parsing if it's not intended to be used.
| # tmp_dir=tmp_dir, | |
| tmp_dir=tmp_dir, |
|
|
||
| RuntimeAttr? runtime_attr_override | ||
| } | ||
| String pipeline_version = "aou_10.0.0" |
There was a problem hiding this comment.
The indentation for the pipeline_version line uses tabs instead of spaces. According to WDL style conventions, indentation should use spaces (typically 2 or 4 spaces). This line should be indented with spaces to match the surrounding code style.
| String pipeline_version = "aou_10.0.0" | |
| String pipeline_version = "aou_10.0.1" |
| command <<< | ||
| set -euxo pipefail | ||
|
|
||
| # Comfigure gcloud account | ||
| gcloud config list account --format "value(core.account)" 1> account.txt | ||
|
|
||
|
|
||
| # Verify Python3 availability in the Docker image | ||
| if which python3 > /dev/null 2>&1; then | ||
| pt3="$(which python3)" | ||
| echo "** python3 located at $pt3" | ||
| echo "** Version info: $(python3 -V)" | ||
| # Quick test to ensure Python3 is working | ||
| python3 -c "print('hello world')" | ||
| else | ||
| echo "!! No 'python3' in path." | ||
| exit 1 | ||
| fi | ||
| # Inline Python script to start a Hail Dataproc cluster and submit a job | ||
| python3 <<EOF | ||
| print("Running python code...") | ||
| import hail as hl | ||
| import os | ||
| import sys | ||
| import uuid | ||
| import re | ||
| from google.cloud import dataproc_v1 as dataproc | ||
| from datetime import datetime | ||
|
|
||
| def popen_read_checked(cmd): | ||
| with os.popen(cmd) as stream: | ||
| output = stream.read() | ||
| status = stream.close() # returns None on success, exit code << 8 on failure | ||
| if status is not None: # means command failed | ||
| raise RuntimeError(f"Command failed with exit code {status >> 8}: {cmd}\n{output}") | ||
| return output | ||
|
|
||
| # Function to replace unacceptable characters with '-' | ||
| def sanitize_label(label): | ||
| return re.sub(r'[^a-z0-9-]', '-', label.lower()) | ||
|
|
||
| def truncate_cluster_name(name): | ||
| # Ensure the name starts with a lowercase letter and convert to lowercase | ||
| name = name.lower() | ||
| if not name[0].isalpha(): | ||
| raise ValueError("Cluster name must start with a letter.") | ||
|
|
||
| # Truncate to a maximum of 10 characters | ||
| truncated = name[:10] | ||
|
|
||
| # Ensure the name does not end with a hyphen | ||
| if truncated[-1] == '-': | ||
| truncated = truncated[:-1] + '0' | ||
|
|
||
| return truncated | ||
|
|
||
| # Sanitize task_identifier | ||
| sanitized_task_label = sanitize_label("~{task_identifier}") | ||
| sanitized_task_label = truncate_cluster_name(sanitized_task_label) | ||
|
|
||
| # Generate a unique cluster name | ||
| # Must match pattern (?:[a-z](?:[-a-z0-9]{0,49}[a-z0-9])?) | ||
| cluster_name = f'{sanitized_task_label}-hail-{str(uuid.uuid4())[0:13]}' | ||
| print(f"Cluster Name: {cluster_name}") | ||
| script_path = "~{submission_script}" | ||
|
|
||
| # Read the Google Cloud account name | ||
| with open("account.txt", "r") as account_file: | ||
| account = account_file.readline().strip() | ||
| print("account: " + account) | ||
|
|
||
| try: | ||
| # Construct the command to start the Hail Dataproc cluster using a multi-line f-string | ||
| # Change master to #n1-highmem-32 when done testing | ||
| cluster_start_cmd = f""" | ||
| hailctl dataproc start --num-workers ~{num_workers} | ||
| --region ~{region} --project ~{gcs_project} --service-account {account} | ||
| --worker-machine-type n1-standard-4 | ||
| --master-machine-type n1-standard-4 | ||
| --max-idle=~{max_idle}m --max-age=~{max_age}m | ||
| --subnet=projects/~{gcs_project}/regions/~{region}/subnetworks/~{gcs_subnetwork_name} | ||
| --enable-component-gateway | ||
| {cluster_name} | ||
| """ | ||
| print("Starting cluster...") | ||
| # Replace newline characters with spaces and remove extra spaces | ||
| cluster_start_cmd = ' '.join(cluster_start_cmd.split()) | ||
|
|
||
| print(os.popen(cluster_start_cmd).read()) | ||
|
|
||
| # Create a Dataproc client and identify the cluster's staging bucket | ||
| cluster_client = dataproc.ClusterControllerClient( | ||
| client_options={"api_endpoint": f"~{region}-dataproc.googleapis.com:443"} | ||
| ) | ||
|
|
||
| for cluster in cluster_client.list_clusters(request={"project_id": "~{gcs_project}", "region": "~{region}"}): | ||
| if cluster.cluster_name == cluster_name: | ||
| cluster_staging_bucket = cluster.config.temp_bucket | ||
|
|
||
| # Submit the Hail job to the Dataproc cluster using a multi-line f-string | ||
| # Submit the job | ||
| ckpt_args = "" | ||
| if "~{do_checkpoint}" == "true": | ||
| ckpt_path = "~{select_first([checkpoint_path, ''])}" | ||
| if ckpt_path == "": | ||
| raise ValueError("do_checkpoint=true but checkpoint_path was not provided") | ||
| ckpt_args = f"--checkpoint_path {ckpt_path} " | ||
| if "~{checkpoint_overwrite}" == "true": | ||
| ckpt_args += "--checkpoint_overwrite " | ||
| submit_cmd = f""" | ||
| gcloud dataproc jobs submit pyspark {script_path} | ||
| --cluster={cluster_name} --project ~{gcs_project} --region=~{region} --account {account} | ||
| --driver-log-levels root=WARN -- | ||
| --executor_memory ~{executor_memory} --executor_cores ~{executor_cores} | ||
| --driver_memory ~{driver_memory} --driver_cores ~{driver_cores} | ||
| --reference_genome ~{reference_genome} | ||
| --min_partitions ~{min_partitions} | ||
| --phased_vcf_gcs_dir ~{phased_vcf_gcs_dir} | ||
| --chrom ~{chrom} | ||
| --samples_tsv ~{samples_tsv} | ||
| --sample_id_column ~{sample_id_column} | ||
| --entry_fields "~{entry_fields}" | ||
| {"--force_bgz" if "~{force_bgz}" == "true" else ""} | ||
| {ckpt_args} | ||
| --output_vcf ~{output_vcf} | ||
| --log_level INFO | ||
| """ | ||
| print("Running: " + submit_cmd) | ||
| submit_cmd = " ".join(submit_cmd.split()) | ||
| print("Running:", submit_cmd) | ||
| print(popen_read_checked(submit_cmd)) | ||
|
|
||
| # Copy outputs back locally for WDL outputs | ||
| # output_vcf is a gs:// path; Cromwell needs local files as task outputs | ||
| out_vcf = "~{task_identifier}.subset.vcf.bgz" | ||
|
|
||
| copy_vcf_cmd = f"gsutil -m cp ~{output_vcf} ./{out_vcf}" | ||
| print(popen_read_checked(copy_vcf_cmd)) | ||
|
|
||
| break | ||
|
|
||
| except Exception as e: | ||
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | ||
| print(timestamp, "Exception raised!") | ||
| print(e) | ||
| raise | ||
| finally: | ||
| print(f"Stopping cluster: {cluster_name}") | ||
| os.popen( | ||
| "gcloud dataproc clusters delete --quiet --project {} --region {} --account {} {}".format( | ||
| "~{gcs_project}", "~{region}", account, cluster_name | ||
| ) | ||
| ).read() | ||
| EOF | ||
|
|
||
| echo "Complete" | ||
| >>> |
There was a problem hiding this comment.
The command block and its contents use tabs for indentation instead of spaces. According to WDL style conventions, indentation should consistently use spaces. Please convert tabs to spaces throughout the command block.
| # Assuming the first member is the directory you want | ||
| # This will get the name of the first member in the tar file | ||
| folder_name = members[0].name.split('/')[0] | ||
| tar.extractall() |
There was a problem hiding this comment.
The tar.extractall() call on line 95 does not specify a path argument, which means files will be extracted to the current directory. This could potentially lead to directory traversal security vulnerabilities if the tar file contains malicious paths. Consider using tar.extractall(path=local_ht_path) or validating member paths before extraction.
| for cluster in cluster_client.list_clusters(request={"project_id": "~{gcs_project}", "region": "~{region}"}): | ||
| if cluster.cluster_name == cluster_name: | ||
| cluster_staging_bucket = cluster.config.temp_bucket | ||
|
|
||
| # Submit the Hail job to the Dataproc cluster using a multi-line f-string | ||
| # Submit the job | ||
| ckpt_args = "" | ||
| if "~{do_checkpoint}" == "true": | ||
| ckpt_path = "~{select_first([checkpoint_path, ''])}" | ||
| if ckpt_path == "": | ||
| raise ValueError("do_checkpoint=true but checkpoint_path was not provided") | ||
| ckpt_args = f"--checkpoint_path {ckpt_path} " | ||
| if "~{checkpoint_overwrite}" == "true": | ||
| ckpt_args += "--checkpoint_overwrite " | ||
| submit_cmd = f""" | ||
| gcloud dataproc jobs submit pyspark {script_path} | ||
| --cluster={cluster_name} --project ~{gcs_project} --region=~{region} --account {account} | ||
| --driver-log-levels root=WARN -- | ||
| --executor_memory ~{executor_memory} --executor_cores ~{executor_cores} | ||
| --driver_memory ~{driver_memory} --driver_cores ~{driver_cores} | ||
| --reference_genome ~{reference_genome} | ||
| --min_partitions ~{min_partitions} | ||
| --phased_vcf_gcs_dir ~{phased_vcf_gcs_dir} | ||
| --chrom ~{chrom} | ||
| --samples_tsv ~{samples_tsv} | ||
| --sample_id_column ~{sample_id_column} | ||
| --entry_fields "~{entry_fields}" | ||
| {"--force_bgz" if "~{force_bgz}" == "true" else ""} | ||
| {ckpt_args} | ||
| --output_vcf ~{output_vcf} | ||
| --log_level INFO | ||
| """ | ||
| print("Running: " + submit_cmd) | ||
| submit_cmd = " ".join(submit_cmd.split()) | ||
| print("Running:", submit_cmd) | ||
| print(popen_read_checked(submit_cmd)) | ||
|
|
||
| # Copy outputs back locally for WDL outputs | ||
| # output_vcf is a gs:// path; Cromwell needs local files as task outputs | ||
| out_vcf = "~{task_identifier}.subset.vcf.bgz" | ||
|
|
||
| copy_vcf_cmd = f"gsutil -m cp ~{output_vcf} ./{out_vcf}" | ||
| print(popen_read_checked(copy_vcf_cmd)) | ||
|
|
||
| break |
There was a problem hiding this comment.
If the cluster is not found in the list of clusters (the for loop doesn't find a matching cluster_name), the code inside the if block will never execute, but no error will be raised. This could lead to silent failures where the task appears to complete successfully but no job was actually submitted. Consider adding an else clause after the for loop to raise an error if the cluster is not found.
| task TabixIndexBgzVcf { | ||
| input { | ||
| File vcf_bgz | ||
| Int cpu = 1 | ||
| Int memory_gb = 2 | ||
| Int disk_gb = 20 | ||
| String? docker | ||
| } | ||
|
|
||
| command <<< | ||
| set -euo pipefail | ||
|
|
||
| # Localize input VCF into the working directory with a stable name | ||
| cp "~{vcf_bgz}" input.vcf.gz | ||
|
|
||
| # Create tabix index (writes input.vcf.gz.tbi) | ||
| tabix -p vcf input.vcf.gz | ||
|
|
||
| # Sanity check | ||
| test -s input.vcf.gz.tbi | ||
| >>> | ||
|
|
||
| output { | ||
| File vcf_local = "input.vcf.gz" | ||
| File tbi = "input.vcf.gz.tbi" | ||
| } | ||
|
|
||
| runtime { | ||
| cpu: cpu | ||
| memory: "~{memory_gb} GB" | ||
| disks: "local-disk ~{disk_gb} HDD" | ||
| docker: select_first([docker, "quay.io/biocontainers/htslib:1.20--h81da01d_0"]) | ||
| } | ||
| } |
There was a problem hiding this comment.
The TabixIndexBgzVcf task is defined but never called in the workflow. If this task is not intended to be used, it should be removed. If it is intended to be used in future versions or as a utility task, consider adding a comment explaining its purpose and why it's not currently called.
| command <<< | ||
| set -euxo pipefail | ||
|
|
||
| # Comfigure gcloud account |
There was a problem hiding this comment.
The word "Comfigure" is misspelled. It should be "Configure".
| Int? max_retries | ||
| } | ||
|
|
||
| workflow SubsetPhasedVcfsForFlare { |
There was a problem hiding this comment.
The workflow name "SubsetPhasedVcfsForFlare" has inconsistent capitalization with the file name "SubsetPhasedVcfsForFLARE.wdl". The workflow name should be "SubsetPhasedVcfsForFLARE" to match the file naming convention (all caps for "FLARE").
| # Create a Dataproc client and identify the cluster's staging bucket | ||
| cluster_client = dataproc.ClusterControllerClient( | ||
| client_options={"api_endpoint": f"~{region}-dataproc.googleapis.com:443"} | ||
| ) | ||
|
|
||
| for cluster in cluster_client.list_clusters(request={"project_id": "~{gcs_project}", "region": "~{region}"}): | ||
| if cluster.cluster_name == cluster_name: | ||
| cluster_staging_bucket = cluster.config.temp_bucket |
There was a problem hiding this comment.
The variable cluster_staging_bucket is assigned inside the for loop (line 236) but is never used. If this variable is intended to be used elsewhere in the code, it should be utilized; otherwise, the assignment and the variable should be removed to avoid confusion.
| # Create a Dataproc client and identify the cluster's staging bucket | |
| cluster_client = dataproc.ClusterControllerClient( | |
| client_options={"api_endpoint": f"~{region}-dataproc.googleapis.com:443"} | |
| ) | |
| for cluster in cluster_client.list_clusters(request={"project_id": "~{gcs_project}", "region": "~{region}"}): | |
| if cluster.cluster_name == cluster_name: | |
| cluster_staging_bucket = cluster.config.temp_bucket | |
| # Create a Dataproc client | |
| cluster_client = dataproc.ClusterControllerClient( | |
| client_options={"api_endpoint": f"~{region}-dataproc.googleapis.com:443"} | |
| ) | |
| for cluster in cluster_client.list_clusters(request={"project_id": "~{gcs_project}", "region": "~{region}"}): | |
| if cluster.cluster_name == cluster_name: |
Adding initial local ancestry scripts for subsetting