-
Notifications
You must be signed in to change notification settings - Fork 622
VS-1780 - Set sample_info.is_loaded for parquet ingest #9320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: VS-1736
Are you sure you want to change the base?
Changes from all commits
3ad4c98
4f0af4f
67baa5a
67f06c8
eab6024
ea30f0e
506f4c5
197136a
717e0d6
a85aa64
e585309
9729e56
fde4fb4
66d34af
94bd318
308ef08
286fc9a
080ca0b
95db0a7
b41883c
6c9ae7b
43d3ff3
74d5678
be56266
a327dc5
2b90504
c4d1450
30d693e
e55fb95
7b9e0f2
66b588c
d9d5e1c
c93bf4f
179fba5
3c8fd75
d6e56cd
5bead97
f3bc8f4
8a87182
842f84d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,7 @@ workflow GvsImportGenomes { | |
| String? billing_project_id | ||
|
|
||
| # Dump these parquet files to a bucket | ||
| Boolean use_parquet_ingest = true # currently only in limited use. | ||
| String output_gcs_dir | ||
| Boolean configure_parquet_bucket_lifecycle = false | ||
|
|
||
|
|
@@ -187,12 +188,14 @@ workflow GvsImportGenomes { | |
| } | ||
|
|
||
| if (load_vet_and_ref_ranges) { | ||
| call SetIsLoadedColumn { | ||
| input: | ||
| load_done = LoadData.done, | ||
| project_id = project_id, | ||
| dataset_name = dataset_name, | ||
| cloud_sdk_docker = effective_cloud_sdk_docker, | ||
| if (!use_parquet_ingest) { | ||
| call SetIsLoadedColumn { | ||
| input: | ||
| load_done = LoadData.done, | ||
| project_id = project_id, | ||
| dataset_name = dataset_name, | ||
| cloud_sdk_docker = effective_cloud_sdk_docker, | ||
| } | ||
| } | ||
|
|
||
| if (configure_parquet_bucket_lifecycle) { | ||
|
|
@@ -211,7 +214,7 @@ workflow GvsImportGenomes { | |
| input: | ||
| project_id = project_id, | ||
| dataset_name = dataset_name, | ||
| set_is_loaded_done = SetIsLoadedColumn.done, | ||
| load_done = LoadData.done, | ||
| lifecycle_configured = select_first([ConfigureParquetLifecycle.done, "done"]), | ||
| variants_docker = effective_variants_docker, | ||
| } | ||
|
|
@@ -245,6 +248,17 @@ workflow GvsImportGenomes { | |
| load_outputs = LoadParquetFilesToBQ.completion_status, | ||
| variants_docker = effective_variants_docker, | ||
| } | ||
|
|
||
| if (use_parquet_ingest) { | ||
| # Update sample_info.is_loaded once parquet loading has been verified | ||
| call SetIsLoadedColumnForParquetIngest { | ||
| input: | ||
| go = VerifyParquetLoading.done, | ||
| project_id = project_id, | ||
| dataset_name = dataset_name, | ||
| cloud_sdk_docker = effective_cloud_sdk_docker, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| output { | ||
|
|
@@ -459,7 +473,7 @@ task LoadData { | |
| vet_parquet_file=`ls vet_*.parquet` | ||
| ref_parquet_file=`ls ref_*.parquet` | ||
|
|
||
| # parse the table partition out of the file name | ||
| # parse the table superpartition out of the file name | ||
| table_number=$(echo "$vet_parquet_file" | cut -d'_' -f2) | ||
|
|
||
| # copy the vet and ref parquet files to the gcs bucket in the right place | ||
|
|
@@ -569,6 +583,67 @@ task SetIsLoadedColumn { | |
| } | ||
| } | ||
|
|
||
| task SetIsLoadedColumnForParquetIngest { | ||
| input { | ||
| String dataset_name | ||
| String project_id | ||
|
|
||
| Boolean go | ||
| String cloud_sdk_docker | ||
| } | ||
| meta { | ||
| # This is doing some tricky stuff with `INFORMATION_SCHEMA` so just punt and let it be `volatile`. | ||
| volatile: true | ||
| } | ||
|
|
||
| # add labels for DSP Cloud Cost Control Labeling and Reporting | ||
| String bq_labels = "--label service:gvs --label team:variants --label managedby:import_genomes" | ||
|
|
||
| command <<< | ||
| # Prepend date, time and pwd to xtrace log entries. | ||
| PS4='\D{+%F %T} \w $ ' | ||
| set -o errexit -o nounset -o pipefail -o xtrace | ||
|
|
||
| echo "project_id = ~{project_id}" > ~/.bigqueryrc | ||
|
|
||
| # set is_loaded to true if there is a corresponding vet table partition with rows for that sample_id | ||
|
|
||
| # Note that we tried modifying CreateVariantIngestFiles to UPDATE sample_info.is_loaded on a per-sample basis. | ||
| # The major issue that was found is that BigQuery allows only 20 such concurrent DML statements. Considered using | ||
| # an exponential backoff, but at the number of samples that are being loaded this would introduce significant delays | ||
| # in workflow processing. So this method is used to set *all* of the saple_info.is_loaded flags at one time. | ||
|
|
||
| # bq query --max_rows check: ok update | ||
| bq --apilog=false --project_id=~{project_id} query --format=csv --use_legacy_sql=false ~{bq_labels} \ | ||
| 'UPDATE `~{dataset_name}.sample_info` SET is_loaded = true | ||
| WHERE sample_id IN (SELECT CAST(partition_id AS INT64) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok... I tinkered with this query in the console and I think I see how this works. But I'm wondering if this is still going to return correct results for vet tables > 001? Wouldn't the partitions in vet_002 start at 1 again?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually copied the bit about the partition from the 'normal' SetIsLoadedColumn method - I had thought it had been put in there to avoid some of the weirdly named vet and ref_ranges tables that were created during foxtrot? Very possible I misunderstood that. |
||
| from `~{dataset_name}.INFORMATION_SCHEMA.PARTITIONS` | ||
| WHERE partition_id NOT LIKE "__%" AND total_logical_bytes > 0 AND REGEXP_CONTAINS(table_name, "^vet_[0-9]+$")) OR | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure I understand why this big
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't there be a ref_ranges version of the AND logic here?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The big OR logic I added (that starting here at 622) is trying to find out that there is a sample_name (as extricated from the file_path) in parquet_load_status for a vet parquet file creation and also one for a ref_ranges parquet file creation. |
||
| sample_name in ( | ||
| select sample_name from | ||
| ( | ||
| select REGEXP_EXTRACT(file_path, r".*input_vcf_\d+_(.*).vcf.gz.parquet$") as sample_name, | ||
| FROM `~{dataset_name}.parquet_load_status` | ||
| where REGEXP_CONTAINS(file_path, ".*vet_[0-9]+_input_vcf_[0-9]+_.*$") | ||
| intersect distinct | ||
| select REGEXP_EXTRACT(file_path, r".*input_vcf_\d+_(.*).vcf.gz.parquet$") as sample_name, | ||
| FROM `~{dataset_name}.parquet_load_status` | ||
| where REGEXP_CONTAINS(file_path, ".*ref_ranges_[0-9]+_input_vcf_[0-9]+_.*$") | ||
| ) | ||
| )' | ||
| >>> | ||
| runtime { | ||
| docker: cloud_sdk_docker | ||
| memory: "1 GB" | ||
| disks: "local-disk 10 HDD" | ||
| cpu: 1 | ||
| } | ||
|
|
||
| output { | ||
| Boolean done = true | ||
| } | ||
| } | ||
|
|
||
| task GetUningestedSampleIds { | ||
| input { | ||
| String dataset_name | ||
|
|
@@ -796,7 +871,7 @@ task CreateParquetTrackingTable { | |
| input { | ||
| String project_id | ||
| String dataset_name | ||
| String set_is_loaded_done | ||
| Array[String] load_done | ||
|
||
| String lifecycle_configured | ||
| String variants_docker | ||
| } | ||
|
|
@@ -955,5 +1030,6 @@ task VerifyParquetLoading { | |
| Int loaded_files = read_json(results_json)["loaded_files"] | ||
| Int missing_files = read_json(results_json)["missing_files"] | ||
| File? missing_files_list = "verification_output/missing_files.txt" | ||
| Boolean done = true | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'saple_info' to 'sample_info'.