Skip to content

Commit

Permalink
Handle preexisting merge tmp gracefully [VS-901] (#8723)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr authored Mar 8, 2024
1 parent 5533f55 commit 21d60be
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us.gcr.io/broad-dsde-methods/variantstore:2024-03-06-alpine-0511c645f"
String variants_docker = "us.gcr.io/broad-dsde-methods/variantstore:2024-03-08-alpine-8e28f1924"
String gatk_docker = "us.gcr.io/broad-dsde-methods/broad-gatk-snapshots:varstore_2024_02_16_78c53a6"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down
27 changes: 19 additions & 8 deletions scripts/variantstore/wdl/extract/import_gvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,19 +312,30 @@ def convert_array_with_id_keys_to_dense_array(arr, ids, drop=[]):
info(f'import_gvs: using target_records (records per partition) of {target_records} for VDS merge')

interval_tmp = os.path.join(tmp_dir, 'interval_checkpoint.ht')
if hl.hadoop_exists(interval_tmp):
info(f'import_gvs: interval checkpoint table "{interval_tmp}" already exists, deleting')
hl.current_backend().fs.rmtree(interval_tmp)

target_final_intervals, _ = calculate_new_intervals(first_ref_mt, target_records, interval_tmp)

with hl._with_flags(no_whole_stage_codegen='1'):

merge_tmp = os.path.join(tmp_dir, 'merge_tmp.vds')
info(f'import_gvs: calling Hail VDS combiner for merging {len(vds_paths)} intermediates')
combiner = hl.vds.new_combiner(output_path=merge_tmp,
vds_paths=vds_paths,
target_records=target_records,
branch_factor=52, # Echo has 104 intermediate VDSes so 2 equally sized groups
temp_path=tmp_dir,
use_genome_default_intervals=True)
combiner.run()
from hail.vds import VariantDataset
ref_success_path = os.path.join(VariantDataset._reference_path(merge_tmp), '_SUCCESS')
var_success_path = os.path.join(VariantDataset._variants_path(merge_tmp), '_SUCCESS')
if hl.hadoop_exists(ref_success_path) and hl.hadoop_exists(var_success_path):
info(f'import_gvs: Hail VDS combiner is done. Skipping it')
else:
info(f'import_gvs: calling Hail VDS combiner for merging {len(vds_paths)} intermediates')
combiner = hl.vds.new_combiner(output_path=merge_tmp,
vds_paths=vds_paths,
target_records=target_records,
branch_factor=52, # Echo has 104 intermediate VDSes so 2 equal sized groups
temp_path=tmp_dir,
use_genome_default_intervals=True)
combiner.run()

combined = hl.vds.read_vds(merge_tmp, intervals=target_final_intervals)

rd = combined.reference_data
Expand Down

0 comments on commit 21d60be

Please sign in to comment.