WORKFLOW
assemble_denovo_metagenomic
File Path
pipes/WDL/workflows/assemble_denovo_metagenomic.wdl
WDL Version
1.0
Type
workflow
Imports
Namespace
Path
assembly
../tasks/tasks_assembly.wdl
metagenomics
../tasks/tasks_metagenomics.wdl
ncbi
../tasks/tasks_ncbi.wdl
read_utils
../tasks/tasks_read_utils.wdl
reports
../tasks/tasks_reports.wdl
utils
../tasks/tasks_utils.wdl
assemble_refbased
assemble_refbased.wdl
Workflow: assemble_denovo_metagenomic
Performs viral de novo assembly on metagenomic reads against a large range of possible reference genomes. Runs raw reads through taxonomic classification (Kraken2), human read depletion (based on Kraken2), de novo assembly (SPAdes), and FASTQC/multiQC of reads. Scaffold de novo contigs against a set of possible references and subsequently polish with reads. This workflow accepts a very large set of input reference genomes. It will subset the reference genomes to those with ANI hits to the provided contigs/MAGs and cluster the reference hits by any ANI similarity to each other. It will choose the top reference from each cluster and produce one assembly for each cluster. This is intended to allow for the presence of multiple diverse viral taxa (coinfections) while forcing a choice of the best assembly from groups of related reference genomes.
Author: Broad Viral Genomics
Name
Type
Description
Default
sample_id
String
-
-
sample_name
String?
-
-
biosample_accession
String?
-
-
reads_bams
Array[File]+
Reads to classify. May be unmapped or mapped or both, paired-end or single-end. Multiple input files will be merged first.
-
batch_id_list
Array[String]
-
-
ncbi_taxdump_tgz
File
An NCBI taxdump.tar.gz file that contains, at the minimum, a nodes.dmp and names.dmp file.
-
spikein_db
File?
ERCC spike-in sequences
-
trim_clip_db
File
Adapter sequences to remove via trimmomatic prior to SPAdes assembly
-
kraken2_db_tgz
File
Pre-built Kraken database tarball containing three files: hash.k2d, opts.k2d, and taxo.k2d.
-
krona_taxonomy_db_kraken2_tgz
File
Krona taxonomy database containing a single file: taxonomy.tab, or possibly just a compressed taxonomy.tab
-
taxid_to_ref_accessions_tsv
File
-
-
sample_name
String?
-
-
reheader_table
File?
-
-
machine_mem_gb
Int?
-
-
min_base_qual
Int?
-
-
taxonomic_ids
Array[Int]?
-
-
minimum_hit_groups
Int?
-
-
taxonomic_ids
Array[Int]?
-
-
minimum_hit_groups
Int?
-
-
spades_min_contig_len
Int?
-
-
spades_options
String?
-
-
machine_mem_gb
Int?
-
-
emailAddress
String
-
-
skani_m
Int?
-
-
skani_s
Int?
-
-
skani_c
Int?
-
-
skani_n
Int?
-
-
skani_m
Int?
-
-
skani_s
Int?
-
-
skani_c
Int?
-
-
nucmer_max_gap
Int?
-
-
nucmer_min_match
Int?
-
-
nucmer_min_cluster
Int?
-
-
scaffold_min_contig_len
Int?
-
-
scaffold_min_pct_contig_aligned
Float?
-
-
machine_mem_gb
Int?
-
-
sample_original_name
String?
-
-
novocraft_license
File?
-
-
trim_coords_bed
File?
-
-
machine_mem_gb
Int?
-
-
min_keep_length
Int?
-
-
sliding_window
Int?
-
-
primer_offset
Int?
-
-
machine_mem_gb
Int?
-
-
reheader_table
File?
-
-
amplicon_set
String?
-
-
max_coverage_depth
Int?
-
-
base_q_threshold
Int?
-
-
mapping_q_threshold
Int?
-
-
read_length_threshold
Int?
-
-
plotXLimits
String?
-
-
plotYLimits
String?
-
-
machine_mem_gb
Int?
-
-
reheader_table
File?
-
-
max_coverage_depth
Int?
-
-
base_q_threshold
Int?
-
-
mapping_q_threshold
Int?
-
-
read_length_threshold
Int?
-
-
plotXLimits
String?
-
-
plotYLimits
String?
-
-
98 optional inputs with default values
taxa_to_dehost
Array[String]
-
["Vertebrata"]
taxa_to_avoid_assembly
Array[String]
-
["Vertebrata", "other sequences", "Bacteria"]
table_name
String
-
"sample"
separator
String
-
","
out_basename
String
-
basename(in_bams[0],".bam")
run_fastqc
Boolean
-
false
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
disk_size
Int
-
750
machine_mem_gb
Int
-
4
topNHits
Int
-
3
filter_bam_to_proper_primary_mapped_reads
Boolean
-
true
do_not_require_proper_mapped_pairs_when_filtering
Boolean
-
false
keep_singletons_when_filtering
Boolean
-
false
keep_duplicates_when_filtering
Boolean
-
false
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
confidence_threshold
Float?
-
0.05
machine_mem_gb
Int
-
90
docker
String
-
"quay.io/broadinstitute/viral-classify:2.5.1.0"
withoutChildren
Boolean
-
false
run_fastqc
Boolean
-
false
machine_mem_gb
Int
-
26 + 10 * ceil(size(classified_reads_txt_gz,"GB"))
docker
String
-
"quay.io/broadinstitute/viral-classify:2.5.1.0"
withoutChildren
Boolean
-
false
run_fastqc
Boolean
-
false
machine_mem_gb
Int
-
26 + 10 * ceil(size(classified_reads_txt_gz,"GB"))
docker
String
-
"quay.io/broadinstitute/viral-classify:2.5.1.0"
method
String
-
"mvicuna"
machine_mem_gb
Int
-
7
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
spades_n_reads
Int
-
10000000
sample_name
String
-
basename(basename(reads_unmapped_bam,".bam"),".taxfilt")
docker
String
-
"quay.io/broadinstitute/viral-assemble:2.5.1.0"
focal_taxon
String
-
"Viruses"
docker
String
-
"quay.io/broadinstitute/viral-classify:2.5.1.0"
docker
String
-
"quay.io/broadinstitute/viral-phylo:2.5.1.0"
docker
String
-
"quay.io/broadinstitute/viral-assemble:2.5.1.0"
machine_mem_gb
Int
-
4
cpu
Int
-
2
disk_size
Int
-
100
disk_size
Int
-
375
tar_opts
String
-
"-z"
aligner
String
-
"muscle"
replace_length
Int
-
55
docker
String
-
"quay.io/broadinstitute/viral-assemble:2.5.1.0"
sample_name
String
-
basename(basename(contigs_fasta,".fasta"),".assembly1-spades")
set_default_keys
Array[String]
-
[]
aligner
String
-
"minimap2"
min_coverage
Int
-
3
major_cutoff
Float
-
0.75
skip_mark_dupes
Boolean
-
false
align_to_ref_options
Map[String,String]
-
{"novoalign": "-r Random -l 40 -g 40 -x 20 -t 501 -k", "bwa": "-k 12 -B 1", "minimap2": ""}
align_to_self_options
Map[String,String]
-
{"novoalign": "-r Random -l 40 -g 40 -x 20 -t 100", "bwa": "", "minimap2": ""}
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
sample_name
String
-
basename(basename(basename(reads_unmapped_bam,".bam"),".taxfilt"),".clean")
min_quality
Int?
-
1
docker
String
-
"andersenlabapps/ivar:1.3.1"
bam_basename
String
-
basename(aligned_bam,".bam")
disk_size
Int
-
375
run_fastqc
Boolean
-
false
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
disk_size
Int
-
750
machine_mem_gb
Int
-
4
out_basename
String
-
basename(aligned_bam,'.bam')
docker
String
-
"quay.io/broadinstitute/viral-phylo:2.5.1.0"
max_amp_len
Int
-
5000
max_amplicons
Int
-
500
machine_mem_gb
Int
-
32
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
skip_mark_dupes
Boolean
-
false
plot_only_non_duplicates
Boolean
-
false
bin_large_plots
Boolean
-
false
binning_summary_statistic
String?
-
"max"
plot_width_pixels
Int?
-
1100
plot_height_pixels
Int?
-
850
plot_pixels_per_inch
Int?
-
100
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
mark_duplicates
Boolean
-
false
machine_mem_gb
Int
-
15
docker
String
-
"quay.io/broadinstitute/viral-assemble:2.5.1.0"
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
sample_name
String
-
basename(basename(basename(reads_unmapped_bam,".bam"),".taxfilt"),".clean")
run_fastqc
Boolean
-
false
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
disk_size
Int
-
750
machine_mem_gb
Int
-
4
out_basename
String
-
basename(aligned_bam,'.bam')
docker
String
-
"quay.io/broadinstitute/viral-phylo:2.5.1.0"
skip_mark_dupes
Boolean
-
false
plot_only_non_duplicates
Boolean
-
false
bin_large_plots
Boolean
-
false
binning_summary_statistic
String?
-
"max"
plot_width_pixels
Int?
-
1100
plot_height_pixels
Int?
-
850
plot_pixels_per_inch
Int?
-
100
docker
String
-
"quay.io/broadinstitute/viral-core:2.5.1"
cpus
Int
-
4
cpus
Int
-
4
Outputs
Name
Type
Expression
cleaned_reads_unaligned_bam
File
deplete.bam_filtered_to_taxa
deduplicated_reads_unaligned
File
rmdup_ubam.dedup_bam
contigs_fasta
File
spades.contigs_fasta
read_counts_raw
Int
deplete.classified_taxonomic_filter_read_count_pre
read_counts_depleted
Int
deplete.classified_taxonomic_filter_read_count_post
read_counts_dedup
Int
rmdup_ubam.dedup_read_count_post
read_counts_prespades_subsample
Int
spades.subsample_read_count
kraken2_summary_report
File
kraken2.kraken2_summary_report
kraken2_reads_report
File
kraken2.kraken2_reads_report
kraken2_krona_plot
File
kraken2.krona_report_html
kraken2_top_taxa_report
File
report_primary_kraken_taxa.ranked_focal_report
kraken2_focal_taxon_name
String
report_primary_kraken_taxa.focal_tax_name
kraken2_focal_total_reads
Int
report_primary_kraken_taxa.total_focal_reads
kraken2_top_taxon_id
String
report_primary_kraken_taxa.tax_id
kraken2_top_taxon_name
String
report_primary_kraken_taxa.tax_name
kraken2_top_taxon_rank
String
report_primary_kraken_taxa.tax_rank
kraken2_top_taxon_num_reads
Int
report_primary_kraken_taxa.num_reads
kraken2_top_taxon_pct_of_focal
Float
report_primary_kraken_taxa.percent_of_focal
raw_fastqc
File?
merge_raw_reads.fastqc
cleaned_fastqc
File?
deplete.fastqc_html_report
spikein_report
File?
spikein.report
spikein_tophit
String?
spikein.top_hit_id
spikein_pct_of_total_reads
String?
spikein.pct_total_reads_mapped
spikein_pct_lesser_hits
String?
spikein.pct_lesser_hits_of_mapped
viral_classify_version
String
kraken2.viralngs_version
viral_assemble_version
String
spades.viralngs_version
assembly_stats_by_taxon
Array[Map[String,String]]
stats_by_taxon
assembly_stats_by_taxon_tsv
File
select_first([assembly_stats_non_empty.combined, assembly_stats_empty.combined])
assembly_method
String
"viral-ngs/scaffold_and_refine_multitaxa"
skani_num_ref_clusters
Int
length(select_references.matched_reference_clusters_fastas_tars)
skani_contigs_to_refs_dist_tsv
File
select_references.skani_dist_full_tsv
assembly_all_taxids
Array[String]
taxid
assembly_all_taxnames
Array[String]
tax_name
assembly_all_lengths_unambig
Array[Int]
assembly_length_unambiguous
assembly_all_pct_ref_cov
Array[Float]
percent_reference_covered
assembly_all_fastas
Array[File]
assembly_fasta
batch_ids
String
unique_batch_ids.sorted_unique_joined
Calls
This workflow calls the following tasks or subworkflows:
Input Mappings (1)
Input
Value
strings
batch_id_list
CALL
TASKS
merge_raw_reads
↗
→ merge_and_reheader_bams
Input Mappings (1)
Input
Value
in_bams
reads_bams
CALL
TASKS
spikein
↗
→ align_and_count
Input Mappings (2)
Input
Value
reads_bam
reads_bam
ref_db
select_first([spikein_db])
Input Mappings (3)
Input
Value
reads_bam
reads_bam
kraken2_db_tgz
kraken2_db_tgz
krona_taxonomy_db_tgz
krona_taxonomy_db_kraken2_tgz
CALL
TASKS
deplete
↗
→ filter_bam_to_taxa
Input Mappings (6)
Input
Value
classified_bam
reads_bam
classified_reads_txt_gz
kraken2.kraken2_reads_report
ncbi_taxonomy_db_tgz
ncbi_taxdump_tgz
exclude_taxa
true
taxonomic_names
taxa_to_dehost
out_filename_suffix
"hs_depleted"
Input Mappings (6)
Input
Value
classified_bam
reads_bam
classified_reads_txt_gz
kraken2.kraken2_reads_report
ncbi_taxonomy_db_tgz
ncbi_taxdump_tgz
exclude_taxa
true
taxonomic_names
taxa_to_avoid_assembly
out_filename_suffix
"acellular"
Input Mappings (1)
Input
Value
reads_unmapped_bam
filter_acellular.bam_filtered_to_taxa
CALL
TASKS
spades
↗
→ assemble
Input Mappings (3)
Input
Value
reads_unmapped_bam
rmdup_ubam.dedup_bam
trim_clip_db
trim_clip_db
always_succeed
true
Input Mappings (1)
Input
Value
kraken_summary_report
kraken2.kraken2_summary_report
Input Mappings (1)
Input
Value
ref_genomes_tsv
taxid_to_ref_accessions_tsv
Input Mappings (2)
Input
Value
reference_genomes_fastas
download_ref_genomes_from_tsv.ref_genomes_fastas
contigs_fasta
spades.contigs_fasta
Input Mappings (1)
Input
Value
tar_file
ref_cluster_tar
Input Mappings (6)
Input
Value
reads_bam
deplete.bam_filtered_to_taxa
contigs_fasta
spades.contigs_fasta
reference_genome_fasta
tar_extract.files
min_length_fraction
0
min_unambig
0
allow_incomplete_output
true
CALL
TASKS
tax_lookup
↗
→ fetch_row_from_tsv
Input Mappings (4)
Input
Value
tsv
taxid_to_ref_accessions_tsv
idx_col
"accessions"
idx_val
sub(scaffold.scaffolding_chosen_ref_basename,"-",":")
add_header
["tax_id", "isolate_prefix", "taxname", "accessions"]
CALL
WORKFLOW
refine
↗
→ assemble_refbased
Input Mappings (3)
Input
Value
reads_unmapped_bams
[deplete.bam_filtered_to_taxa]
reference_fasta
scaffold.scaffold_fasta
sample_name
sample_id + "-" + taxid
Input Mappings (2)
Input
Value
infiles
[write_tsv([assembly_header]), write_tsv(select_all(stat_by_taxon))]
output_name
"assembly_metadata-~{sample_id}.tsv"
Input Mappings (2)
Input
Value
infiles
[write_tsv([assembly_header])]
output_name
"assembly_metadata-~{sample_id}.tsv"
Images
Container images used by tasks in this workflow:
python:slim
Used by 2 tasks:
unique_batch_ids
tax_lookup
⚙️ Parameterized
Configured via input:
docker
Used by 2 tasks:
merge_raw_reads
rmdup_ubam
⚙️ Parameterized
Configured via input:
docker
Used by 4 tasks:
kraken2
deplete
filter_acellular
report_primary_kraken_taxa
⚙️ Parameterized
Configured via input:
docker
Used by 3 tasks:
spades
select_references
scaffold
⚙️ Parameterized
Configured via input:
docker
Used by 1 task:
download_ref_genomes_from_tsv
quay.io/broadinstitute/viral-baseimage:0.3.0
ubuntu
Used by 2 tasks:
assembly_stats_non_empty
assembly_stats_empty
Zoom In
Zoom Out
Fit
Reset
🖱️ Scroll to zoom • Drag to pan • Double-click to reset • ESC to close
flowchart TD
Start([assemble_denovo_metagenomic])
N1["unique_batch_idsunique_strings "]
N2["merge_raw_readsmerge_and_reheader_bams "]
subgraph C1 ["↔️ if defined(spikein_db)"]
direction TB
N3["spikeinalign_and_count "]
end
N4["kraken2"]
N5["depletefilter_bam_to_taxa "]
N6["filter_acellularfilter_bam_to_taxa "]
N7["rmdup_ubam"]
N8["spadesassemble "]
N9["report_primary_kraken_taxa"]
N10["download_ref_genomes_from_tsv"]
N11["select_references"]
subgraph S1 ["🔃 scatter ref_cluster_tar in select_references.matched_reference_clusters_fastas_tars"]
direction TB
N12["tar_extract"]
N13["scaffold"]
N14["tax_lookupfetch_row_from_tsv "]
subgraph C2 ["↔️ if scaffold.assembly_preimpute_length_unambiguous > min_scaffold_unambig"]
direction TB
N15[/"refineassemble_refbased "/]
end
end
subgraph C3 ["↔️ if length(select_all(stat_by_taxon)) > 0"]
direction TB
N16["assembly_stats_non_emptyconcatenate "]
end
subgraph C4 ["↔️ if length(select_all(stat_by_taxon)) == 0"]
direction TB
N17["assembly_stats_emptyconcatenate "]
end
N2 --> N3
N2 --> N4
N2 --> N5
N4 --> N5
N2 --> N6
N4 --> N6
N6 --> N7
N7 --> N8
N4 --> N9
N8 --> N11
N10 --> N11
N11 --> N12
N8 --> N13
N12 --> N13
N5 --> N13
N11 --> N13
N13 --> N14
N11 --> N14
N13 --> N15
N14 --> N15
N11 --> N15
N5 --> N15
Start --> N1
Start --> N2
Start --> N10
Start --> N16
Start --> N17
N15 --> End([End])
N1 --> End([End])
N9 --> End([End])
N17 --> End([End])
N3 --> End([End])
N16 --> End([End])
classDef taskNode fill:#a371f7,stroke:#8b5cf6,stroke-width:2px,color:#fff
classDef workflowNode fill:#58a6ff,stroke:#1f6feb,stroke-width:2px,color:#fff
version 1.0
import "../tasks/tasks_assembly.wdl" as assembly
import "../tasks/tasks_metagenomics.wdl" as metagenomics
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_read_utils.wdl" as read_utils
import "../tasks/tasks_reports.wdl" as reports
import "../tasks/tasks_utils.wdl" as utils
import "assemble_refbased.wdl" as assemble_refbased
workflow assemble_denovo_metagenomic {
meta {
description: "Performs viral de novo assembly on metagenomic reads against a large range of possible reference genomes. Runs raw reads through taxonomic classification (Kraken2), human read depletion (based on Kraken2), de novo assembly (SPAdes), and FASTQC/multiQC of reads. Scaffold de novo contigs against a set of possible references and subsequently polish with reads. This workflow accepts a very large set of input reference genomes. It will subset the reference genomes to those with ANI hits to the provided contigs/MAGs and cluster the reference hits by any ANI similarity to each other. It will choose the top reference from each cluster and produce one assembly for each cluster. This is intended to allow for the presence of multiple diverse viral taxa (coinfections) while forcing a choice of the best assembly from groups of related reference genomes."
author: "Broad Viral Genomics"
email: "viral-ngs@broadinstitute.org"
allowNestedInputs: true
}
input {
String sample_id
String? sample_name
String? biosample_accession
Array[File]+ reads_bams
Array[String] batch_id_list
File ncbi_taxdump_tgz
File? spikein_db
File trim_clip_db
File kraken2_db_tgz
File krona_taxonomy_db_kraken2_tgz
File taxid_to_ref_accessions_tsv
Array[String] taxa_to_dehost = ["Vertebrata"]
Array[String] taxa_to_avoid_assembly = ["Vertebrata", "other sequences", "Bacteria"]
String table_name = "sample"
}
Int min_scaffold_unambig = 300 # in base-pairs; any scaffolded assembly < this length will not be refined/polished
String sample_original_name = select_first([sample_name, sample_id])
parameter_meta {
reads_bams: {
description: "Reads to classify. May be unmapped or mapped or both, paired-end or single-end. Multiple input files will be merged first.",
patterns: ["*.bam"]
}
spikein_db: {
description: "ERCC spike-in sequences",
patterns: ["*.fasta", "*.fasta.gz", "*.fasta.zst"]
}
trim_clip_db: {
description: "Adapter sequences to remove via trimmomatic prior to SPAdes assembly",
patterns: ["*.fasta", "*.fasta.gz", "*.fasta.zst"]
}
kraken2_db_tgz: {
description: "Pre-built Kraken database tarball containing three files: hash.k2d, opts.k2d, and taxo.k2d.",
patterns: ["*.tar.gz", "*.tar.lz4", "*.tar.bz2", "*.tar.zst"]
}
krona_taxonomy_db_kraken2_tgz: {
description: "Krona taxonomy database containing a single file: taxonomy.tab, or possibly just a compressed taxonomy.tab",
patterns: ["*.tab.zst", "*.tab.gz", "*.tab", "*.tar.gz", "*.tar.lz4", "*.tar.bz2", "*.tar.zst"]
}
ncbi_taxdump_tgz: {
description: "An NCBI taxdump.tar.gz file that contains, at the minimum, a nodes.dmp and names.dmp file.",
patterns: ["*.tar.gz", "*.tar.lz4", "*.tar.bz2", "*.tar.zst"]
}
cleaned_fastqc: {
description: "Output cleaned fastqc reports in HTML.",
category: "other"
}
deduplicated_reads_unaligned: {
description: "Deduplication on unaligned reads in BAM format using mvicuna or cdhit.",
category: "other"
}
kraken2_krona_plot: {
description:"Visualize the results of the Kraken2 analysis with Krona, which disaplys taxonmic hierarchiral data in multi-layerd pie.",
category:"other"
}
kraken2_summary_report: {
description: "Kraken report output file.",
category: "other"
}
raw_fastqc:{
description: "Merged raw fastqc reads.",
category: "other"
}
}
call utils.unique_strings as unique_batch_ids {
input:
strings = batch_id_list
}
call read_utils.merge_and_reheader_bams as merge_raw_reads {
input:
in_bams = reads_bams
}
File reads_bam = merge_raw_reads.out_bam
if(defined(spikein_db)) {
call reports.align_and_count as spikein {
input:
reads_bam = reads_bam,
ref_db = select_first([spikein_db])
}
}
call metagenomics.kraken2 as kraken2 {
input:
reads_bam = reads_bam,
kraken2_db_tgz = kraken2_db_tgz,
krona_taxonomy_db_tgz = krona_taxonomy_db_kraken2_tgz
}
call metagenomics.filter_bam_to_taxa as deplete {
input:
classified_bam = reads_bam,
classified_reads_txt_gz = kraken2.kraken2_reads_report,
ncbi_taxonomy_db_tgz = ncbi_taxdump_tgz,
exclude_taxa = true,
taxonomic_names = taxa_to_dehost,
out_filename_suffix = "hs_depleted"
}
call metagenomics.filter_bam_to_taxa as filter_acellular {
input:
classified_bam = reads_bam,
classified_reads_txt_gz = kraken2.kraken2_reads_report,
ncbi_taxonomy_db_tgz = ncbi_taxdump_tgz,
exclude_taxa = true,
taxonomic_names = taxa_to_avoid_assembly,
out_filename_suffix = "acellular"
}
call read_utils.rmdup_ubam {
input:
reads_unmapped_bam = filter_acellular.bam_filtered_to_taxa
}
call assembly.assemble as spades {
input:
reads_unmapped_bam = rmdup_ubam.dedup_bam,
trim_clip_db = trim_clip_db,
always_succeed = true
}
call metagenomics.report_primary_kraken_taxa {
input:
kraken_summary_report = kraken2.kraken2_summary_report
}
# download (multi-segment) genomes for each reference, fasta filename = dash-concatenated accession list
call ncbi.download_ref_genomes_from_tsv {
input:
ref_genomes_tsv = taxid_to_ref_accessions_tsv
}
# subset reference genomes to those with ANI hits to contigs and cluster reference hits by any ANI similarity to each other
call assembly.select_references {
input:
reference_genomes_fastas = download_ref_genomes_from_tsv.ref_genomes_fastas,
contigs_fasta = spades.contigs_fasta
}
# assemble and produce stats for every reference cluster
Array[String] assembly_header = ["entity:assembly_id", "assembly_name", "sample_id", "sample_name", "taxid", "tax_name", "tax_shortname", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "scaffolding_num_segments_recovered", "reference_num_segments_required", "reference_length", "reference_accessions", "skani_num_ref_clusters", "skani_this_cluster_num_refs", "skani_dist_tsv", "scaffolding_ani", "scaffolding_pct_ref_cov", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned", "assembly_method", "assembly_method_version", "biosample_accession", "batch_ids", "~{table_name}"]
scatter(ref_cluster_tar in select_references.matched_reference_clusters_fastas_tars) {
call utils.tar_extract {
input:
tar_file = ref_cluster_tar
}
# assemble (scaffold-and-refine) genome against this reference cluster
call assembly.scaffold {
input:
reads_bam = deplete.bam_filtered_to_taxa,
contigs_fasta = spades.contigs_fasta,
reference_genome_fasta = tar_extract.files,
min_length_fraction = 0,
min_unambig = 0,
allow_incomplete_output = true
}
# get taxid and taxname from taxid_to_ref_accessions_tsv
call utils.fetch_row_from_tsv as tax_lookup {
input:
tsv = taxid_to_ref_accessions_tsv,
idx_col = "accessions",
idx_val = sub(scaffold.scaffolding_chosen_ref_basename, "-", ":"),
add_header = ["tax_id", "isolate_prefix", "taxname", "accessions"]
}
String taxid = tax_lookup.map["tax_id"]
String tax_name = tax_lookup.map["taxname"]
String isolate_prefix = tax_lookup.map["isolate_prefix"]
# polish de novo assembly with reads
if (scaffold.assembly_preimpute_length_unambiguous > min_scaffold_unambig) {
call assemble_refbased.assemble_refbased as refine {
input:
reads_unmapped_bams = [deplete.bam_filtered_to_taxa],
reference_fasta = scaffold.scaffold_fasta,
sample_name = sample_id + "-" + taxid
}
}
# build output tsv row
Int assembly_length_unambiguous = select_first([refine.assembly_length_unambiguous, 0])
Float percent_reference_covered = 1.0 * assembly_length_unambiguous / scaffold.reference_length
File assembly_fasta = select_first([refine.assembly_fasta, scaffold.intermediate_gapfill_fasta])
Map[String, String] stats_by_taxon = {
"entity:assembly_id" : sample_id + "-" + taxid,
"assembly_name" : tax_name + ": " + sample_original_name,
"sample_id" : sample_id,
"sample_name" : sample_original_name,
"taxid" : taxid,
"tax_name" : tax_name,
"tax_shortname" : isolate_prefix,
"assembly_fasta" : assembly_fasta,
"aligned_only_reads_bam" : select_first([refine.align_to_self_merged_aligned_only_bam, ""]),
"coverage_plot" : select_first([refine.align_to_self_merged_coverage_plot, ""]),
"assembly_length" : select_first([refine.assembly_length, "0"]),
"assembly_length_unambiguous" : assembly_length_unambiguous,
"reads_aligned" : select_first([refine.align_to_self_merged_reads_aligned, "0"]),
"mean_coverage" : select_first([refine.align_to_self_merged_mean_coverage, "0"]),
"percent_reference_covered" : percent_reference_covered,
"scaffolding_num_segments_recovered" : scaffold.assembly_num_segments_recovered,
"reference_num_segments_required" : scaffold.reference_num_segments_required,
"reference_length" : scaffold.reference_length,
"reference_accessions" : tax_lookup.map["accessions"],
"skani_num_ref_clusters" : length(select_references.matched_reference_clusters_fastas_tars),
"skani_this_cluster_num_refs" : length(tar_extract.files),
"skani_dist_tsv" : scaffold.scaffolding_stats,
"scaffolding_ani" : scaffold.skani_ani,
"scaffolding_pct_ref_cov" : scaffold.skani_ref_aligned_frac,
"intermediate_gapfill_fasta" : scaffold.intermediate_gapfill_fasta,
"assembly_preimpute_length_unambiguous" : scaffold.assembly_preimpute_length_unambiguous,
"replicate_concordant_sites" : select_first([refine.replicate_concordant_sites, "0"]),
"replicate_discordant_snps" : select_first([refine.replicate_discordant_snps, "0"]),
"replicate_discordant_indels" : select_first([refine.replicate_discordant_indels, "0"]),
"replicate_discordant_vcf" : select_first([refine.replicate_discordant_vcf, ""]),
"isnvsFile" : select_first([refine.align_to_self_isnvs_vcf, ""]),
"aligned_bam" : select_first([refine.align_to_self_merged_aligned_only_bam, ""]),
"coverage_tsv" : select_first([refine.align_to_self_merged_coverage_tsv, ""]),
"read_pairs_aligned" : select_first([refine.align_to_self_merged_read_pairs_aligned, "0"]),
"bases_aligned" : select_first([refine.align_to_self_merged_bases_aligned, "0"]),
"assembly_method" : "viral-ngs/assemble_denovo",
"assembly_method_version" : scaffold.viralngs_version,
"biosample_accession" : select_first([biosample_accession, ""]),
"batch_ids" : unique_batch_ids.sorted_unique_joined,
"~{table_name}": '{"entityType":"~{table_name}","entityName":"' + sample_id + '"}'
}
if(assembly_length_unambiguous > min_scaffold_unambig) {
scatter(h in assembly_header) {
String stat_by_taxon = stats_by_taxon[h]
}
}
}
### summary stats
if (length(select_all(stat_by_taxon)) > 0) {
call utils.concatenate as assembly_stats_non_empty {
input:
infiles = [write_tsv([assembly_header]), write_tsv(select_all(stat_by_taxon))],
output_name = "assembly_metadata-~{sample_id}.tsv"
}
}
if (length(select_all(stat_by_taxon)) == 0) {
call utils.concatenate as assembly_stats_empty {
input:
infiles = [write_tsv([assembly_header])],
output_name = "assembly_metadata-~{sample_id}.tsv"
}
}
output {
File cleaned_reads_unaligned_bam = deplete.bam_filtered_to_taxa
File deduplicated_reads_unaligned = rmdup_ubam.dedup_bam
File contigs_fasta = spades.contigs_fasta
Int read_counts_raw = deplete.classified_taxonomic_filter_read_count_pre
Int read_counts_depleted = deplete.classified_taxonomic_filter_read_count_post
Int read_counts_dedup = rmdup_ubam.dedup_read_count_post
Int read_counts_prespades_subsample = spades.subsample_read_count
File kraken2_summary_report = kraken2.kraken2_summary_report
File kraken2_reads_report = kraken2.kraken2_reads_report
File kraken2_krona_plot = kraken2.krona_report_html
File kraken2_top_taxa_report = report_primary_kraken_taxa.ranked_focal_report
String kraken2_focal_taxon_name = report_primary_kraken_taxa.focal_tax_name
Int kraken2_focal_total_reads = report_primary_kraken_taxa.total_focal_reads
String kraken2_top_taxon_id = report_primary_kraken_taxa.tax_id
String kraken2_top_taxon_name = report_primary_kraken_taxa.tax_name
String kraken2_top_taxon_rank = report_primary_kraken_taxa.tax_rank
Int kraken2_top_taxon_num_reads = report_primary_kraken_taxa.num_reads
Float kraken2_top_taxon_pct_of_focal = report_primary_kraken_taxa.percent_of_focal
File? raw_fastqc = merge_raw_reads.fastqc
File? cleaned_fastqc = deplete.fastqc_html_report
File? spikein_report = spikein.report
String? spikein_tophit = spikein.top_hit_id
String? spikein_pct_of_total_reads = spikein.pct_total_reads_mapped
String? spikein_pct_lesser_hits = spikein.pct_lesser_hits_of_mapped
String viral_classify_version = kraken2.viralngs_version
String viral_assemble_version = spades.viralngs_version
Array[Map[String,String]] assembly_stats_by_taxon = stats_by_taxon
File assembly_stats_by_taxon_tsv = select_first([assembly_stats_non_empty.combined, assembly_stats_empty.combined])
String assembly_method = "viral-ngs/scaffold_and_refine_multitaxa"
Int skani_num_ref_clusters = length(select_references.matched_reference_clusters_fastas_tars)
File skani_contigs_to_refs_dist_tsv = select_references.skani_dist_full_tsv
Array[String] assembly_all_taxids = taxid
Array[String] assembly_all_taxnames = tax_name
Array[Int] assembly_all_lengths_unambig = assembly_length_unambiguous
Array[Float] assembly_all_pct_ref_cov = percent_reference_covered
Array[File] assembly_all_fastas = assembly_fasta
String batch_ids = unique_batch_ids.sorted_unique_joined
}
}
version 1.0
import "../tasks/tasks_assembly.wdl" as assembly
import "../tasks/tasks_metagenomics.wdl" as metagenomics
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_read_utils.wdl" as read_utils
import "../tasks/tasks_reports.wdl" as reports
import "../tasks/tasks_utils.wdl" as utils
import "assemble_refbased.wdl" as assemble_refbased
workflow assemble_denovo_metagenomic {
meta {
description: "Performs viral de novo assembly on metagenomic reads against a large range of possible reference genomes. Runs raw reads through taxonomic classification (Kraken2), human read depletion (based on Kraken2), de novo assembly (SPAdes), and FASTQC/multiQC of reads. Scaffold de novo contigs against a set of possible references and subsequently polish with reads. This workflow accepts a very large set of input reference genomes. It will subset the reference genomes to those with ANI hits to the provided contigs/MAGs and cluster the reference hits by any ANI similarity to each other. It will choose the top reference from each cluster and produce one assembly for each cluster. This is intended to allow for the presence of multiple diverse viral taxa (coinfections) while forcing a choice of the best assembly from groups of related reference genomes."
author: "Broad Viral Genomics"
email: "viral-ngs@broadinstitute.org"
allowNestedInputs: true
}
input {
String sample_id
String? sample_name
String? biosample_accession
Array[File]+ reads_bams
Array[String] batch_id_list
File ncbi_taxdump_tgz
File? spikein_db
File trim_clip_db
File kraken2_db_tgz
File krona_taxonomy_db_kraken2_tgz
File taxid_to_ref_accessions_tsv
Array[String] taxa_to_dehost = ["Vertebrata"]
Array[String] taxa_to_avoid_assembly = ["Vertebrata", "other sequences", "Bacteria"]
String table_name = "sample"
}
Int min_scaffold_unambig = 300 # in base-pairs; any scaffolded assembly < this length will not be refined/polished
String sample_original_name = select_first([sample_name, sample_id])
parameter_meta {
reads_bams: {
description: "Reads to classify. May be unmapped or mapped or both, paired-end or single-end. Multiple input files will be merged first.",
patterns: ["*.bam"]
}
spikein_db: {
description: "ERCC spike-in sequences",
patterns: ["*.fasta", "*.fasta.gz", "*.fasta.zst"]
}
trim_clip_db: {
description: "Adapter sequences to remove via trimmomatic prior to SPAdes assembly",
patterns: ["*.fasta", "*.fasta.gz", "*.fasta.zst"]
}
kraken2_db_tgz: {
description: "Pre-built Kraken database tarball containing three files: hash.k2d, opts.k2d, and taxo.k2d.",
patterns: ["*.tar.gz", "*.tar.lz4", "*.tar.bz2", "*.tar.zst"]
}
krona_taxonomy_db_kraken2_tgz: {
description: "Krona taxonomy database containing a single file: taxonomy.tab, or possibly just a compressed taxonomy.tab",
patterns: ["*.tab.zst", "*.tab.gz", "*.tab", "*.tar.gz", "*.tar.lz4", "*.tar.bz2", "*.tar.zst"]
}
ncbi_taxdump_tgz: {
description: "An NCBI taxdump.tar.gz file that contains, at the minimum, a nodes.dmp and names.dmp file.",
patterns: ["*.tar.gz", "*.tar.lz4", "*.tar.bz2", "*.tar.zst"]
}
cleaned_fastqc: {
description: "Output cleaned fastqc reports in HTML.",
category: "other"
}
deduplicated_reads_unaligned: {
description: "Deduplication on unaligned reads in BAM format using mvicuna or cdhit.",
category: "other"
}
kraken2_krona_plot: {
description:"Visualize the results of the Kraken2 analysis with Krona, which disaplys taxonmic hierarchiral data in multi-layerd pie.",
category:"other"
}
kraken2_summary_report: {
description: "Kraken report output file.",
category: "other"
}
raw_fastqc:{
description: "Merged raw fastqc reads.",
category: "other"
}
}
call utils.unique_strings as unique_batch_ids {
input:
strings = batch_id_list
}
call read_utils.merge_and_reheader_bams as merge_raw_reads {
input:
in_bams = reads_bams
}
File reads_bam = merge_raw_reads.out_bam
if(defined(spikein_db)) {
call reports.align_and_count as spikein {
input:
reads_bam = reads_bam,
ref_db = select_first([spikein_db])
}
}
call metagenomics.kraken2 as kraken2 {
input:
reads_bam = reads_bam,
kraken2_db_tgz = kraken2_db_tgz,
krona_taxonomy_db_tgz = krona_taxonomy_db_kraken2_tgz
}
call metagenomics.filter_bam_to_taxa as deplete {
input:
classified_bam = reads_bam,
classified_reads_txt_gz = kraken2.kraken2_reads_report,
ncbi_taxonomy_db_tgz = ncbi_taxdump_tgz,
exclude_taxa = true,
taxonomic_names = taxa_to_dehost,
out_filename_suffix = "hs_depleted"
}
call metagenomics.filter_bam_to_taxa as filter_acellular {
input:
classified_bam = reads_bam,
classified_reads_txt_gz = kraken2.kraken2_reads_report,
ncbi_taxonomy_db_tgz = ncbi_taxdump_tgz,
exclude_taxa = true,
taxonomic_names = taxa_to_avoid_assembly,
out_filename_suffix = "acellular"
}
call read_utils.rmdup_ubam {
input:
reads_unmapped_bam = filter_acellular.bam_filtered_to_taxa
}
call assembly.assemble as spades {
input:
reads_unmapped_bam = rmdup_ubam.dedup_bam,
trim_clip_db = trim_clip_db,
always_succeed = true
}
call metagenomics.report_primary_kraken_taxa {
input:
kraken_summary_report = kraken2.kraken2_summary_report
}
# download (multi-segment) genomes for each reference, fasta filename = dash-concatenated accession list
call ncbi.download_ref_genomes_from_tsv {
input:
ref_genomes_tsv = taxid_to_ref_accessions_tsv
}
# subset reference genomes to those with ANI hits to contigs and cluster reference hits by any ANI similarity to each other
call assembly.select_references {
input:
reference_genomes_fastas = download_ref_genomes_from_tsv.ref_genomes_fastas,
contigs_fasta = spades.contigs_fasta
}
# assemble and produce stats for every reference cluster
Array[String] assembly_header = ["entity:assembly_id", "assembly_name", "sample_id", "sample_name", "taxid", "tax_name", "tax_shortname", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "scaffolding_num_segments_recovered", "reference_num_segments_required", "reference_length", "reference_accessions", "skani_num_ref_clusters", "skani_this_cluster_num_refs", "skani_dist_tsv", "scaffolding_ani", "scaffolding_pct_ref_cov", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned", "assembly_method", "assembly_method_version", "biosample_accession", "batch_ids", "~{table_name}"]
scatter(ref_cluster_tar in select_references.matched_reference_clusters_fastas_tars) {
call utils.tar_extract {
input:
tar_file = ref_cluster_tar
}
# assemble (scaffold-and-refine) genome against this reference cluster
call assembly.scaffold {
input:
reads_bam = deplete.bam_filtered_to_taxa,
contigs_fasta = spades.contigs_fasta,
reference_genome_fasta = tar_extract.files,
min_length_fraction = 0,
min_unambig = 0,
allow_incomplete_output = true
}
# get taxid and taxname from taxid_to_ref_accessions_tsv
call utils.fetch_row_from_tsv as tax_lookup {
input:
tsv = taxid_to_ref_accessions_tsv,
idx_col = "accessions",
idx_val = sub(scaffold.scaffolding_chosen_ref_basename, "-", ":"),
add_header = ["tax_id", "isolate_prefix", "taxname", "accessions"]
}
String taxid = tax_lookup.map["tax_id"]
String tax_name = tax_lookup.map["taxname"]
String isolate_prefix = tax_lookup.map["isolate_prefix"]
# polish de novo assembly with reads
if (scaffold.assembly_preimpute_length_unambiguous > min_scaffold_unambig) {
call assemble_refbased.assemble_refbased as refine {
input:
reads_unmapped_bams = [deplete.bam_filtered_to_taxa],
reference_fasta = scaffold.scaffold_fasta,
sample_name = sample_id + "-" + taxid
}
}
# build output tsv row
Int assembly_length_unambiguous = select_first([refine.assembly_length_unambiguous, 0])
Float percent_reference_covered = 1.0 * assembly_length_unambiguous / scaffold.reference_length
File assembly_fasta = select_first([refine.assembly_fasta, scaffold.intermediate_gapfill_fasta])
Map[String, String] stats_by_taxon = {
"entity:assembly_id" : sample_id + "-" + taxid,
"assembly_name" : tax_name + ": " + sample_original_name,
"sample_id" : sample_id,
"sample_name" : sample_original_name,
"taxid" : taxid,
"tax_name" : tax_name,
"tax_shortname" : isolate_prefix,
"assembly_fasta" : assembly_fasta,
"aligned_only_reads_bam" : select_first([refine.align_to_self_merged_aligned_only_bam, ""]),
"coverage_plot" : select_first([refine.align_to_self_merged_coverage_plot, ""]),
"assembly_length" : select_first([refine.assembly_length, "0"]),
"assembly_length_unambiguous" : assembly_length_unambiguous,
"reads_aligned" : select_first([refine.align_to_self_merged_reads_aligned, "0"]),
"mean_coverage" : select_first([refine.align_to_self_merged_mean_coverage, "0"]),
"percent_reference_covered" : percent_reference_covered,
"scaffolding_num_segments_recovered" : scaffold.assembly_num_segments_recovered,
"reference_num_segments_required" : scaffold.reference_num_segments_required,
"reference_length" : scaffold.reference_length,
"reference_accessions" : tax_lookup.map["accessions"],
"skani_num_ref_clusters" : length(select_references.matched_reference_clusters_fastas_tars),
"skani_this_cluster_num_refs" : length(tar_extract.files),
"skani_dist_tsv" : scaffold.scaffolding_stats,
"scaffolding_ani" : scaffold.skani_ani,
"scaffolding_pct_ref_cov" : scaffold.skani_ref_aligned_frac,
"intermediate_gapfill_fasta" : scaffold.intermediate_gapfill_fasta,
"assembly_preimpute_length_unambiguous" : scaffold.assembly_preimpute_length_unambiguous,
"replicate_concordant_sites" : select_first([refine.replicate_concordant_sites, "0"]),
"replicate_discordant_snps" : select_first([refine.replicate_discordant_snps, "0"]),
"replicate_discordant_indels" : select_first([refine.replicate_discordant_indels, "0"]),
"replicate_discordant_vcf" : select_first([refine.replicate_discordant_vcf, ""]),
"isnvsFile" : select_first([refine.align_to_self_isnvs_vcf, ""]),
"aligned_bam" : select_first([refine.align_to_self_merged_aligned_only_bam, ""]),
"coverage_tsv" : select_first([refine.align_to_self_merged_coverage_tsv, ""]),
"read_pairs_aligned" : select_first([refine.align_to_self_merged_read_pairs_aligned, "0"]),
"bases_aligned" : select_first([refine.align_to_self_merged_bases_aligned, "0"]),
"assembly_method" : "viral-ngs/assemble_denovo",
"assembly_method_version" : scaffold.viralngs_version,
"biosample_accession" : select_first([biosample_accession, ""]),
"batch_ids" : unique_batch_ids.sorted_unique_joined,
"~{table_name}": '{"entityType":"~{table_name}","entityName":"' + sample_id + '"}'
}
if(assembly_length_unambiguous > min_scaffold_unambig) {
scatter(h in assembly_header) {
String stat_by_taxon = stats_by_taxon[h]
}
}
}
### summary stats
if (length(select_all(stat_by_taxon)) > 0) {
call utils.concatenate as assembly_stats_non_empty {
input:
infiles = [write_tsv([assembly_header]), write_tsv(select_all(stat_by_taxon))],
output_name = "assembly_metadata-~{sample_id}.tsv"
}
}
if (length(select_all(stat_by_taxon)) == 0) {
call utils.concatenate as assembly_stats_empty {
input:
infiles = [write_tsv([assembly_header])],
output_name = "assembly_metadata-~{sample_id}.tsv"
}
}
output {
File cleaned_reads_unaligned_bam = deplete.bam_filtered_to_taxa
File deduplicated_reads_unaligned = rmdup_ubam.dedup_bam
File contigs_fasta = spades.contigs_fasta
Int read_counts_raw = deplete.classified_taxonomic_filter_read_count_pre
Int read_counts_depleted = deplete.classified_taxonomic_filter_read_count_post
Int read_counts_dedup = rmdup_ubam.dedup_read_count_post
Int read_counts_prespades_subsample = spades.subsample_read_count
File kraken2_summary_report = kraken2.kraken2_summary_report
File kraken2_reads_report = kraken2.kraken2_reads_report
File kraken2_krona_plot = kraken2.krona_report_html
File kraken2_top_taxa_report = report_primary_kraken_taxa.ranked_focal_report
String kraken2_focal_taxon_name = report_primary_kraken_taxa.focal_tax_name
Int kraken2_focal_total_reads = report_primary_kraken_taxa.total_focal_reads
String kraken2_top_taxon_id = report_primary_kraken_taxa.tax_id
String kraken2_top_taxon_name = report_primary_kraken_taxa.tax_name
String kraken2_top_taxon_rank = report_primary_kraken_taxa.tax_rank
Int kraken2_top_taxon_num_reads = report_primary_kraken_taxa.num_reads
Float kraken2_top_taxon_pct_of_focal = report_primary_kraken_taxa.percent_of_focal
File? raw_fastqc = merge_raw_reads.fastqc
File? cleaned_fastqc = deplete.fastqc_html_report
File? spikein_report = spikein.report
String? spikein_tophit = spikein.top_hit_id
String? spikein_pct_of_total_reads = spikein.pct_total_reads_mapped
String? spikein_pct_lesser_hits = spikein.pct_lesser_hits_of_mapped
String viral_classify_version = kraken2.viralngs_version
String viral_assemble_version = spades.viralngs_version
Array[Map[String,String]] assembly_stats_by_taxon = stats_by_taxon
File assembly_stats_by_taxon_tsv = select_first([assembly_stats_non_empty.combined, assembly_stats_empty.combined])
String assembly_method = "viral-ngs/scaffold_and_refine_multitaxa"
Int skani_num_ref_clusters = length(select_references.matched_reference_clusters_fastas_tars)
File skani_contigs_to_refs_dist_tsv = select_references.skani_dist_full_tsv
Array[String] assembly_all_taxids = taxid
Array[String] assembly_all_taxnames = tax_name
Array[Int] assembly_all_lengths_unambig = assembly_length_unambiguous
Array[Float] assembly_all_pct_ref_cov = percent_reference_covered
Array[File] assembly_all_fastas = assembly_fasta
String batch_ids = unique_batch_ids.sorted_unique_joined
}
}