Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
Create Environment
Install Mamba Forge:
curl -L https://github.com/con
Code Snippets
17 18 19 | shell: "abricate {params} {input} > {output.csv}" " && python3 {scripts_directory}get_abricate_info_list.py {output.csv} {output.yaml}" |
38 39 40 | shell: "abricate {params} {input} > {output.csv}" " && python3 {scripts_directory}get_abricate_info_list.py {output.csv} {output.yaml}" |
59 60 61 | shell: "abricate {params} {input} > {output.csv}" " && python3 {scripts_directory}get_abricate_info_list.py {output.csv} {output.yaml}" |
19 20 | shell: "fastqc {input} -o {output.dir} {params} && python3 {scripts_directory}move_fastqc_output.py {wildcards.sample}" |
41 42 | shell: "fastqc {input} -o {output.dir} {params} && python3 {scripts_directory}move_fastqc_output.py {wildcards.sample}" |
64 65 | shell: "fastqc {input.read_1} {input.read_2} -o {output.dir} {params} -t {threads}" |
85 86 | shell: "fastqc {input.read} -o {output.dir} {params} -t {threads}" |
18 19 | shell: "fasttree {params} {input} > {output.tree} && cp {output.tree} {output.nwk}" |
38 39 | shell: "fasttree {params} {input} > {output.tree} && cp {output.tree} {output.nwk}" |
57 58 | shell: "fasttree {params} {input} > {output}" |
76 77 78 | shell: "cp {input.tree} {output.tree} &&" "cp {input.tree} {output.nwk}" |
33 34 | shell: "freebayes {params} -f {input.ref} {input.samples} > {output}" |
25 26 27 | shell: "python {coverage_script} -i {input.depth} -r {REFERENCE_FASTA}" " -o {output.coverage} -t {params.threshold} -g {REFERENCE_GB}" |
54 55 | shell: "python {scripts_directory}merge_coverage.py '{input}' {output.coverage_regular} {output.coverage_translate} {REFERENCE_GB}" |
18 19 20 21 22 23 24 | shell: "mkdir align_samples/{wildcards.sample}/reference -p && " "cp {REFERENCE_FASTA} align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && " "bwa index align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && " "bwa mem align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {input.reads_1} {input.reads_2} | " "samtools view -u -T align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta -q {params.minqual} | " "samtools sort --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta > {output}" |
42 43 44 45 46 47 48 | shell: "mkdir align_samples/{wildcards.sample}/reference -p && " "cp {REFERENCE_FASTA} align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && " "bwa index align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && " "bwa mem align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {input.reads_1} | " "samtools view -u -T align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta -q {params.minqual} | " "samtools sort --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta > {output}" |
69 70 71 72 73 74 | shell: "bwa mem -k 5 -T 16 align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {PRIMER_FASTA} | samtools view -b -F 4 > {params.folder}primers.bam" " && bedtools bamtobed -i {params.folder}primers.bam > {params.folder}primers.bed &&" "samtools sort -o {input.pre_snps}.sorted {input.pre_snps} " "&& ivar trim -b {params.folder}primers.bed -i {input.pre_snps}.sorted -m 0 -q 0 -p {output} -e " "&& samtools sort -o {output} {output}" |
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | shell: " ivar trim -m 0 -q 0 -e -b {params.folder}primers.bed -p {params.prefix}.trimmed -i {input.bam}" " && samtools sort -o {params.prefix}.trimmed.sorted.bam {params.prefix}.trimmed.bam" " && samtools index {params.prefix}.trimmed.sorted.bam" " && samtools mpileup -A -d 0 -Q 0 {input.bam} | ivar consensus -m 0 -n N -p {params.prefix}.ivar_consensus" " && ./../workflow/scripts/run_check_consensus {params.prefix}.ivar_consensus.fa align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta" " && bwa index -p {params.prefix}.ivar_consensus {params.prefix}.ivar_consensus.fa" " && bwa mem -k 5 -T 16 {params.prefix}.ivar_consensus {PRIMER_FASTA} | samtools view -bS -F 4 | samtools sort -o {params.folder}primers_consensus.bam" " && samtools mpileup -A -d 0 --reference {params.prefix}.ivar_consensus.fa -Q 0 {params.folder}primers_consensus.bam | ivar variants -p {params.folder}primers_consensus -t 0.03" " && bedtools bamtobed -i {params.folder}primers_consensus.bam > {params.folder}primers_consensus.bed" " && ivar getmasked -i {params.folder}primers_consensus.tsv -b {params.folder}primers_consensus.bed -f {PRIMER_FASTA}.pair_information.tsv -p {params.folder}primer_mismatchers_indices" " && ivar removereads -i {params.prefix}.trimmed.sorted.bam -p {params.prefix}.masked.bam -t {params.folder}primer_mismatchers_indices.txt -b {params.folder}primers.bed" " && samtools sort -o {params.prefix}.masked.sorted.bam {params.prefix}.masked.bam" " && cp {params.prefix}.masked.sorted.bam {output}" " && samtools index {output}" |
127 128 129 | shell: "samtools mpileup -A -d 600000 -B -Q 0 {input.bam} --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta | " "ivar variants -p align_samples/{wildcards.sample}/iVar/snps -q {params.mapqual} -t {params.minfrac} -m {params.mincov} align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta " |
150 151 152 | shell: "samtools mpileup -d 0 -A -Q {params.mapqual} {input.bam} --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta | " "ivar consensus -p align_samples/{wildcards.sample}/iVar/{params.consensus} -q {params.mapqual} -t {params.minfrac} -n N -m {params.mincov} " |
175 176 177 | shell: "samtools depth {params} {input.i} | bgzip -c > {output.only_depth} " "&& tabix -p vcf {output.only_depth} && gunzip -c {output.only_depth} > {output.depth} " |
194 195 | shell: "python ../workflow/scripts/split_consensus.py {input.consensus} {input.depth} {REFERENCE_GB} {output.consensus}" |
211 212 | shell: "python ../workflow/scripts/split_depth_file.py {input.zipped} {REFERENCE_GB}" |
228 229 | shell: "python ../workflow/scripts/mask_consensus_by_deep.py align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {input.first_consensus} {output.align_file} {wildcards.seg}" |
248 249 | shell: "mafft --thread {threads} {params} {input.align_file} > {output.aligned_file}" |
268 269 | shell: "python ../workflow/scripts/msa_masker.py -i {input.align_file} -df {input.depth} -o {output} {params}" |
289 290 | shell: "python ../workflow/scripts/get_consensus_medaka.py '{input}' {output}" |
310 311 | shell: "python {scripts_directory}mask_regions.py {input.consensus} {output.final_consensus} {params} " |
323 324 | shell: "python {scripts_directory}convert_vcf.py {input.tsv_file} {output.vcf_file} " |
21 22 | shell: "mafft --thread {threads} {params} {input.align_file} > {output.aligned_file}" |
41 42 | shell: "mafft --thread {threads} {params} {input.align_file} > {output.aligned_file}" |
61 62 | shell: "mafft --thread {threads} {params} {input} > {output}" |
81 82 | shell: "mafft --thread {threads} {params} {input} > {output}" |
101 102 | shell: "mafft --thread {threads} {params} {input} > {output}" |
121 122 | shell: "mafft --thread {threads} {params} {input} > {output}" |
138 139 | shell: "cp {input} {output}" |
13 14 | shell: "mkdir {output} && cp {software_parameters_path} projects/{wildcards.project}/" |
34 35 36 | shell: "mkdir projects/{wildcards.project}/sample_{wildcards.sample}/ -p && " " cp -r {params} projects/{wildcards.project}/sample_{wildcards.sample}/ " |
60 61 62 | shell: "python {scripts_directory}generate_AllConsensus.py {input.coverage} {REFERENCE_GB} '{input.every_consensus}' {REFERENCE_FASTA} {output.AllConsensus} {output.all_consensus_no_ref} " "&& python {scripts_directory}concat_segments.py '{input.every_consensus}' {REFERENCE_GB} {output.All_nt} {input.coverage} {REFERENCE_FASTA} {output.All_nt_only_90plus}" |
89 90 | shell: "python {scripts_directory}split_files_by_locus.py {input} projects/{PROJECT_NAME}/main_result {REFERENCE_GB}" |
21 22 23 | shell: "rm -r align_samples/{wildcards.sample}/medaka/ && mkdir align_samples/{wildcards.sample}/medaka/ && cp {input.ref} {output.ref_out} && medaka_consensus -i {input.i} -d {output.ref_out} -o align_samples/{wildcards.sample}/medaka -t {threads} -m r941_min_high_g360" " && cp {output.i} {output.i2}" |
43 44 45 | shell: "samtools depth {params} {input.i} | bgzip -c > {output.only_depth} " "&& tabix -p vcf {output.only_depth} && gunzip -c {output.only_depth} > {output.depth} " |
61 62 | shell: "python3 {scripts_directory}split_depth_file.py {input} {REFERENCE_GB}" |
80 81 | shell: "medaka variant --verbose {input.ref} {input.hdf} {output.vcf}" |
100 101 102 | shell: "medaka tools annotate {input.vcf} {input.ref} {input.bam} {output.snps} && " "bgzip {output.snps} -c > {output.vcf_zipped} && tabix {output.vcf_zipped}" |
126 127 128 129 130 | shell: "touch {output.temp_file} && " "python {scripts_directory}add_freq_medaka_consensus.py {input.normal_reference_fasta} {input.vcf_file} {input.file_coverage} " "{output.vcf_file_out} {output.vcf_file_out_removed_by_filter} {output.temp_file} {output.normal_reference_fasta_removed} {output.temp_file} {params} &&" "bgzip -c -f {output.vcf_file_out} > {output.vcf_file_out_compr} && tabix {output.vcf_file_out_compr}" |
147 148 | shell: "bcftools consensus -s SAMPLE -f {input.ref} {input.vcf_ziped} -o {output}" |
164 165 | shell: "python {scripts_directory}mask_consensus_by_deep.py {REFERENCE_FASTA} {input.first_consensus} {output.align_file} {wildcards.seg}" |
184 185 | shell: "python {scripts_directory}msa_masker.py -i {input.align_file} -df {input.depth} -o {output} {params}" |
205 206 | shell: "python {scripts_directory}get_consensus_medaka.py '{input}' {output}" |
224 225 | shell: "python {scripts_directory}mask_regions.py {input} {output} {params}" |
16 17 | shell: "gunzip -cd {input} | NanoFilt {params} | gzip > {output}" |
17 18 | shell: "mkdir {output.dir} -p && NanoStat --fastq {input} --outdir {output.dir} -n {wildcards.sample}_stats.txt -t {threads}" |
36 37 | shell: "NanoStat --fastq {input} --outdir {output.dir} -n {wildcards.sample}_stats.txt -t {threads}" |
17 18 19 | shell: "abricate {params} {REFERENCE_FASTA} > {output.csv}" " && python3 {scripts_directory}get_abricate_info.py {output.csv} {output.yaml}" |
35 36 | shell: "echo 'This is not SARS-CoV-2' > {output}" |
56 57 | shell: "pangolin {input.consensus} --outfile {output} -t {threads}" |
17 18 | shell: "seqret {params} -sequence {input} -outseq {output}" |
36 37 | shell: "seqret {params} -sequence {input} -outseq {output}" |
55 56 | shell: "seqret {params} -sequence {input} -outseq {output}" |
74 75 | shell: "seqret {params} -sequence {input} -outseq {output}" |
93 94 | shell: "seqret {params} -sequence {input} -outseq {output}" |
112 113 | shell: "seqret {params} -sequence {input} -outseq {output}" |
131 132 | shell: "seqret {params} -sequence {input} -outseq {output}" |
150 151 | shell: "seqret {params} -sequence {input} -outseq {output}" |
26 27 28 | shell: "rm -r align_samples/{wildcards.sample}/snippy/ && " "snippy --cpus {threads} --pe1 {input.reads_1} --pe2 {input.reads_2} --ref {REFERENCE_FASTA} --outdir align_samples/{wildcards.sample}/snippy/ {params}" |
50 51 52 | shell: "rm -r align_samples/{wildcards.sample}/snippy/ && " "snippy --cpus {threads} --se {input.read} --ref {REFERENCE_FASTA} --outdir align_samples/{wildcards.sample}/snippy/ {params}" |
71 72 | shell: "gunzip -c {input.zipped} > {output.unzipped}" |
88 89 90 | shell: "python3 {scripts_directory}split_depth_file.py align_samples/{wildcards.sample}/snippy/depth/snps.depth {REFERENCE_GB} " "&& touch {output.unzipped}" |
106 107 | shell: "python {scripts_directory}mask_consensus_by_deep.py {REFERENCE_FASTA} {input.first_consensus} {output.align_file} {wildcards.seg}" |
126 127 | shell: "python {scripts_directory}msa_masker.py -i {input.align_file} -df {input.depth} -o {output} {params}" |
147 148 | shell: "python {scripts_directory}get_consensus_medaka.py '{input}' {output}" |
166 167 | shell: "python {scripts_directory}mask_regions.py {input} {output} {params}" |
26 27 | shell: "python {scripts_directory}create_snpeff_text.py $CONDA_PREFIX {input.ref_gb} {input.ref_fa} {REFERENCE_NAME} {output} " |
46 47 48 | shell: "{replace} {input.snp_file} &&" "snpEff {params} -v {REFERENCE_NAME} {input.snp_file} > {output}" |
67 68 69 | shell: "{replace} {input.i1} && " "snpEff {params} -v {REFERENCE_NAME} {input.i1} > {output}" |
22 23 | shell: "spades.py -t {threads} {params} -s {input} -o {output.dir}" |
44 45 | shell: "spades.py -t {threads} {params} -1 {input.read_1} -2 {input.read_2} -o {output.dir}" |
16 17 | shell: "python {scripts_directory}translate.py {REFERENCE_GB} {input.Alignment_nt} {output} '{wildcards.locus}' '{wildcards.gene}' {input.coverage}" |
23 24 25 26 27 28 29 30 31 | shell: "trimmomatic PE " "{input} " "{output.read_1} " "{output.read_un1} " "{output.read_2} " "{output.read_un2} " "-threads {threads} " "{params}" |
50 51 52 53 54 55 | shell: "trimmomatic SE " "-threads {threads} " "{input} " "{output} " "{params}" |
19 20 | shell: "python {scripts_directory}variants.py '{input}' '{output}' validated_variants" |
40 41 | shell: "python {scripts_directory}variants.py '{input}' {output} minor_iSNVs" |
61 62 | shell: "python {scripts_directory}variants.py '{input}' {output} minor_iSNVs_inc_indels" |
82 83 | shell: "python {scripts_directory}proportions_iSNVs_graph.py '{input}' {output.out_file}" |
13 14 | shell: "echo 'No sample with all segments above 90% coverage for the value given in the user/parameters.yaml file.' > {output}" |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 | import os import sys from Bio import SeqIO from Bio.Seq import MutableSeq import pysam def get_type_variation(ref, alt): """return type of variation based on change possible in variation type snp Single Nucleotide Polymorphism A => T mnp Multiple Nuclotide Polymorphism GC => AT ins Insertion ATT => AGTT del Deletion ACGG => ACG complex Combination of snp/mnp """ if ref == alt == 1: return "snp" if ref > alt: return "del" if ref < alt: return "ins" return "mnp" if ref == alt and alt > 1 else "complex" def variant_has_SR_DPSP_AR(variant): return "SR" in variant and "DPSP" in variant and "AR" in variant def read_text_file(file_name): """ read text file and put the result in an vector """ vect_out = [] with open(file_name, "r", encoding="utf-8") as handle: for line in handle: sz_temp = line.strip() if len(sz_temp) == 0: continue vect_out.append(sz_temp) return vect_out def get_coverage_by_pos( file_coverage, chr_name, position_start, position_end, temp_file ): if not os.path.exists(file_coverage): return -1 cmd = f"tabix {file_coverage} {chr_name}:{position_start}-{position_end} > {temp_file}" os.system(cmd) # get number vect_lines = read_text_file(temp_file) if ( len(vect_lines) == 1 and len(vect_lines[0].split("\t")) == 3 and isinstance(int(vect_lines[0].split("\t")[2]), int) ): return int(vect_lines[0].split("\t")[2]) return -1 def add_freq_ao_ad_and_type_to_vcf( vcf_file, vcf_file_out, vcf_file_out_removed_by_filter, freq_vcf_limit, coverage_limit, file_coverage, temp_file, ): """add FREQ, AO, AF and TYPE to VCF, FREQ=AO/DP This case is used in MEDAKA only :param vcf_file_out_removed_by_filter -> can be None, keep the variants that are filter by freq_vcf_limit vcffile must be gzip and tbi included :param coverage_limit -> filter by this coverage (this is necessary because medaka doesn't have) :param cut off for VCF freq returns: vcf file with freq, AO and AF """ FREQ = "FREQ" AO = "AO" RO = "RO" AF = "AF" TYPE = "TYPE" DP_COMPOSED = "DP_COMPOSED" # this is used to get # read the input file vcf_hanlder = pysam.VariantFile(vcf_file, "r") if ( FREQ in vcf_hanlder.header.info and AO in vcf_hanlder.header.info and AF in vcf_hanlder.header.info ): vcf_hanlder.close() return vcf_hanlder_write = pysam.VariantFile(vcf_file_out, "w") if vcf_file_out_removed_by_filter is not None: vcf_hanlder_write_removed_by_filter = pysam.VariantFile( vcf_file_out_removed_by_filter, "w" ) if FREQ not in vcf_hanlder.header.info: vcf_hanlder.header.info.add( FREQ, number="A", type="Float", description="Ratio of AO/(DPSP-AR)" ) if AO not in vcf_hanlder.header.info: vcf_hanlder.header.info.add( AO, number="A", type="Integer", description="Alternate allele observation count, SR (alt1 fwd + alt1 rev, etc.)", ) if RO not in vcf_hanlder.header.info: vcf_hanlder.header.info.add( RO, number="1", type="Integer", description="Reference allele observation count, SR (ref fwd + ref rev)", ) if AF not in vcf_hanlder.header.info: vcf_hanlder.header.info.add( AF, number="R", type="Integer", description="Number of observation for each allele, SR (ref fwd + ref rev, alt1 fwd + alt1 rev, etc.)", ) if TYPE not in vcf_hanlder.header.info: vcf_hanlder.header.info.add( TYPE, number="A", type="String", description="The type of allele, either snp, mnp, ins, del, or complex", ) if DP_COMPOSED not in vcf_hanlder.header.info: vcf_hanlder.header.info.add( DP_COMPOSED, number="1", type="String", description="Coverage at position (DPSP-AR)/(samtools -aa). First is collected by Medaka, Second is collected by samtools.", ) # write the header for variant_header_records in vcf_hanlder.header.records: vcf_hanlder_write.header.add_record(variant_header_records) if vcf_file_out_removed_by_filter is not None: vcf_hanlder_write_removed_by_filter.header.add_record( variant_header_records ) for variant_sample in vcf_hanlder.header.samples: vcf_hanlder_write.header.add_sample(variant_sample) if vcf_file_out_removed_by_filter is not None: vcf_hanlder_write_removed_by_filter.header.add_sample(variant_sample) for variant in vcf_hanlder: # DP must be replaced by DPSP. DPSP is the # sum of all reads Span and Ambiguous if variant_has_SR_DPSP_AR(variant.info): # SR=0,0,15,6 # don't process this VCF because has a low coverage total_deep = int(variant.info["DPSP"]) - sum( int(x) for x in variant.info["AR"] ) total_deep_samtools = get_coverage_by_pos( file_coverage, variant.chrom, variant.pos, variant.pos, temp_file, ) if coverage_limit > 0 and total_deep_samtools < coverage_limit: continue if ((len(variant.info["SR"]) // 2) - 1) != len(variant.alts): # vcf_hanlder_write.write(variant) continue # different numbers of Alleles and References # extra info vect_out_ao = [] # AO out_ro = -1 # RO vect_out_af = [] # AF vect_out_freq = [] # FREQ vect_out_freq_filtered = [] # FREQ vect_out_type = [] # TYPE for value_ in range(0, len(variant.info["SR"]), 2): if value_ > 0: allele_count = int(variant.info["SR"][value_]) + int( variant.info["SR"][value_ + 1] ) if total_deep > 0: # incongruences in Medaka, # these values are collected in different stages of the Medaka workflow, (email from support@nanoporetech.com at 23 Dec 2020) if total_deep <= allele_count: vect_out_freq.append(100) else: freq_value = allele_count / float(total_deep) if freq_value >= freq_vcf_limit: vect_out_freq.append( float("{:.1f}".format(freq_value * 100)) ) elif vcf_file_out_removed_by_filter is not None: vect_out_freq_filtered.append( float("{:.1f}".format(freq_value * 100)) ) # print(variant.pos, variant.ref, str(variant.alts), variant.info['DP'], vect_out_ao[-1], vect_out_freq[-1]) vect_out_ao.append(allele_count) vect_out_type.append( get_type_variation( len(variant.ref), len(variant.alts[(value_ - 2) >> 1]), ) ) vect_out_af.append( int(variant.info["SR"][value_]) + int(variant.info["SR"][value_ + 1]) ) if out_ro == -1: out_ro = int(variant.info["SR"][value_]) + int( variant.info["SR"][value_ + 1] ) # has some variant to save if vect_out_freq: if out_ro > -1: variant.info[RO] = (out_ro, ) variant.info[AO] = tuple(vect_out_ao) variant.info[AF] = tuple(vect_out_af) variant.info[TYPE] = tuple(vect_out_type) variant.info[DP_COMPOSED] = (f"{total_deep}/{total_deep_samtools}", ) variant.info[FREQ] = tuple(vect_out_freq) # Only save the ones with FREQ vcf_hanlder_write.write(variant) # save the filtered if vect_out_freq_filtered: if out_ro > -1: variant.info[RO] = (out_ro, ) variant.info[AO] = tuple(vect_out_ao) variant.info[AF] = tuple(vect_out_af) variant.info[TYPE] = tuple(vect_out_type) variant.info[DP_COMPOSED] = (f"{total_deep}/{total_deep_samtools}", ) variant.info[FREQ] = tuple(vect_out_freq_filtered) # Only save the ones with FREQ vcf_hanlder_write_removed_by_filter.write(variant) vcf_hanlder_write.close() vcf_hanlder.close() if vcf_file_out_removed_by_filter is not None: vcf_hanlder_write_removed_by_filter.close() return vcf_file_out def compute_masking_sites( sequence, ranges=None, single_positions=None, from_beggining=None, from_end=None, ) -> list[int]: length = len(sequence) masking_sites = [] if ranges is not None: for pos in ranges: # print(ranges) pos[0] = int(pos[0]) pos[1] = int(pos[1]) if pos[1] < length and pos[1] > pos[0]: masking_sites.extend(iter(range(pos[0] - 1, pos[1] - 1))) if single_positions is not None: for pos in single_positions: pos = int(pos) if pos < length: masking_sites.append(pos - 1) if from_beggining is not None: masking_sites.extend(iter(range(int(from_beggining)))) if from_end is not None: masking_sites.extend(iter(range(length - int(from_end), length))) return masking_sites def main(): freq_vcf_limit: float = float(sys.argv[10]) coverage_limit: float = float(sys.argv[9]) element_name_old = "" vect_sites = [] vect_ranges = [] final_mask_consensus = [] vcf_file = sys.argv[2] vcf_file_out = sys.argv[4] vcf_file_out_removed_by_filter = sys.argv[5] file_coverage = sys.argv[3] temp_file = sys.argv[8] add_freq_ao_ad_and_type_to_vcf( vcf_file, vcf_file_out, vcf_file_out_removed_by_filter, freq_vcf_limit, coverage_limit, file_coverage, temp_file, ) final_vcf_with_removed_variants = sys.argv[5] vcf_hanlder = pysam.VariantFile(final_vcf_with_removed_variants, "r") vect_sites = [] vect_ranges = [] for variant in vcf_hanlder: if element_name_old != variant.chrom: element_name_old = variant.chrom # MEDAKA output must have "TYPE" in info if variant.info["TYPE"][0] == "snp": vect_sites.append(str(variant.pos)) elif variant.info["TYPE"][0] == "ins": vect_sites.append(str(variant.pos)) elif variant.info["TYPE"][0] == "del": vect_ranges.append( f"{variant.pos + len(variant.alts[0])}-{variant.pos - len(variant.alts[0]) + len(variant.ref)}" ) else: vect_ranges.append(f"{variant.pos}-{variant.pos + len(variant.ref) - 1}") sites = "".join( i if idx == len(vect_sites) - 1 else f"{i}," for idx, i in enumerate(vect_sites) ) regions = "".join( i if idx == len(vect_ranges) - 1 else f"{i}," for idx, i in enumerate(vect_ranges) ) # print(sites) # print(regions) consensus = sys.argv[1] output = sys.argv[7] final_mask_consensus = [] ranges, single_positions, from_beggining, from_end = ( [x.split("-") for x in regions.split(",")] if regions != "" else None, sites.split(",") if sites != "" else None, None, None, ) ref_insertions = 0 for record in SeqIO.parse(consensus, "fasta"): sequence = MutableSeq(record.seq) masking_sites = compute_masking_sites( sequence, ranges, single_positions, from_beggining, from_end ) masking_sites = sorted(masking_sites) print(masking_sites) # Taken from insaflu ref_pos = 0 gap = 0 for idx in range(len(sequence)): if sequence[idx] == "-": # print("Hello") gap += 1 # print(gap) # ref_insertions += 1 # continue print(idx) if ref_pos in masking_sites: print( f"placing N in position {ref_pos + ref_insertions}, it was a {sequence[ref_pos + ref_insertions]}" ) sequence[ref_pos + ref_insertions] = "N" ref_pos += 1 if (ref_pos + ref_insertions) >= len(record.seq): break # End of insaflu code record.seq = sequence final_mask_consensus.append(record) print(final_mask_consensus[0].seq[8650]) SeqIO.write(final_mask_consensus, output, "fasta") if __name__ == "__main__": main() |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | from Bio import SeqIO def is_fasta(filename: str) -> bool: """ The is_fasta function checks if a file is in FASTA format. :param filename: str: Specify the file to be checked :return: True if the file is a fasta file, and false otherwise :doc-author: Trelent """ with open(filename, "r", encoding="utf-8") as handle: fasta = SeqIO.parse(handle, "fasta") return any(fasta) def is_genbank(filename: str) -> bool: """ The is_gbk function checks if a file is in GenBank format. :param filename: str: Specify the file to be checked :return: True if the file is a genbank file, and false otherwise :doc-author: Trelent """ with open(filename, "r", encoding="utf-8") as handle: gbk = SeqIO.parse(handle, "genbank") return any(gbk) def same_identifiers(fasta_file: str, gbk_file: str) -> bool: """ The same_identifiers function checks whether the IDs in a FASTA file match those in a GenBank file. It does this by opening both files and parsing them with the SeqIO module from Biopython. The function returns True if the lists of IDs are identical, and False otherwise. :param fasta_file: str: Specify the path to the fasta file :param gbk_file: str: Specify the path to the genbank file :return: True if the order of the ids in both files are equal :doc-author: Trelent """ with open(fasta_file, "r", encoding="utf-8") as handle: fasta = SeqIO.parse(handle, "fasta") # type: ignore fasta_ids: list[str] = [record.id for record in fasta] with open(gbk_file, "r", encoding="utf-8") as handle: gbk = SeqIO.parse(handle, "genbank") gbk_locus: list[str] = [record.name for record in gbk] return sorted(fasta_ids) == sorted(gbk_locus) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | import sys import csv import re from Bio import SeqIO from extract_gb_info import get_locus from yaml_io import read_yaml def get_fasta_reference_concat(fasta_ref): """ The get_fasta_reference_concat function takes a fasta file as inputand returns the concatenated reference sequence. The function is used to create a single, concatenated reference sequence from multiple contigs in the assembly. :param fasta_ref: Specify the reference genome to be used for mapping :return: The concatenated reference sequence in the form of a seqrecord """ concat_reference = [] count = 0 for record in SeqIO.parse(fasta_ref, "fasta"): if count == 0: concat_reference.append(record) start = fasta_ref.index("reference") end = fasta_ref.index(".fasta") # why + 11 => 16/05 concat_reference[0].id = fasta_ref[start + 11 : end] else: concat_reference[0].seq += record.seq count += 1 return concat_reference[0] def get_id_sequence_from_consensus(consensus, sample_name, value_list): """ The get_id_sequence_from_consensus function takes a fasta file of consensus sequences and returns the sequence of the locus that is specified by the value_list. The first record in this fasta file will be returned with its id changed to sample_name. This function assumes that there are no duplicate records in consensus, which may not be true. :param consensus: Define the consensus sequence file :param sample_name: Name the sequence in the seqrecord object :param value_list: Specify which segments to concatenate :return: A seqrecord object that contains the consensus sequence for a given sample :doc-author: Trelent """ concat_consensus = [] count = 1 starting_segment = value_list[0] for record in SeqIO.parse(consensus, "fasta"): if count == starting_segment: concat_consensus.append(record) concat_consensus[0].id = sample_name elif count in value_list: concat_consensus[0].seq += record.seq count += 1 return concat_consensus[0] def get_id_sequence_from_consensus_strict(consensus, sample_name, value_list): """ The get_id_sequence_from_consensus_strict function takes a consensus sequence file, the sample name, and a list of integers. It then parses through the consensus sequence file to find all sequences that match the starting segment in the list. The function concatenates these sequences together into one record and assigns it an ID based on its sample name. It returns this record. :param consensus: Specify the path to a fasta file containing the consensus sequence for each sample :param sample_name: Name the sequence :param value_list: Specify the segments that will be concatenated :return: A seqrecord object from a fasta file :doc-author: Trelent """ concat_consensus = [] count = 1 starting_segment = value_list[0] for record in SeqIO.parse(consensus, "fasta"): if count == starting_segment: concat_consensus.append(record) concat_consensus[0].id = "" concat_consensus[0].id = sample_name concat_consensus[0].description = "" elif count in value_list: concat_consensus[0].seq += record.seq count += 1 return concat_consensus[0] def create_consensus_file_for_alignment( consensus_list: list, reference_gb: str, output: str, coverage_file: str, reference_fasta: str, output_only_90_plus: str, ): """ The create_consensus_file_for_alignment function takes a list of files, the reference genome in genbank format, the species name and an output file name as input. It then creates a fasta file with all the sequences from the consensus files that have coverage above 90%. The function also takes into account if it is flu or not. If it is flu it will only include samples that have 8 loci covered at 90% or more in one file and in the other it will concatonate all segments that have 90% coverage or more. :param consensus: Specify the path to the directory containing all of your consensus sequences :param reference_gb: Get the name of the reference sequence in a genbank file :param species: Determine which reference file to use :param output: Name the output file :param coverage_file: Specify the file containing all the coverage values for each sample :param fasta_file: Get the reference sequence from the fasta file :param output_only_90_plus: Create a fasta file with only the sequences that have at least 90% coverage :return: A fasta file with the consensus sequences of all samples that have a coverage over 90% :doc-author: Trelent """ with open(coverage_file, encoding="UTF8", newline="") as csvfile: csv_reader = csv.reader(csvfile, delimiter=",") coverage_list = list(csv_reader) dic_directory = read_yaml("../config/constants.yaml") software_parameters = read_yaml(dic_directory["software_parameters"]) coverage_value = software_parameters["min_coverage_consensus"] for row in coverage_list: for value in range(len(row) - 1, 0, -1): # print(row[value]) if float(row[value]) > coverage_value: row[value] = value else: row.pop(value) coverage_dic = {} for i in coverage_list: coverage_dic[i[0]] = i[1:] fasta_reference_concat = get_fasta_reference_concat(reference_fasta) new_concat_file = [fasta_reference_concat] new_concat_file_strict = [fasta_reference_concat] if len(get_locus(reference_gb)) > 1: for file in consensus_list: sample_name = re.findall("(?<=sample_)(.*?)(?=/)", file)[0] if len(coverage_dic[sample_name]) > 0: new_sequence = get_id_sequence_from_consensus( file, sample_name, coverage_dic[sample_name] ) new_concat_file.append(new_sequence) if len(coverage_dic[sample_name]) == 8: new_sequence = get_id_sequence_from_consensus_strict( file, sample_name, coverage_dic[sample_name] ) new_concat_file_strict.append(new_sequence) SeqIO.write(new_concat_file, output_only_90_plus, "fasta") SeqIO.write(new_concat_file_strict, output, "fasta") else: for file in consensus_list: sample_name = re.findall("(?<=sample_)(.*?)(?=/)", file)[0] if len(coverage_dic[sample_name]) == 1: new_sequence = get_id_sequence_from_consensus_strict( file, sample_name, coverage_dic[sample_name] ) new_concat_file_strict.append(new_sequence) SeqIO.write(new_concat_file_strict, output_only_90_plus, "fasta") SeqIO.write(new_concat_file_strict, output, "fasta") if __name__ == "__main__": CONSENSUS_LIST = sys.argv[1].split(" ") REFERENCE_GB = sys.argv[2] OUTPUT_FILE = sys.argv[3] COVERAGE_FILE = sys.argv[4] REFERENCE_FASTA = sys.argv[5] OUTPUT_FILE_WITH_90_PLUS_COVERAGE = sys.argv[6] create_consensus_file_for_alignment( CONSENSUS_LIST, REFERENCE_GB, OUTPUT_FILE, COVERAGE_FILE, REFERENCE_FASTA, OUTPUT_FILE_WITH_90_PLUS_COVERAGE, ) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import os import sys import re import errno import argparse def parse_args(args=None): Description = "Convert iVar variants tsv file to vcf format." Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>""" parser = argparse.ArgumentParser(description=Description, epilog=Epilog) parser.add_argument("FILE_IN", help="Input tsv file.") parser.add_argument("FILE_OUT", help="Full path to output vcf file.") parser.add_argument( "-po", "--pass_only", dest="PASS_ONLY", help="Only output variants that PASS all filters.", action="store_true", ) parser.add_argument( "-af", "--allele_freq_thresh", type=float, dest="ALLELE_FREQ_THRESH", default=0, help="Only output variants where allele frequency greater than this number (default: 0).", ) return parser.parse_args(args) def make_dir(path): if not len(path) == 0: try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False, minAF=0): filename = os.path.splitext(FileIn)[0] header = ( "##fileformat=VCFv4.2\n" "##source=iVar\n" '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n' '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n' '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n' '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n' '##FORMAT=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n' '##FORMAT=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n' '##FORMAT=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n' '##FORMAT=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n' '##FORMAT=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n' '##FORMAT=<ID=ALT_QUAL,Number=1,Type=String,Description="Mean quality of alternate base">\n' '##FORMAT=<ID=ALT_FREQ,Number=1,Type=String,Description="Frequency of alternate base">\n' ) header += ( "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\t" + filename + "\n" ) varList = [] varCountDict = {"SNP": 0, "INS": 0, "DEL": 0} OutDir = os.path.dirname(FileOut) make_dir(OutDir) fout = open(FileOut, "w") fout.write(header) with open(FileIn) as f: for line in f: if not re.match("REGION", line): line = re.split("\t", line) CHROM = line[0] POS = line[1] ID = "." REF = line[2] ALT = line[3] var_type = "SNP" if ALT[0] == "+": ALT = REF + ALT[1:] var_type = "INS" elif ALT[0] == "-": REF += ALT[1:] ALT = line[2] var_type = "DEL" QUAL = line[9] pass_test = line[13] if pass_test == "TRUE": FILTER = "PASS" else: FILTER = "FAIL" INFO = "DP=" + line[11] FORMAT = "GT:REF_DP:REF_RV:REF_QUAL:ALT_DP:ALT_RV:ALT_QUAL:ALT_FREQ" SAMPLE = ( "1:" + line[4] + ":" + line[5] + ":" + line[6] + ":" + line[7] + ":" + line[8] + ":" + line[9] + ":" + line[10] ) oline = ( CHROM + "\t" + POS + "\t" + ID + "\t" + REF + "\t" + ALT + "\t" + QUAL + "\t" + FILTER + "\t" + INFO + "\t" + FORMAT + "\t" + SAMPLE + "\n" ) writeLine = True if passOnly and FILTER != "PASS": writeLine = False if float(line[10]) < minAF: writeLine = False if (CHROM, POS, REF, ALT) in varList: writeLine = False else: varList.append((CHROM, POS, REF, ALT)) if writeLine: varCountDict[var_type] += 1 fout.write(oline) fout.close() ## Print variant counts to pass to MultiQC # varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())] # print("\t".join(["sample"] + [x[0] for x in varCountList])) # print("\t".join([filename] + [x[1] for x in varCountList])) def main(args=None): args = parse_args(args) ivar_variants_to_vcf( args.FILE_IN, args.FILE_OUT, args.PASS_ONLY, args.ALLELE_FREQ_THRESH ) if __name__ == "__main__": sys.exit(main()) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | import subprocess import sys from extract_gb_info import get_id_version, get_locus CONFIG_FILE = "../workflow/db/snpeff.config" DATA_DIRECTORY = "../workflow/db/data/" def prepare_snpeff_run(ref_path_gb, locus, ref_path_fa, reference_name, output): """ The prepare_snpeff_run function creates a snpEFF database for the reference genome. It takes as input: - The path to the reference genome in GenBank format, and - The name of the output file that will be created by this function. This function creates a directory called 'config/data' if it does not already exist, and then creates three files inside that directory: genes.gbk, sequences.fa, and snpeff_genes_prod_snpsiftdb-passed-only-geneidmap (the last one is created by running SnpSift). :param snpeff_path: Specify the path to snpeff :param ref_path_gb: Specify the path to the genbank file of a reference genome :param locus: Determine the reference genome to use for snpeff :param ref_path_fa: Specify the path to the reference genome fasta file :param reference_name: Name the reference files in the snpeff config file :param output: Create a file that indicates the database is ready :return: A text file with a message "database ready!" :doc-author: Trelent """ if len(locus) > 1: text = [ f"{reference_name}.chromosome : {' '.join(str(locus))}\n", f"{reference_name}.genome : flu\n", ] for i in locus: text.append( f"{reference_name}.{i}.codonTable : Bacterial_and_Plant_Plastid\n" ) else: text = [ f"\n{reference_name}.genome: {reference_name}\n", f"{reference_name}.{get_id_version(ref_path_gb)}.codonTable : Bacterial_and_Plant_Plastid\n", ] with open(CONFIG_FILE, "r", encoding="UTF8") as snpeff_config: lines = snpeff_config.readlines() joined = "".join(lines) find = joined.find("".join(text)) if find == -1: subprocess.run( f"mkdir {DATA_DIRECTORY}{reference_name} -p", shell=True, check=False ) subprocess.run( f"cat {ref_path_gb} > {DATA_DIRECTORY}{reference_name}/genes.gbk", shell=True, check=False, ) subprocess.run( f"cat {ref_path_fa} > {DATA_DIRECTORY}{reference_name}/sequences.fa", shell=True, check=False, ) with open(CONFIG_FILE, "a", encoding="UTF8") as snpeff: snpeff.writelines(text) subprocess.run( f"snpEff build -genbank {reference_name} -c {CONFIG_FILE} > {output}", shell=True, check=False, ) else: with open(output, "w", encoding="UTF8") as out: out.write("Database Ready!") if __name__ == "__main__": SNPEFF_PATH = sys.argv[1] REFERENCE_GB = sys.argv[2] REFERENCE_FASTA = sys.argv[3] REFERENCE_NAME = sys.argv[4] OUTPUT = sys.argv[5] SEGMENTS_LIST = get_locus(REFERENCE_GB) prepare_snpeff_run( REFERENCE_GB, SEGMENTS_LIST, REFERENCE_FASTA, REFERENCE_NAME, OUTPUT ) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | import os import sys from extract_gb_info import get_id_version, get_locus def prepare_snpeff_run(snpeff_path, ref_path_gb, locus, ref_path_fa, reference_name): if len(locus) > 1: text = [ f"{reference_name}.chromosome : {' '.join(str(locus))}\n", f"{reference_name}.genome : flu\n", ] for i in locus: text.append( f"{reference_name}.{i}.codonTable : Bacterial_and_Plant_Plastid\n" ) else: text = [ f"\n{reference_name}.genome: {reference_name}\n", f"{reference_name}.{get_id_version(ref_path_gb)}.codonTable : Bacterial_and_Plant_Plastid\n", ] with open( f"{snpeff_path}/share/snpeff-4.3.1t-5/snpEff.config", "r", encoding="UTF8" ) as snpeff_config_file: lines = snpeff_config_file.readlines() joined = "".join(lines) find = joined.find("".join(text)) if find == -1: os.system( f"mkdir {snpeff_path}/share/snpeff-4.3.1t-5/data/{reference_name} ") os.system( f"cat {ref_path_gb} > {snpeff_path}/share/snpeff-4.3.1t-5/{reference_name}/genes.gbk" ) os.system( f"cat {ref_path_fa} > {snpeff_path}/share/snpeff-4.3.1t-5/{reference_name}/sequences.fa" ) with open( f"{snpeff_path}/share/snpeff-4.3.1t-5/snpEff.config", "a", encoding="UTF8" ) as snpeff: snpeff.writelines(text) os.system(f"snpEff build -genbank {reference_name}") if __name__ == "__main__": SNPEFF_PATH = sys.argv[1] REFERENCE_GB = sys.argv[2] REFERENCE_FASTA = sys.argv[3] REFERENCE_NAME = sys.argv[4] SEGMENTS_LIST = get_locus(REFERENCE_GB) prepare_snpeff_run( SNPEFF_PATH, REFERENCE_GB, SEGMENTS_LIST, REFERENCE_FASTA, REFERENCE_NAME ) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | from Bio import SeqIO def get_locus(genbank_file): """ The get_locus function takes a genbank file and returns the locus number of that record. If there is only one record in the genbank file, it will return the possible_name. If there are multiple records, it will return a list of all locus numbers. :param genbank_file: Open the genbank file, and then parse it using seqio :param possible_name: Determine if the file is a single genbank file or not :return: A list of locus numbers :doc-author: Trelent """ locus = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): locus.append(record.name) return locus def get_locus_len(genbank_file): """ The get_locus function takes a genbank file and returns the locus number of that record. If there is only one record in the genbank file, it will return the possible_name. If there are multiple records, it will return a list of all locus numbers. :param genbank_file: Open the genbank file, and then parse it using seqio :param possible_name: Determine if the file is a single genbank file or not :return: A list of locus numbers :doc-author: Trelents """ locus = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): locus.append(len(record.seq)) return locus def get_locus_and_genes(genbank_file): """ The get_locus_and_genes function takes a genbank file as input and returns a dictionary of locus tags and genes. The function iterates through the features in each record, checking if they are CDSs. If so, it checks if the gene qualifier is present or not. If it is present, then that value is used for the gene name; otherwise, the locus_tag is used instead. :param genbank_file:str: Specify the file that contains the genbank information :return: A dictionary with the locus tag as key and a list of gene names as value :doc-author: Trelent """ locus_gene = {} with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): locus_gene[record.name] = [] for feat in record.features: if feat.type == "CDS": if feat.qualifiers.get("locus_tag", False): locus_gene[record.name].append( feat.qualifiers["locus_tag"][0].replace(" ", "_") ) elif feat.qualifiers.get("gene", False): locus_gene[record.name].append( feat.qualifiers["gene"][0].replace(" ", "_") ) elif feat.qualifiers.get("note", False): locus_gene[record.name].append( feat.qualifiers["note"][0].replace(" ", "_") ) elif feat.qualifiers.get("product", False): locus_gene[record.name].append( feat.qualifiers["product"][0].replace(" ", "_") ) return locus_gene def get_gb_name(genbank_file): with open(genbank_file, encoding="utf-8") as handle_gb: gbk = list(SeqIO.parse(handle_gb, "genbank"))[0] return gbk.name def get_id_version(genbank_file): """ The get_locus function takes a genbank file and returns the locus number of that record. If there is only one record in the genbank file, it will return the possible_name. If there are multiple records, it will return a list of all locus numbers. :param genbank_file: Open the genbank file, and then parse it using seqio :param possible_name: Determine if the file is a single genbank file or not :return: A list of locus numbers :doc-author: Trelent """ with open(genbank_file, encoding="utf-8") as handle_gb: for line in handle_gb: if "VERSION" in line: return line.split(" ")[-1].strip() return "" def get_positions_gb(genbank_file): """ The get_positions_gb function takes a genbank file as input and returns a list of lists. Each sublist contains the name of the gene, and its start and end positions in that particular sequence. The get_positions_gb function is called by other functions to retrieve this information. :param genbank_file: Open the genbank file and parse it :return: A list of lists :doc-author: Trelent """ positions = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): for feat in record.features: if feat.type == "CDS": if feat.qualifiers.get("locus_tag", False): positions.append( [ feat.qualifiers.get("locus_tag")[0].replace(" ", "_"), feat.location, ] ) elif feat.qualifiers.get("gene", False): positions.append( [ feat.qualifiers["gene"][0].replace(" ", "_"), feat.location, ] ) elif feat.qualifiers.get("note", False): positions.append( [ feat.qualifiers["note"][0].replace(" ", "_"), feat.location, ] ) elif feat.qualifiers.get("product", False): positions.append( [ feat.qualifiers["product"][0].replace(" ", "_"), feat.location, ] ) positions_clean = [] for idx, gene in enumerate(positions): positions_clean.append((gene[0], [])) for part in gene[1].parts: positions_clean[idx][1].append([int(part.start), int(part.end)]) handle_gb.close() return positions_clean def get_genes(genbank_file): """ The get_genes function takes a genbank file as input and returns a list of all the genes in that file. The function is used to create a list of all the genes in each genome. :param genbank_file: Specify the name of the file that contains all of the genes in a genome :return: A list of all the genes in the genbank file :doc-author: Trelent """ genes = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): for features in record.features: if features.type == "CDS": if features.qualifiers.get("gene"): genes.append(features.qualifiers["gene"][0].replace(" ", "_")) elif features.qualifiers.get("locus_tag"): genes.append( features.qualifiers["locus_tag"][0].replace(" ", "_") ) elif features.qualifiers.get("note"): genes.append(features.qualifiers["note"][0].replace(" ", "_")) elif features.qualifiers.get("product"): genes.append( features.qualifiers["product"][0].replace(" ", "_") ) handle_gb.close() return genes def get_identification_version(segments, reference_gb): """ The get_identification_version function takes a list of SeqRecord objects and the path to a reference GenBank file. It returns the identification and version numbers for that GenBank file. :param segments:list: Determine if the reference is a single segment or not :param reference_gb:str: Specify the reference genome in genbank format :return: A tuple with the identification and version of the reference genome :doc-author: Trelent """ if len(segments) == 1: version_id = get_id_version(reference_gb).split(".") identification = version_id[0] if len(version_id) > 0 else "" version = version_id[1] if len(version_id) > 1 else "" if len(version_id) == 1: new_id = get_gb_name(reference_gb) return new_id, "" else: identification = "" version = "" return identification, version def get_identification_version_string(segments, reference_gb): """ The get_identification_version function takes a list of SeqRecord objects and the path to a reference GenBank file. It returns the identification and version numbers for that GenBank file. :param segments:list: Determine if the reference is a single segment or not :param reference_gb:str: Specify the reference genome in genbank format :return: A tuple with the identification and version of the reference genome :doc-author: Trelent """ if len(segments) == 1: version_id = get_id_version(reference_gb).split(".") identification = version_id[0] if len(version_id) > 0 else "" version = version_id[1] if len(version_id) > 1 else "" if len(version_id) == 1: new_id = get_gb_name(reference_gb) return new_id else: identification = "" version = "" return f"{identification}.{version}" |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | import sys import csv from yaml_io import read_yaml from Bio import SeqIO from extract_gb_info import get_locus def get_locus_out_id(string_id: str): return string_id[len(string_id) - string_id[::-1].index("__"):] def convert_index_to_locus(coverage: dict[str, list[int]], genbank: str) -> dict[str, list[str]]: locus_names: list[str] = get_locus(genbank) for sample in coverage: for idx, value in enumerate(coverage[sample]): coverage[sample][idx] = locus_names[value-1] return coverage def generate_AllConsensus( coverage_file, reference_gb, input_files, reference_fasta, output_w_ref, output_no_ref, ): """ The create_consensus_file_for_alignment function takes a list of files, the reference genome in genbank format, the species name and an output file name as input. It then creates a fasta file with all the sequences from the consensus files that have coverage above 90%. The function also takes into account if it is flu or not. If it is flu it will only include samples that have 8 loci covered at 90% or more in one file and in the other it will concatonate all segments that have 90% coverage or more. :param consensus: Specify the path to the directory containing all of your consensus sequences :param reference_gb: Get the name of the reference sequence in a genbank file :param species: Determine which reference file to use :param output: Name the output file :param coverage_file: Specify the file containing all the coverage values for each sample :param fasta_file: Get the reference sequence from the fasta file :param output_only_90_plus: Create a fasta file with only the sequences that have at least 90% coverage :return: A fasta file with the consensus sequences of all samples that have a coverage over 90% :doc-author: Trelent """ with open(coverage_file, newline="") as csvfile: csv_reader = csv.reader(csvfile, delimiter=",") coverage_list = list(csv_reader) dic_directory = read_yaml("../config/constants.yaml") software_parameters = read_yaml(dic_directory["software_parameters"]) coverage_value = software_parameters["min_coverage_consensus"] for row in coverage_list: for value in range(len(row) - 1, 0, -1): if float(row[value]) >= coverage_value: row[value] = value else: row.pop(value) coverage_dic = {} for i in coverage_list: coverage_dic[i[0]] = i[1:] coverage_dic = convert_index_to_locus(coverage_dic, reference_gb) final_consensus_w_ref = [] final_consensus_no_ref = [] for record in SeqIO.parse(reference_fasta, "fasta"): final_consensus_w_ref.append(record) for sample in coverage_dic: for file in input_files: if sample in file: record: SeqIO.SeqRecord for record in SeqIO.parse(file, "fasta"): if get_locus_out_id(record.id) in coverage_dic[sample]: final_consensus_w_ref.append(record) final_consensus_no_ref.append(record) SeqIO.write(final_consensus_w_ref, output_w_ref, "fasta") SeqIO.write(final_consensus_no_ref, output_no_ref, "fasta") if __name__ == "__main__": coverage_file = sys.argv[1] reference_gb = sys.argv[2] input_files = sys.argv[3].split(" ") reference_fasta = sys.argv[4] output_w_ref = sys.argv[5] output_no_ref = sys.argv[6] generate_AllConsensus( coverage_file, reference_gb, input_files, reference_fasta, output_w_ref, output_no_ref, ) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | import csv import re from collections import Counter import sys def get_abricate_info(path): """ The get_abricate_info function takes in a path to an abricate file and returns the most common species and genus found in that file. The function first opens the csv file, reads it into a list of lists, then iterates through each entry to find all entries with "genus" or "species" as their header. If they have "genus", then they are added to a list of genera. If they have "species", then they are added to a list of species. After this is done for every entry, we use Counter from collections (imported at top) :param path:str: Specify the path to the abricate file :return: A list of two strings: the most common species and genus found in a given abricate file """ with open(path, encoding="utf-8") as csv_handler: reader = csv.reader(csv_handler) info_list = list(reader) genus_list = [] species_list = [] for entry_list in info_list: entry = entry_list[0] if "genus" in entry: genus = re.findall("(?<=genus~~~)(.*?)(?=~~~)", entry)[0] genus_list.append(genus) elif "species" in entry: species = re.findall("(?<=species~~~)(.*?)(?=~~~)", entry)[0] species_list.append(species) if genus_list and species_list: final_genus = set(genus_list) final_species = set(species_list) return [final_species, final_genus] elif genus_list and not species_list: final_genus = set(genus_list) return ["", final_genus] elif not genus_list and species_list: final_species = set(species_list) return [final_species, ""] else: return ["", ""] def write_to_file(output_path, info_list) -> None: """ The write_to_file function writes the species and genus names to a file. The function takes two arguments: output_path, which is the path of the file we want to write to; and info_list, which is a list containing both species and genus names. The function returns None. :param output_path:str: Specify the path to the output file :param info_list:list[str]: Store the information that will be written to the file :return: None """ species = info_list[0] genus = info_list[1] with open(output_path, "w", encoding="utf-8") as output_handler: output_handler.write(f"Species: {species}\n") output_handler.write(f"Genus: {genus}") if __name__ == "__main__": input_file = sys.argv[1] output_file = sys.argv[2] write_to_file(output_file, get_abricate_info(input_file)) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | import csv import re from collections import Counter import sys def get_abricate_info(path): """ The get_abricate_info function takes in a path to an abricate file and returns the most common species and genus found in that file. The function first opens the csv file, reads it into a list of lists, then iterates through each entry to find all entries with "genus" or "species" as their header. If they have "genus", then they are added to a list of genera. If they have "species", then they are added to a list of species. After this is done for every entry, we use Counter from collections (imported at top) :param path:str: Specify the path to the abricate file :return: A list of two strings: the most common species and genus found in a given abricate file """ with open(path, encoding="utf-8") as csv_handler: reader = csv.reader(csv_handler) info_list = list(reader) genus_list = [] species_list = [] for entry_list in info_list: entry = entry_list[0] if "genus" in entry: genus = re.findall("(?<=insaflu~~~)(.*?)(?=_genus)", entry)[0] genus_list.append(genus) elif "species" in entry: species = re.findall("(?<=insaflu~~~)(.*?)(?=_species)", entry)[0] species_list.append(species) if genus_list and species_list: final_genus = max(Counter(genus_list).items(), key=lambda x: x[1])[0] final_species = max(Counter(species_list).items(), key=lambda x: x[1])[0] return [final_species, final_genus] elif genus_list and not species_list: final_genus = max(Counter(genus_list).items(), key=lambda x: x[1])[0] return ["", final_genus] elif not genus_list and species_list: final_species = max(Counter(species_list).items(), key=lambda x: x[1])[0] return [final_species, ""] else: return ["", ""] def write_to_file(output_path, info_list) -> None: """ The write_to_file function writes the species and genus names to a file. The function takes two arguments: output_path, which is the path of the file we want to write to; and info_list, which is a list containing both species and genus names. The function returns None. :param output_path:str: Specify the path to the output file :param info_list:list[str]: Store the information that will be written to the file :return: None """ species = info_list[0] genus = info_list[1] with open(output_path, "w", encoding="utf-8") as output_handler: output_handler.write(f"Species: {species}\n") output_handler.write(f"Genus: {genus}") if __name__ == "__main__": input_file = sys.argv[1] output_file = sys.argv[2] write_to_file(output_file, get_abricate_info(input_file)) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import sys import re from Bio import SeqIO if __name__ == "__main__": consensus_files = sys.argv[1].split(" ") output_file = sys.argv[2] final_consensus_file = [] for consensus_file in consensus_files: aligned_sequence = list(SeqIO.parse(consensus_file, "fasta"))[1] aligned_sequence.id = f"{re.findall('(?<=align_samples/)(.*?)(?=/)',consensus_file)[0]}__{aligned_sequence.id}" final_consensus_file.append(aligned_sequence) with open(output_file, "w", encoding="utf-8") as handle_fasta_out_align: SeqIO.write(final_consensus_file, handle_fasta_out_align, "fasta") |
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 | from Bio import SeqIO from optparse import OptionParser import sys import random import os import gzip import glob def get_locus_name_len(genbank_file): locus_name = [] locus_len = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): locus_name.append(record.name) locus_len.append(str(len(record.seq))) return locus_name, locus_len class Util(object): ''' classdocs ''' FORMAT_FASTA = "fasta" FORMAT_FASTQ = "fastq" EXTENSION_ZIP = ".gz" TEMP_DIRECTORY = "/tmp" COVERAGE_TEMP_DIRECTORY = "getCoverage" COUNT_DNA_TEMP_DIRECTORY = "count_dna_temp_directory" def __init__(self): ''' Constructor ''' pass def get_temp_file(self, file_name, sz_type): main_path = os.path.join( self.TEMP_DIRECTORY, self.COUNT_DNA_TEMP_DIRECTORY) if (not os.path.exists(main_path)): os.makedirs(main_path) while 1: return_file = os.path.join(main_path, "count_dna_" + file_name + "_" + str( random.randrange(100000, 999999, 10)) + "_file" + sz_type) if (not os.path.exists(return_file)): return return_file def is_integer(self, n_value): try: int(n_value) return True except ValueError: return False def is_gzip(self, file_name): return True if ( file_name.rfind(".gz") == len(file_name) - 3) else False def __get_temp_file__(self, index_file_to_process, sz_type): main_path = os.path.join( self.TEMP_DIRECTORY, self.COVERAGE_TEMP_DIRECTORY) if (not os.path.exists(main_path)): os.makedirs(main_path) while 1: return_file = os.path.join(main_path, "seq_dna_" + str(index_file_to_process) + "_" + str( random.randrange(10000, 99999, 10)) + "_file." + sz_type) if (not os.path.exists(return_file)): return return_file class CoverageElement(object): """ Only have the number of reads and average """ def __init__(self, element): self.element = element self.dt_data = {} def add_coverage(self, type_coverage, coverage): self.dt_data[type_coverage] = coverage def get_coverage(self, type_coverage): return self.dt_data.get(type_coverage, None) def __str__(self): return "Element: {} {}".format(self.element, self.dt_data) class Coverage(object): """ Only have the number of reads and average """ COVERAGE_ALL = "CoverageAll" COVERAGE_MORE_DEFINED_BY_USER = "CoverageMoreDefinedByUser" def __init__(self, genbank, limit_defined_by_user=10): self.limit_defined_by_user = limit_defined_by_user self.dt_data = {} self.ratio_value_defined_by_user = 0 self.genbank = genbank def get_dict_data(self): return self.dt_data def add_coverage(self, element, type_coverage, coverage): if (element in self.dt_data): self.dt_data[element].add_coverage(type_coverage, coverage) else: self.dt_data[element] = CoverageElement(element) self.dt_data[element].add_coverage(type_coverage, coverage) def get_coverage(self, element, type_coverage): if (element in self.dt_data): return self.dt_data[element].get_coverage(type_coverage) raise Exception("Error: there's no key like this: " + element) def __str__(self): sz_return = "snps" names, _ = get_locus_name_len(self.genbank) for name in names: sz_return += f"\t{name}" sz_return += "\t" for key in self.dt_data: sz_return += "{}\t".format( self.get_coverage( key, Coverage.COVERAGE_ALL)) print(sz_return) for key in self.dt_data: sz_return += "{}\t".format( self.get_coverage( key, Coverage.COVERAGE_MORE_DEFINED_BY_USER)) return sz_return class ParseFile(object): ''' classdocs ''' def __init__(self): ''' Constructor ''' self.data_file = None self.reference_dict = {} self.vect_reference = [] self.util = Util() def is_gzip(self, file_name): return True if ( file_name.rfind(".gz") == len(file_name) - 3) else False def parse_file(self, file_name): """ """ self.data_file = DataFile(file_name) if (self.util.is_gzip(file_name)): handle = gzip.open(file_name, mode='rt') else: handle = open(file_name) for line in handle: sz_temp = line.strip().lower() if (len(sz_temp) == 0 or sz_temp[0] == '#'): continue self.data_file.add_data(line) handle.close() return self.data_file def read_reference_fasta(self, reference_file): """ test if the reference_file and ge the handle """ if (not os.path.exists(reference_file)): raise Exception( "Can't locate the reference file: '" + reference_file + "'") # set temp file name temp_file_name = reference_file # create temp file b_temp_file = False if self.util.is_gzip(reference_file): b_temp_file = True temp_file_name = self.util.get_temp_file( "reference_file_", ".fasta") cmd = "gzip -cd " + reference_file + " > " + temp_file_name os.system(cmd) for rec in SeqIO.parse(temp_file_name, 'fasta'): self.reference_dict[rec.id] = len(str(rec.seq)) self.vect_reference.append(rec.id) # remove temp file if necessary if b_temp_file and temp_file_name: os.remove(temp_file_name) class DataFile(object): ''' classdocs ''' util = Util() def __init__(self, file_name): ''' Constructor ''' self.file_name = file_name self.vect_chromosomes = [] self.dict_data = {} self.dict_data_coverage = {} self.previous_position = -1 def get_vect_chromosomes(self): return self.vect_chromosomes def get_dict_data(self): return self.dict_data def add_data(self, line): if (len(line) == 0 or line[0] == '#'): return vect_data = line.split() if (len(vect_data) != 3): raise Exception("File: " + self.file_name + "\nThis line must have three values '" + line + "'") if (not self.util.is_integer(vect_data[1])): raise Exception("File: " + self.file_name + "\nLine: '" + line + "'\nThe locus need to be integer") if (not self.util.is_integer(vect_data[2])): raise Exception("File: " + self.file_name + "\nLine: '" + line + "'\nThe coverage need to be integer") if (vect_data[0] in self.dict_data): if (int(vect_data[1]) <= (self.previous_position)): raise Exception("File: " + self.file_name + "\nLine: '" + line + "'\nThe locus need to be greater than the predecessor in the file") self.dict_data[vect_data[0]].append([vect_data[1], vect_data[2]]) self.previous_position = int(vect_data[1]) else: self.vect_chromosomes.append(vect_data[0]) self.dict_data[vect_data[0]] = [[vect_data[1], vect_data[2]]] self.previous_position = int(vect_data[1]) def get_coverage(self, sz_chromosome, length_chromosome): if (sz_chromosome not in self.dict_data): return 0 if (sz_chromosome in self.dict_data_coverage): return self.dict_data_coverage[sz_chromosome] if (length_chromosome == 0): return 0 # medaka sometimes creates bigger references than the original, difference 2 or 3 number of bases if (len(self.dict_data[sz_chromosome]) > (length_chromosome * 1.10) or len(self.dict_data[sz_chromosome]) < (length_chromosome - (length_chromosome * 0.10))): raise Exception("Chromosome '%s' has different sizes. Coverage: %d; Reference: %d" % ( sz_chromosome, len(self.dict_data[sz_chromosome]), length_chromosome)) sum_total = 0 for data_ in self.dict_data[sz_chromosome]: sum_total += int(data_[1]) self.dict_data_coverage[sz_chromosome] = sum_total / \ float(length_chromosome) return self.dict_data_coverage[sz_chromosome] def get_ratio_more_than(self, sz_chromosome, length_chromosome, value): if (sz_chromosome not in self.dict_data): return 0 if (length_chromosome == 0): return 0 # medaka sometimes creates bigger references than the original, difference 2 or 3 number of bases if (len(self.dict_data[sz_chromosome]) > (length_chromosome * 1.10) or len(self.dict_data[sz_chromosome]) < (length_chromosome - (length_chromosome * 0.10))): raise Exception("Chromosome '%s' has different sizes. Coverage: %d; Reference: %d" % ( sz_chromosome, len(self.dict_data[sz_chromosome]), length_chromosome)) sum_total = 0 for data_ in self.dict_data[sz_chromosome]: sum_total += (1 if (int(data_[1]) > value) else 0) return sum_total / float(length_chromosome) class GetCoverage(object): """ get coverage from deep.gz file need deep.gz file and reference """ util = Util() def __init__(self): self.vect_files_processed = [] self.reference_dict = {} self.vect_reference = [] def get_dict_reference(self): return self.reference_dict def get_vect_reference(self): return self.vect_reference def get_vect_files_processed(self): return self.vect_files_processed def process_input_files(self, input_path): """ test if input_file is directory or file and read all files in the directory """ print("Collecting files in: " + input_path) if (os.path.isfile(input_path)): if (os.path.exists(input_path)): self.vect_files_processed.append(input_path) else: self.vect_files_processed = glob.glob(input_path) def get_dict_with_coverage(self, deep_file): """ get a dictonary of elements with coverage """ self.reference_dict = {} self.vect_reference = [] parse_file = ParseFile() data_file = parse_file.parse_file(deep_file) dt_out = {} for key in data_file.get_dict_data().keys(): dt_out[key] = [int(value_[1]) for value_ in data_file.get_dict_data()[key]] return dt_out # def get_coverage(self, deep_file, reference, limit_defined_by_user = None, limit_defined_to_project = None): def get_coverage(self, deep_file, reference, limit_defined_by_user=10): """ get an instance of coverage """ self.reference_dict = {} self.vect_reference = [] parse_file = ParseFile() data_file = parse_file.parse_file(deep_file) self.read_reference_fasta(reference) coverage = Coverage(genbank=options.genbank) for chromosome in self.vect_reference: if (chromosome not in self.reference_dict): raise Exception("Can't locate the chromosome '" + chromosome + "' in reference file") coverage.add_coverage(chromosome, Coverage.COVERAGE_ALL, "%.1f" % ( data_file.get_coverage(chromosome, self.reference_dict[chromosome]))) coverage.add_coverage(chromosome, Coverage.COVERAGE_MORE_DEFINED_BY_USER, "%.1f" % (data_file.get_ratio_more_than(chromosome, self.reference_dict[chromosome], limit_defined_by_user - 1) * 100)) return coverage def read_reference_fasta(self, reference_file): """ test if the reference_file and ge the handle """ if (not os.path.exists(reference_file)): raise Exception( "Can't locate the reference file: '" + reference_file + "'") # set temp file name temp_file_name = reference_file # create temp file b_temp_file = False if self.util.is_gzip(reference_file): b_temp_file = True temp_file_name = self.util.get_temp_file( "reference_file_", ".fasta") cmd = "gzip -cd " + reference_file + " > " + temp_file_name os.system(cmd) for rec in SeqIO.parse(temp_file_name, 'fasta'): self.reference_dict[rec.id] = len(str(rec.seq)) self.vect_reference.append(rec.id) ### if b_temp_file and temp_file_name: os.remove(temp_file_name) if __name__ == '__main__': """ V0.6 release 21/11/2017 add - ratio as a parameter V0.5 release 30/09/2017 Fix - length chromosome V0.4 release 30/09/2017 Fix - when coverage doesn't have at all coverage doesn't appear in the results V0.3 release 30/09/2017 Fix - get coverage 0 for chromosomes that are missing in coverage file V0.2 release 30/09/2017 Add - need the reference file check if all chromosomes that are in the coverage are the ones that are in the reference. get the size of the chromosomes from the reference add chromosome length to the report V0.1 release 12/09/2017 Add - coverage average base on the files <chromosome> <position> <deep coverage>: """ b_debug = False if (b_debug): dir_path = os.path.dirname(os.path.realpath(__file__)) input_file = os.path.join(dir_path, "test/files/*.depth") output_file = "tmp_out_get_coverage.csv" reference = os.path.join(dir_path, "test/files/ref/ref_H3.fasta") threshold = 10 get_coverage = GetCoverage() get_coverage.process_input_files(input_file) handle = open(output_file, "w") for file_to_process in get_coverage.get_vect_files_processed(): coverage = get_coverage.get_coverage(deep_file=file_to_process, reference=reference, limit_defined_by_user=threshold) print(coverage) handle.write("{},{}\n".format( str(os.path.basename(file_to_process)).split(".")[0], coverage)) # print("{},{}\n".format(str(os.path.basename(file_to_process)).split(".")[0],coverage)) handle.close() sys.exit(1) else: parser = OptionParser( usage="%prog [-h] -i -r -o [-t]", version="%prog 0.6", add_help_option=False) parser.add_option("-i", "--input", type="string", dest="input", help="Input file or path with coverage files. Can be zipped.", metavar="IN_FILE") parser.add_option("-r", "--reference", type="string", dest="reference", help="Reference file to get the length of the chromosomes to check his name.", metavar="REF_FILE") parser.add_option("-g", "--genbank", type="string", dest="genbank", help="Genbank file") parser.add_option("-o", "--output", type="string", dest="output", help="Output file name", metavar="OUT_FILE") parser.add_option("-t", "--threshold", type="int", dest="threshold", help="Minimum coverage per position (default 10)", metavar="OUT_FILE") parser.add_option('-h', '--help', dest='help', action='store_true', help='show this help message and exit') (options, args) = parser.parse_args() if (options.help): parser.print_help() print("") print("\tCreate an output file with several averages about the coverage.") print("\tOnly runs in linux or mac.") print( "\texample: python getCoverage -i '/usr/local/zpto/*.gz' -r reference.fasta -o resultsOut.csv") print("\texample: python getCoverage -i '/usr/local/zpto/*.gz' -r reference.fasta -o resultsOut.csv -t 30") print("") print( "\tThe input coverage files must be in this format '<chromosome> <position> <deep coverage>'") sys.exit(0) if (len(args) != 0): parser.error( "incorrect number of arguments, no of arguments: " + str(len(args))) if not options.input: # parser.error('Name of the input files/path is not specified') if not options.genbank: # parser.error('Name of the input files/path is not specified') if not options.reference: # parser.error('Name of the reference file not specified') if not options.output: # parser.error('Output file is not specified') threshold = 10 if (options.threshold): threshold = options.threshold locus_name, locus_len = get_locus_name_len(options.genbank) get_coverage = GetCoverage() get_coverage.process_input_files(options.input) handle = open(options.output, "w") # print(get_coverage.get_vect_files_processed()) for file_to_process in get_coverage.get_vect_files_processed(): coverage = get_coverage.get_coverage(deep_file=file_to_process, reference=options.reference, limit_defined_by_user=threshold) print(file_to_process) print(str(os.path.basename(file_to_process)).split(".")[0]) handle.write("\n") handle.write("Chromosome\n") print(locus_name, locus_len) name = "Name\t" + "\t".join(locus_name) handle.write(name.strip() + "\n") lenght = "Lenght\t" + "\t".join(locus_len) handle.write(lenght.strip() + "\n") handle.write("\n") handle.write("Coverage\tRatio>0\tRatio>9\n") handle.write("{}\n".format( str(coverage).strip())) # print("{},{}".format(str(os.path.basename(file_to_process)).split(".")[0],coverage)) handle.close() |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | def get_trimmomatic_parameters(software_parameters): trimmomatic_params = ( f'ILLUMINACLIP {software_parameters["trimmo-ILLUMINACLIP"]} -phred33' if software_parameters["trimmo-ILLUMINACLIP"] is not None else "-phred33" ) trimmomatic_params += ( f' HEADCROP:{software_parameters["trimmo-HEADCROP"]}' if software_parameters["trimmo-HEADCROP"] is not None else "" ) trimmomatic_params += ( f' CROP:{software_parameters["trimmo-CROP"]}' if software_parameters["trimmo-CROP"] is not None else "" ) trimmomatic_params += ( f' SLIDINGWINDOW:{software_parameters["trimmo-SLIDINGWINDOW1"]}:{software_parameters["trimmo-SLIDINGWINDOW2"]}' if software_parameters["trimmo-SLIDINGWINDOW2"] is not None else "" ) trimmomatic_params += ( f' LEADING:{software_parameters["trimmo-LEADING"]}' if software_parameters["trimmo-LEADING"] is not None else "" ) trimmomatic_params += ( f' TRAILING:{software_parameters["trimmo-TRAILING"]}' if software_parameters["trimmo-TRAILING"] is not None else "" ) trimmomatic_params += ( f' MINLEN:{software_parameters["trimmo-MINLEN"]}' if software_parameters["trimmo-MINLEN"] is not None else "" ) trimmomatic_params += " TOPHRED33" return trimmomatic_params def mask_regions_parameters(software_parameters): mask = ( f'-s {software_parameters["MASK_SITES"]} ' if software_parameters["MASK_SITES"] is not None else "" ) mask += ( f'-r {software_parameters["MASK_REGIONS"]} ' if software_parameters["MASK_REGIONS"] is not None else "" ) mask += ( f'-b {software_parameters["MASK_F_BEGINNING"]} ' if software_parameters["MASK_F_BEGINNING"] is not None else "" ) mask += ( f'-e {software_parameters["MASK_F_END"]} ' if software_parameters["MASK_F_END"] is not None else "" ) return mask def get_snippy_parameters(software_parameters): snippy_parameters = ( f' --mapqual {software_parameters["mapqual"]}' if software_parameters["mapqual"] else "" ) snippy_parameters += ( f' --mincov {software_parameters["mincov"]}' if software_parameters["mincov"] else "" ) snippy_parameters += ( f' --minfrac {software_parameters["minfrac"] }' if software_parameters["minfrac"] else "" ) return snippy_parameters def get_nanofilt_parameters(software_parameters): nanofilt_parameters = ( f' -q {software_parameters["QUALITY"]}' if software_parameters["QUALITY"] is not None else "" ) nanofilt_parameters += ( f' -l {software_parameters["LENGTH"]}' if software_parameters["LENGTH"] is not None else "" ) nanofilt_parameters += ( f' --headcrop {software_parameters["HEADCROP"]}' if software_parameters["HEADCROP"] is not None else "" ) nanofilt_parameters += ( f' --tailcrop {software_parameters["TAILCROP"]}' if software_parameters["TAILCROP"] is not None else "" ) return nanofilt_parameters |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | from typing import Optional import pandas from scripts.yaml_io import read_yaml dic_directory = read_yaml("config/constants.yaml") class Data: """Object with sample data information""" def __init__(self, file: str, consensus_illumina) -> None: self.user_df: pandas.DataFrame = pandas.read_csv(file) self.consensus_illumina = consensus_illumina def get_sample_names(self) -> list[str]: """ The get_sample_names function returns a list of sample names from the user_df dataframe. :param self: Access the attributes and methods of the class in python :return: A list of the sample names in the user_df :doc-author: Trelent """ return list(self.user_df["sample_name"]) def get_sample_1(self) -> list[str]: """ The get_sample_1 function returns a list of the fastq files associated with the sample. This is accomplished by returning the values in the "fastq" column of a pandas dataframe. :param self: Access the attributes and methods of the class in python :return: A list of the fastq file names from the user_df dataframe :doc-author: Trelent """ return list(self.user_df["fastq1"]) def get_sample_2(self) -> list[Optional[str]]: """ The get_sample_2 function returns a list of the second fastq file for each sample. This is useful when you want to run a program that requires paired-end reads. :param self: Access the attributes and methods of the class in python :return: A list of the values in the column "fastq2" from the user_df :doc-author: Trelent """ return list(self.user_df["fastq2"]) def get_sample_type(self) -> dict[str, str]: """ The get_sample_type function returns a dictionary of sample names and their corresponding sequencing technology. The function is called by the get_sample_metadata function to create a dictionary of metadata for each sample. :param self: Reference the class instance :return: A dictionary with sample names as keys and the corresponding assembler type as values :doc-author: Trelent """ names = self.get_sample_names() tech_type: pandas.Series = self.user_df["tech"] type_dic: dict[str, str] = {} for index, _ in enumerate(names): type_dic[names[index]] = ( "medaka" if tech_type[index] == "ont" else self.consensus_illumina ) return type_dic def get_dic(self) -> dict[str, dict[str, Optional[str]]]: """ The get_dic function creates a dictionary of dictionaries. The outer dictionary is keyed by sample names, and the inner dictionary contains keys for each column in the user_df dataframe. For example: {'sample_name': {'fastq2': 'path/to/file', 'fastq2': 'path/to/file', ...}, ...} :param self: Refer to the object that is calling the function :return: A dictionary with the sample names as keys and a subdictionary of fastq files, tech, and species :doc-author: Trelent """ dic: dict = {} names = self.get_sample_names() for name in names: dic[name] = {} for idx, sample_name_1 in enumerate(self.get_sample_1()): dic[names[idx]]["fastq1"] = ( sample_name_1 if isinstance(sample_name_1, str) else None ) for idx, sample_name_2 in enumerate(self.get_sample_2()): dic[names[idx]]["fastq2"] = ( sample_name_2 if isinstance(sample_name_2, str) else None ) for idx, tech in enumerate(list(self.user_df["tech"])): dic[names[idx]]["tech"] = tech if isinstance(tech, str) else None return dic def get_options(self) -> list[Optional[dict[str, dict[str, Optional[str]]]]]: """ The get_options function returns a list of dictionaries. The first dictionary is the paired illumina samples, the second is th single illumina samples, and the third is ont_samples. The fourth dictionary contains all of the information for each sample in a project and will be used to create yaml files for Snakemake. Lastly, it :param self: Access the attributes and methods of the class in python :return: A list of dictionaries :doc-author: Trelent """ project_dic = self.get_dic() final_dictionary: Optional[dict[str, dict[str, Optional[str]]]] = {} single_illumina: Optional[dict[str, dict[str, Optional[str]]]] = {} paired_illumina: Optional[dict[str, dict[str, Optional[str]]]] = {} ont_samples = {} for names, _ in project_dic.items(): if ( project_dic[names]["fastq1"] and project_dic[names]["fastq2"] and project_dic[names]["tech"] == "illumina" ): paired_illumina[names] = project_dic[names] final_dictionary[names] = project_dic[names] elif ( project_dic[names]["fastq1"] and not project_dic[names]["fastq2"] and project_dic[names]["tech"] == "illumina" ): single_illumina[names] = project_dic[names] final_dictionary[names] = project_dic[names] elif ( project_dic[names]["fastq1"] and not project_dic[names]["fastq2"] and project_dic[names]["tech"] == "ont" ): ont_samples[names] = project_dic[names] final_dictionary[names] = project_dic[names] return [ paired_illumina, single_illumina, ont_samples, final_dictionary, ] def get_assembler(self) -> str: """ The get_assembler function returns the assembler used in the pipeline. :param self: Access the attributes and methods of the class in python :return: The assembler that is being used in the current instance of the class :doc-author: Trelent """ return self.consensus_illumina def get_data_in_align_form( assembler: str, single_illumina: dict[str, Optional[str]], paired_illumina: dict[str, Optional[str]], ) -> list[dict[str, Optional[str]]]: """ The get_data_in_align_form function takes in the assembler, single_illumina and paired_illumina dictionaries. It then checks to see if the assembler is snippy or iVar. If it is snippy, it returns the paired illumina dictionary as well as both single illumina dictionaries. If it is iVar, then it returns both paired illumina dictionaries and both single illumina dictionaries. :param assembler:str: Determine which assembler to use :param single_illumina:dict[str: Store the name of the file that contains the single-end illumina reads :param Optional[str]]: Indicate that the parameter is optional :param paired_illumina:dict[str: Store the paired illumina reads :param Optional[str]]: Indicate that the value can be none :param : Determine which assembler to use :return: A list of four dictionaries :doc-author: Trelent """ paired_illumina_snippy: dict[str, Optional[str]] = {} single_illumina_snippy: dict[str, Optional[str]] = {} paired_illumina_ivar: dict[str, Optional[str]] = {} single_illumina_ivar: dict[str, Optional[str]] = {} if assembler == "snippy": paired_illumina_snippy = paired_illumina single_illumina_snippy = single_illumina paired_illumina_ivar = {} single_illumina_ivar = {} elif assembler == "iVar": paired_illumina_ivar = paired_illumina single_illumina_ivar = single_illumina paired_illumina_snippy = {} single_illumina_snippy = {} return [ paired_illumina_snippy, single_illumina_snippy, paired_illumina_ivar, single_illumina_ivar, ] |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | from Bio import SeqIO def detach_reference(input_file, ref): """ The detach_reference function takes a fasta file and removes the first record from it, and saves that record to a new file. The function returns the filename of the original fasta file without its first sequence. :param input_file: Specify the file to be parsed :param ref: Specify the name of the file that will be created to store the reference sequence :return: The reference sequence :doc-author: Trelent """ with open(input_file, "r", encoding="utf8") as handle_input: record_list = list(SeqIO.parse(handle_input, "fasta")) reference = record_list.pop(0) with open(input_file, "w", encoding="utf8") as handle_input_write: SeqIO.write(record_list, handle_input_write, "fasta") with open(ref, "w", encoding="utf8") as handle_reference: SeqIO.write(reference, handle_reference, "fasta") def attach_reference(input_file, ref): """ The attach_reference function takes a fasta file and attaches the reference sequence to it. It is used in the main function to attach the reference sequence to each of the input files. :param input_file: Specify the name of the file that contains the sequence records :param ref: Specify the reference sequence :return: The input file with the reference sequence at the beginning of the list :doc-author: Trelent """ with open(ref, "r", encoding="utf8") as handle: reference = list(SeqIO.parse(handle, "fasta"))[0] with open(input_file, "r", encoding="utf8") as handle: records = list(SeqIO.parse(handle, "fasta")) records.insert(0, reference) with open(input_file, "w", encoding="utf8") as handle: SeqIO.write(records, handle, "fasta") |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import sys from Bio import SeqIO if __name__ == "__main__": reference_fasta = sys.argv[1] consensus_file = sys.argv[2] output_file = sys.argv[3] segment_name = sys.argv[4] with open(reference_fasta, "r", encoding="utf-8") as handle_fasta: dt_consensus = SeqIO.to_dict(SeqIO.parse(consensus_file, "fasta")) for record in SeqIO.parse(handle_fasta, "fasta"): if record.id == segment_name: # make mask # get sequences vect_out_fasta_to_align = [] record_id = record.id record.id = "" record.id = record_id + "_ref" # delete descriptions vect_out_fasta_to_align.append(record) vect_out_fasta_to_align.append(dt_consensus[record_id]) with open(output_file, "w", encoding="utf-8") as handle_fasta_out_align: SeqIO.write( vect_out_fasta_to_align, handle_fasta_out_align, "fasta" ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | import argparse from Bio import SeqIO from Bio.Seq import MutableSeq def compute_masking_sites( sequence, ranges=None, singles_positions=None, from_beggining=None, from_end=None, ): length = len(sequence) masking_sites = [] if ranges is not None: for pos in ranges: pos[0] = int(pos[0]) pos[1] = int(pos[1]) if pos[1] < length and pos[1] > pos[0]: masking_sites.extend(iter(range(pos[0] - 1, pos[1] - 1))) if single_positions is not None: for pos in single_positions: pos = int(pos) if pos < length: masking_sites.append(pos - 1) if from_beggining is not None: masking_sites.extend(iter(range(int(from_beggining)))) if from_end is not None: masking_sites.extend(iter(range(length - int(from_end), length))) return masking_sites if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("consensus", type=str, help="Consensus File Path") parser.add_argument("output", type=str, help="Output File Path") parser.add_argument( "-r", "--regions", type=str, help="Pass a string with the format x-y,n-z,...", ) parser.add_argument( "-s", "--single", type=str, help="Pass a string with the format x,y,n,z,...", ) parser.add_argument( "-b", "--beggining", type=int, help="Pass a integer with the format x" ) parser.add_argument( "-e", "--end", type=int, help="Pass a integer with the format x" ) args = parser.parse_args() ranges = None consensus = args.consensus or None output = args.output or None # Need to Error if output is missing single_positions = args.single.split(",") if args.single else None if args.regions: ranges = [x.split("-") for x in args.regions.split(",")] from_beggining = args.beggining or None from_end = args.end or None final_mask_consensus = [] for record in SeqIO.parse(consensus, "fasta"): sequence = MutableSeq(record.seq) masking_sites = compute_masking_sites( sequence, ranges, single_positions, from_beggining, from_end ) # Taken from insaflu ref_pos = 0 ref_insertions = 0 # for _ in range(len(sequence)): # if sequence[_] == "-": # ref_insertions += 1 # continue # if ref_pos in masking_sites: # sequence[ref_pos + ref_insertions] = "N" # ref_pos += 1 # if (ref_pos + ref_insertions) >= len(record.seq): # break for position in masking_sites: sequence[position] = "N" # End of insaflu code record.seq = sequence final_mask_consensus.append(record) with open(output, "w") as handle_fasta_out: SeqIO.write(final_mask_consensus, handle_fasta_out, "fasta") |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | import re import sys import csv import yaml from Bio import SeqIO def get_locus(genbank_file): """ The get_locus function takes a genbank file and returns the locus number of that record. If there is only one record in the genbank file, it will return the possible_name. If there are multiple records, it will return a list of all locus numbers. :param genbank_file: Open the genbank file, and then parse it using seqio :param possible_name: Determine if the file is a single genbank file or not :return: A list of locus numbers :doc-author: Trelent """ locus = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): locus.append(record.name) return locus def get_locus_len(genbank_file): """ The get_locus function takes a genbank file and returns the locus number of that record. If there is only one record in the genbank file, it will return the possible_name. If there are multiple records, it will return a list of all locus numbers. :param genbank_file: Open the genbank file, and then parse it using seqio :param possible_name: Determine if the file is a single genbank file or not :return: A list of locus numbers :doc-author: Trelents """ locus = [] with open(genbank_file, encoding="utf-8") as handle_gb: for record in SeqIO.parse(handle_gb, "genbank"): locus.append(len(record.seq)) return locus def read_yaml(yaml_file_path: str) -> dict: """ The read_yaml function reads a yaml file and returns the contents as a dictionary. The read_yaml function accepts one argument, which is the path to the yaml file. :param yaml_file_path: Specify the path to the yaml file that is going to be read :return: A dictionary :doc-author: Trelent """ with open(yaml_file_path, encoding="utf-8") as yaml_file: return yaml.load(yaml_file, Loader=yaml.FullLoader) def merge_coverage(file_list, output_file): """ The merge_coverage function takes a list of coverage files and merges them into one file. The first argument is the list of files, the second argument is the name for the output file. :param file_list: Store the list of files that will be merged :return: A list of lists with the coverage information for each sample :doc-author: Trelent """ with open(file_list[0], "r", encoding="utf8") as file_handler: length = file_handler.readlines()[3].split()[1] species = get_locus(REFERENCE_GB) length = get_locus_len(REFERENCE_GB) super_header = ["Name", *species, "", ""] super_header_1 = ["Length", *length, "", ""] software_parameters = read_yaml(dic_directory["software_parameters"]) header = [ "SAMPLES", "Mean depth of coverage", r"% of size covered by at least 1-fold", r"% of size covered by at least X-fold", ] sub_header = ["", *species, "", *species, "X-Fold", *species] final_output = [super_header, super_header_1, header, sub_header] for file in file_list: with open(file, "r", encoding="utf8") as f: sample_name = re.findall("(?<=/)(.*?)(?=_coverage.csv)", file)[0].split( "/" )[0] info = f.readlines()[6].split() sample_t = sample_type[sample_name] if sample_t in ("snippy", "iVar"): sample_cov = software_parameters["mincov"] elif sample_t == "medaka": sample_cov = software_parameters["mincov_medaka"] else: raise Exception( "There is no implementation for other tools than snippy, iVar and medaka." ) info[0] = sample_name print("This is info prev", info) info.insert(1 + len(species), "") print("This is info prev", info) info.insert(2 + len(species) * 2, sample_cov) print("This is info last", info) final_output.append(info) with open(output_file, mode="w", encoding="utf8") as f: f_writer = csv.writer( f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL ) f_writer.writerows(final_output) def get_coverage(filename: str, n_locus: int): """ The get_coverage function takes a filename as an argument and returns the coverage of each locus in that file. The function also takes n_locus as an argument, which is the number of loci in the file. :param filename: Specify the file to be read :param n_locus: Specify the number of loci in the file :return: A list of the coverage for each locus in a given sample :doc-author: Trelent """ with open(filename, newline="", encoding="UTF8") as csvfile: coverage_list = list(csv.reader(csvfile, delimiter="\t")) sample_name = re.findall("(?<=/)(.*?)(?=_coverage.csv)", filename)[0].split( "/" )[0] # for i in coverage_list: # print(i) coverage_list = coverage_list[-1][-n_locus:] coverage_list.insert(0, sample_name) csvfile.close() return coverage_list def create_script_readable_coverage_file(coverage_files, output, n_locus): """ The create_script_readable_coverage_file function takes a list of coverage files and outputs a script readable coverage file. The output is the name of the file you want to create. The last argument is n_locus which specifies how many loci are in each coverage file. :param coverage_files: Specify the files that contain the coverage data :param output: Specify the name of the output file :param n_locus: Indicate the number of locus in the genome :return: A list of lists, where each sublist contains the coverage each locus :doc-author: Trelent """ coverage_list = coverage_files n_locus = int(n_locus) coverage_translate = [] for filename in coverage_list: coverage_translate.append(get_coverage(filename, n_locus)) with open(output, mode="w", encoding="UTF8") as output_file: f_writer = csv.writer(output_file, delimiter=",") f_writer.writerows(coverage_translate) output_file.close() if __name__ == "__main__": COVERAGE_FILES = sys.argv[1].split() OUTPUT_FILE_1 = sys.argv[2] OUTPUT_FILE_2 = sys.argv[3] REFERENCE_GB = sys.argv[4] dic_directory = read_yaml("../config/constants.yaml") sample_type = read_yaml(dic_directory["config_file"])["sample_type"] NUMBER_OF_LOCUS = len(get_locus(REFERENCE_GB)) merge_coverage(COVERAGE_FILES, OUTPUT_FILE_1) create_script_readable_coverage_file( COVERAGE_FILES, OUTPUT_FILE_2, NUMBER_OF_LOCUS) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | import sys import os from yaml_io import read_yaml if __name__ == "__main__": dic_directory: dict[str, str] = read_yaml("../config/constants.yaml") SAMPLE = sys.argv[1] config_user = read_yaml(f"{dic_directory['config_file']}") fastq1 = config_user["samples"][SAMPLE]["fastq1"] fastq2 = config_user["samples"][SAMPLE]["fastq2"] if fastq1 and fastq2: run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.html ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_1_fastqc.html" os.system(f"{run}") run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq2}_fastqc.html ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_2_fastqc.html" os.system(f"{run}") run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.zip ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_1_fastqc.zip" os.system(f"{run}") run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq2}_fastqc.zip ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_2_fastqc.zip" os.system(f"{run}") elif fastq1: run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.html ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_fastqc.html" os.system(f"{run}") run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.zip ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_fastqc.zip" os.system(f"{run}") |
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 | import os import re import gzip import codecs import csv import argparse import datetime as dt from Bio import SeqIO def import_seqs(fasta_file): """Imports sequences from a FASTA file. Parameters ---------- fasta_file : str Path to the FASTA file. Returns ------- seqs : list of list List with one sublist per sequence in the FASTA file. Each sublist has two elements: The identifier/header of the sequence (str) and the sequence (str). """ seqs = [] for record in SeqIO.parse(fasta_file, "fasta"): seqid = record.id sequence = str(record.seq) seqs.append([seqid, sequence]) return seqs def read_depth_file(depth_file, sample_map): """read depth file""" with gzip.open(depth_file, "rb") if depth_file.endswith(".gz") else open( depth_file, "r" ) as df: lines = ( csv.reader(codecs.iterdecode(df, "utf-8"), delimiter="\t") if depth_file.endswith(".gz") else csv.reader(df, delimiter="\t") ) for l in lines: if len(l) < 3 or len(l[0].strip()) == 0: continue if l[0] in sample_map: sample_map[l[0]].append([l[0], l[1], l[2]]) else: sample_map[l[0]] = [[l[0], l[1], l[2]]] return sample_map def import_depth(depth_data, sample_ids=[]): """Import depth files, Parameters ---------- depth_data : str Path to the depth files or file itself, can be in gz format. Returns ------- sample_map : dictionary with name of sequence and list of depth """ sample_map = {} # list depth files and match sample to depth file if os.path.isdir(depth_data) is True: depth_files = os.listdir(depth_data) sample_map = {} for i in sample_ids: match = [f for f in depth_files if i in f] if len(match) > 0: file = os.path.join(depth_data, match[0]) sample_map = read_depth_file(file, sample_map) else: print("Could not find a depth file for " "sample {0}".format(i)) elif os.path.isfile(depth_data) is True: sample_map = read_depth_file(depth_data, sample_map) return sample_map def process_data(reference_sequence, samples, sample_map, cutoff, mask_gaps): """Mask sample sequences, Parameters ---------- reference_sequence : str Path to the depth files or file itself, can be in gz format. samples : list list of all elements, [[<name>, <sequence>], ...] sample_map : dictionary dictionary with name of sequence and list of depth Returns ------- None """ # determine gaps in reference # match '-' in reference ref_gaps = list(re.finditer("(-)+", reference_sequence)) ref_gaps = [s.span() for s in ref_gaps] gaps = {s[0]: s[1] - s[0] for s in ref_gaps} for i, sample in enumerate(samples): seqid = sample[0] sequence = sample[1] # determine initial and final subsequences with '-' left_trimmed = sequence.lstrip("-") left_pos = len(sequence) - len(left_trimmed) right_trimmed = sequence.rstrip("-") right_pos = len(sequence) - (len(sequence) - len(right_trimmed)) assert seqid in sample_map, "Error, SeqId-{} not in depth file".format(seqid) depth_info = sample_map[seqid] # get positions with zero coverage zero_cov = [d for d in depth_info if int(d[2]) <= cutoff] # shift low depth positions based on gaps on reference for p in gaps: current_p = p shift = gaps[p] zero_cov = [ [z[0], str(int(z[1]) + shift), z[2]] if int(z[1]) - 1 >= current_p else [z[0], z[1], z[2]] for z in zero_cov ] # after adjusting for gaps on reference we can remove # positions on trimmed regions zero_cov = [ z for z in zero_cov if int(z[1]) > left_pos and int(z[1]) <= right_pos ] total = len(zero_cov) print( "\nFound a total of {0} positions with " "low depth for sample {1}.\n".format(total, seqid) ) print("Masking low depth positions in {0}...".format(seqid)) splitted = list(sequence) masked = 0 zero_pos = [] for pos in zero_cov: # Python indexing starts at 0 low_cov_pos = int(pos[1]) - 1 current = splitted[low_cov_pos] if current != "-" or mask_gaps is True: splitted[low_cov_pos] = "N" # print('{0}: {1} --> N'.format(pos[1], current)) zero_pos.append(pos[1]) masked += 1 print("Masked {0}/{1}".format(masked, total)) samples[i][1] = "".join(splitted) # # save info about positions below coverage # with open(pos_file, 'a') as pf: # pos_info = '{0}\t{1}\t{2}\n'.format(seqid, total, # ','.join(zero_pos)) # pf.write(pos_info) def main(input_fasta, depth_data, output_fasta, cutoff, mask_gaps): start_date = dt.datetime.now() start_date_str = dt.datetime.strftime(start_date, "%Y-%m-%dT%H:%M:%S") print("Started at: {0}\n".format(start_date_str)) # import sequences in MSA FASTA file msa_seqs = import_seqs(input_fasta) # keep reference and samples separate reference = msa_seqs[0] reference_id = reference[0] samples = msa_seqs[1:] sample_ids = [s[0] for s in samples] print("Reference:\n{0}".format(reference_id)) print( "\nAligned samples:\n{0}\nTotal: {1}".format( "\n".join(sample_ids), len(sample_ids) ) ) # read depth information sample_map = import_depth(depth_data, sample_ids) # out_dir = os.path.dirname(output_fasta) # pos_basename = os.path.basename(output_fasta).split('.fasta')[0] # pos_file = os.path.join(out_dir, pos_basename+'_pos') # process data process_data(reference[1], samples, sample_map, cutoff, mask_gaps) # write masked sequences to FASTA with open(output_fasta, "w") as out: reference_record = [">{0}\n{1}".format(reference[0], reference[1])] samples_records = [">{0}\n{1}".format(s[0], s[1]) for s in samples] records = reference_record + samples_records out_text = "\n".join(records) out.write(out_text) end_date = dt.datetime.now() end_date_str = dt.datetime.strftime(end_date, "%Y-%m-%dT%H:%M:%S") delta = end_date - start_date minutes, seconds = divmod(delta.total_seconds(), 60) print("\nFinished at: {0}".format(end_date_str)) print("Elapsed time: {0:.0f}m{1:.0f}s".format(minutes, seconds)) def parse_arguments(): def msg(name=None): # simple command with default options simple_cmd = ( " python msa_low_cov_masker.py -i input.fasta " "-df depth_files -o out.fasta" ) # different depth of coverage value depth_command = ( " python msa_low_cov_masker.py -i input.fasta " "-df depth_files -o out.fasta --c 10" ) # mask gaps contained in sequences gaps_command = ( " python msa_low_cov_masker.py -i input.fasta " "-df depth_files -o out.fasta --c 10 --g" ) usage_msg = ( "\nSimple command with default options:\n\n{0}\n" "\nDifferent depth of coverage value:\n\n{1}\n" "\nMask gaps in the middle of aligned sequences:" "\n\n{2}\n".format(simple_cmd, depth_command, gaps_command) ) return usage_msg parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, usage=msg(), ) parser.add_argument( "-i", type=str, required=True, dest="input_fasta", help="Path to the input FASTA file that contains " "the Multiple Sequence Alignment.", ) parser.add_argument( "-df", type=str, required=True, dest="depth_files", help="Path to the directory with TSV " "files that contain depth of " "sequencing data. One file per " "sample (files must contain the " "identifier of the sample in the name).", ) parser.add_argument( "-o", type=str, required=True, dest="output_fasta", help="Path to the output FASTA file " "to which the masked sequences " "will be saved.", ) parser.add_argument( "--c", type=int, required=False, default=0, dest="cutoff", help="Positions with a depth value equal " "or below the value of this argument " "will be substituted by N (default=0).", ) parser.add_argument( "--g", required=False, action="store_true", dest="mask_gaps", help="If the process should mask gaps (-) " "with low depth (default=False).", ) args = parser.parse_args() return [ args.input_fasta, args.depth_files, args.output_fasta, args.cutoff, args.mask_gaps, ] if __name__ == "__main__": args = parse_arguments() main(args[0], args[1], args[2], args[3], args[4]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | import re import csv import sys def get_info_dic(info_list): """ The get_info_dic function takes a list of strings and returns a dictionary with the first item in each string as the key and the second as its value. For example, if info_list = ['ID=gene:Solyc00g005000.2.3', 'Name=Potri.001G000100'] then get_info_dic(info_list) will return {'ID':'gene:Solyc00g005000.2.3', 'Name':'Potri001G000100'}. :param info_list: Create a dictionary of the information contained in the info column :return: A dictionary of the information in each line :doc-author: Trelent """ info_dic = {} for item in info_list: pieces = item.split("=") info_dic[pieces[0]] = pieces[1] return info_dic def get_row_dict(row): """ The get_row_dict function takes a row from the vcf file and returns a dictionary with the following keys: CHROM, POS, ID, REF, ALT. :param row: Pass the row of data from the vcf file to the function :return: A dictionary with the chrom, pos, id, ref and alt values from a row in the vcf file :doc-author: Trelent """ row_dic = { "CHROM": row[0], "POS": row[1], "ID": row[2], "REF": row[3], "ALT": row[4], "QUAL": row[5], } return row_dic def create_graph_csv(files_path, output_file): """ The create_graph_csv function creates a csv file with the following information: - Sample name - Number of variants with frequency less than 50% (less_50) - Number of variants between 90 and 50% (between_90_50) :param files_path: Pass the path of the files that will be used to create a csv file :param output_file: Specify the name of the output file :return: A list of lists with the number of variants that are less than 50% frequency and between 90% and 50% :doc-author: Trelent """ file_final = [["Sample", "Less 50", "Between 90 and 50"]] for file in files_path: count_less = 0 count_more_50_less_90 = 0 with open(file, "r") as invcf: for line in invcf: try: if line.startswith("#"): continue new_entry = [] line = line.strip().split() info = line[7].split(";") info_dic = get_info_dic(info) row_dic = get_row_dict(line) if "ANN" not in info_dic: info_dic["ANN"] = "|||||||||||||||||||||||||||||||||||||||" info_dic["FTYPE"] = "" else: info_dic["FTYPE"] = "CDS" if ( "snp" in info_dic["TYPE"] or "del" in info_dic["TYPE"] or "ins" in info_dic["TYPE"] ): freq = round(float(info_dic["AO"]) / float(info_dic["DP"]), 2) if freq < 0.50: count_less += 1 elif freq > 0.50 and freq < 0.90: count_more_50_less_90 += 1 except: continue file_final.append( [ re.findall("(?<=/)(.*?)(?=_snpeff.vcf)", file)[0], count_less, count_more_50_less_90, ] ) # type: ignore with open(output_file, mode="w") as f: f_writer = csv.writer(f, delimiter=",") f_writer.writerows(file_final) f.close() if __name__ == "__main__": files_path = sys.argv[1].split(" ") output_file = sys.argv[2] create_graph_csv(files_path, output_file) |
2 3 4 5 6 7 8 9 10 11 12 13 14 | from random import choices def random_string(length): """ The random_string function returns a random string of length n. :param length: Determine the length of the random string :return: A string of random characters :doc-author: Trelent """ abc = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" return "".join(choices(abc, k=length)) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import sys import csv from Bio import SeqIO from extract_gb_info import get_locus def parse_depth(depth_file): with open(depth_file, "r", encoding="UTF8") as depth: reader = csv.reader(depth) out = [] for i in reader: out.append(i[0].split("\t")) return out def get_consensus(consensus): with open(consensus, "r", encoding="UTF8") as handle_fasta: return list(SeqIO.parse(handle_fasta, "fasta"))[0].seq def split_consensus(consensus, depth, locus, output): depth = parse_depth(depth) locus = get_locus(locus) dic = {} for i in locus: dic[i] = "" for entry in depth: dic[entry[0]] = entry[1] numbers_list = [] for i in locus: numbers_list.append(int(dic[i])) consensus_seq = get_consensus(consensus) # print(len(consensus_seq)) # consensus_seq = "".join(['A' for x in range(0,13136)]) # print(consensus_seq) last_list = [] # count_nt = 0 seq = " " new_dic = {} count = 0 for loc in locus: new_dic[loc] = [count, count + int(dic[loc])] # print(new_dic[loc]) count += int(dic[loc]) for loc in new_dic: small_part = consensus_seq[new_dic[loc][0] : new_dic[loc][1]] seq = small_part last_list.append(seq) with open(output, "w", encoding="UTF8") as out: for idx, seq in enumerate(last_list): out.writelines(f">{locus[idx]}\n") out.writelines(seq) out.writelines("\n") if __name__ == "__main__": CONSENSUS = sys.argv[1] DEPTH = sys.argv[2] LOCUS = sys.argv[3] OUTPUT = sys.argv[4] split_consensus(consensus=CONSENSUS, depth=DEPTH, locus=LOCUS, output=OUTPUT) |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import sys from extract_gb_info import get_locus def split_depth_file(file_path, reference_gb): """ The split_depth_file function takes a file path as an argument and splits the depth file into separate files for each reference sequence. The function takes the last position of a string containing the full path to the depth file as its starting point, then finds where in that string the first instance of "/" occurs. It then uses this index to slice out everything after it and stores it in a variable called filename. The function then creates another variable called new_file; which is empty at first but will eventually contain all lines from our original depth file except for those corresponding to our reference sequence (which we know because they start with the number) :param file_path: Specify the path to the file that is being split :return: A list of lines from the depth file that correspond to the reference sequence :doc-author: Trelent """ last_pos = len(file_path) - file_path[::-1].index("/") path = file_path[:last_pos] reference_list = get_locus(reference_gb) for name in reference_list: with open(file_path, "r", encoding="utf-8") as handle_depth: new_file = [] for line in handle_depth.readlines(): if line.split()[0] == str(name): new_file.append(line) if new_file: with open(f"{path}{name}.depth", "w", encoding="utf8") as output_file: output_file.writelines(new_file) # else: # print("Action already taken.") if __name__ == "__main__": DEPTH_FILE = sys.argv[1] REFERENCE_GB = sys.argv[2] split_depth_file(DEPTH_FILE, REFERENCE_GB) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | import sys from Bio import SeqIO from extract_gb_info import get_locus def get_locus_out_id(string_id: str): if string_id.find("__") == -1: return False return string_id[len(string_id) - string_id[::-1].index("__"):] def prepare_msa_masker(alignment, outdir, reference_gb): """ The prepare_msa_masker function takes a multiple sequence alignment and a reference genbank file as input. It then creates an output directory for each locus in the reference genome, and writes the sequences of that locus to a fasta file within each output directory. :param alignment: Specify the path to the alignment file :param outdir: Specify the directory where the files will be saved :param reference_gb: Get the locus of each segment in the alignment :return: A list of lists :doc-author: Trelent """ locus_names = get_locus(reference_gb) locus_records = {x: [] for x in locus_names} for locus in locus_records: for record in SeqIO.parse(alignment, "fasta"): if locus == record.id or locus == get_locus_out_id(record.id): locus_records[locus].append(record) for locus, list_of_records in locus_records.items(): SeqIO.write( list_of_records, f"{outdir}/{locus}/Alignment_nt_{locus}.fasta", "fasta", ) if __name__ == "__main__": alignment = sys.argv[1] outdir = sys.argv[2] reference_gb = sys.argv[3] prepare_msa_masker(alignment, outdir, reference_gb) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | import sys import csv from Bio import SeqIO import extract_gb_info as ggb from yaml_io import read_yaml import textwrap def write_fasta(dictionary: dict[str, str], filename: str): with open(filename, "w") as fasta: for key, value in dictionary.items(): fasta.write(f">{key}\n") fasta.write("\n".join(textwrap.wrap(str(value), 70))) fasta.write("\n") def get_reference(file_name: str) -> SeqIO.SeqRecord: with open(file_name, "r") as handle: return next(SeqIO.parse(handle, "fasta")) def get_ref_adjusted_positions( alignment: str, positions: list[tuple[str, list[list[int]]]], gene ) -> list[tuple[str, list[list[int]]]]: new_positions = [] ref = get_reference(alignment) index_list = [idx for idx, nucleotide in enumerate( ref) if nucleotide != "-"] for gene_group in positions: if gene_group[0] == gene: for group in gene_group[1]: group[0] = index_list[group[0]] if group[1] >= len(index_list): group[1] = index_list[-1] else: group[1] = index_list[group[1]] new_positions.append(gene_group) return new_positions def get_coverage_to_translate_matrix(filename: str) -> dict[str, list[str]]: with open(filename) as csvfile: csv_reader = csv.reader(csvfile) coverage_dic: dict[str, list[str]] = { row[0]: row[1:] for row in csv_reader} return coverage_dic def get_position_in_list(reference: str, locus: str) -> int: list_of_locus: list[str] = ggb.get_locus(reference) return list_of_locus.index(locus) def get_best_translation(record, pos: list[int]) -> str: options = ["", "", ""] options[0] = record.seq[pos[0]: pos[1]].ungap( ).translate(table=11, to_stop=False) options[1] = record.seq[pos[0] + 1: pos[1]].ungap( ).translate(table=11, to_stop=False) options[2] = record.seq[pos[0] + 2: pos[1]].ungap( ).translate(table=11, to_stop=False) idx = 0 min_idx = 0 min_value = options[0][:-1].count("*") for entry in options[1:]: idx += 1 new_value = entry[:-1].count("*") if new_value < min_value: min_value = new_value min_idx = idx return options[min_idx] def get_new_consensus(positions: list[tuple[str, list[list[int]]]], coverage_dic: dict[str, list[str]], coverage_value: float, gene_idx: int) -> dict[str, dict[str, str]]: new_consensus: dict[str, dict[str, str]] = {} for gene_structure in positions: gene_name = gene_structure[0] gene_positions = gene_structure[1] new_consensus[gene_name] = {} for pos in gene_positions: for record in SeqIO.parse(alignment, "fasta"): if record.id == reference_id: if new_consensus[gene_name].get(record.id, False): new_consensus[gene_name][record.id] += get_best_translation( record, pos) else: new_consensus[gene_name][record.id] = get_best_translation( record, pos) continue seq = "".join([str(record.seq)[pos[0]: pos[1]].replace("-", "") for pos in gene_positions]) gene_length = sum([pos[1] - pos[0] for pos in gene_positions]) if seq.count("N") / gene_length > 0.1: continue identifier = record.id record_coverage = float( coverage_dic[identifier[: identifier.index( f"__{reference_id}")]][gene_idx] ) if record_coverage >= coverage_value: if new_consensus[gene_name].get(identifier, False): new_consensus[gene_name][record.id] += get_best_translation( record, pos) else: new_consensus[gene_name][identifier] = get_best_translation( record, pos) return new_consensus def write_fast_aa( reference: str, alignment: str, output: str, reference_id: str, gene: str, coverage: str, ): coverage_dic = get_coverage_to_translate_matrix(coverage) constants: dict[str, str] = read_yaml("../config/constants.yaml") coverage_value: int = read_yaml(constants["software_parameters"])[ "min_coverage_consensus" ] gene_idx = get_position_in_list(reference, reference_id) positions: list[tuple[str, list[list[int]]] ] = ggb.get_positions_gb(reference) positions = get_ref_adjusted_positions(alignment, positions, gene) new_consensus = get_new_consensus(positions, coverage_dic, coverage_value, gene_idx) for value in new_consensus.values(): write_fasta(value, output) if __name__ == "__main__": reference = sys.argv[1] alignment = sys.argv[2] output = sys.argv[3] reference_id = sys.argv[4] gene = sys.argv[5] coverage = sys.argv[6] write_fast_aa(reference, alignment, output, reference_id, gene, coverage) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | import csv import re # from typing_extensions import TypedDict import sys DESCRIPTORS = [ "CHROM", "POS", "TYPE", "REF", "ALT", "FREQ", "COVERAGE", "EVIDENCE", "FTYPE", "STRAND", "NT_POS", "AA_POS", "EFFECT", "NT CHANGE", "AA CHANGE", "AA CHANGE ALT", "LOCUS_TAG", "GENE", "PRODUCT", "VARIANTS IN INCOMPLETE LOCUS", ] # # class SnakemakeEntry(TypedDict): # chrom: str # pos: int # ref: str # alt: str # qual: float # info: dict[str, str] # ann: dict[str, str] # names: list[str] # values: list[str] def remove_smallcase(string): return "".join(letter for letter in string if not letter.islower()) def smaller(string: str) -> str: return string[:2] + remove_smallcase(string[2:]) def convert_list_to_dict(data: list[str]) -> tuple[dict[str, str], dict[str, str]]: snp_annotation = [ "Allele", "Annotation", "Annotation_Impact", "Gene_Name", "Gene_ID", "Feature_Type", "Feature_ID", "Transcript_BioType", "Rank", "HGVS.c", "HGVS.p", "cDNA.pos/cDNA.length", "CDS_CDSLength", "AA.pos/AA.length", "Distance", "ERRORS/WARNINGS/INFO", ] new_dict = {} ann_dict = {} for item in data: if item: k, v = item.split("=") if k.upper() == "ANN": new_v = v.split("|") ann_dict = {snp_annotation[idx]: new_v[idx] for idx in range(16)} else: new_dict[k] = v return new_dict, ann_dict # class DataEntry(TypedDict, total=False): # "CHROM" # "POS" # "TYPE" # "REF" # "ALT" # "FREQ" # "COVERAGE" # "EVIDENCE" # "FTYPE" # "STRAND" # "NT_POS" # "AA_POS" # "EFFECT" # "NT CHANGE" # "AA CHANGE" # "AA CHANGE ALT" # "LOCUS_TAG" # "GENE" # "PRODUCT" # "VARIANTS IN INCOMPLETE LOCUS" # def get_multiple_freq(AO, DP): dp_value = float(DP) freqs = [float(v) / dp_value for v in AO.split(",")] return round(min(freqs), 2) def get_type(a, b): if a == b: return "snp" elif a > b: return "del" elif a < b: return "ins" else: return "complex" def get_info_on(dictionary, key, before=""): return before + dictionary.get(key) if dictionary.get(key, False) else "" def analyse_file(input_file): with open(input_file) as handler: lines = [line for line in handler.readlines() if line[0] != "#"] vcf_data = [] for line in lines: line = line.split() info, ann = convert_list_to_dict(line[7].split(";")) entry = { "chrom": line[0], "pos": int(line[1]), "ref": line[3], "alt": line[4], "qual": float(line[5]), "info": info, "ann": ann, "names": line[8].split(":"), "values": line[9].split(":"), } get_freq = entry["info"].get("FREQ", False) if not get_freq: try: get_freq = get_multiple_freq( entry["info"]["AO"], entry["info"]["DP"]) except KeyError: get_freq = round(float(entry["values"][-1]), 2) else: get_freq = round(float(get_freq) / 100, 2) default = [ entry["chrom"], entry["pos"], entry["info"].get("TYPE", get_type( len(entry["ref"]), len(entry["alt"]))), entry["ref"], entry["alt"], get_freq, get_info_on(entry["info"], "DP"), get_info_on(entry["info"], "AO", f"{entry['alt']}:"), get_info_on(entry["info"], "RO", f"{entry['ref']}:"), ] if entry.get("ann"): ann = entry["ann"] default.extend( [ "CDS", "+", ann["CDS_CDSLength"], ann["AA.pos/AA.length"], ann["Annotation"], ann["HGVS.c"], ann["HGVS.p"], smaller(ann["HGVS.p"]), ann["Gene_Name"], ann["Gene_Name"], "yes", ] ) else: default.extend(["", "", "", "", "", "", "", "", "", "", ""]) vcf_data.append(default) # print(vcf_data) return vcf_data def word_in_list(word, check_list): return any(item in word for item in check_list) def comparison(value1, value2, signal): if signal == "bigger": return value1 >= value2 elif signal == "smaller": return value1 < value2 def filter_variants( data: list[list[str]], signal: str, list_of_types: list[str], identifier: str ) -> list[list[str]]: filtered_data = [] for entry in data: entry_d = {DESCRIPTORS[idx]: value for idx, value in enumerate(entry)} if not comparison(entry_d["FREQ"], 0.51, signal): continue if not word_in_list(entry_d["TYPE"], list_of_types): continue entry.insert(0, identifier) filtered_data.append(entry) return filtered_data def validated_variants(files_path, output_file, signal, re_expression, list_of_types): """ The validated_variants function takes a list of vcf files and outputs a csv file with the following columns: ID, CHROM, POS, TYPE, REF, ALT, FREQ (allele frequency), COVERAGE (total depth at that position), EVIDENCE (number of reads supporting variant) FTYPE(functional type), STRAND(strand bias for reads supporting variant), NT_POS(nucleotide position in codon) AA_POS(amino acid position in protein sequence) EFFECT(effect on gene function as reported by snpeff software package). :param files_path: Pass the path to the folder containing all of your vcf files :param output_file: Specify the name of the file that will be created :return: A csv file with the following columns: :doc-author: Trelent """ vcf_final = [ [ "ID", "CHROM", "POS", "TYPE", "REF", "ALT", "FREQ", "COVERAGE", "EVIDENCE", "FTYPE", "STRAND", "NT_POS", "AA_POS", "EFFECT", "NT CHANGE", "AA CHANGE", "AA CHANGE ALT", "LOCUS_TAG", "GENE", "PRODUCT", "VARIANTS IN INCOMPLETE LOCUS", ] ] for file in files_path: identifier = re.findall(re_expression, file)[0] clean_file = analyse_file(file) filtered_file = filter_variants( clean_file, signal, list_of_types, identifier) vcf_final.extend(filtered_file) with open(output_file, mode="w") as f: f_writer = csv.writer(f, delimiter=",") f_writer.writerows(vcf_final) f.close() if __name__ == "__main__": files_path = sys.argv[1].split(" ") output_file = sys.argv[2] type_of_file = sys.argv[3] if type_of_file == "validated_variants": signal = "bigger" re_expression = "(?<=sample__)(.*?)(?=_snpeff.vcf)" list_of_words = ["snp", "mnp", "complex", "ins", "del"] elif type_of_file == "minor_iSNVs_inc_indels": signal = "smaller" re_expression = "(?<=/freebayes/)(.*?)(?=_snpeff.vcf)" list_of_words = ["snp", "del", "ins"] elif type_of_file == "minor_iSNVs": signal = "smaller" re_expression = "(?<=/freebayes/)(.*?)(?=_snpeff.vcf)" list_of_words = ["snp", "complex"] else: raise RuntimeError("That was a bad call") # to change validated_variants( files_path, output_file, signal, re_expression, list_of_words, ) |
2 3 4 5 6 7 8 9 10 11 12 | import yaml def read_yaml(yaml_file_path: str) -> dict: with open(yaml_file_path, encoding="utf-8") as yaml_file: return yaml.load(yaml_file, Loader=yaml.FullLoader) def write_yaml(yaml_file_path: str, dump_dict: dict) -> None: with open(yaml_file_path, "w", encoding="utf-8") as file: yaml.dump(dump_dict, file) |
Support
Do you know this workflow well? If so, you can
request seller status , and start supporting this workflow.
Created: 1yr ago
Updated: 1yr ago
Maitainers:
public
URL:
https://github.com/RdrigoE/insaflu_snakemake
Name:
insaflu_snakemake
Version:
v1.0.0
Downloaded:
0
Copyright:
Public Domain
License:
None
Keywords:
- Future updates
Related Workflows

ENCODE pipeline for histone marks developed for the psychENCODE project
psychip pipeline is an improved version of the ENCODE pipeline for histone marks developed for the psychENCODE project.
The o...

Near-real time tracking of SARS-CoV-2 in Connecticut
Repository containing scripts to perform near-real time tracking of SARS-CoV-2 in Connecticut using genomic data. This pipeli...

snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...

ATLAS - Three commands to start analyzing your metagenome data
Metagenome-atlas is a easy-to-use metagenomic pipeline based on snakemake. It handles all steps from QC, Assembly, Binning, t...
raw sequence reads
Genome assembly
Annotation track
checkm2
gunc
prodigal
snakemake-wrapper-utils
MEGAHIT
Atlas
BBMap
Biopython
BioRuby
Bwa-mem2
cd-hit
CheckM
DAS
Diamond
eggNOG-mapper v2
MetaBAT 2
Minimap2
MMseqs
MultiQC
Pandas
Picard
pyfastx
SAMtools
SemiBin
Snakemake
SPAdes
SqueezeMeta
TADpole
VAMB
CONCOCT
ete3
gtdbtk
h5py
networkx
numpy
plotly
psutil
utils
metagenomics

RNA-seq workflow using STAR and DESeq2
This workflow performs a differential gene expression analysis with STAR and Deseq2. The usage of this workflow is described ...

This Snakemake pipeline implements the GATK best-practices workflow
This Snakemake pipeline implements the GATK best-practices workflow for calling small germline variants. The usage of thi...