TASKS tasks_interhost
| File Path |
pipes/WDL/tasks/tasks_interhost.wdl
|
|---|---|
| WDL Version | 1.0 |
| Type | tasks |
📋Tasks in this document
-
subsample_by_cases Run subsampler to get downsampled dataset and metadata proportional to epidemiological case counts.
-
multi_align_mafft_ref
-
multi_align_mafft
-
beast Execute GPU-accelerated BEAST. For tips on performance, see https://beast.community/performance#gpu
-
index_ref
-
trimal_clean_msa
-
merge_vcfs_bcftools
-
merge_vcfs_gatk
-
reconstructr
Tasks
TASKS subsample_by_cases
Run subsampler to get downsampled dataset and metadata proportional to epidemiological case counts.
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
metadata
|
File
|
- | - |
case_data
|
File
|
- | - |
id_column
|
String
|
- | - |
geo_column
|
String
|
- | - |
keep_file
|
File?
|
- | - |
remove_file
|
File?
|
- | - |
filter_file
|
File?
|
- | - |
seed_num
|
Int?
|
- | - |
start_date
|
String?
|
- | - |
end_date
|
String?
|
- | - |
5 optional inputs with default values |
|||
Command
set -e -o pipefail
mkdir -p data outputs
# decompress if compressed
echo "staging and decompressing input data files"
if [[ ~{metadata} == *.gz ]]; then
cat "~{metadata}" | pigz -d > data/metadata.tsv
elif [[ ~{metadata} == *.zst ]]; then
cat "~{metadata}" | zstd -d > data/metadata.tsv
else
ln -s "~{metadata}" data/metadata.tsv
fi
if [[ ~{case_data} == *.gz ]]; then
cat "~{case_data}" | pigz -d > data/case_data.tsv
elif [[ ~{case_data} == *.zst ]]; then
cat "~{case_data}" | zstd -d > data/case_data.tsv
else
ln -s "~{case_data}" data/case_data.tsv
fi
## replicate snakemake DAG manually
# rule genome_matrix
# Generate matrix of genome counts per day, for each element in column ~{geo_column}
echo "getting genome matrix"
python3 /opt/subsampler/scripts/get_genome_matrix.py \
--metadata data/metadata.tsv \
--index-column ~{geo_column} \
--date-column ~{date_column} \
~{"--start-date " + start_date} \
~{"--end-date " + end_date} \
--output outputs/genome_matrix_days.tsv
date;uptime;free
# rule unit_conversion
# Generate matrix of genome and case counts per epiweek
echo "converting matricies to epiweeks"
python3 /opt/subsampler/scripts/aggregator.py \
--input outputs/genome_matrix_days.tsv \
--unit ~{unit} \
--format integer \
--output outputs/matrix_genomes_unit.tsv
python3 /opt/subsampler/scripts/aggregator.py \
--input data/case_data.tsv \
--unit ~{unit} \
--format integer \
~{"--start-date " + start_date} \
~{"--end-date " + end_date} \
--output outputs/matrix_cases_unit.tsv
date;uptime;free
# rule correct_bias
# Correct under- and oversampling genome counts based on epidemiological data
echo "create bias-correction matrix"
python3 /opt/subsampler/scripts/correct_bias.py \
--genome-matrix outputs/matrix_genomes_unit.tsv \
--case-matrix outputs/matrix_cases_unit.tsv \
--index-column code \
~{"--baseline " + baseline} \
--output1 outputs/weekly_sampling_proportions.tsv \
--output2 outputs/weekly_sampling_bias.tsv \
--output3 outputs/matrix_genomes_unit_corrected.tsv
date;uptime;free
# rule subsample
# Sample genomes and metadata according to the corrected genome matrix
echo "subsample data according to bias-correction"
# subsampler_timeseries says --keep is optional but actually fails if you don't specify one
cp /dev/null data/keep.txt
~{"cp " + keep_file + " data/keep.txt"}
python3 /opt/subsampler/scripts/subsampler_timeseries.py \
--metadata data/metadata.tsv \
--genome-matrix outputs/matrix_genomes_unit_corrected.tsv \
--index-column ~{id_column} \
--geo-column ~{geo_column} \
--date-column ~{date_column} \
--time-unit ~{unit} \
--keep data/keep.txt \
~{"--remove " + remove_file} \
~{"--filter-file " + filter_file} \
~{"--seed " + seed_num} \
~{"--start-date " + start_date} \
~{"--end-date " + end_date} \
--weekasdate no \
--sampled-sequences outputs/selected_sequences.txt \
--sampled-metadata outputs/selected_metadata.tsv \
--report outputs/sampling_stats.txt
echo '# Sampling proportion: ~{baseline}' | cat - outputs/sampling_stats.txt > temp && mv temp outputs/sampling_stats.txt
date;uptime;free
# copy outputs from container's temp dir to host-accessible working dir for delocalization
echo "wrap up"
mv outputs/* .
# get counts
cat selected_sequences.txt | wc -l | tee NUM_OUT
# get hardware utilization
set +o pipefail
cat /proc/uptime | cut -f 1 -d ' ' > UPTIME_SEC
cat /proc/loadavg > CPU_LOAD
{ if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.peak ]; then cat /sys/fs/cgroup/memory/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } > MEM_BYTES
Outputs
| Name | Type | Expression |
|---|---|---|
genome_matrix_days
|
File
|
"genome_matrix_days.tsv"
|
matrix_genomes_unit
|
File
|
"matrix_genomes_unit.tsv"
|
matrix_cases_unit
|
File
|
"matrix_cases_unit.tsv"
|
weekly_sampling_proportions
|
File
|
"weekly_sampling_proportions.tsv"
|
weekly_sampling_bias
|
File
|
"weekly_sampling_bias.tsv"
|
matrix_genomes_unit_corrected
|
File
|
"matrix_genomes_unit_corrected.tsv"
|
selected_sequences
|
File
|
"selected_sequences.txt"
|
selected_metadata
|
File
|
"selected_metadata.tsv"
|
sampling_stats
|
File
|
"sampling_stats.txt"
|
num_selected
|
Int
|
read_int("NUM_OUT")
|
max_ram_gb
|
Int
|
ceil((read_float("MEM_BYTES") / 1000000000))
|
runtime_sec
|
Int
|
ceil(read_float("UPTIME_SEC"))
|
cpu_load
|
String
|
read_string("CPU_LOAD")
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
machine_mem_gb + " GB"
|
cpu
|
2
|
disks
|
"local-disk 200 HDD"
|
disk
|
"200 GB"
|
dx_instance_type
|
"mem3_ssd1_v2_x4"
|
TASKS multi_align_mafft_ref
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
reference_fasta
|
File
|
- | - |
assemblies_fasta
|
Array[File]+
|
- | - |
mafft_maxIters
|
Int?
|
- | - |
mafft_ep
|
Float?
|
- | - |
mafft_gapOpeningPenalty
|
Float?
|
- | - |
machine_mem_gb
|
Int?
|
- | - |
1 optional input with default value |
|||
Command
interhost.py --version | tee VERSION
interhost.py multichr_mafft \
"~{reference_fasta}" ~{sep=" " assemblies_fasta} \
. \
~{'--ep=' + mafft_ep} \
~{'--gapOpeningPenalty=' + mafft_gapOpeningPenalty} \
~{'--maxiters=' + mafft_maxIters} \
--outFilePrefix "align_mafft-~{fasta_basename}" \
--preservecase \
--localpair \
--sampleNameListFile "align_mafft-~{fasta_basename}-sample_names.txt" \
--loglevel DEBUG
Outputs
| Name | Type | Expression |
|---|---|---|
alignments_by_chr
|
Array[File]+
|
glob("align_mafft-~{fasta_basename}*.fasta")
|
viralngs_version
|
String
|
read_string("VERSION")
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
select_first([machine_mem_gb, 60]) + " GB"
|
cpu
|
8
|
disks
|
"local-disk " + disk_size + " HDD"
|
disk
|
disk_size + " GB"
|
dx_instance_type
|
"mem3_ssd1_v2_x8"
|
maxRetries
|
2
|
TASKS multi_align_mafft
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
assemblies_fasta
|
Array[File]+
|
- | - |
mafft_maxIters
|
Int?
|
- | - |
mafft_ep
|
Float?
|
- | - |
mafft_gapOpeningPenalty
|
Float?
|
- | - |
machine_mem_gb
|
Int?
|
- | - |
2 optional inputs with default values |
|||
Command
interhost.py --version | tee VERSION
interhost.py multichr_mafft \
~{sep=" " assemblies_fasta} \
. \
~{'--ep=' + mafft_ep} \
~{'--gapOpeningPenalty=' + mafft_gapOpeningPenalty} \
~{'--maxiters=' + mafft_maxIters} \
--outFilePrefix ~{out_prefix} \
--preservecase \
--localpair \
--sampleNameListFile ~{out_prefix}-sample_names.txt \
--loglevel DEBUG
Outputs
| Name | Type | Expression |
|---|---|---|
sampleNamesFile
|
File
|
"~{out_prefix}-sample_names.txt"
|
alignments_by_chr
|
Array[File]
|
glob("~{out_prefix}*.fasta")
|
viralngs_version
|
String
|
read_string("VERSION")
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
select_first([machine_mem_gb, 30]) + " GB"
|
cpu
|
8
|
disks
|
"local-disk " + disk_size + " HDD"
|
disk
|
disk_size + " GB"
|
dx_instance_type
|
"mem2_ssd1_v2_x8"
|
maxRetries
|
2
|
TASKS beast
Execute GPU-accelerated BEAST. For tips on performance, see https://beast.community/performance#gpu
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
beauti_xml
|
File
|
- | - |
beagle_order
|
String?
|
The order of CPU(0) and GPU(1+) resources used to process partitioned data. | - |
accelerator_type
|
String?
|
[GCP] The model of GPU to use. For availability and pricing on GCP, see https://cloud.google.com/compute/gpus-pricing#gpus | - |
accelerator_count
|
Int?
|
[GCP] The number of GPUs of the specified type to use. | - |
gpu_type
|
String?
|
[Terra] The model of GPU to use. For availability and pricing on GCP, see https://support.terra.bio/hc/en-us/articles/4403006001947-Getting-started-with-GPUs-in-a-Cloud-Environment | - |
gpu_count
|
Int?
|
[Terra] The number of GPUs of the specified type to use. | - |
vm_size
|
String?
|
- | - |
2 optional inputs with default values |
|||
Command
set -e
beast -beagle_info
nvidia-smi
# if beagle_order is not specified by the user,
# create an appropriate string based on the gpu count
default_beagle_order="$(seq -s, ~{gpu_count_used})"
beagle_order=~{default="$default_beagle_order" beagle_order}
echo "beagle_order: $beagle_order"
bash -c "sleep 60; nvidia-smi" &
beast \
-beagle_multipartition off \
-beagle_GPU \
-beagle_cuda \
-beagle_SSE \
~{true="-beagle_double" false="-beagle_single" beagle_double_precision} \
-beagle_scaling always \
~{'-beagle_order ' + beagle_order} \
~{beauti_xml}
Outputs
| Name | Type | Expression |
|---|---|---|
beast_log
|
File
|
glob("*.log")[0]
|
trees
|
Array[File]
|
glob("*.trees")
|
ops
|
Array[File]
|
glob("*.ops")
|
rates
|
Array[File]
|
glob("*.rates")
|
root
|
Array[File]
|
glob("*.root")
|
beast_stdout
|
File
|
stdout()
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
"7 GB"
|
cpu
|
4
|
disks
|
"local-disk " + disk_size + " HDD"
|
disk
|
disk_size_az + " GB"
|
vm_size
|
select_first([accelerator_type, "Standard_NC6"])
|
maxRetries
|
1
|
bootDiskSizeGb
|
boot_disk
|
gpu
|
true
|
dx_timeout
|
"40H"
|
dx_instance_type
|
"mem1_ssd1_gpu2_x8"
|
acceleratorType
|
select_first([accelerator_type, "nvidia-tesla-p4"])
|
acceleratorCount
|
select_first([accelerator_count, 1])
|
gpuType
|
select_first([gpu_type, "nvidia-tesla-p4"])
|
gpuCount
|
select_first([gpu_count, 1])
|
nvidiaDriverVersion
|
"410.79"
|
TASKS index_ref
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
referenceGenome
|
File
|
- | - |
novocraft_license
|
File?
|
- | - |
machine_mem_gb
|
Int?
|
- | - |
1 optional input with default value |
|||
Command
read_utils.py --version | tee VERSION
read_utils.py novoindex \
"~{referenceGenome}" \
~{"--NOVOALIGN_LICENSE_PATH=" + novocraft_license}
read_utils.py index_fasta_samtools "~{referenceGenome}"
read_utils.py index_fasta_picard "~{referenceGenome}"
Outputs
| Name | Type | Expression |
|---|---|---|
referenceNix
|
File
|
"*.nix"
|
referenceFai
|
File
|
"*.fasta.fai"
|
referenceDict
|
File
|
"*.dict"
|
viralngs_version
|
String
|
read_string("VERSION")
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
cpu
|
2
|
memory
|
select_first([machine_mem_gb, 4]) + " GB"
|
disks
|
"local-disk " + disk_size + " HDD"
|
disk
|
disk_size + " GB"
|
maxRetries
|
2
|
TASKS trimal_clean_msa
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
in_aligned_fasta
|
File
|
- | - |
machine_mem_gb
|
Int?
|
- | - |
2 optional inputs with default values |
|||
Command
trimal -fasta -automated1 -in "~{in_aligned_fasta}" -out "~{input_basename}_trimal_cleaned.fasta"
Outputs
| Name | Type | Expression |
|---|---|---|
trimal_cleaned_fasta
|
File
|
"~{input_basename}_trimal_cleaned.fasta"
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
select_first([machine_mem_gb, 7]) + " GB"
|
cpu
|
4
|
disks
|
"local-disk " + disk_size + " HDD"
|
disk
|
disk_size + " GB"
|
dx_instance_type
|
"mem1_ssd1_v2_x8"
|
maxRetries
|
2
|
TASKS merge_vcfs_bcftools
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
in_vcfs_gz
|
Array[File]
|
VCF files to merged; should be (b)gzipped. | - |
machine_mem_gb
|
Int?
|
- | - |
2 optional inputs with default values |
|||
Command
# copy files to CWD (required for tabix indexing)
INFILES=~{write_lines(in_vcfs_gz)}
ln -s $(cat $INFILES) .
for FN in $(cat $INFILES); do basename $FN; done > vcf_filenames.txt
# tabix index input vcfs (must be gzipped)
for FN in $(cat vcf_filenames.txt); do
tabix -p vcf $FN
done
# see: https://samtools.github.io/bcftools/bcftools.html#merge
# --merge snps allows snps to be merged to multi-allelic (multi-ALT) records, all other records are listed separately
bcftools merge
--missing-to-ref \
--force-samples \
--merge snps \
--output ${output_prefix}.vcf.gz \
--output-type z \
--threads "$(nproc --all)" \
--file-list vcf_filenames.txt
# tabix index the vcf to create .tbi file
tabix -p vcf ${output_prefix}.vcf.gz
Outputs
| Name | Type | Expression |
|---|---|---|
merged_vcf_gz
|
File
|
"~{output_prefix}.vcf.gz"
|
merged_vcf_gz_tbi
|
File
|
"~{output_prefix}.vcf.gz.tbi"
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
select_first([machine_mem_gb, 3]) + " GB"
|
cpu
|
2
|
dx_instance_type
|
"mem1_ssd1_v2_x2"
|
maxRetries
|
2
|
TASKS merge_vcfs_gatk
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
in_vcfs_gz
|
Array[File]
|
VCF files to merged; should be (b)gzipped. | - |
ref_fasta
|
File
|
fasta file of reference genome relative to which the input VCF sites were called | - |
machine_mem_gb
|
Int?
|
- | - |
2 optional inputs with default values |
|||
Command
# tabix index input vcfs (must be gzipped)
parallel -I ,, \
"tabix -p vcf ,," \
::: "~{sep=" " in_vcfs_gz}"
# index reference to create .fai and .dict indices
samtools faidx "~{ref_fasta}"
picard CreateSequenceDictionary R="~{ref_fasta}" O=$(basename $(basename "~{ref_fasta}" .fasta) .fa).dict
# store input vcf file paths in file
for invcf in $(echo "~{sep=" " in_vcfs_gz}"); do
echo "$invcf" > input_vcfs.list
done
# merge
gatk3 -T CombineVariants -R "~{ref_fasta}" -V input_vcfs.list -o "~{output_prefix}.vcf" -genotypeMergeOptions UNIQUIFY
# bgzip output
bgzip "~{output_prefix}.vcf"
# tabix index the vcf to create .tbi file
tabix -p vcf "~{output_prefix}.vcf.gz"
Outputs
| Name | Type | Expression |
|---|---|---|
merged_vcf_gz
|
File
|
"~{output_prefix}.vcf.gz"
|
merged_vcf_gz_tbi
|
File
|
"~{output_prefix}.vcf.gz.tbi"
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
select_first([machine_mem_gb, 3]) + " GB"
|
cpu
|
2
|
dx_instance_type
|
"mem1_ssd1_v2_x2"
|
maxRetries
|
2
|
TASKS reconstructr
Inputs
| Name | Type | Description | Default |
|---|---|---|---|
msa_fasta
|
File
|
- | - |
ref_fasta
|
File
|
- | - |
date_csv
|
File
|
- | - |
depth_csv
|
File
|
- | - |
lofreq_vcfs
|
Array[File]+
|
- | - |
n_iters
|
Int
|
- | - |
5 optional inputs with default values |
|||
Command
set -e -o pipefail
# stage input files
mkdir -p input_data input_data/vcf input_data/coverage
/opt/reconstructR/scripts/cp_and_decompress.sh "~{msa_fasta}" input_data/aligned.fasta
/opt/reconstructR/scripts/cp_and_decompress.sh "~{ref_fasta}" input_data/ref.fasta
/opt/reconstructR/scripts/cp_and_decompress.sh "~{date_csv}" input_data/date.csv
/opt/reconstructR/scripts/cp_and_decompress.sh "~{depth_csv}" input_data/depth.csv
/opt/reconstructR/scripts/mcp_and_decompress.sh "~{sep="" "" lofreq_vcfs}" input_data/vcf
# run reconstructR
R --no-save<<CODE
library(reconstructR)
results <- run_mcmc(~{n_iters})
p <- visualize(results)
write.table(tabulate(results), quote=FALSE, sep='\t', row.names=FALSE,
file="~{out_basename}-tabulate.tsv")
write.table(decipher(results), quote=FALSE, sep='\t', row.names=FALSE,
file="~{out_basename}-decipher.tsv")
save(results, file="~{out_basename}-states.Rdata")
CODE
# compress outputs
pigz -9 -p $(nproc) "~{out_basename}-tabulate.tsv" "~{out_basename}-decipher.tsv" "~{out_basename}-states.Rdata"
# profiling and stats
cat /proc/uptime | cut -f 1 -d ' ' > UPTIME_SEC
cat /proc/loadavg > CPU_LOAD
{ if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.peak ]; then cat /sys/fs/cgroup/memory/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } > MEM_BYTES
Outputs
| Name | Type | Expression |
|---|---|---|
tabulated_tsv_gz
|
File
|
"~{out_basename}-tabulate.tsv.gz"
|
deciphered_tsv_gz
|
File
|
"~{out_basename}-decipher.tsv.gz"
|
mcmc_states_Rdata_gz
|
File
|
"~{out_basename}-states.Rdata.gz"
|
max_ram_gb
|
Int
|
ceil((read_float("MEM_BYTES") / 1000000000))
|
runtime_sec
|
Int
|
ceil(read_float("UPTIME_SEC"))
|
cpu_load
|
String
|
read_string("CPU_LOAD")
|
Runtime
| Key | Value |
|---|---|
docker
|
docker
|
memory
|
machine_mem_gb + " GB"
|
cpu
|
cpus
|
disks
|
"local-disk " + disk_size + " HDD"
|
disk
|
disk_size + " GB"
|
bootDiskSizeGb
|
50
|
dx_instance_type
|
"mem1_ssd1_v2_x4"
|
maxRetries
|
1
|