Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
This is a Snakemake pipeline designed to take Oxford Nanopore Technologies data from fastq's to variant calls. In additions to traditional SNVs and indels, this pipeline will also call methylation using nanopolish.
The
Snakefile
is under
workflow
.
The functions performed by this tool are as follows:
Alignment
- minimap2
SV Calling
-
Sniffles
-
CuteSV
-
SVIM
SNV/indel Calling
- Clair3
Read-phasing
- longphase
Methylation tag linking
- In-house pipeline
There are dummy rules which can be added to the snakemake command which run specific tests. They are listed below:
-
all_align
- Runs minimap2 for all samples
-
all_sv
- Runs all SV callers for all samples
-
all_snv
- Runs Clair3 for all samples
-
all_haplotag
- Runs haplotype phasing using longphase for all samples
-
all_haplotag_methyl
- Haplotags bams and links methylation bams for all samples
-
all_minimap_methyl
- Links methylation to unphased bams for all samples
Code Snippets
21 22 | script: "../scripts/split_faidx.py" |
49 50 51 52 53 | shell: """ samtools fqidx {input.fastq} -r {input.batch_file} > $TMPDIR/scatteritem.fastq minimap2 -t {threads} --MD --secondary=no --eqx -x map-ont -a {input.ref} $TMPDIR/scatteritem.fastq | samtools view -S -b | samtools sort -T $TMPDIR > {output.sorted_bam} """ |
80 81 82 83 | shell: """ samtools merge -@{threads} {output.scatter_merged_bam} {input.sorted_bams} """ |
106 107 108 109 | shell: """ samtools merge -@{threads} {output.merged_bam} {input.scatter_merged_bam} """ |
132 133 134 135 | shell: """ samtools index {input.merged_bam} """ |
17 18 | script: "../scripts/append_mod_tags.py" |
20 21 22 23 | shell: """ {params.script_dir}/longphase haplotag --snp-file={input.snv_vcf} --bam-file={input.bam} --qualityThreshold=1 -t {threads} --sv-file={input.sv_vcf} -o $( echo {output.bam} | sed 's/.bam//' ) """ |
46 47 48 49 | shell: """ samtools index {input.merged_bam} """ |
18 19 20 21 22 23 24 25 26 | shell: """ run_clair3.sh --bam_fn={input.merged_bam} --sample_name={wildcards.sample} --ref_fn={input.ref} --threads={threads} --platform=ont --model_path=$(dirname $( which run_clair3.sh ) )/models/ont_guppy5 --output=$(dirname {output.vcf}) --ctg_name={wildcards.chrom} --enable_phasing if [[ ( -f $( echo {output.vcf} | sed 's/phased_//' ) ) && ( ! -f {output.vcf} ) ]]; then cp $( echo {output.vcf} | sed 's/phased_//' ) {output.vcf} elif [[ ( ! -f $( echo {output.vcf} | sed 's/phased_//' ) ) && ( ! -f {output.vcf} ) ]]; then zcat $(dirname {output.vcf})/pileup.vcf.gz | grep ^# | bgzip -c > {output.vcf} fi """ |
48 49 50 51 | shell: """ bcftools concat -O v -o {output.vcf} {input.vcf} """ |
76 77 78 79 | shell: """ sniffles -i {input.merged_bam} --reference {input.ref} --output-rnames -v {output.vcf} """ |
105 106 107 108 109 | shell: """ zcat {input.ref} > {output.cuteSV_ref} || cat {input.ref} > {output.cuteSV_ref} cuteSV -t {threads} --genotype --max_cluster_bias_INS 100 --diff_ratio_merging_INS 0.3 --max_cluster_bias_DEL 100 --diff_ratio_merging_DEL 0.3 {input.merged_bam} {output.cuteSV_ref} {output.cuteSV_vcf} $( dirname {output.cuteSV_vcf} ) """ |
135 136 137 138 139 | shell: """ svim alignment --sample {wildcards.sample} $( dirname {output.vcf_tmp} ) {input.merged_bam} {input.ref} cp -l {output.vcf_tmp} {output.vcf} """ |
162 163 164 165 166 | shell: """ bcftools sort -o /dev/stdout -O v {input.vcf} | bgzip -c > {output.zipped} tabix {output.zipped} """ |
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | import os import sys import time import itertools import pickle import sqlite3 from multiprocessing import Pool import pysam # LOGGING sys.stdout = open(snakemake.log[0], "w") def create_database(db_name): if os.path.exists(db_name): os.remove(db_name) conn = sqlite3.connect(db_name) c = conn.cursor() c.execute("CREATE TABLE meth_tags (qname TEXT PRIMARY KEY, tag BLOB)") # commit changes and close connection conn.commit() conn.close() def get_time(): time_rn = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) return time_rn def fetch_modified_bases(modified_obj, db_name) -> None: """ Fetch base modification tags Mm & Ml :param modified_obj: An unsorted bam pysam object with just methylation tags :param db_name: :return: None """ print(f"Opening {modified_obj.filename.decode()} to fetch tags. {get_time()}") # Start database connection conn = sqlite3.connect(db_name) c = conn.cursor() for read in modified_obj.fetch(until_eof=True): if read.has_tag("Mm") or read.has_tag("MM"): tags = read.get_tags() qname = read.query_name # serialize the tags list serialized_list = pickle.dumps(tags) c.execute( "INSERT INTO meth_tags VALUES (?, ?)", (str(qname), sqlite3.Binary(serialized_list)), ) modified_obj.close() # commit changes and close connection conn.commit() conn.close() print( f"Base modification tags fetched for {modified_obj.filename.decode()}. {get_time()}" ) def write_linked_tags(bam, db_name, out_file) -> None: """ Write out merged bam with Mm tags and possibly Ml, and its index. :param bam: An aligned bam :param db_name: :param out_file: :return: None """ # Connect to db conn = sqlite3.connect(db_name) c = conn.cursor() appended_tags = pysam.AlignmentFile(out_file, "wb", template=bam) for read in bam.fetch(until_eof=True): result = c.execute( "SELECT tag FROM meth_tags WHERE qname = ?", (str(read.qname),) ).fetchone() if result: deserialized_tag = pickle.loads(result[0]) read.set_tags(read.get_tags() + deserialized_tag) appended_tags.write(read) print(f"File written to: {out_file}") appended_tags.close() # write index pysam.index(out_file) print(f"Index written for {out_file}.bai") def collect_tags(methyl_sn_input: list, db_name: str) -> None: # methyl_sn_input: snakemake input """ Collect optional tags from ONT bam with methyl calls :param methyl_sn_input: a list of file paths pointing to methyl bam :param db_name: :return: None """ if not len(methyl_sn_input) == 1: for bam in methyl_sn_input: methyl_bam = pysam.AlignmentFile(bam, "rb", check_sq=False) fetch_modified_bases(methyl_bam, db_name) else: methyl_bam = pysam.AlignmentFile(methyl_sn_input[0], "rb", check_sq=False) fetch_modified_bases(methyl_bam, db_name) def make_subset_bams(input_bam, prefix) -> list[str]: subset_size = 100 * 1024 * 1024 # 100MB in bytes if os.path.getsize(input_bam.filename.decode()) < subset_size: subset_size = int(subset_size / 10) subset_idx = 0 subset_size_bytes = 0 current_subset = None bam_file_list = [] for read in input_bam: # If the current subset is None or its size has exceeded the subset size, create a new subset if current_subset is None or subset_size_bytes >= subset_size: # If this is not the first subset, close the previous subset file if current_subset is not None: current_subset.close() pysam.index(current_subset.filename.decode()) # Create a new subset file with a name based on the subset index subset_idx += 1 current_subset = pysam.AlignmentFile( f"{prefix}_tmp.{subset_idx}.bam", "wb", template=input_bam ) bam_file_list.append(f"{prefix}_tmp.{subset_idx}.bam") # Write the current read to the current subset file current_subset.write(read) subset_size_bytes = os.path.getsize(current_subset.filename.decode()) # Close the last subset file current_subset.close() pysam.index(current_subset.filename.decode()) input_bam.close() return bam_file_list def combine_the_chunked(bams: list[str], merge_output: str): aln_bams = [pysam.AlignmentFile(x, check_sq=False) for x in bams] out_bam = pysam.AlignmentFile(merge_output, "wb", template=aln_bams[0]) for bam in aln_bams: for records in bam: out_bam.write(records) bam.close() out_bam.close() pysam.index(merge_output) def run_pool(bam_file: str, db_name, output_file) -> None: aln_bam = pysam.AlignmentFile(bam_file, "rb") write_linked_tags(aln_bam, db_name, output_file) # wait for the file to become available while not os.path.exists(bam_file): time.sleep(1) # file is available, remove it clean_up_temps([bam_file]) def clean_up_temps(files: list, suffix=".bai"): for f in files: index = f + suffix try: os.remove(f) os.remove(index) print("removed: ", f) print("removed: ", index) except FileNotFoundError: pass def main(): # Grabbing from snakemake threads = snakemake.threads methyl_collection = snakemake.input.methyl_bam aln_bam = snakemake.input.aln_bam bam = pysam.AlignmentFile(aln_bam, check_sq=False) prefix = os.path.join(snakemake.resources.tmpdir, snakemake.wildcards.sample) final_output = snakemake.output.linked_bam db_name = f"{prefix}-meth_tags.db" create_database(db_name=db_name) # Populate the database with methylation tags collect_tags(methyl_collection, db_name=db_name) # Make the chunks chunked_bams_names = make_subset_bams(input_bam=bam, prefix=prefix) link_bam_output_names = [ x.replace("_tmp.", "_tmp-linked.") for x in chunked_bams_names ] with Pool(threads) as p: p.starmap( run_pool, zip(chunked_bams_names, itertools.repeat(db_name), link_bam_output_names), ) p.close() p.join() combine_the_chunked(bams=link_bam_output_names, merge_output=final_output) # CLEANING UP! clean_up_temps(link_bam_output_names) clean_up_temps([db_name]) if __name__ == "__main__": sys.exit(main()) |
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | if __name__ == "__main__": NIDS = len(snakemake.output.batches) batch_dict = {} for i in range(NIDS): batch_dict[i] = [] with open(snakemake.input.fai, "r") as infile: fai_list = [line.split("\t")[0] for line in infile] for j in range(len(fai_list)): batch_dict[j % NIDS].append(fai_list[j]) outs = [open(f, "w+") for f in snakemake.output.batches] for i in range(NIDS): outs[i].write("\n".join(batch_dict[i]) + "\n") outs[i].close() |
Support
Do you know this workflow well? If so, you can
request seller status , and start supporting this workflow.
Created: 1yr ago
Updated: 1yr ago
Maitainers:
public
URL:
https://github.com/EichlerLab/ONT_pipelines
Name:
ont_pipelines
Version:
1
Downloaded:
0
Copyright:
Public Domain
License:
MIT License
Keywords:
- Future updates
Related Workflows

ENCODE pipeline for histone marks developed for the psychENCODE project
psychip pipeline is an improved version of the ENCODE pipeline for histone marks developed for the psychENCODE project.
The o...

Near-real time tracking of SARS-CoV-2 in Connecticut
Repository containing scripts to perform near-real time tracking of SARS-CoV-2 in Connecticut using genomic data. This pipeli...

snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...

ATLAS - Three commands to start analyzing your metagenome data
Metagenome-atlas is a easy-to-use metagenomic pipeline based on snakemake. It handles all steps from QC, Assembly, Binning, t...
raw sequence reads
Genome assembly
Annotation track
checkm2
gunc
prodigal
snakemake-wrapper-utils
MEGAHIT
Atlas
BBMap
Biopython
BioRuby
Bwa-mem2
cd-hit
CheckM
DAS
Diamond
eggNOG-mapper v2
MetaBAT 2
Minimap2
MMseqs
MultiQC
Pandas
Picard
pyfastx
SAMtools
SemiBin
Snakemake
SPAdes
SqueezeMeta
TADpole
VAMB
CONCOCT
ete3
gtdbtk
h5py
networkx
numpy
plotly
psutil
utils
metagenomics

RNA-seq workflow using STAR and DESeq2
This workflow performs a differential gene expression analysis with STAR and Deseq2. The usage of this workflow is described ...

This Snakemake pipeline implements the GATK best-practices workflow
This Snakemake pipeline implements the GATK best-practices workflow for calling small germline variants. The usage of thi...