Amplicon data processing workflows for diversity analysis
Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation, topic
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
This software makes use of the workflow management system Snakemake to build and integrate different amplicon workflows and allows for the flexible processing of data from different markers.
Features
-
Clustering/denoising of raw amplicon sequencing data, taxonomic assignments and further sequence comparisons and marker-specific processing (such as ITS extraction)
-
Comparison of different pipelines or variations of the same pipeline using different sets of parameters. Workflow output is presented in a common file structure.
-
Simultaneous processing of multi-marker amplicons generated using different primer sets
-
Multiple taxonomic assignment methods can be applied to each dataset using different marker-specific reference databases
Note: To this is a work in progress and will be extended further (see below ).
Non-features
The software does not assist with comprehensive statistical analyses, even though the integrated pipelines may offer them. Since the output files are in commonly used formats (such as BIOM), they can still serve as input for many analysis toolkits. There is also a dedicated import script for R .
Integrated pipelines
-
USEARCH / VSEARCH -based amplicon pipeline using UNOISE3 for obtaining ASVs (paired-end implemented)
-
QIIME2 (currently with DADA2 denoising)
-
Amptk (UNOISE3 and DADA2, paired-end only)
-
... (more to follow)
Validation
Validation is done using amplicon data from a fungal mock community (
details in
test
directory
) and a basic comparison of the different workflows can be done
using a script
.
No snakes π were harmed in the process of creating this software
Installing
The software makes use of the Conda package manager , the installation is thus pretty straightforward ( see instructions here ).
Configuring
The easiest is to copy the contents of the config or test/config directory into a new analysis directory and then modify the files according to your needs. There are two YAML files:
-
config.yaml
: Main configuration file containing input and workflow definitions. See here for a description and examples (incomplete). All available options are are documented in the file itself . -
taxonomy.yaml
: Defines all available taxonomcic databases. See here for details .
Running
The
snakecharmer
script is used as follows:
./snakecharmer <outdir> <rule1> <rule2>...
Before running the first time, the command
conda activate snakemake
is necessary if
Conda was used
.
The target rules (or "commands") define, which output should be generated. Only one or several rules can be specified. Some depend on output of other rules. For instance, the
taxonomy
rule requires the clustering/denoising to happen before (
denoise
command). A complete list of commands is
found here
.
The most important are:
-
quality
: Reports sequencing read statistics inresults/<workflow>/validation
, which may help in deciding on setting the workflow parameters. -
denoise
: Does the clustering/denoising for all workflows defined inconfig.yaml
. -
taxonomy
: Applies all taxonomy assignment methods defined inconfig.yaml
to the output of all clustering workflows. -
clean
: Removes working directories that are not strictly needed (retaining theresults
dir)
Example
The following command processes a test dataset (fungal mock comunities in the
test
directory
) using 6 cores on a local computer. The
target rules
to be run are
denoise
,
cmp
,
taxonomy
and
ITS
.
conda activate snakemake
./snakecharmer test denoise cmp taxonomy --cores 6
On a computer cluster, the command may look different (
see documentation here
). The
snakecharmer
script is just a simple wrapper for Snakemake, but otherwise accepts all arguments that Snakemake does. Example for running with SLURM:
outdir=~/path/to/analysis
./snakecharmer --cores 20 --jobs 20 --slurm $outdir denoise cmp taxonomy
Output
After running, a few additional directories will have appeared next to
config
. The most important one is the
results
directory, which roughly has the following structure (
more details here
):
π¦<my_analysis>
ββ π config
β ββ π config.yaml
β ββ π taxonomy.yaml
β (...)
ββ π results
β ββ π <workflow name>
β β ββ π data
β β β ββ π denoised.fasta
β β β ββ π denoised_otutab.txt.gz
β β β ββ π denoised.biom
β β β ββ π denoised.hdf5.biom
β β β ββ π denoised_search.txt.gz
β β β ββ π taxonomy
β β β β ββ π <database>-<method>-<name>..txt.gz
β β β β ββ π <database>-<method>-<name>.biom.gz
β β β β ββ π <database>-<method>-<name>.hdf5.biom.gz
β β β β β (...)
β β β ββ π cmp
β β β β ββ π <my_seq_comparison>.txt
β β β β ββ π <my_seq_comparison>_notmatched.fasta.gz
β β β β ββ π <my_seq_comparison>_denoised_notmatched.fasta.gz
β β β β β (...)
β β β ββ π [ITSx]
β β β β ββ π out.positions.txt
β β β β ββ (...)
(...)
Whether the output are ASVs/ESVs or OTUs from a fixed threshold clustering (not yet implemented), the resulting FASTA file is always called
denoised.fasta
. The sample/OTU count matrix is returned both in the traditional tabular format (
denoised_otutab.txt.gz
) and
BIOM
. The taxonomic annotations are named by taxonomy database and assignment method (multiple combinations possible) and returned in a QIIME-style tabular format as well as the BIOM format. Furthermore, there can be results of sequence comparisons (
cmp
) or marker-specific data such as ITSx results.
With the simplest scenario (one run/layout and one primer combination), the relevant results directory is
<my_analysis>/results/<workflow_name>/data
. With multi-workflow/marker setups, the
data
directory will not be present, and the individual workflow results are placed in the nested directories (
more here
).
Analyzing in R
The R source file
R/read_amplicon.R
provides code for reading all data from a results directory. See also the small
example analysis
.
Comparison of denoising/clustering pipelines
There is a separate bash script
scripts/compare_results.sh
, which creates an Excel file comparing the number of reads assigned to 98% clusters of the already denoised sequences by each pipeline. A separate workbook is created for each sample. The script requires VSEARCH, as well as R with the following packages:
ggplot2
,
tidyverse
,
data.table
and
openxlsx
.
Further steps...
A list of possible next steps includes:
-
Integrate more pipelines / clustering methods and taxonomy databases
-
Integrate other platforms than Illumina and allow simultaneous analysis of multi-platform data
-
Allow for run result merging
-
Offer more ways of comparing and validating pipelines and generally improve user experience
-
Testing deployment on different systems
-
Improve configuration of job resources (memory, CPUs)
-
The USEARCH pipeline may be moved into an extra repository to be used independently
-
...
Code Snippets
40 41 | script: "../scripts/amptk_trim_paired.py" |
66 67 | script: "../scripts/amptk_cluster_paired.py" |
82 83 84 85 86 87 88 89 90 91 92 | shell: """ exec 1> "{output}" echo "Read merging and primer trimming" echo "================================" cat {input.merge_trim:q} printf "\n\n\n" echo "Denoising" echo "==========" cat {input.cluster:q} """ |
15 16 17 18 19 20 21 | shell: """ gzip -dc {input.otutab} > {output.tmp_tab} biom convert -i {input.otutab} \ -o {output.biom} \ --table-type 'OTU table' --to-json &> {log} """ |
36 37 38 39 40 41 | shell: """ biom convert -i {input.biom} \ -o {output.biom_hdf5} \ --table-type "OTU table" --to-hdf5 &> {log} """ |
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | shell: """ exec &> {log} set -xeuo pipefail bam="{output.bam}" sam="${{bam%.*}}.sam" notmatched="{output.notmatched}" notmatched="${{notmatched%.gz}}" clusters_notmatched="{output.clusters_notmatched}" clusters_notmatched="${{clusters_notmatched%.gz}}" vsearch -usearch_global "{input.otus}" -db "{params.db}" \ -userout "{output.map}" \ -samout "$sam" \ -userfields 'query+target+id' \ -notmatched "$clusters_notmatched" \ -dbnotmatched "$notmatched" \ -threads {threads} \ -maxaccepts {params.maxaccepts} \ -maxrejects {params.maxrejects} \ -maxhits {params.maxhits} \ -id {params.threshold} # compress not-matched files gzip -nf "$notmatched" "$clusters_notmatched" if [ -s "$sam" ]; then # make BAM file rm -f "{params.db}.fai" "$bam.bai" samtools view -T "{params.db}" -b "$sam" | samtools sort -@ {threads} > "$bam" samtools index "$bam" else echo -n > "$bam" fi rm -f "$sam" "{params.db}.fai" """ |
65 66 | shell: "rm -Rf results/*/workflow_*/*/*/cmp" |
24 25 | script: "../scripts/taxonomy/idtaxa_train.R" |
44 45 | script: "../scripts/taxonomy/idtaxa_assign.R" |
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | shell: """ pos="{output.pos}" ITSx -i {input} -o "${{pos%.positions.txt}}" \ -t {params.par[organism_groups]} \ -E {params.par[e-value]} \ --allow_single_domain {params.par[allow_single_domain]} \ --complement {params.par[complement]} \ --heuristics {params.par[heuristics]} \ --graphical {params.par[graphical]} \ --fasta {params.par[fasta]} \ --preserve {params.par[preserve]} \ --save_regions {params.par[save_regions]} \ --partial {params.par[partial]} \ --cpu {threads} \ 2> {log} """ |
39 40 | shell: "rm -Rf results/*/workflow_*/*/*/ITSx" |
15 16 | script: "../scripts/combine_sample_reports.py" |
26 27 28 29 30 31 32 33 34 35 | shell: """ exec &> {log} set -xeuo pipefail if [ "{wildcards.layout}" = "single.rev" ]; then vsearch --fastx_revcomp "{input.otus}" --fastaout "{output.otus}" else ln -srf "{input.otus}" "{output.otus}" fi """ |
39 40 | script: "../scripts/collect_sample_lists.py" |
50 51 | script: "../scripts/dump_config.py" |
65 66 | script: "../scripts/prepare_primers.py" |
125 126 | script: "../scripts/collect_sample_files.py" |
146 147 | script: "../scripts/make_pooling_list.py" |
170 171 | script: "../scripts/pool_raw.py" |
187 188 189 190 191 | shell: """ mkdir -p {output} ln -sr {input.run_dir} {output}/nested """ |
205 206 | script: "../scripts/make_new_sample_tab.py" |
219 220 | script: "../scripts/list_samples_yaml.py" |
241 242 | script: "../scripts/collect_unique_files.py" |
261 262 | script: "../scripts/link_data_dir.py" |
14 15 16 17 18 | shell: """ mkdir -p {output.qc_dir} fastqc -q -f fastq -t {threads} -o {output.qc_dir} {input.sample_dir}/*.fastq.gz 2> {log} """ |
33 34 35 36 37 38 | shell: """ outdir="$(dirname "{output}")" multiqc -fm fastqc -o "$outdir" "{input.fastqc}" &> {log} (cd "$outdir" && zip -FSqr -rm multiqc_data.zip multiqc_data) 2> {log} """ |
54 55 56 57 | shell: """ ln -srf {input.existing} {output} 2> {log} """ |
72 73 74 75 76 77 | shell: """ outdir="$(dirname "{output}")" multiqc -fm fastqc -o "$outdir" "{input.fastqc}" &> {log} (cd "$outdir" && zip -FSqr -rm multiqc_data.zip multiqc_data) 2> {log} """ |
31 32 | script: "../scripts/make_new_sample_tab.py" |
55 56 57 58 59 60 61 62 | shell: """ qiime tools import \ --type 'SampleData[{params.type}]' \ --input-path {input.manifest} \ --output-path {output} \ --input-format {params.format} &> {log} """ |
86 87 | script: "../scripts/qiime_trim_single.py" |
108 109 | script: "../scripts/qiime_trim_paired.py" |
131 132 | script: "../scripts/qiime_dada2.py" |
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | shell: """ exec &> {log} # export table mkdir -p {output.tmp} tab={output.tab} tab=${{tab%.gz}} qiime tools export \ --input-path {input.tab} \ --output-path {output.tmp} mv {output.tmp}/feature-table.biom {output.biom_hdf5} biom convert -i {output.biom_hdf5} \ -o {output.biom_json} \ --to-json --table-type "OTU table" biom convert -i {output.biom_hdf5} \ -o $tab \ --to-tsv --table-type "OTU table" sed -i '1,1d' $tab gzip -nf $tab # export seqs qiime tools export \ --input-path {input.clustered} \ --output-path {output.tmp} cat {output.tmp}/dna-sequences.fasta > {output.clustered} # export stats qiime metadata tabulate \ --m-input-file {input.stats} \ --o-visualization {output.tmp}/stats.qzv qiime tools export \ --input-path {output.tmp}/stats.qzv \ --output-path {output.tmp}/stats statfile={output.tmp}/stats/metadata.tsv head -n1 $statfile > {output.stats} tail -n+3 $statfile >> {output.stats} """ |
219 220 | script: "../scripts/combine_sample_reports.py" |
235 236 237 238 239 240 241 242 243 244 245 | shell: """ exec 1> "{output}" echo "Primer trimming" echo "===============" cat {input.trim:q} printf "\n\n\n" echo "Denoising" echo "==========" cat {input.cluster:q} """ |
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | shell: """ exec &> {log} par=$(cat {input.params) if [ "$par" != "{}" ]; then echo "Unknown QIIME naive-bayes classifier config: $par" exit 1 fi mkdir -p {output.tmp} zstd -dcqf {input.seq} > {output.tmp}/seq.fasta 2> {log} # extract taxonomic lineages from FASTA for import in QIIME zstd -dcqf {input} | st . --to-tsv id,desc > {output.tmp}/tax.txt qiime tools import \ --type 'FeatureData[Sequence]' \ --input-path {output.tmp}/seq.fasta \ --output-path {output.tmp}/seq.qza qiime tools import \ --type 'FeatureData[Taxonomy]' \ --input-format HeaderlessTSVTaxonomyFormat \ --input-path {output.tmp}/tax.txt \ --output-path {output.tmp}/tax.qza qiime feature-classifier fit-classifier-naive-bayes \ --i-reference-reads {output.tmp}/seq.qza \ --i-reference-taxonomy {output.tmp}/tax.qza \ --o-classifier {output.tmp}/trained.qza zstd -dcq {output.tmp}/trained.qza > {output.trained} """ |
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 | shell: """ exec &> {log} mkdir -p {output.tmp} # lowercase letters cause problems -> convert to uppercase # (cannot use 'st upper' because seqtool is not in conda environment, # therefore using awk) awk '/^>/ {{print($0)}}; /^[^>]/ {{print(toupper($0))}}' {input.seq} > {output.tmp}/input.fasta qiime tools import \ --input-path {output.tmp}/input.fasta \ --type 'FeatureData[Sequence]' \ --input-format DNAFASTAFormat \ --output-path {output.tmp}/seqs.qza qiime feature-classifier classify-sklearn \ --i-classifier {input.db} \ --i-reads {output.tmp}/seqs.qza \ --o-classification {output.tmp}/classified.qza \ --p-reads-per-batch 1000 \ --p-confidence {params.par[confidence]} \ --p-n-jobs {threads} \ --verbose qiime tools export \ --input-path {output.tmp}/classified.qza \ --output-path {output.tmp} gzip -nc {output.tmp}/taxonomy.tsv > {output.tax} """ |
52 53 | script: "../scripts/taxonomy/write_db_config.py" |
71 72 | script: "../scripts/taxonomy/obtain_taxdb.py" |
90 91 | script: "../scripts/taxonomy/write_db_config.py" |
109 110 | script: "../scripts/taxonomy/filter_taxdb.py" |
127 128 | script: "../scripts/taxonomy/write_db_config.py" |
148 149 | script: "../scripts/taxonomy/obtain_taxdb_formatted.py" |
165 166 167 168 | shell: """ zstd -dcqf "{input.db}" > "{output.db}" """ |
185 186 | script: "../scripts/taxonomy/rank_propagate.py" |
201 202 203 204 205 206 207 208 | shell: """ tax={input.tax} st set -ul <(gzip -dc "$tax") -d {{l:2}} "{input.fa}" | st replace -d '__' ':' | st replace -dr ' *; *' ' ' | gzip -nc > {output} """ |
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 | shell: """ exec &> {log} set -xeuo pipefail mkdir -p $(dirname {output.tax_tmp}) gzip -dc {input.tax} | sed 's/Taxon/taxonomy/g' | sed 's/Feature ID/# Feature ID/g' > {output.tax_tmp} if [[ $(wc -l < "{output.tax_tmp}") -ge 2 ]]; then biom add-metadata -i {input.biom} \ -o /dev/stdout \ --observation-metadata-fp {output.tax_tmp} \ --sc-separated taxonomy --float-fields Confidence --output-as-json | gzip -nc > {output.biom} biom add-metadata -i {input.biom_hdf5} \ -o {output.biom_tmp} \ --observation-metadata-fp {output.tax_tmp} \ --sc-separated taxonomy --float-fields Confidence gzip -nc {output.biom_tmp} > {output.biom_hdf5} else # no taxa echo -n | gzip -nc > {output.biom} echo -n | gzip -nc > {output.biom_hdf5} fi """ |
259 260 | shell: "rm -Rf results/*/workflow_*/*/*/taxonomy" |
52 53 | script: "../scripts/uvsnake_gen_config.py" |
72 73 | script: "../scripts/uvsnake_run.py" |
97 98 | script: "../scripts/uvsnake_run.py" |
118 119 120 121 122 123 124 125 126 127 | shell: """ indir="$(dirname "{input.results[0]}")" outdir="$(dirname "{output.results[0]}")" cp -f "$indir/{wildcards.cluster_method}.fasta" "$outdir/clusters.fasta" cp -f "$indir/{wildcards.cluster_method}_otutab.txt.gz" "$outdir/otutab.txt.gz" cp -f "$indir/{wildcards.cluster_method}.biom" "$outdir/otutab.biom" cp -f "{input.stats}" "{output.stats}" cp -f "{input.log}" "{output.log}" """ |
156 157 158 159 160 | shell: """ outdir="$(dirname "{output}")" multiqc -f -m fastqc -m cutadapt -o "$outdir" "{input.fastqc}" "{input.cutadapt_dir}" &> {log} """ |
183 184 | script: "../scripts/taxonomy/convert_utax.py" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | import gzip import os import shutil import sys from subprocess import check_call from utils import file_logging def cluster_paired(method, trimmed_in, clustered_out, otutab_out, usearch_par, dada2_par, unoise_program=None, usearch_bin=None, threads=1): command = "cluster" if method == "uparse" else method cmd = [ "amptk", command, "-i", os.path.basename(trimmed_in), "-o", method, "--cpus", str(threads) ] # add method specific arguments # TODO: inconsistent to have these settings under usearch even though used for DADA2 as well # (on the other hand, Amptk uses an USEACH-like procedure for DADA2 as well) maxee = usearch_par["merge"]["expected_length"] * usearch_par["filter"]["max_error_rate"] if method in ("unoise3", "uparse"): cmd += [ "--usearch", usearch_bin, "--maxee", str(maxee), ] if method == "unoise3": cmd += [ "--method", unoise_program, "--minsize", str(usearch_par["unoise3"]["min_size"]), ] if unoise_program == "usearch": assert usearch_bin is not None cmd += ["--usearch", usearch_bin] cluster_file = method + '.ASVs.fa' elif method == "uparse": assert usearch_bin is not None cmd += [ "--minsize", str(usearch_par["uparse"]["min_size"]), "--usearch", usearch_bin, ] cluster_file = method + '.cluster.otus.fa' else: assert method == "dada2", "Unknown / unimplemented Amptk command" cmd += [ "--maxee", str(maxee), "--chimera_method", dada2_par["chimera_method"], ] p = dada2_par["pooling_method"] if p == "pooled": cmd.append("--pool") elif p == "pseudo": cmd.append("--pseudopool") cluster_file = method + '.ASVs.fa' outdir = os.path.dirname(trimmed_in) print("Call: " + " ".join(cmd), file=sys.stderr) check_call(cmd, cwd=outdir, stdout=sys.stdout, stderr=sys.stderr) # copy files results_dir = os.path.dirname(clustered_out) if not os.path.exists(results_dir): os.makedirs(results_dir) shutil.copy2(os.path.join(outdir, cluster_file), clustered_out) with open(os.path.join(outdir, method + '.otu_table.txt'), 'rb') as i: with gzip.open(otutab_out, 'wb') as o: shutil.copyfileobj(i, o) with file_logging(snakemake.log[0]): cluster_paired( trimmed_in=snakemake.input.demux, clustered_out=snakemake.output.clustered, otutab_out=snakemake.output.tab, threads=snakemake.threads, **snakemake.params ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | import os import sys import yaml from subprocess import check_call from utils import file_logging def anchored(seq_d): if seq_d['anchor']: print(f"WARNING: The primer: {seq_d['seq']} should be anchored, but " "this is not possible with the Amptk workflow", file=sys.stderr) return seq_d['seq'] def trim_paired(input_files, demux_out, primer_file, f_primer_name, r_primer_name, program=None, usearch_bin=None, err_rate=None, min_len=None, threads=1): # read primers with open(primer_file) as f: primers = yaml.safe_load(f) f_primer = anchored(primers["forward_consensus"][f_primer_name]) r_primer = anchored(primers["reverse_consensus"][r_primer_name]) # determine mismatches (depending on average of forward/reverse primer lengths) # and subsequent rounding primer_mismatch = round((len(f_primer) + len(r_primer)) / 2 * err_rate) # NOTE: we limit the max. number of primer mismatches to 2. # The reason is that Amptk apparently does **another** primer trimming after # merging the already trimmed reads. # Too liberal mismatch thresholds lead to many unspecific primer matches # and consequently to unwanted trimming of reads. if primer_mismatch > 2: print(f"WARNING: The maximum primer mismatches were limited to 2 with " "the Amptk workflow (would be {primer_mismatch} with current " "max_error_rate setting)", file=sys.stderr) primer_mismatch = 2 # prepare input/output dirs # TODO: depends on this exact naming scheme prefix = demux_out.replace('.demux.fq.gz', '') outdir = os.path.dirname(prefix) if not os.path.exists(outdir): os.makedirs(outdir) # Since we need to change the directory while executing, get the correct # relative input path indir = os.path.abspath(os.path.dirname(input_files[0])) # All files in the input dir are parsed by Amptk, therefore make sure that # there are no other files f1 = set(os.path.basename(f) for f in input_files) f2 = set(os.path.basename(f) for f in os.listdir(indir) if f.endswith('.fastq.gz')) assert f1 == f2, \ "Amptk input dir has unknown files: {}".format(', '.join(f2.difference(f1))) # run cmd = [ "amptk", "illumina", "-i", indir, "-o", os.path.basename(prefix), "-f", f_primer, "-r", r_primer, "--min_len", str(min_len), "--trim_len", "10000000000", # high enough to never be longer "--cpus", str(threads), "--cleanup", "--require_primer=on", "--rescue_forward=off", "--primer_mismatch", str(primer_mismatch), "--merge_method", program ] if program == "usearch": assert usearch_bin is not None cmd += ["--usearch", usearch_bin] print("Call: " + " ".join(cmd), file=sys.stderr) check_call(cmd, cwd=outdir, stdout=sys.stdout, stderr=sys.stderr) with file_logging(snakemake.log[0]): trim_paired( snakemake.input.fq, snakemake.output.demux, primer_file=snakemake.input.primers_yaml, f_primer_name=snakemake.wildcards.f_primer, r_primer_name=snakemake.wildcards.r_primer, program=snakemake.params.program, usearch_bin=snakemake.params.usearch_bin, err_rate=snakemake.params.err_rate, min_len=snakemake.params.min_len, threads=snakemake.threads, ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import os from os.path import exists, abspath, relpath import shutil import sys from typing import * from utils import file_logging from utils.sample_list import SampleList def link_samples(sample_tab, outdir): # delete output dir to make sure there are no orphan files # if exists(outdir): # shutil.rmtree(outdir) os.makedirs(outdir, exist_ok=True) # link sample_list = SampleList(sample_tab) for sample, read_paths in sample_list.samples(): # R1 and R2 files for i, source in enumerate(read_paths): target = os.path.join(outdir, f"{sample}_R{i+1}.fastq.gz") os.symlink(abspath(source), abspath(target)) # report source = relpath(source, ".") target = relpath(target, ".") print("{} > {}".format(source, target), file=sys.stdout) with file_logging(snakemake.log[0]): link_samples(snakemake.input.sample_tab, snakemake.output.sample_dir) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from utils import file_logging from utils.sample_list import SampleList def collect_sample_lists(run_meta, reserved_chars, path_template): for d in run_meta: tsv_out = path_template.format(ext="tsv", **d) l = SampleList(d["sample_file"], reserved_chars=reserved_chars) with open(tsv_out, "w") as o: l.write(o, sort_by_sample=True) yaml_out = path_template.format(ext="yaml", **d) with open(yaml_out, "w") as o: l.write_yaml(o) with file_logging(snakemake.log[0]): collect_sample_lists(**snakemake.params) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | from collections import defaultdict import csv import hashlib import os from os.path import exists, abspath, relpath import shutil import sys from typing import * from utils import file_logging from utils.sample_list import SampleList def get_samples(run_meta, path_template): # get all necessary metadata _run_meta = [ ( (d["technology"], d["layout"], d["run"]), list(SampleList(path_template.format(**d).samples())) ) for d in run_meta ] # get list of runs per sample sample2run = defaultdict(set) for run, samples in _run_meta: for sample in samples: sample2run[sample].add(run) # Which layout/run/technology combinations have duplicate samples? # Runs/layouts without duplicates are set to None dupes = sorted(set( run for runs in sample2run.values() if len(runs) > 1 for run in runs )) # Define unique suffixes for run/layout combination that have duplicate samples. # For consistency, all sample names in these runs will receive a suffix, # even if some of them are not duplicated. suffixes = {r: dupes.index(dupes[r]) + 1 for r in dupes} sample_dict = {} for run, samples in _run_meta: try: suffix = suffixes[run] except KeyError: suffix = None for sample, reads in samples: unique_sample = sample if suffix is None else f"{sample}_{suffix}" assert not unique_sample in sample_dict paths = [(path, f"{unique_sample}_R{i+1}.fastq.gz") for i, path in enumerate(reads)] sample_dict[unique_sample] = (sample, run, paths) # convert to flat sorted list samples = [(unique_sample, *other) for unique_sample, other in sample_dict.items()] return sorted(samples) def file_md5(filename): md5 = hashlib.md5() with open(filename, "rb") as f: # Read and update hash in chunks of 4K for chunk in iter(lambda: f.read(4096), b""): md5.update(chunk) return md5.hexdigest() def do_link(path_iter, outdir): for paths in path_iter: for source, target in paths: target = os.path.join(outdir, target) assert not os.path.exists(target) os.symlink(abspath(source), abspath(target)) # report source = relpath(source, ".") target = relpath(target, ".") print("{} > {}".format(source, target), file=sys.stdout) def dump_tsv(samples, outfile): with open(outfile, "w") as o: w = csv.writer(o, delimiter="\t") header = ["technology", "run", "layout", "sample", "unique_sample", "source_read_1", "source_read_2", "read_1", "read_2", "read_1_md5", "read_2_md5"] w.writerow(header) for unique_sample, sample, meta, paths in samples: orig_reads, target_files = zip(*paths) orig_reads = list(orig_reads) target_files = list(target_files) md5 = [file_md5(p) for p in orig_reads] if len(orig_reads) == 1: orig_reads.append("") target_files.append("") md5.append("") row = list(meta) + [sample, unique_sample] + orig_reads + target_files + md5 w.writerow(row) def link_samples(run_meta, read_dir, sample_file): # delete output dir to make sure there are no orphan files if exists(read_dir): shutil.rmtree(read_dir) if not exists(read_dir): os.makedirs(read_dir) # get the sample paths samples = get_samples(run_meta) # do the work do_link((paths for _, _, _, paths in samples), read_dir) dump_tsv(samples, sample_file) with file_logging(snakemake.log[0]): link_samples(snakemake.params.run_meta, snakemake.input.path_template, snakemake.output.read_dir, snakemake.output.tsv) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | from collections import OrderedDict from copy import copy import csv import re from utils import file_logging class SampleReport(object): def __init__(self, path, pattern): # read data params = pattern.parse(path) self.group_header = list(params) self.groups = list(params.values()) with open(path) as f: rdr = csv.reader(f, delimiter="\t") try: self.header = next(rdr) except StopIteration: self.header = [] self.rows = [row for row in rdr if row] class PathPattern(object): def __init__(self, pattern): self._pattern = re.compile(pattern) def parse(self, path): out = OrderedDict() for m in re.finditer(self._pattern, path): out.update(m.groupdict()) return out def combine_reports(sample_reports, path_pattern, outfile): pattern = PathPattern(path_pattern) sample_files = [SampleReport(path, pattern) for path in sample_reports] assert all(f.group_header == sample_files[0].group_header for f in sample_files[1:]) # obtain header header = [] for f in sample_files: f.field_idx = [] for field in f.header: try: f.field_idx.append(header.index(field)) except ValueError: f.field_idx.append(len(header)) header.append(field) # write data with open(outfile, "w") as o: w = csv.writer(o, delimiter="\t") if sample_files: w.writerow(sample_files[0].group_header + header) empty_row = [""] * len(header) for f in sample_files: for row in f.rows: out = copy(empty_row) for i, field in zip(f.field_idx, row): out[i] = field w.writerow(f.groups + out) with file_logging(snakemake.log[0]): combine_reports( snakemake.input.reports, snakemake.params.path_pattern, snakemake.output.report ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import yaml from copy import deepcopy from utils import file_logging def dump_config(config, outfile): with open(outfile, "w") as o: c = deepcopy(config) # make copy since we modify it del c["settings"]["input"] del c["settings"]["primers"] del c["settings"]["taxonomy_db_sources"] del c["settings"]["taxonomy_dbs"] del c["settings"]["taxonomy_methods"] c["taxonomy"] = { marker: {"-".join(name): config for name, config in tax.items()} for marker, tax in c["taxonomy"].items() } yaml.safe_dump(c, o) with file_logging(snakemake.log[0]): dump_config(snakemake.params.config, snakemake.output[0]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import os from os.path import dirname from utils import file_logging def link_data_dir(outdir_list, clust_files, data_dir): # write output dirs outdirs = [dirname(f) for f in clust_files] with open(outdir_list, "w") as o: o.writelines((l + "\n" for l in outdirs)) if len(outdirs) == 1: source_dir = outdirs[0] if os.path.exists(data_dir): os.remove(data_dir) os.symlink(os.path.relpath(source_dir, dirname(data_dir)), data_dir) with file_logging(snakemake.log[0]): link_data_dir(outdir_list=snakemake.output.out_list, clust_files=snakemake.input.clust, data_dir=snakemake.params.data_dir) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | from collections import defaultdict import os import yaml from utils import file_logging def list_samples(sample_files, outfile): out = defaultdict(dict) for sample_file in sample_files: p = sample_file.split(os.sep) layout = p[-3] run = p[-2] with open(sample_file) as f: out[run][layout] = yaml.safe_load(f) with open(outfile, "w") as o: yaml.safe_dump(dict(out), o) with file_logging(snakemake.log[0]): list_samples(sample_files=snakemake.input.sample_files, outfile=snakemake.output.yml) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from os.path import join from utils import file_logging from utils.sample_list import SampleList def make_sample_tab(sample_file, sample_file_out, sample_dir, subdir=None, qiime_style=False): if subdir is not None: sample_dir = join(sample_dir, subdir) sl = SampleList(sample_file) with open(sample_file_out, "w") as f: sl.write(f, absolute_paths=True, qiime_style=qiime_style, out_pattern=sample_dir+"/{sample}_R{read}.fastq.gz") with file_logging(snakemake.log[0]): make_sample_tab(sample_file=snakemake.input.tab, sample_file_out=snakemake.output.tab, sample_dir=snakemake.input.sample_dir, **snakemake.params) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | import yaml from utils import file_logging from utils.sample_list import SampleList def make_pooling_list(sample_files, sample_file_out, info_file_out): # read sample files # TODO: indexes.tsv for demultiplexing sample_lists = [SampleList(f) for f in sample_files] # there should be only one layout present (paired or single) unique_layout = set(l.layout for l in sample_lists) assert len(unique_layout) == 1 layout = next(iter(unique_layout)) # get a map of sample -> read files sample_dicts = [dict(l.samples()) for l in sample_lists] unique_names = sorted(set(s for d in sample_dicts for s in d)) # write sample lists sample_list_out = SampleList(layout=layout) sample_pools = {} for sample in unique_names: # obtain a list of all run files for that sample run_files = [d[sample] for d in sample_dicts if sample in d] # sort the runs by path to have a consistent output run_files.sort() # group by R1/R2 run_files = list(zip(*run_files)) out_reads = [f"{sample}_R{i+1}.fastq.gz" for i in range(sample_list_out.n_reads)] sample_list_out.add(sample, out_reads) sample_pools[sample] = {f"R{i+1}": f for i, f in enumerate(run_files)} # write new sample list to files # TODO: we only list file names, not paths. This sample list is mostly needed # for the pipeline to work, but the paths are unimportant since already known with open(sample_file_out, "w") as f: sample_list_out.write(f) # write YAML file with all pooling info # note: sample keys are sorted (unless sort_keys=False), but the # but the order of read pooling is consistent (files are in a sorted list) with open(info_file_out, "w") as f: yaml.safe_dump(sample_pools, f) with file_logging(snakemake.log[0]): make_pooling_list(sample_files=snakemake.input.sample_files, sample_file_out=snakemake.output.sample_file, info_file_out=snakemake.output.yml) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | import os from os.path import abspath from multiprocessing.dummy import Pool # only threaded version necessary from subprocess import check_call import sys # import gzip # import shutil import yaml from utils import file_logging def do_pooling(data): source_paths, target_path = data # make sure that all file names are the same # TODO: this assertion fails with 'single.rev' layout # assert len(set(os.path.basename(p) for p in list(source_paths) + [target_path])) == 1 try: if len(source_paths[0]) == 1: # we only need to link assert len(source_paths) == 1 os.symlink(abspath(source_paths[0]), abspath(target_path)) else: check_call(["zcat {} | gzip -c > {}".format(" ".join(source_paths), target_path)], shell=True) # with gzip.open(target_path, 'w') as out: # for path in source_paths: # print("source", path, "to", target_path) # with gzip.open(path) as f: # shutil.copyfileobj(f, out) return data except Exception as e: return e def pool_raw(pooling_info, outdir, processes=1): os.makedirs(outdir, exist_ok=True) # list of input -> output paths with open(pooling_info) as f: pool_files = yaml.safe_load(f) # send these lists to the workers p = Pool(processes) # Create a flat sequence of read files to combine # and the corresponding output file name args = ((f, os.path.join(outdir, f"{sample}_{read}.fastq.gz")) for sample, sample_files in pool_files.items() for read, f in sample_files.items()) for res in p.imap_unordered(do_pooling, args): if isinstance(res, Exception): raise res source_paths, target_path = res print("{} > {}".format(" ".join(source_paths), target_path), file=sys.stderr) with file_logging(snakemake.log[0]): pool_raw(pooling_info=snakemake.input.yml, outdir=snakemake.output.fq, processes=snakemake.threads) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | import yaml from seq_consensus import consensus from utils import file_logging __complements = bytes.maketrans( b'ATCGRYKMBVDHNSWatcgrykmbvdhnsw -', b'TAGCYRMKVBHDNSWtagcyrmkvbhdnsw -' ) def reverse_complement(seq): return seq.translate(__complements)[::-1] def parse_combine_primers(primers_by_marker): """ Combines primers from different markers (in a nested dictionary structure) together (still grouped by forward/reverse), making sure that if the same primer name occurs in different markers, the primer sequences are the same (even though this may be a rare case). """ out = {"forward": {}, "reverse": {}} for _marker, primers_by_dir in primers_by_marker.items(): for dir_, primers in primers_by_dir.items(): for name, seqs in primers.items(): # remove anchors and split comma-delimited oligo lists anchor = False if seqs.startswith('^'): anchor = True seqs = seqs[1:] seqs = [s.strip() for s in seqs.split(',')] # check if seq already present if name in out[dir_]: assert (seqs == out[dir_][name]['seq']), \ "Sequences of primer {} differ between different markers".format(name) else: out[dir_][name] = {'seq': seqs, 'anchor': anchor} return out def oligo_consensus(seqs, threshold=0.8): seqs = list(seqs) # we allow for different lengths by adding 3' terminal gaps # (oligos are expected to be aligned to 5', global alignment is a requirement # for many clustering algorithms) maxlen = max(len(s) for s in seqs) return consensus((s + '.' * (maxlen - len(s)) for s in seqs), threshold=threshold, free_endgaps=True, end_gap_char='.') def process_primers(primers_by_marker, single_method='consensus:50'): out = parse_combine_primers(primers_by_marker) # reverse complement of reverse sequence # (often needed for trimming reverse primer of merged amplicon) for dir_, seq_dict in list(out.items()): out[dir_ + '_rev'] = { name: { 'seq': [reverse_complement(s) for s in d['seq']], 'anchor': d['anchor'] } for name, d in seq_dict.items() } # consensus # TODO: warn/fail if sequences are too divergent if single_method.startswith('consensus:'): t = float(single_method.replace('consensus:', '').strip()) / 100. fn = lambda seqs: oligo_consensus(seqs, threshold=t) if len(seqs) > 1 else seqs[0] else: raise Exception('Currently no method other than "consensus:<threshold>" available to obtain a single oligo') for key, seq_dict in list(out.items()): out[key + '_consensus'] = { name: { 'seq': fn(d['seq']), 'anchor': d['anchor'] } for name, d in seq_dict.items() } return out with file_logging(snakemake.log[0]): out = process_primers(snakemake.params.primers) with open(snakemake.output.yaml, 'w') as f: yaml.safe_dump(out, f, sort_keys=False) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | import re import sys from subprocess import check_call from utils import file_logging def denoise_paired( input, layout, seq_out, tab_out, stats_out, trunc_qual=None, trunc_len=None, max_err=None, # the following settings have defaults (can be undefined without error) merge_maxdiffs=0, chimera_method=None, pooling_method=None, threads=1, **unused ): cmd = ["qiime", "dada2"] # https://docs.qiime2.org/2023.7/plugins/available/dada2/denoise-paired/ if layout == "paired": cmd += [ "denoise-paired", "--p-trunc-len-f", "{:.0f}".format(trunc_len["fwd"]), "--p-trunc-len-r", "{:.0f}".format(trunc_len["rev"]), "--p-max-ee-f", str(max_err["fwd"]), "--p-max-ee-r", str(max_err["rev"]), ] else: dir_ = "fwd" if layout.endswith(".rev"): layout = re.sub(r"\.rev$", "", layout) dir_ = "rev" assert layout == "single" # https://docs.qiime2.org/2023.7/plugins/available/dada2/denoise-single cmd += [ "denoise-single", "--p-trunc-len", "{:.0f}".format(trunc_len[dir_]), "--p-max-ee", str(max_err[dir_]), ] # common options if chimera_method is None: chimera_method = "consensus" elif chimera_method == "per-sample": chimera_method = "none" if pooling_method is None: pooling_method = "independent" elif pooling_method == "pooled": print('Warning: "pooling_method = pooled" not possible for QIIME2', file=sys.stderr) cmd += [ "--i-demultiplexed-seqs", input, "--p-trunc-q", "{:.0f}".format(trunc_qual), "--p-n-reads-learn", "1000000", # TODO: not configurable "--p-chimera-method", chimera_method, "--verbose", "--p-n-threads", str(threads), "--o-representative-sequences", seq_out, "--o-table", tab_out, "--o-denoising-stats", stats_out ] if merge_maxdiffs > 0: if merge_maxdiffs > 1: print("WARNING: 'merge_maxdiffs' is > 1, but QIIME only allows up to one difference.", file=sys.stderr) cmd.append("--p-allow-one-off") print("Call: " + " ".join(cmd), file=sys.stderr) check_call(cmd, stdout=sys.stdout, stderr=sys.stderr) with file_logging(snakemake.log[0]): denoise_paired( snakemake.input.trim, layout=snakemake.wildcards.layout, seq_out=snakemake.output.asvs, tab_out=snakemake.output.tab, stats_out=snakemake.output.stats, threads=snakemake.threads, **snakemake.params.par ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | import sys import yaml from subprocess import check_call from utils import file_logging def trim_paired(primer_file, input, output, f_primer, r_primer, err_rate=None, threads=None, min_length=None): with open(primer_file) as f: primers = yaml.safe_load(f) anchored = lambda s: '^' + s['seq'] if s['anchor'] else s['seq'] f_seq = anchored(primers['forward_consensus'][f_primer]) r_rev = primers['reverse_rev_consensus'][r_primer]['seq'] r_seq = anchored(primers['reverse_consensus'][r_primer]) f_rev = primers['forward_rev_consensus'][f_primer]['seq'] # see https://docs.qiime2.org/2023.7/plugins/available/cutadapt/trim-paired/ cmd = [ "qiime", "cutadapt", "trim-paired", "--i-demultiplexed-sequences", input, "--p-cores", str(threads), "--p-adapter-f", f"{f_seq}...{r_rev};optional", "--p-adapter-r", f"{r_seq}...{f_rev};optional", "--p-error-rate", str(err_rate), "--p-overlap", "10", # TODO: configure "--p-minimum-length", str(min_length), "--p-discard-untrimmed", "--verbose", "--o-trimmed-sequences", output ] print("Call: " + " ".join(cmd), file=sys.stderr) check_call(cmd, stdout=sys.stdout, stderr=sys.stderr) with file_logging(snakemake.log[0]): trim_paired( snakemake.input.yaml, snakemake.input.demux, snakemake.output.qza, f_primer=snakemake.wildcards.f_primer, r_primer=snakemake.wildcards.r_primer, err_rate=snakemake.params.err_rate, min_length=snakemake.params.min_length, threads=snakemake.threads, ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import sys import yaml from subprocess import check_call from utils import file_logging def trim_paired(primer_file, input, output, f_primer, r_primer, rev_read=False, err_rate=None, threads=None, min_length=None): with open(primer_file) as f: primers = yaml.safe_load(f) anchored = lambda s: '^' + s['seq'] if s['anchor'] else s['seq'] if not rev_read: f_seq = anchored(primers['forward_consensus'][f_primer]) r_rev = primers['reverse_rev_consensus'][r_primer]['seq'] else: f_seq = anchored(primers['reverse_consensus'][r_primer]) r_rev = primers['forward_rev_consensus'][f_primer]['seq'] # see https://docs.qiime2.org/2023.7/plugins/available/cutadapt/trim-single/ cmd = [ "qiime", "cutadapt", "trim-single", "--i-demultiplexed-sequences", input, "--p-cores", str(threads), "--p-adapter", f"{f_seq}...{r_rev};optional", "--p-error-rate", str(err_rate), "--p-overlap", "10", # TODO: configure "--p-minimum-length", str(min_length), "--p-discard-untrimmed", "--verbose", "--o-trimmed-sequences", output ] print("Call: " + " ".join(cmd), file=sys.stderr) check_call(cmd, stdout=sys.stdout, stderr=sys.stderr) with file_logging(snakemake.log[0]): trim_paired( snakemake.input.yaml, snakemake.input.demux, snakemake.output.qza, f_primer=snakemake.wildcards.f_primer, r_primer=snakemake.wildcards.r_primer, err_rate=snakemake.params.err_rate, min_length=snakemake.params.min_length, rev_read=snakemake.params.rev_read, threads=snakemake.threads, ) |
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | import re import yaml from tax_helpers import zstd_fasta_reader, zstd_fasta_writer, fail_on_invalid from utils import file_logging def convert_taxdb_utax(input, param_file, output): # we don't allow any parameters with open(param_file) as f: params = yaml.safe_load(f) fail_on_invalid(params) reserved_chars = re.compile("[ ,:]") rank_pat = re.compile('\s*?([a-z]+)__(.*?)\s*') with zstd_fasta_reader(input) as records, zstd_fasta_writer(output) as out: for rec in records: lineage = rec.description # some characters have a special meaning in the UTAX format, convert to '_' # (including spaces in names) lineage = reserved_chars.sub('_', lineage) # split into components try: lineage = [rank_pat.fullmatch(r).groups() for r in lineage.split(';')] except AttributeError: raise Exception("Not a valid QIIME-formatted lineage: {}".format(lineage)) # remove empty ranks, since the UTAX format does not require every rank # in every lineage lineage_out = [(rank, name) for rank, name in lineage if name.strip() != ""] # if lineage != lineage_out: # print("before", lineage) # print("filter", lineage_out) # final format rec.id = "{id};tax={tax};".format( id=rec.id, tax=",".join(rank + ':' + name for rank, name in lineage_out) ) rec.description = "" out.write_record(rec) with file_logging(snakemake.log[0]): convert_taxdb_utax(snakemake.input.db, snakemake.input.params, snakemake.output.db) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 | import os import re import sys from tempfile import mkstemp import unittest import yaml from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from tax_helpers import * from utils import file_logging class Filters(object): """ Class handling the filters """ # QIIME-style rank prefixes _prefix_re = re.compile('([a-z]+)__') # for ambiguous filtering DNA = set(b"ACGT") Ambigs = set(b"MRWSYKVHDBN") # list of filter keywords keywords = ['defined_rank', 'min_len', 'max_len', 'max_n', 'max_ambig'] def __init__( self, defined_rank = None, min_len = None, max_len = None, max_n = None, max_ambig = None ): self.defined_rank = defined_rank self.min_len = min_len self.max_len = max_len self.max_n = max_n self.max_ambig = max_ambig self.initialized = False def init(self, lineage): """ Set up filters given the first record """ # any ambiguity filter present? self._ambig_filter = self.max_n is not None or self.max_ambig is not None self._no_ambigs = self._ambig_filter and self.max_n in (None, 0) and self.max_ambig in (None, 0) # validate defined_rank self.defined_rank_prefix = None if self.defined_rank is not None: try: ranks_short = [self._prefix_re.match(r.strip()).group(1) for r in lineage.split(';')] except AttributeError: raise Exception("Not a valid QIIME-formatted lineage: {}".format(lineage)) assert self.defined_rank in RANK_TRANS, "Unknown rank name for filtering: {}".format(self.defined_rank) rank_short = RANK_TRANS[self.defined_rank] self.defined_rank_prefix = rank_short + '__' if rank_short not in ranks_short: rev_dict = dict(zip(RANK_TRANS.values(), RANK_TRANS.keys())) raise Exception(( "Minimum unambiguous rank name not found in lineage: {}. " "Even though this is a valid rank, it should actually exist in the database." "Available ranks: {}").format( self.defined_rank, ", ".join(rev_dict[r] for r in ranks_short))) # ensure ranks are in order possible_ranks = list(RANK_TRANS.values()) try: rank_i = [possible_ranks.index(r) for r in ranks_short] except IndexError: raise Exception("Invalid rank found among the ranks of the first lineage: {}".format(", ".join(ranks_short))) assert rank_i == sorted(rank_i), "Ranks in the first lineage are not in the correct order" def check(self, id: str, lineage: str, seq: Seq) -> bool: # initialize if necessary if not self.initialized: self.init(lineage) self.initialized = True # ambiguous rank filter if self.defined_rank_prefix is not None: lineage = [l.strip() for l in lineage.split(";")] try: # if the empty prefix is found, this means that the rank # is not defined. undef_i = lineage.index(self.defined_rank_prefix) # This hit could be an intermediate "blank" rank, therefore we have to further # check if all lower ranks are empty. This procedure should still be faster # than iterating backwards on every lineage, intermediate blank # ranks are usually not so frequent. if all(self._prefix_re.fullmatch(r) is not None for r in lineage[(undef_i+1):]): return False except ValueError: pass # length filters if self.min_len is not None and len(seq) < self.min_len: return False if self.max_len is not None and len(seq) > self.max_len: return False # sequence ambiguity filters if self._ambig_filter: # first, check if there are any ambiguities at all seq_bytes = bytes(seq) bases = set(seq_bytes) ambigs = bases.difference(self.DNA) # validate invalid = ambigs.difference(self.Ambigs) assert len(invalid) == 0,\ "Invalid characters found in sequence: {}".format( ", ".join(chr(b) for b in sorted(invalid)) ) if len(ambigs) > 0: # found some ambigs: in case we don't allow any, we can return if self._no_ambigs: return False # otherwise, we count DNA bases to obtain the number of ambigs # This solution seems to be fastest (faster than using collections.Counter) dna_count = sum(seq_bytes.count(b) for b in self.DNA) ambig_count = len(seq_bytes) - dna_count if self.max_ambig is not None and ambig_count > self.max_ambig: return False if self.max_n is not None and seq_bytes.count(b"N") > self.max_n: return False # if none of the checks failed, the sequence may be included return True def filter_taxdb(input, filtered_out, cfg_file): with open(cfg_file) as f: cfg = yaml.safe_load(f) invalid = [k for k in cfg if not k in Filters.keywords] assert len(invalid) == 0, \ "Invalid taxonomy database filter keyword(s) found: " + " ".join(invalid) if len(cfg) > 0: # there seems to be something to filter with zstd_fasta_reader(input) as records, zstd_fasta_writer(filtered_out) as out: filters = Filters(**cfg) written = 0 i = -1 for i, rec in enumerate(records): if filters.check(rec.id, rec.description, rec.seq) is True: out.write_record(rec) written += 1 assert i is not None, "No records found" print(f"{written} of {i+1} records written to output", file=sys.stderr) else: # nothing to do, simply link to the output file if os.path.exists(filtered_out): os.remove(filtered_out) print(f"No filtering to be done, linking {input} to {filtered_out}", file=sys.stderr) os.symlink(os.path.relpath(input, os.path.dirname(filtered_out)), filtered_out) ### Tests #### def _do_filter(records, **cfg): infile = mkstemp()[1] outfile = mkstemp()[1] cfg_file = mkstemp()[1] with open(cfg_file, "w") as f: yaml.safe_dump(cfg, f) with zstd_fasta_writer(infile) as w: for i, r in enumerate(records): w.write_record(SeqRecord(Seq(r[1]), id=str(i), description=r[0])) filter_taxdb(infile, outfile, cfg_file) with zstd_fasta_reader(outfile) as recs: out = [(rec.description, str(rec.seq)) for rec in recs] os.remove(infile) os.remove(outfile) os.remove(cfg_file) return out class Tester(unittest.TestCase): tax_records = [ ("k__Kingdom;f__Family;g__Genus;s__Genus species", "TGTATAGCAATAG"), ("k__Kingdom;f__Family;g__;s__", "TGTATAGCAATAG"), ("k__Kingdom;f__;g__;s__", "TGTATAGCAATAG"), # lineages with intermediate blanks ("k__;f__Family;g__;s__", "A"), ("k__;f__;g__Genus;s__Genus_species", "A"), ] tax_with_blanks = [ ] invalid_tax = [ ("f__Family;k__;g__;s__", ""), ("", ""), ("k_", ""), ("k__;s__;", ""), ] ambig_records = [ ("k__Kingdom;s__Species", "TGTATAGCAATAG"), ("k__Kingdom;s__Species", "NNAGCAA"), # length = 7 ("k__Kingdom;s__Species", "BRYAGCAATAG"), ("k__Kingdom;s__Species", "RNNAGCAAT"), # length = 9 ] invalid_seq = [ ("k__", "ATGZX") ] def test_no_filter(self): rec = self.tax_records + self.invalid_tax + self.ambig_records + self.invalid_seq flt = _do_filter(rec) self.assertEqual(flt, rec) def test_rank_filter(self): flt = _do_filter(self.tax_records, defined_rank="species") self.assertEqual(flt, [self.tax_records[i] for i in [0, 4]]) flt = _do_filter(self.tax_records, defined_rank="genus") self.assertEqual(flt, [self.tax_records[i] for i in [0, 4]]) flt = _do_filter(self.tax_records, defined_rank="family") self.assertEqual(flt, [self.tax_records[i] for i in [0, 1, 3, 4]]) with self.assertRaises(Exception) as ctx: _do_filter(self.tax_records, defined_rank="subfamily") self.assertTrue('Minimum unambiguous rank name not found in lineage: subfamily.' in str(ctx.exception)) with self.assertRaises(Exception) as ctx: _do_filter(self.tax_records, defined_rank="noname") self.assertTrue('Unknown rank name for filtering: noname' in str(ctx.exception)) def test_ambig(self): flt = _do_filter(self.ambig_records, max_ambig=3) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 1, 2, 3]]) flt = _do_filter(self.ambig_records, max_ambig=2) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 1]]) flt = _do_filter(self.ambig_records, max_ambig=2, max_n=1) self.assertEqual(flt, [self.ambig_records[i] for i in [0]]) flt = _do_filter(self.ambig_records, max_n=2) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 1, 2, 3]]) flt = _do_filter(self.ambig_records, max_n=1) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 2]]) flt = _do_filter(self.ambig_records, max_n=0) self.assertEqual(flt, [self.ambig_records[i] for i in [0]]) def test_lenfilter(self): flt = _do_filter(self.ambig_records, min_len=7) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 1, 2, 3]]) flt = _do_filter(self.ambig_records, min_len=8) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 2, 3]]) flt = _do_filter(self.ambig_records, min_len=10) self.assertEqual(flt, [self.ambig_records[i] for i in [0, 2]]) flt = _do_filter(self.ambig_records, min_len=8, max_len=9) self.assertEqual(flt, [self.ambig_records[i] for i in [3]]) def test_comb(self): rec = self.tax_records + self.ambig_records flt = _do_filter(rec, defined_rank="genus", max_ambig=3, max_n=0) self.assertEqual(flt, [rec[i] for i in [0, 4, 5, 7]]) flt = _do_filter(rec, defined_rank="kingdom", max_ambig=2, max_n=1) self.assertEqual(flt, [rec[i] for i in [0, 1, 2, 3, 4, 5]]) flt = _do_filter(rec, defined_rank="species", max_ambig=2, min_len=8) self.assertEqual(flt, [rec[i] for i in [0, 5]]) def test_invalid_tax(self): # errors are only checked with rank filter with self.assertRaises(Exception) as ctx: _do_filter([self.invalid_tax[0]], defined_rank="kingdom") self.assertTrue('Ranks in the first lineage are not in the correct order' in str(ctx.exception)) for rec in self.invalid_tax[1:]: with self.assertRaises(Exception) as ctx: _do_filter([rec], defined_rank="kingdom") self.assertTrue('Not a valid QIIME-formatted lineage' in str(ctx.exception)) def test_invalid_seq(self): # errors are only checked with ambig. filter with self.assertRaises(Exception) as ctx: _do_filter(self.invalid_seq, max_ambig=1) self.assertTrue('Invalid characters found in sequence: X, Z' in str(ctx.exception)) #### Entry point #### if __name__ == '__main__': try: with file_logging(snakemake.log[0]): filter_taxdb(snakemake.input.all, snakemake.output.filtered, snakemake.input.cfg) except NameError: unittest.main() |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | RANK_TRANS <- c( 'domain' = 'd', 'superkingdom' = 'sk', 'kingdom' = 'k', 'subkingdom' = 'ks', 'superphylum' = 'sp', 'phylum' = 'p', 'subphylum' = 'ps', 'infraphylum' = 'pi', 'superclass' = 'sc', 'class' = 'c', 'subclass' = 'cs', 'infraclass' = 'ci', 'cohort' = 'co', 'superorder' = 'so', 'order' = 'o', 'suborder' = 'os', 'infraorder' = 'oi', 'parvorder' = 'op', 'superfamily' = 'sf', 'family' = 'f', 'subfamily' = 'fs', 'tribe' = 't', 'subtribe' = 'ts', 'genus' = 'g', 'subgenus' = 'gs', 'species group' = 'ss', 'species subgroup' = 'sgs', 'species' = 's', 'subspecies' = 'ssb', 'forma' = 'for' ) make_lineage <- function(ids, total_ranks) { artificial_ranks = NULL if (is.null(ids[[1]]$rank)) { artificial_ranks <- paste0("r", 0:(total_ranks-1)) } sapply(seq_along(ids), function(i) { x = ids[[i]] n_ranks = length(x$taxon) lineage = x$taxon[2:n_ranks] if (!is.null(artificial_ranks)) { ranks_short = artificial_ranks[2:n_ranks] } else { ranks = x$rank[2:n_ranks] ranks_short <- unname(RANK_TRANS[ranks]) is_na <- is.na(ranks_short) ranks_short[is_na] <- ranks[is_na] } setNames( x$confidence[n_ranks], paste(paste(ranks_short, lineage, sep="__"), collapse=";") ) }) } get_default <- function(list, item, default=NULL) { value <- list[[item]] if (is.null(value)) default else value } idtaxa_assign <- function(seqfile, db, tax_out, threshold, processors=1, rand_seed=NULL, ...) { seqs <- Biostrings::readDNAStringSet(seqfile) # TODO: 'trainingSet' name is expected, cannot vary load(db) cat("\nClassifying", length(seqs), "sequences from", seqfile, "against", length(trainingSet$taxonomy), "reference sequences", "at confidence >=", threshold, "using", processors, "processors.\n", file=stderr()) if (!is.null(rand_seed)) { cat("Setting random seed:", rand_seed, "\n", file=stderr()) set.seed(rand_seed) } ids <- DECIPHER::IdTaxa( seqs, trainingSet, strand="both", ... ) l = make_lineage(ids, total_ranks=max(trainingSet$levels)) out = data.frame( `Feature ID`=names(ids), Taxon=names(l), Confidence=round(unname(l), 1), check.names=F, stringsAsFactors=F ) write.table(out, gzfile(tax_out), sep="\t", na="", quote=F, row.names=F) } log = file(snakemake@log[[1]]) sink(log, type="output") sink(log, type="message") params=snakemake@params$par idtaxa_assign( snakemake@input$seq, snakemake@input$db, snakemake@output$tax, processors=snakemake@threads, threshold=params$confidence, # 60 (cautious) or 50 (sensible) bootstraps=get_default(params, "bootstraps", 100), rand_seed=get_default(params, "rand_seed", NULL) ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | library(stringr) library(yaml) get_default <- function(list, item, default=NULL) { value <- list[[item]] if (is.null(value)) default else value } make_lineages <- function(headers) { desc <- str_split_fixed(headers, " ", 2)[,2] nranks <- max(str_count(desc, ";")) + 1 tax <- str_split_fixed(desc, ";", nranks) stopifnot(!any(is.na(tax))) # no rank should be undefined in input # separate ranks from names and ensure that the ranks are the same across # columns rank_names <- apply(tax, 2, function(x) str_split_fixed(x, "__", 2), simplify=F) stopifnot(sapply(rank_names, function(x) length(unique(x[,1]))) == 1) ranks <- sapply(rank_names, '[', 1) taxa_out <- sapply(rank_names, function(x) x[,2]) # replace empty ranks with NAs: since internal undefined ranks should not be # empty (rank propagated earlier in pipeline), # we should not obtain any NAs in output lineages taxa_out[taxa_out == ""] <- NA # add root taxa_out <- cbind("Root", taxa_out) ranks <- c("rootrank", ranks) # get defined rank def_rank <- apply(taxa_out, 1, function(x) max(which(!is.na(x)))) # assemble lineages lineages <- sapply(seq_along(def_rank), function(i) { d <- def_rank[i] x <- taxa_out[i,] paste(x[1:d], collapse="; ") }) # ensure that there were no NAs # TODO: there can in theory be false positives here stopifnot(!grepl("; NA(;|$)", lineages, perl=T)) # return all the info list( ranks = ranks, def_rank = def_rank, taxa = lineages ) } idtaxa_train <- function(taxdb_file, trained_out, processors=1, max_group_size=NULL, max_iterations=3, allow_group_removal=F, rand_seed=NULL) { # read data and make sure it's all in the same orientation cat("Reading input sequences from", "taxdb_file", "...\n", file=stderr()) seqs <- Biostrings::readDNAStringSet(taxdb_file) # subsample: only for testing!! # seqs <- seqs[sample(length(seqs), 2000)] cat("Running 'OrientNucleotides'...\n", file=stderr()) seqs <- DECIPHER::OrientNucleotides(seqs, processors=processors) # obtain lineages formatted as expected by IDTAXA seq_headers <- names(seqs) tax <- make_lineages(seq_headers) names(seqs) <- tax$taxa # vector for removed sequences (used at several places) remove <- logical(length(seqs)) # taxa groups groups <- names(seqs) group_counts <- table(groups) u_groups <- names(group_counts) if (!is.null(rand_seed)) { cat("Setting random seed for pruning + LearnTaxa:", rand_seed, "\n", file=stderr()) set.seed(rand_seed) } # prune the training set if (!is.null(max_group_size)) { for (i in which(group_counts > max_group_size)) { index <- which(groups == u_groups[i]) keep <- sample(length(index), max_group_size) remove[index[-keep]] <- TRUE } rm_freqs <- sort(table(groups[remove]), decreasing=T) cat(sprintf( "Removed %d of %d sequences from large groups with > %d sequences:\n%s\n\n", sum(remove), length(groups), max_group_size, paste(paste(names(rm_freqs), rm_freqs, sep=': '), collapse='\n') ), file=stderr()) } # iterative training # (this code is taken unmodified from the tutorial # http://www2.decipher.codes/Documentation/Documentation-ClassifySequences.html) probSeqsPrev <- integer() # suspected problem sequences from prior iteration for (i in seq_len(max_iterations)) { cat("Training iteration: ", i, "\n", sep="") # train the classifier trainingSet <- DECIPHER::LearnTaxa( seqs[!remove], names(seqs)[!remove] ) # look for problem sequences probSeqs <- trainingSet$problemSequences$Index if (length(probSeqs) == 0) { cat("No problem sequences remaining.\n") break } else if (length(probSeqs) == length(probSeqsPrev) && all(probSeqsPrev==probSeqs)) { cat("Iterations converged.\n") break } cat("Problematic sequences in iteration", i, ":\n", paste(seq_headers[!remove][probSeqs], collapse="\n"), "\n\n", file=stderr()) if (i == max_iterations) break probSeqsPrev <- probSeqs # remove any problem sequences index <- which(!remove)[probSeqs] remove[index] <- TRUE # remove all problem sequences if (!allow_group_removal) { # replace any removed groups missing <- !(u_groups %in% groups[!remove]) missing <- u_groups[missing] if (length(missing) > 0) { index <- index[groups[index] %in% missing] remove[index] <- FALSE # don't remove } } } # # info plot # pdf(info_out, width=9, height=7.4) # plot(trainingSet) # dev.off() # we did not supply the 'rank' argument, instead we just set the ranks manually here trainingSet$ranks <- tax$ranks[trainingSet$levels] # finally save the databaes save(trainingSet, file=trained_out) } log = file(snakemake@log[[1]]) sink(log, type="output") sink(log, type="message") params <- read_yaml(snakemake@input$params) idtaxa_train( snakemake@input$seq, snakemake@output$db, #processors=snakemake@threads, max_group_size=get_default(params, "max_group_size", NULL), max_iterations=get_default(params, "max_iterations", 3), allow_group_removal=get_default(params, "allow_group_removal", F), rand_seed=get_default(params, "rand_seed", NULL) ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import os import shutil import yaml from tax_helpers import bin_file, zstd_bin_writer, fail_on_invalid from utils import file_logging def obtain_formatted(param_file, outfile): outdir = os.path.dirname(outfile) if not os.path.exists(outdir): os.makedirs(outdir) with open(param_file) as f: par = yaml.safe_load(f) fmt = par.pop("format") try: file = par.pop("file") except KeyError: raise Exception(f"The 'file' parameter must be present with databases of type '{fmt}'") fail_on_invalid(par) with bin_file(file, gz=False) as f, zstd_bin_writer(outfile) as o: shutil.copyfileobj(f, o) with file_logging(snakemake.log[0]): obtain_formatted( snakemake.input.params, snakemake.output.db ) |
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 | from copy import copy import csv from io import TextIOWrapper from itertools import zip_longest import json import os from os.path import join, dirname, basename import re import sys from tempfile import mkstemp import unittest from urllib.request import urlopen import yaml from tax_helpers import * from utils import file_logging def report_problem(permissive, assertion, text): """ Warns or raises AssertionError upon problem depending on on permissive setting """ if not assertion: if permissive: print(text, file=sys.stderr) else: assert assertion, text class IdMismatch(Exception): pass def read_aligned(fa_handle, tax_handle, full_desc=False): with fasta_reader(fa_handle, full_desc=full_desc) as seqs: lineages = csv.reader(tax_handle, delimiter="\t") for rec, tax in zip_longest(seqs, lineages): if rec is None or tax is None or rec.id != tax[0]: raise IdMismatch() yield rec, tax[1] def read_inconsistent(fa_handle, tax_handle, full_desc=False): lineages = {l[0]: l[1] for l in csv.reader(tax_handle, delimiter="\t") if l} with fasta_reader(fa_handle, full_desc=full_desc) as seqs: for rec in seqs: try: yield rec, lineages[rec.id] except IndexError: raise Exception("Sequence ID '{}' not found in taxonomy".format(rec.id)) def report_count(recs): i = -1 for i, r in enumerate(recs): yield r print("Obtained {} records".format(i + 1), file=sys.stderr) class AmbigMatcher(object): """ Helper class for matching ambiguous keywords/patterns (custom supplied) in files from unknown sources. """ def __init__(self, patterns, sp_patterns=None): self.patterns = self.parse_config(patterns) self._sp_patterns_raw = sp_patterns self.sp_rank_i = None self.initialized = False def initialize(self, lineage): ranks = [r.strip().split("__", 1)[0] for r in lineage] self.empty_lineage = [rank + "__" for rank in ranks] self.sp_patterns = None if self._sp_patterns_raw is not None: self.sp_patterns = self.parse_config(self._sp_patterns_raw) try: self.sp_rank_i = ranks.index(RANK_TRANS['species']) except ValueError: print("Species ambiguity patterns specified, but no species in lineages", file=sys.stderr) self.has_patterns = len(self.patterns) > 0 or self.sp_rank_i is not None @staticmethod def parse_config(cfg): assert isinstance(cfg, list) fixed = [] regex = [] for patt in cfg: if isinstance(patt, dict): assert len(patt) == 1 and next(iter(patt.keys())) == 'regex',\ "Invalid ambiguity pattern supplied" regex.append(re.compile(next(iter(patt.values())))) else: fixed.append(patt) return fixed, regex def is_ambig(self, rank_name, fixed, regex): # TODO: should the rank be removed from the name? only relevant for regex patterns matching '^' for pat in fixed: if pat in rank_name: return True for pat in regex: if pat.search(rank_name) is not None: return True return False def clean(self, lineage): # we need the first lineage to know which ranks to expect # and being able to fully initialize if not self.initialized: self.initialize(lineage) self.initialized = True # clean lineage if self.has_patterns: # regular patterns assert len(lineage) == len(self.empty_lineage), "lineage length mismatch" for i in range(len(lineage)): empty = self.empty_lineage[i] rank_name = lineage[i] assert rank_name.startswith(empty), "rank mismatch" if self.is_ambig(rank_name, *self.patterns): lineage[i] = empty # species patterns if self.sp_rank_i is not None \ and self.is_ambig(lineage[self.sp_rank_i], *self.sp_patterns): lineage[self.sp_rank_i] = self.empty_lineage[self.sp_rank_i] return lineage def obtain_qiime_qza(target, sequences=None, taxonomy=None, ambig=[], ambig_sp=None, **other): """ This simple implementation opens QIIME artifacts without requiring the QIIME2 software (which would have to be loaded only for that specific method). see https://dev.qiime2.org/latest/storing-data/archive """ fail_on_invalid(other) @contextmanager def _open_qza(path, type, filename, encoding="ascii", errors="replace"): with archive(path, "zip") as a: uuid = os.path.split(a.namelist()[0])[0] with a.open(uuid + '/metadata.yaml') as f, TextIOWrapper(f, encoding="ascii") as f: meta = yaml.safe_load(f) f = meta.get("format") assert f == type, "Unexpected format for {}: {}".format(path, f) assert meta["uuid"] == uuid with a.open("{}/data/{}".format(uuid, filename)) as f: yield TextIOWrapper(f, encoding=encoding, errors=errors) # usually, sequences and taxonomy should be aligned, but # we fall back to unordered parsing if the other fails with local_file(sequences) as seqfile, local_file(taxonomy) as taxfile: ambig_matcher = AmbigMatcher(ambig, ambig_sp) for i, read_fn in enumerate([read_aligned, read_inconsistent]): with _open_qza(seqfile, "DNASequencesDirectoryFormat", "dna-sequences.fasta") as seqs,\ _open_qza(taxfile, "TSVTaxonomyDirectoryFormat", "taxonomy.tsv") as tax: try: with zstd_fasta_writer(target) as out: for rec, lineage in report_count(read_fn(seqs, tax)): lineage = [r.strip() for r in lineage.split(";")] ambig_matcher.clean(lineage) rec.description = ";".join(lineage) out.write_record(rec) except IdMismatch: if i == 0: print("Sequences and taxonomy not aligned, retrying using taxonomy dictionary...", file=sys.stderr) else: raise Exception("Sequence ID mismatch") def obtain_qiime(target, file=None, ambig=[], ambig_sp=None, **other): """ Converter for the flat-file FASTA format with QIIME-style lineages in the header. This essentially only cleans the lineages (if any ambiguity keywords were supplied) """ fail_on_invalid(other) with zstd_fasta_writer(target) as out, fasta_reader(file) as recs: filter_ambig = ambig or ambig_sp ambig_matcher = AmbigMatcher(ambig, ambig_sp) for rec in report_count(recs): if filter_ambig: lineage = [r.strip() for r in rec.description.split(";")] ambig_matcher.clean(lineage) rec.description = ";".join(lineage) # now we can collapse and write the output out.write_record(rec) def obtain_utax(target, file=None, ambig=[], ambig_sp=None, ignore_problems=False, **other): """ UTAX format converter Since the UTAX format allows for missing ranks in part of the lineages, we have to infer these first, which makes the code somewhat more complicated. """ fail_on_invalid(other) # helper functions def set_remove(s, key): try: s.remove(key) except KeyError: return False return True def split_rank(rank): s = rank.strip().split(':', 1) assert len(s) == 2, "Invalid rank: {}".format(rank) return s lineage_pat = re.compile(r"(.+?);tax=(.+?)\s*;?\s*") def get_id_lineage(lineage): try: return lineage_pat.fullmatch(lineage).groups() except AttributeError: raise Exception("Not an UTAX-formatted lineage: {}".format(lineage)) with local_file(file) as path: # obtain a list of ranks print("Looking for taxonomic ranks...", file=sys.stderr) with fasta_reader(path, full_desc=True) as recs: ranks = set(r.strip().split(':', 1)[0] for rec in recs for r in get_id_lineage(rec.description)[1].split(',')) ranks.discard('') assert len(ranks) > 0, "The UTAX lineages are empty" all_ranks = [r for r in RANK_TRANS.values() if set_remove(ranks, r)] assert len(ranks) == 0, \ ("Some ranks are not known and this method is not smart enough " "to know where to put them: {}").format(", ".join(ranks)) # now we can read the file again, converting the lineages print("Converting lineages...", file=sys.stderr) ambig_matcher = AmbigMatcher(ambig, ambig_sp) with zstd_fasta_writer(target) as out, fasta_reader(path, full_desc=True) as recs: reserved = re.compile(r"(__|;)") empty_lineage = [r + '__' for r in all_ranks] for rec in report_count(recs): rec.id, lineage = get_id_lineage(rec.description) # replace reserved chars lineage = reserved.sub('_', lineage) # fill in ranks lineage_out = copy(empty_lineage) rank, name = None, None rank_iter = iter(lineage.split(',')) item = next(rank_iter) rank, name = split_rank(item) for i, exp_rank in enumerate(all_ranks): if rank == exp_rank: # if rank matches -> insert it in the lineage and # proceed to next lineage_out[i] = '{}__{}'.format(rank, name) try: item = next(rank_iter) rank, name = split_rank(item) except StopIteration: rank, name = None, None report_problem(ignore_problems, rank is None, ( "The order of ranks was not as expected: {} " "(found {}:{})").format(rec.description, rank, name)) # then we clean ambiguous names (if configured so) ambig_matcher.clean(lineage_out) # now we can collapse and write the output rec.description = ";".join(lineage_out) out.write_record(rec) def obtain_gtdb(target, file=None, **other): fail_on_invalid(other) assert file is not None and isinstance(file, (str, list)),\ "'file' in GTDB config should be list or single string" if isinstance(file, str): file = [file] with zstd_fasta_writer(target) as out: desc_pat = re.compile(r" \[\w+=.+?\].*") for f in file: print(f"Obtaining {file}", file=sys.stderr) with archive(f, format="tar") as t: # parse the content and write to output f = t.getnames() assert len(f) == 1 with TextIOWrapper(t.extractfile(f[0]), "ascii", errors="replace") as fa,\ fasta_reader(fa) as seqs: for rec in report_count(seqs): # remove [...=...] annotations from end rec.description = desc_pat.sub('', rec.description) out.write_record(rec) def obtain_unite_otus(target, doi=None, file=None, date=None, threshold="dynamic", kind="regular", **other): fail_on_invalid(other) if file is None: # Query the UNITE API using the DOI in order to obtain the file URL assert doi is not None, "Reference databases of format 'unite' need a 'doi' or an 'url' specified" assert not doi.startswith("http"), "Please specify the UNITE DOI without preceding https://doi.org/" res = urlopen("https://api.plutof.ut.ee/v1/public/dois/?format=vnd.api%2Bjson&identifier=" + doi).read() d = json.loads(res) files = [(f["name"], f["url"]) for f in d["data"][0]["attributes"]["media"]] if date is not None: date = str(date) filtered = [f for f in files if date in f[0]] assert len(filtered) == 1, \ "UNITE file 'date' setting does not identify one file. Available: {}".format(", ".join(f for f, _ in files)) file = filtered[0][1] else: if len(files) > 1: print("WARNING: selected the last file in the list: {}. Available are: {}. Set 'date' to select another file.".format( files[-1][0], ", ".join(f for f, _ in files) ), file=sys.stderr) file = files[-1][1] # obtain the file print(f"Obtaining {file}", file=sys.stderr) with archive(file, format="tar") as t: # select the correct files if kind == "regular": dname = "" else: assert kind == "developer", "Invalid database 'kind' (set to 'regular' or 'developer')" dname = "developer" files = [f for f in t.getnames() if dirname(f) == dname] file_pat = re.compile(r"sh_[a-z]+_qiime_ver\d+_([a-z0-9]+)_.+?\.(fasta|txt)") matches = ((file_pat.fullmatch(basename(f)), f) for f in files) file_info = {m.groups(): f for m, f in matches if m is not None} threshold = str(threshold) try: fa = file_info[(threshold, "fasta")] tax = file_info[(threshold, "txt")] except KeyError: raise Exception(( "Correct UNITE FASTA/taxonomy files not found, check values for 'threshold' and 'kind'. " "If correct, there may be a bug to report. Available files are: {}".format(", ".join(files)))) print(f"Reading from {fa} and {tax}", file=sys.stderr) # precompile regex patterns to replace sp_pattern = re.compile(r"s__\w+?[_ ]sp\.?") unknown_pattern = re.compile(r"([a-z])__(\w+?_[a-z]{3}_Incertae_sedis|unidentified|)") # parse FASTA and add lineages with zstd_fasta_writer(target) as out: with TextIOWrapper(t.extractfile(tax), "ascii", errors="replace") as lineages, \ TextIOWrapper(t.extractfile(fa), "ascii", errors="replace") as seqs: for rec, lineage in report_count(read_inconsistent(seqs, lineages)): lineage = [n.strip() for n in lineage.split(";")] # make undefined ranks empty if sp_pattern.fullmatch(lineage[-1]) is not None: lineage[-1] = "s__" for i in range(len(lineage)): m = unknown_pattern.fullmatch(lineage[i]) if m is not None: lineage[i] = "{}__".format(m.group(1)) rec.description = ";".join(lineage) out.write_record(rec) def obtain_midori(target, prefix=None, version=None, marker=None, kind=None, include_ambig=None, remove_num=None, **other): fail_on_invalid(other) if prefix is None: assert version is not None and marker is not None and kind is not None, "Reference databases of format 'midori' need 'version', 'marker' and 'kind' defined, or alternatively an 'url_prefix'" assert kind in {'longest', 'uniq'}, "Reference databases of format 'midori' need kind=uniq or kind=longest" base_url = "https://www.reference-midori.info/forceDownload.php?fName=download/Databases/GenBank{}/".format(version) if include_ambig is True: prefix = base_url + "QIIME_sp/{}/MIDORI2_{}_NUC_SP_GB{}_{}_QIIME".format(kind, kind.upper(), version, marker) else: prefix = base_url + "QIIME/{}/MIDORI2_{}_NUC_GB{}_{}_QIIME".format(kind, kind.upper(), version, marker) # obtain the files print(f"Obtaining from: {prefix}", file=sys.stderr) seqfile = prefix + ".fasta.gz" taxfile = prefix + ".taxon.gz" # compile regex patterns num_pat = re.compile(r"_\d+(;|$)") undef_pat = { name[0]: re.compile(r"{}__{}_.+".format(name[0], name)) for name in ['kingdom', 'phylum', 'class', 'order', 'family', 'genus'] } # Non-filtered version including sp. and others defined only at higher ranks # -> we have to convert undefined names to empty strings. # The patterns follow information from https://doi.org/10.1002/edn3.303 and comparisons with # filtered files from Midori. Still, it was not possible to obtain the exact # same result, differences are however very small (see validation in test_data/taxonomy/midori). sp_pattern=r""" [_\s] ( ( cf\.|aff\.|sp\.|environment|undescribed|uncultured|complex|unclassified |nom\.|_nomen\s.*|_nom\.\s.*|nud\.|unidentif\.|indet\.|gen\.|nr\. |taxon\s\w+ ) [_\s\d] |(sp|cf)[\._][A-Z0-9] |sp\.$ ) """ sp_pattern = re.compile(sp_pattern, re.VERBOSE) with textfile(seqfile, gz=True) as fa, textfile(taxfile, gz=True) as tax: # then filter and write to output with zstd_fasta_writer(target) as out: for rec, lineage in report_count(read_aligned(fa, tax)): if remove_num is True: lineage = num_pat.sub(r"\1", lineage) lineage = lineage.split(";") # Set undefined names to empty strings # Set species names matching one of the above patterns to undefined # The species identified as undefined here vs. species filtered # from the QIIME_SP file (=QIIME version) do not match perfectly, # but almost (see validation code above) assert lineage[-1].startswith("s__") if sp_pattern.search(lineage[-1]) is not None: lineage[-1] = "s__" # remove _family_..., _phylum_..., etc. for i in range(len(lineage)-1): taxon = lineage[i] rank_char = taxon[0] if undef_pat[rank_char].fullmatch(taxon) is not None: lineage[i] = taxon[:3] if lineage[-2] == "g__" and lineage[-1] != "s__" and not lineage[-1].startswith("s__["): print(f"WARNING: species was not recognized as ambiguous, " "but the genus is: {lineage[-1]}. " "The species was set to undefined.", file=sys.stderr) lineage[-1] = "s__" rec.description = ";".join(lineage) out.write_record(rec) def obtain_taxdb(param_file, outfile): outdir = os.path.dirname(outfile) if not os.path.exists(outdir): os.makedirs(outdir) with open(param_file) as f: d = yaml.safe_load(f) try: fmt = d.pop("format") except: raise Exception("The database format ('format' keyword) must be defined for {}".format(d["name"])) if fmt in ("unite_otus", "midori", "gtdb", "utax", "qiime", "qiime_qza"): func = getattr(sys.modules[__name__], "obtain_" + fmt) func(target=outfile, **d) else: raise Exception("Unknown taxonomy database format: {}".format(fmt)) #### Tests #### def _obtain_fasta(expected_file=None, **cfg): outfile = mkstemp()[1] cfg_file = mkstemp()[1] with open(cfg_file, "w") as f: yaml.safe_dump(cfg, f) obtain_taxdb(cfg_file, outfile) expected = None if expected_file: with open(expected_file) as f: expected = f.read().strip('\n') with zstd_reader(outfile) as f: found = f.read().strip('\n') os.remove(outfile) os.remove(cfg_file) return expected, found class Tester(unittest.TestCase): data = join("test_data", "taxonomy") def test_unite(self): """ The file contains a small subset of sequences downloaded here: https://unite.ut.ee/repository.php (licensed CC BY-SA 4.0) """ unite = join(self.data, "unite") exp, found = _obtain_fasta( join(unite, "v97.fasta"), file=join(unite, "unite_test.tgz"), format="unite_otus", threshold=97 ) self.assertEqual(exp, found) unite = join(self.data, "unite") exp, found = _obtain_fasta( join(unite, "v97_dev.fasta"), file=join(unite, "unite_test.tgz"), format="unite_otus", threshold=97, kind="developer" ) self.assertEqual(exp, found) # code for checking download functionality: # exp, found = _obtain_fasta( # doi="10.15156/BIO/2483918", # format="unite_otus", # date="29.11.2022", # threshold=97 # ) def test_gtdb(self): """ The file contains a small subset of sequences downloaded here: https://data.gtdb.ecogenomic.org/releases (licensed CC BY-SA 4.0) """ path = join(self.data, "gtdb") exp, found = _obtain_fasta( join(path, "gtdb.fna"), file=join(path, "gtdb.tar.gz"), format="gtdb", ) self.assertEqual(exp, found) def test_midori(self): """ The files contain a small subset of sequences downloaded here: https://www.reference-midori.info """ path = join(self.data, "midori") exp, found = _obtain_fasta( join(path, "clean.fasta"), prefix=join(path, "MIDORI2_LONGEST_NUC_SP_GB256_lrRNA_QIIME"), format="midori", ) self.assertEqual(exp, found) exp, found = _obtain_fasta( join(path, "clean_nonum.fasta"), prefix=join(path, "MIDORI2_LONGEST_NUC_SP_GB256_lrRNA_QIIME"), format="midori", remove_num=True ) self.assertEqual(exp, found) # _obtain_fasta( # format="midori", # version=256, # marker="lrRNA", # kind="longest", # remove_num=True, # include_ambig=True # ) def test_utax(self): """ The file contains a small subset of sequences downloaded here: https://www.drive5.com/usearch/manual/sintax_downloads.html """ path = join(self.data, "utax") exp, found = _obtain_fasta( join(path, "qiime_formatted.fasta"), file=join(path, "rdp_16s_v18.fa.gz"), format="utax", ) self.assertEqual(exp, found) # _obtain_fasta( # file="https://www.drive5.com/sintax/silva_18s_v123.fa.gz", # format="utax", # ignore_problems=True, # ambig=["uncultured"] # ) def test_qiime_qza(self): """ The file contains a small subset of sequences downloaded here: https://docs.qiime2.org/2023.5/data-resources/#data-resources The QZA archives are stripped down and don't contain provenience/md5 hashes """ path = join(self.data, "qiime_qza") exp, found = _obtain_fasta( join(path, "qiime_flat.fasta"), sequences=join(path, "silva-138-99-seqs-515-806.qza"), taxonomy=join(path, "silva-138-99-tax-515-806.qza"), format="qiime_qza", ) self.assertEqual(exp, found) exp, found = _obtain_fasta( join(path, "qiime_flat.fasta"), sequences=join(path, "silva-138-99-seqs-515-806.qza"), taxonomy=join(path, "silva-138-99-tax-515-806_reordered.qza"), format="qiime_qza", ) self.assertEqual(exp, found) exp, found = _obtain_fasta( join(path, "qiime_flat_noambig.fasta"), sequences=join(path, "silva-138-99-seqs-515-806.qza"), taxonomy=join(path, "silva-138-99-tax-515-806.qza"), format="qiime_qza", ambig=["uncultured"], ambig_sp=[{"regex": "[ _]sp\.?$"}] ) self.assertEqual(exp, found) # _obtain_fasta( # sequences="http://ftp.microbio.me/greengenes_release/2022.10/2022.10.backbone.full-length.fna.qza", # taxonomy="http://ftp.microbio.me/greengenes_release/2022.10/2022.10.backbone.tax.qza", # format="qiime_qza", # ambig=["uncultured"], # ambig_sp=[{"regex": "[ _]sp\.?$"}] # ) def test_qiime(self): """ Flat QIIME-style annotations in FASTA descriptions Same data as in test_qiime_qza. """ path = join(self.data, "qiime") exp, found = _obtain_fasta( join(path, "qiime_out1.fasta"), file=join(path, "qiime_input.fasta"), format="qiime", ambig=["uncultured"] ) self.assertEqual(exp, found) exp, found = _obtain_fasta( join(path, "qiime_out2.fasta"), file=join(path, "qiime_input.fasta"), format="qiime", ambig=["uncultured", "human_gut"], ambig_sp=[{"regex": "sp\.?$"}] ) self.assertEqual(exp, found) def test_ambig_matcher(self): m = AmbigMatcher([], None) assert m.clean(["s__undefined"]) == ["s__undefined"] m = AmbigMatcher(["undefined"], None) assert m.clean(["k__Fungi", "s__undefined"]) == ["k__Fungi", "s__"] assert m.clean(["k__Fungi", "s__uncultured"]) == ["k__Fungi", "s__uncultured"] m = AmbigMatcher(["undefined"], [" sp."]) assert m.clean(["k__undefined", "s__undefined sp."]) == ["k__", "s__"] assert m.clean(["k__Fungi", "s__Some sp.345"]) == ["k__Fungi", "s__"] m = AmbigMatcher([], [{"regex": "\ssp\.$"}]) assert m.clean(["k__undefined", "g__Genusp."]) == ["k__undefined", "g__Genusp."] m = AmbigMatcher([], [{"regex": "\ssp\.$"}]) assert m.clean(["k__undefined", "s__undefined sp."]) == ["k__undefined", "s__"] assert m.clean(["k__Fungi", "s__Some sp.345"]) == ["k__Fungi", "s__Some sp.345"] m = AmbigMatcher([], [{"regex": "\ssp\.$"}]) m.clean(["k__"]) with self.assertRaises(Exception) as ctx: m.clean(["k__", "s__"]) self.assertTrue("length mismatch" in str(ctx.exception)) m = AmbigMatcher([]) m.clean(["k__Kingdom", "g__Genus"]) with self.assertRaises(Exception) as ctx: m.clean(["k__Kingdom", "s__Genus"]) self.assertTrue("rank mismatch" in str(ctx.exception)) #### Entry point #### if __name__ == '__main__': try: with file_logging(snakemake.log[0]): obtain_taxdb(snakemake.input.yml, snakemake.output.db) except NameError: unittest.main() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import os import re from tempfile import mkstemp import unittest from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from tax_helpers import zstd_fasta_reader, zstd_fasta_writer, RANK_TRANS from utils import file_logging # translates rank codes to a shortened rank name (first three characters, lowercase) rank_codes = dict(zip(*( RANK_TRANS.values(), (k[:3].lower() for k in RANK_TRANS.keys()) ))) def rank_propagate(input, output, unknown_root="Unknown", unknown_suffix="incertae_sedis"): """ Does rank propagation of undefined *internal* names, while leaving the *terminal* undefined (empty) names empty. Thus, the output is a consistent lineage with all consecutive names being defined. If the first rank (usually kingdom/domain) is undefined, but some lower ranks are defined, then we fill in "Unknown". All lower ranks names are composed as: <lowest defined rank>_<rank code>_<unknown_suffix>. """ prefix_re = re.compile('([a-z]+)__') with zstd_fasta_reader(input) as records, zstd_fasta_writer(output) as out: for rec in records: lineage = rec.description lineage = [r.strip() for r in lineage.split(';')] # first, determine the last defined rank n = len(lineage) for n in range(n - 1, -1, -1): is_defined = prefix_re.fullmatch(lineage[n]) is None if is_defined: break # then, rank propagate all internal ranks # (we don't have to re-check the last defined item, so lineage[:n] is fine) modified = False last_defined_taxon = None last_defined_name = None for i, taxon in enumerate(lineage[:n]): m = prefix_re.fullmatch(taxon) if m is None: # we have a name last_defined_taxon = taxon else: # empty -> construct a name if last_defined_name is None and last_defined_taxon is not None: # we do this splitting only once, if required last_defined_name = last_defined_taxon.split("__", 1)[1] rank_char = m.group(1) try: rank_code = rank_codes[rank_char] except IndexError: raise Exception(f"Unknown rank code: '{rank_char}'") if last_defined_name is None: name = last_defined_name = unknown_root else: name = f"{last_defined_name}_{rank_code}_{unknown_suffix}" lineage[i] = taxon + name modified = True if modified: rec.description = ";".join(lineage) out.write_record(rec) ### Tests #### def _do_propagate(records): infile = mkstemp()[1] outfile = mkstemp()[1] with zstd_fasta_writer(infile) as w: for i, r in enumerate(records): w.write_record(SeqRecord(Seq(r[1]), id=str(i), description=r[0])) rank_propagate(infile, outfile) with zstd_fasta_reader(outfile) as recs: out = [(rec.description, str(rec.seq)) for rec in recs] os.remove(infile) os.remove(outfile) return out class Tester(unittest.TestCase): # input -> output of rank propagation lineages = [ ( "k__Kingdom;f__Family;g__Genus;s__Genus species", "k__Kingdom;f__Family;g__Genus;s__Genus species", ), ( "k__Kingdom;f__Family;g__;s__", "k__Kingdom;f__Family;g__;s__", ), ( "k__Kingdom;f__;g__;s__", "k__Kingdom;f__;g__;s__", ), # we don't fill in the 'Unknown' rank if the whole lineage is empty ( "k__;s__", "k__;s__", ), ( "k__;p__ ;c__ ; g__Genus", "k__Unknown;p__Unknown_phy_incertae_sedis;c__Unknown_cla_incertae_sedis;g__Genus", ), ( "k__Kingdom;p__; f__; g__Genus;s__Species", "k__Kingdom;p__Kingdom_phy_incertae_sedis;f__Kingdom_fam_incertae_sedis;g__Genus;s__Species", ), ] def test_propagation(self): modified = _do_propagate((orig, "") for orig, _ in self.lineages) modified = (lineage for lineage, seq in modified) for l, modified in zip(self.lineages, modified): _, expected = l self.assertEqual(modified, expected) #### Entry point #### if __name__ == '__main__': try: with file_logging(snakemake.log[0]): rank_propagate(snakemake.input.db, snakemake.output.db) except NameError: unittest.main() |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import os import yaml from utils import file_logging def write_db_config(dbconfig, outfile, exclude=None): if exclude is not None: for k in exclude: try: del dbconfig[k] except KeyError: pass yml = yaml.safe_dump(dbconfig) # check for the case of a hash collision after updating a ref. database # within the same project. Of course, the probability is close to zero # and this will not detect collisions across projects. if os.path.exists(outfile): with open(outfile) as f: assert f.read() == yml, """ "Potential hash collision for database {}. Try deleting the " "contents of 'refdb', empty the Snakemake cache or rename the " "database in taxonomy.yaml """.format(dbconfig["name"]) with open(outfile, "w") as out: out.write(yml) with file_logging(snakemake.log[0]): write_db_config(snakemake.params.dbconfig, snakemake.output.yml, exclude=snakemake.params.get("exclude", None)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | from os.path import relpath, dirname import yaml from utils import file_logging snakefile_content = """ configfile: "{configfile}" module uvsnake: snakefile: "{snakefile}" config: config use rule * from uvsnake as uvsnake_* # special target rules for preprocessing # before applying either UNOISE3 or UPARSE rule uvsnake_prepare: input: trim_dir=expand( "workdir/prepare_paired/2_trim/{{sample}}/{{sample}}_{{dir_}}.log", sample=uvsnake.config["_sample_names"], dir_=["fwd", "rev"] ), report="results/sample_report.tsv", """ def write_config(sample_tab, config_out, snakefile, snakefile_out, primer_config, usearch_config): # generate the configuration out = {} workdir = dirname(snakefile_out) out["input"] = {"sample_file": relpath(sample_tab, workdir)} # prepare primers: # uvsnake has almost the same configuration, but does not have the # marker concept of, so we merge primers from all markers. # Primers have been ensured to be unique across markers, so # there will not be any name clashes out["primers"] = { "forward": [], "reverse": [], "trim_settings": primer_config.pop("trim_settings") } for cfg in primer_config.values(): for _dir, primers in cfg.items(): out["primers"][_dir] += primers config_keys = ["defaults", "merge", "filter", "unoise3", "uparse", "otutab"] for k in config_keys: out[k] = usearch_config[k] out["merge"].pop("expected_length", None) with open(config_out, "w") as f: yaml.safe_dump(out, f, sort_keys=False) # generate the Snakefile with open(snakefile_out, "w") as o: o.write(snakefile_content.format( configfile=relpath(config_out, workdir), snakefile=snakefile, )) with file_logging(snakemake.log[0]): write_config( sample_tab=snakemake.input.sample_tab, config_out=snakemake.output.config, snakefile_out=snakemake.output.snakefile, **snakemake.params ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from os.path import abspath, dirname from subprocess import check_call import sys from utils import file_logging def run_uvsnake(snakefile, command, threads=1): cmd = [ "snakemake", "--directory", dirname(snakefile), "--use-conda", "--cache", "--cores", str(threads), "--snakefile", abspath(snakefile), f"uvsnake_{command}" ] print("Running {}...".format(" ".join(cmd)), file=sys.stderr) check_call(cmd, stdout=sys.stdout, stderr=sys.stderr) with file_logging(snakemake.log[0]): run_uvsnake( snakefile=snakemake.input.snakefile, command=snakemake.params.command, threads=snakemake.threads, ) |
119 120 | shell: "rm -Rf input workdir logs" |
124 125 | shell: "rm -Rf results input unique_samples workdir logs refdb" |
Support
- Future updates
Related Workflows





