scaffold_and_refine_multitaxa
pipes/WDL/workflows/scaffold_and_refine_multitaxa.wdl

WORKFLOW scaffold_and_refine_multitaxa

File Path pipes/WDL/workflows/scaffold_and_refine_multitaxa.wdl
WDL Version 1.0
Type workflow

Imports

Namespace Path
assembly ../tasks/tasks_assembly.wdl
ncbi ../tasks/tasks_ncbi.wdl
utils ../tasks/tasks_utils.wdl
assemble_refbased assemble_refbased.wdl

Workflow: scaffold_and_refine_multitaxa

Scaffold de novo contigs against a set of possible references and subsequently polish with reads. This workflow accepts a very large set of input reference genomes. It will subset the reference genomes to those with ANI hits to the provided contigs/MAGs and cluster the reference hits by any ANI similarity to each other. It will choose the top reference from each cluster and produce one assembly for each cluster. This is intended to allow for the presence of multiple diverse viral taxa (coinfections) while forcing a choice of the best assembly from groups of related reference genomes.

Author: Broad Viral Genomics
viral-ngs@broadinstitute.org

Inputs

Name Type Description Default
sample_id String - -
sample_name String? - -
reads_unmapped_bam File - -
contigs_fasta File - -
taxid_to_ref_accessions_tsv File - -
biosample_accession String? - -
emailAddress String - -
skani_m Int? - -
skani_s Int? - -
skani_c Int? - -
skani_n Int? - -
skani_m Int? - -
skani_s Int? - -
skani_c Int? - -
nucmer_max_gap Int? - -
nucmer_min_match Int? - -
nucmer_min_cluster Int? - -
scaffold_min_contig_len Int? - -
scaffold_min_pct_contig_aligned Float? - -
machine_mem_gb Int? - -
sample_original_name String? - -
novocraft_license File? - -
trim_coords_bed File? - -
machine_mem_gb Int? - -
min_keep_length Int? - -
sliding_window Int? - -
primer_offset Int? - -
machine_mem_gb Int? - -
reheader_table File? - -
amplicon_set String? - -
max_coverage_depth Int? - -
base_q_threshold Int? - -
mapping_q_threshold Int? - -
read_length_threshold Int? - -
plotXLimits String? - -
plotYLimits String? - -
machine_mem_gb Int? - -
reheader_table File? - -
max_coverage_depth Int? - -
base_q_threshold Int? - -
mapping_q_threshold Int? - -
read_length_threshold Int? - -
plotXLimits String? - -
plotYLimits String? - -
65 optional inputs with default values

Outputs

Name Type Expression
assembly_stats_by_taxon Array[Map[String,String]] stats_by_taxon
assembly_stats_by_taxon_tsv File select_first([assembly_stats_non_empty.combined, assembly_stats_empty.combined])
assembly_method String "viral-ngs/scaffold_and_refine_multitaxa"
skani_num_ref_clusters Int length(select_references.matched_reference_clusters_fastas_tars)
skani_contigs_to_refs_dist_tsv File select_references.skani_dist_full_tsv
assembly_all_taxids Array[String] taxid
assembly_all_taxnames Array[String] tax_name
assembly_all_lengths_unambig Array[Int] assembly_length_unambiguous
assembly_all_pct_ref_cov Array[Float] percent_reference_covered
assembly_all_fastas Array[File] assembly_fasta

Calls

This workflow calls the following tasks or subworkflows:

CALL TASKS download_ref_genomes_from_tsv

Input Mappings (1)
Input Value
ref_genomes_tsv taxid_to_ref_accessions_tsv

CALL TASKS select_references

Input Mappings (2)
Input Value
reference_genomes_fastas download_ref_genomes_from_tsv.ref_genomes_fastas
contigs_fasta contigs_fasta

CALL TASKS tar_extract

Input Mappings (1)
Input Value
tar_file ref_cluster_tar

CALL TASKS scaffold

Input Mappings (6)
Input Value
reads_bam reads_unmapped_bam
contigs_fasta contigs_fasta
reference_genome_fasta tar_extract.files
min_length_fraction 0
min_unambig 0
allow_incomplete_output true

CALL TASKS tax_lookup → fetch_row_from_tsv

Input Mappings (4)
Input Value
tsv taxid_to_ref_accessions_tsv
idx_col "accessions"
idx_val sub(scaffold.scaffolding_chosen_ref_basename,"-",":")
add_header ["taxid", "isolate_prefix", "taxname", "accessions"]

CALL WORKFLOW refine → assemble_refbased

Input Mappings (3)
Input Value
reads_unmapped_bams [reads_unmapped_bam]
reference_fasta scaffold.scaffold_fasta
sample_name sample_id + "-" + taxid

CALL TASKS assembly_stats_non_empty → concatenate

Input Mappings (2)
Input Value
infiles [write_tsv([assembly_header]), write_tsv(select_all(stat_by_taxon))]
output_name "assembly_metadata-~{sample_id}.tsv"

CALL TASKS assembly_stats_empty → concatenate

Input Mappings (2)
Input Value
infiles [write_tsv([assembly_header])]
output_name "assembly_metadata-~{sample_id}.tsv"

Images

Container images used by tasks in this workflow:

🐳 Parameterized Image
⚙️ Parameterized

Configured via input:
docker

Used by 1 task:
  • download_ref_genomes_from_tsv
🐳 Parameterized Image
⚙️ Parameterized

Configured via input:
docker

Used by 2 tasks:
  • select_references
  • scaffold
🐳 viral-baseimage

quay.io/broadinstitute/viral-baseimage:0.3.0

Used by 1 task:
  • tar_extract
🐳 python

python:slim

Used by 1 task:
  • tax_lookup
🐳 ubuntu

ubuntu

Used by 2 tasks:
  • assembly_stats_non_empty
  • assembly_stats_empty
← Back to Index

scaffold_and_refine_multitaxa - Workflow Graph

🖱️ Scroll to zoom • Drag to pan • Double-click to reset • ESC to close

scaffold_and_refine_multitaxa - WDL Source Code

version 1.0

import "../tasks/tasks_assembly.wdl" as assembly
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_utils.wdl" as utils
import "assemble_refbased.wdl" as assemble_refbased

workflow scaffold_and_refine_multitaxa {
    meta {
        description: "Scaffold de novo contigs against a set of possible references and subsequently polish with reads. This workflow accepts a very large set of input reference genomes. It will subset the reference genomes to those with ANI hits to the provided contigs/MAGs and cluster the reference hits by any ANI similarity to each other. It will choose the top reference from each cluster and produce one assembly for each cluster. This is intended to allow for the presence of multiple diverse viral taxa (coinfections) while forcing a choice of the best assembly from groups of related reference genomes."
        author: "Broad Viral Genomics"
        email:  "viral-ngs@broadinstitute.org"
        allowNestedInputs: true
    }

    input {
        String  sample_id
        String? sample_name
        File    reads_unmapped_bam
        File    contigs_fasta

        File    taxid_to_ref_accessions_tsv

        String? biosample_accession

        String  table_name = "sample"
    }

    Int    min_scaffold_unambig = 300 # in base-pairs; any scaffolded assembly < this length will not be refined/polished
    String sample_original_name = select_first([sample_name, sample_id])

    # download (multi-segment) genomes for each reference, fasta filename = dash-concatenated accession list
    call ncbi.download_ref_genomes_from_tsv {
        input:
            ref_genomes_tsv = taxid_to_ref_accessions_tsv
    }

    # subset reference genomes to those with ANI hits to contigs and cluster reference hits by any ANI similarity to each other
    call assembly.select_references {
        input:
            reference_genomes_fastas = download_ref_genomes_from_tsv.ref_genomes_fastas,
            contigs_fasta = contigs_fasta
    }

    # assemble and produce stats for every reference cluster
    Array[String] assembly_header = ["entity:assembly_id", "assembly_name", "sample_id", "sample_name", "taxid", "tax_name", "tax_shortname", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "scaffolding_num_segments_recovered", "reference_num_segments_required", "reference_length", "reference_accessions", "skani_num_ref_clusters", "skani_this_cluster_num_refs", "skani_dist_tsv", "scaffolding_ani", "scaffolding_pct_ref_cov", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned", "assembly_method", "assembly_method_version", "biosample_accession", "~{table_name}"]
    scatter(ref_cluster_tar in select_references.matched_reference_clusters_fastas_tars) {

        call utils.tar_extract {
            input:
                tar_file = ref_cluster_tar
        }

        # assemble (scaffold-and-refine) genome against this reference cluster
        call assembly.scaffold {
            input:
                reads_bam = reads_unmapped_bam,
                contigs_fasta = contigs_fasta,
                reference_genome_fasta = tar_extract.files,
                min_length_fraction = 0,
                min_unambig = 0,
                allow_incomplete_output = true
        }

        # get taxid and taxname from taxid_to_ref_accessions_tsv
        call utils.fetch_row_from_tsv as tax_lookup {
            input:
                tsv = taxid_to_ref_accessions_tsv,
                idx_col = "accessions",
                idx_val = sub(scaffold.scaffolding_chosen_ref_basename, "-", ":"),
                add_header = ["taxid", "isolate_prefix", "taxname", "accessions"]
        }
        String taxid = tax_lookup.map["taxid"]
        String tax_name = tax_lookup.map["taxname"]
        String isolate_prefix = tax_lookup.map["isolate_prefix"]

        # polish de novo assembly with reads
        if (scaffold.assembly_preimpute_length_unambiguous > min_scaffold_unambig) {
            call assemble_refbased.assemble_refbased as refine {
                input:
                    reads_unmapped_bams  = [reads_unmapped_bam],
                    reference_fasta      = scaffold.scaffold_fasta,
                    sample_name          = sample_id + "-" + taxid
            }
        }

        # build output tsv row
        Int    assembly_length_unambiguous = select_first([refine.assembly_length_unambiguous, 0])
        Float  percent_reference_covered = 1.0 * assembly_length_unambiguous / scaffold.reference_length
        File   assembly_fasta = select_first([refine.assembly_fasta, scaffold.intermediate_gapfill_fasta])
        Map[String, String] stats_by_taxon = {
            "entity:assembly_id" : sample_id + "-" + taxid,
            "assembly_name" :      tax_name + ": " + sample_original_name,
            "sample_id" :          sample_id,
            "sample_name" :        sample_original_name,
            "taxid" :              taxid,
            "tax_name" :           tax_name,
            "tax_shortname" :      isolate_prefix,

            "assembly_fasta" :              assembly_fasta,
            "aligned_only_reads_bam" :      select_first([refine.align_to_self_merged_aligned_only_bam, ""]),
            "coverage_plot" :               select_first([refine.align_to_self_merged_coverage_plot, ""]),
            "assembly_length" :             select_first([refine.assembly_length, "0"]),
            "assembly_length_unambiguous" : assembly_length_unambiguous,
            "reads_aligned" :               select_first([refine.align_to_self_merged_reads_aligned, "0"]),
            "mean_coverage" :               select_first([refine.align_to_self_merged_mean_coverage, "0"]),
            "percent_reference_covered" :   percent_reference_covered,
            "scaffolding_num_segments_recovered" : scaffold.assembly_num_segments_recovered,
            "reference_num_segments_required" : scaffold.reference_num_segments_required,
            "reference_length" :            scaffold.reference_length,
            "reference_accessions" :        tax_lookup.map["accessions"],

            "skani_num_ref_clusters" :      length(select_references.matched_reference_clusters_fastas_tars),
            "skani_this_cluster_num_refs" : length(tar_extract.files),
            "skani_dist_tsv" :              scaffold.scaffolding_stats,
            "scaffolding_ani" :             scaffold.skani_ani,
            "scaffolding_pct_ref_cov" :     scaffold.skani_ref_aligned_frac,

            "intermediate_gapfill_fasta" :            scaffold.intermediate_gapfill_fasta,
            "assembly_preimpute_length_unambiguous" : scaffold.assembly_preimpute_length_unambiguous,

            "replicate_concordant_sites" :  select_first([refine.replicate_concordant_sites, "0"]),
            "replicate_discordant_snps" :   select_first([refine.replicate_discordant_snps, "0"]),
            "replicate_discordant_indels" : select_first([refine.replicate_discordant_indels, "0"]),
            "replicate_discordant_vcf" :    select_first([refine.replicate_discordant_vcf, ""]),

            "isnvsFile" :          select_first([refine.align_to_self_isnvs_vcf, ""]),
            "aligned_bam" :        select_first([refine.align_to_self_merged_aligned_only_bam, ""]),
            "coverage_tsv" :       select_first([refine.align_to_self_merged_coverage_tsv, ""]),
            "read_pairs_aligned" : select_first([refine.align_to_self_merged_read_pairs_aligned, "0"]),
            "bases_aligned" :      select_first([refine.align_to_self_merged_bases_aligned, "0"]),

            "assembly_method" :         "viral-ngs/assemble_denovo",
            "assembly_method_version" : scaffold.viralngs_version,

            "biosample_accession" :     select_first([biosample_accession, ""]),

            "~{table_name}":            '{"entityType":"~{table_name}","entityName":"' + sample_id + '"}'
        }

        if(assembly_length_unambiguous > min_scaffold_unambig) {
            scatter(h in assembly_header) {
                String stat_by_taxon = stats_by_taxon[h]
            }
        }
    }

    ### summary stats
    if (length(select_all(stat_by_taxon)) > 0) {
        call utils.concatenate as assembly_stats_non_empty {
            input:
                infiles     = [write_tsv([assembly_header]), write_tsv(select_all(stat_by_taxon))],
                output_name = "assembly_metadata-~{sample_id}.tsv"
        }
    }
    if (length(select_all(stat_by_taxon)) == 0) {
        call utils.concatenate as assembly_stats_empty {
            input:
                infiles     = [write_tsv([assembly_header])],
                output_name = "assembly_metadata-~{sample_id}.tsv"
        }
    }

    output {
        Array[Map[String,String]] assembly_stats_by_taxon  = stats_by_taxon
        File   assembly_stats_by_taxon_tsv                 = select_first([assembly_stats_non_empty.combined, assembly_stats_empty.combined])
        String assembly_method                             = "viral-ngs/scaffold_and_refine_multitaxa"

        Int    skani_num_ref_clusters              = length(select_references.matched_reference_clusters_fastas_tars)
        File   skani_contigs_to_refs_dist_tsv      = select_references.skani_dist_full_tsv

        Array[String] assembly_all_taxids          = taxid
        Array[String] assembly_all_taxnames        = tax_name
        Array[Int]    assembly_all_lengths_unambig = assembly_length_unambiguous
        Array[Float]  assembly_all_pct_ref_cov     = percent_reference_covered
        Array[File]   assembly_all_fastas          = assembly_fasta
    }
}