Snakemake workflow to cluster genomes (e.g. Metagenome assembled genomes) into species and Strains
Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation, topic
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
Authors
- Silas Kieser (@silask) Webpage
Usage
Step 1: Install workflow
If you simply want to use this workflow, download and extract the latest release . If you intend to modify and further develop this workflow, fork this repository. Please consider providing any generally applicable modifications via a pull request.
In any case, if you use this workflow in a paper, don't forget to give credits to the authors by citing the URL of this repository and, if available, its DOI (see above).
Step 2: Configure workflow
Configure the workflow according to your needs via editing the file
config.yaml
.
Step 3: Execute workflow
Test your configuration by performing a dry-run via
snakemake -n
Execute the workflow locally via
snakemake --cores $N
using
$N
cores or run it in a cluster environment via
snakemake --cluster qsub --jobs 100
or
snakemake --drmaa --jobs 100
See the Snakemake documentation for further details.
Testing
Tests cases are in the subfolder
.test
. They should be executed via continuous integration with Travis CI.
Code Snippets
10 11 12 13 14 15 16 17 18 | run: from common.genome_stats import get_many_genome_stats import pandas as pd d= pd.read_csv(input[0],sep='\t',index_col=0,squeeze=True,usecols=[0]) filenames = d.index.map(lambda s: f"genomes/{s}{config['fasta_extension']}") del d get_many_genome_stats(filenames,output[0],threads) |
43 44 | script: "../scripts/many_minimap.py" |
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | run: import pandas as pd sep='\t' print(input) D = pd.read_csv(input[0],sep=sep) n_cols= D.shape[1] D.to_csv(output[0],sep=sep) for file in input[1:]: D = pd.read_csv(file,sep=sep) assert n_cols== D.shape[1], f"File {file} doen't have the same number of columns as the one before. Cannot concatenate." D.to_csv(output[0],sep=sep,header=False,mode='a') in_dir= os.path.dirname(input[0]) import shutil shutil.rmtree(in_dir) |
118 119 120 121 122 123 124 125 | shell: "snakemake -s {params.path}/rules/mummer.smk " "--config comparison_list='{input.comparison_list}' " " genome_folder='{input.genome_folder}' " " subset={wildcards.subset} " " genome_stats={input.genome_stats} " " --rerun-incomplete " "-j {threads} --nolock 2> {log}" |
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | run: import pandas as pd import shutil Mummer={} for file in input: Mummer[io.simplify_path(file)]= pd.read_csv(file,index_col=[0,1],sep='\t') M= pd.concat(Mummer,axis=0) M.index= M.index.droplevel(0) M.to_csv(output[0],sep='\t') ani_dir= os.path.dirname(input[0]) shutil.rmtree(ani_dir) #sns.jointplot('ANI','Coverage',data=M.query('ANI>0.98'),kind='hex',gridsize=100,vmax=200) |
16 17 | script: "../scripts/group_species.py" |
43 44 | script: "../scripts/group_strains.py" |
81 82 83 84 85 86 87 88 89 90 91 | run: mapping = get_representative_mapping(input.cluster_file,wildcards.resolution_level) output_dir = output.dir os.makedirs(output_dir) input_dir= os.path.relpath(input.dir,start=output_dir) for genome in mapping.unique(): os.symlink(os.path.join(input_dir,genome+'.fasta'), os.path.join(output_dir,genome+'.fasta') ) |
119 120 121 122 123 124 125 126 127 128 129 130 131 132 | run: import pandas as pd df= pd.read_csv(input.cluster_file,sep='\t') os.makedirs(output.dir) for species_name,sp in df.groupby('Species'): with open(os.path.join(output.dir,species_name+'.fasta'),'w') as fasta_out: for genome in sp.Representative_Strains.unique(): with open(os.path.join(input.dir,genome+'.fasta')) as fasta_in: fasta_out.write(fasta_in.read()) |
7 8 9 10 | run: from glob import glob with open(output[0],'w') as f: f.write('\n'.join(glob(f"{input[0]}/*{config['fasta_extension']}"))+'\n' ) |
30 31 | shell: "mash sketch -o {params.out_name} -p {threads} -s {params.s} -k {params.k} -l {input[0]} 2> {log}" |
49 50 51 | shell: "mash dist -p {threads} -d {params.d} " "{input.genomes} {input.genomes} > {output[0]} 2> {log}" |
73 74 75 76 77 78 79 80 | shell: "bindash sketch " "--outfname={output[0]} " "--nthreads={threads} " "--sketchsize64={params.sketchsize64} " "--kmerlen={params.k} " "{params.extra} " "--listfname={input[0]} 2> {log}" |
97 98 99 100 101 | shell: "bindash dist " "--nthreads={threads} " "--mthres={params.d} " "--outfname={output} {input[0]} 2> {log}" |
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | run: if wildcards.tool == "mummer": open_function= gd.load_mummer elif wildcards.tool == "minimap": open_function= gd.load_minimap elif wildcards.tool == "bindash": open_function= gd.load_bindash elif wildcards.tool == "fastani": open_function= gd.load_fastani elif wildcards.tool == "mash": open_function= gd.load_mash else: raise Exception( f"Don't know how to load table from tool : {wildcards.tool}" ) M= open_function(input[0]).drop(['Identity'],axis=1) M.to_parquet(output[0],engine="pyarrow") |
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | run: F= gd.load_mash(input[0]) G= gd.to_graph(F.query(f"Distance<={params.threshold}")) if hasattr(G,'selfloop_edges'): G.remove_edges_from(G.selfloop_edges()) os.makedirs(output[0]) fout=None for i,e in enumerate(G.edges()): if (i % int(params.N)) ==0: n= int(i // params.N )+1 if fout is not None: fout.close() fout= open(f"{output[0]}/subset_{n}.txt",'w') fout.write("\t".join(sorted(e))+'\n') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | import pandas as pd import scipy.spatial as sp import scipy.cluster.hierarchy as hc from sklearn.metrics import silhouette_score import numpy as np from common import genome_pdist as gd import networkx as nx def automatic_cluster_species( Dist, seed_thresholds=[0.92, 0.97], linkage_method="average" ): linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method) def get_Nclusters(threshold): labels = hc.fcluster(linkage, (1 - threshold), criterion="distance") return max(labels) N_range = [get_Nclusters(t) for t in seed_thresholds] if (N_range[1] - N_range[0]) > 100: print(f"Need to evaluate more than {N_range[1]-N_range[0]} thresholds") assert ~np.isnan(N_range).any(), "N range is not defined" Scores = gd.evaluate_clusters_range( np.arange(min(N_range), max(N_range) + 1), Dist, linkage_method=linkage_method ) if N_range[0] == N_range[1]: labels = hc.fcluster(linkage, (1 - seed_thresholds[0]), criterion="distance") else: N_species = Scores.Silhouette_score.idxmax() labels = hc.fcluster(linkage, N_species, criterion="maxclust") return Scores, labels def threshold_based_clustering(Dist, threshold, linkage_method="average"): assert (threshold > 0.9) & ( threshold < 1 ), "threshold should be between 0.9 and 1 or 'auto', threshold was {threshold}" linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method) labels = hc.fcluster(linkage, (1 - threshold), criterion="distance") Scores = gd.evaluate_clusters_thresholds( [threshold], Dist, linkage_method=linkage_method ) return Scores, labels if __name__ == "__main__": linkage_method = snakemake.params.linkage_method threshold = snakemake.params.threshold quality_score_formula = snakemake.config["quality_score"] Q = pd.read_csv(snakemake.input.genome_info,sep='\t',index_col=0) quality_score = Q.eval(quality_score_formula) assert ( not quality_score.isnull().any() ), "I have NA quality values for thq quality score, it seems not all of the values defined in the quality_score_formula are presentfor all entries in tables/Genome_quality.tsv " # load genome distances M= gd.load_parquet(snakemake.input[0]) # genome distance to graph if ANI > 0.9 G= gd.to_graph(M.query(f"Identity>=@threshold")) if hasattr(G,'selfloop_edges'): G.remove_edges_from(G.selfloop_edges()) # prepare table for species number mag2Species = pd.DataFrame(index=Q.index, columns=["SpeciesNr", "Species"]) mag2Species.index.name = "genome" last_species_nr=0 for cc in nx.connected_components(G): Mcc = M.loc[( M.index.levels[0].intersection(cc), M.index.levels[1].intersection(cc) ), ] Dist = 1 - gd.pairewise2matrix(Mcc, fillna=Mcc.Identity.min()) if threshold == "auto": Scores, labels = automatic_cluster_species(Dist, linkage_method=linkage_method) else: Scores, labels = threshold_based_clustering( Dist, threshold, linkage_method=linkage_method ) # enter values of labels to species table mag2Species.loc[Dist.index, "SpeciesNr"] = labels +last_species_nr last_species_nr = mag2Species.SpeciesNr.max() missing_species = mag2Species.SpeciesNr.isnull() N_missing_species = sum(missing_species) mag2Species.loc[missing_species, "SpeciesNr"] = ( np.arange(last_species_nr, last_species_nr + N_missing_species) + 1 ) Scores["N_clusters"] += N_missing_species Scores.to_csv(snakemake.output.scores, sep="\t", index=False) # assert ( # Scores.loc[Scores.Silhouette_score.idxmax(), "N_clusters"] # == mag2Species.SpeciesNr.max() # ), "error in calculation of N species" print(f"Identified { mag2Species.SpeciesNr.max()} species") # create propper species names n_leading_zeros = len(str(max(labels))) format_int = "sp{:0" + str(n_leading_zeros) + "d}" mag2Species["Species"] = mag2Species.SpeciesNr.apply(format_int.format) # select representative mag2Species["Representative_Species"] = gd.best_genome_from_table( mag2Species.Species, quality_score ) mag2Species.to_csv(snakemake.output.cluster_file, sep="\t") Q= Q.join(mag2Species) Q.to_csv(snakemake.input.genome_info,sep='\t') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | import pandas as pd import scipy.spatial as sp import scipy.cluster.hierarchy as hc import numpy as np from common import genome_pdist as gd if __name__ == "__main__": linkage_method = snakemake.params.linkage_method treshold = 0.995 quality_score_formula = snakemake.config["quality_score"] M = gd.load_parquet(snakemake.input.dists) mag2species = pd.read_csv(snakemake.input.mag2species, index_col=0, sep="\t") Strains = mag2species.copy() M["Species"] = mag2species.loc[M.index.get_level_values(0), "Species"].values M["Species2"] = mag2species.loc[M.index.get_level_values(1), "Species"].values M = M.query("Species==Species2") strains_threshold= snakemake.config.get("strains_threshold",'auto') # simple threshold based spliting if strains_threshold != "auto": for species, data in M.groupby("Species"): ID = data.Identity.unstack() all_indexes = ID.index.union(ID.columns) ID = ID.reindex(index=all_indexes, columns=all_indexes).fillna(0) ID = ID + ID.T ID.values[np.eye(ID.shape[0], dtype=bool)] = 1 Dist = 1 - ID.fillna(0.8) Dist.clip(0, 1, inplace=True) linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method) labels = hc.fcluster(linkage, (1 - float(strains_threshold)) / 2, criterion="distance") Strains.loc[ID.index, "StrainNr"] = labels # Detect automatically threshold else: # detect stain gap fraction_below_treshold = (M.Identity < treshold).groupby( M.Species ).sum() / M.groupby("Species").size() have_strain_gap = fraction_below_treshold > 0.05 for species, data in M.groupby("Species"): if have_strain_gap[species]: ID = data.Identity.unstack() all_indexes = ID.index.union(ID.columns) ID = ID.reindex(index=all_indexes, columns=all_indexes).fillna(0) ID = ID + ID.T ID.values[np.eye(ID.shape[0], dtype=bool)] = 1 Dist = 1 - ID.fillna(0.8) Dist.clip(0, 1, inplace=True) linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method) labels = hc.fcluster(linkage, (1 - treshold) / 2, criterion="distance") Nmax = max(labels) assert Nmax < 500, "Need to evaluate more than 500 tresholds" assert ~np.isnan(Nmax), "N range is not defined" Scores = gd.evaluate_clusters_range( np.arange(2, Nmax + 1), Dist, linkage_method=linkage_method ) N_strains = Scores.Silhouette_score.idxmax() if ~np.isnan(N_strains): labels = hc.fcluster(linkage, N_strains, criterion="maxclust") Strains.loc[ID.index, "StrainNr"] = labels # Finalize naming Strains["Strain"] = ( "Strain" + Strains.SpeciesNr.astype(int).astype("str") + "_" + Strains.StrainNr.dropna().astype(int).astype("str") ) Strains.loc[Strains.Strain.isnull(), "Strain"] = Strains.loc[ Strains.Strain.isnull(), "Species" ] print(f"Identified { Strains.Strain.unique().size} strains") Q = pd.read_csv(snakemake.input.genome_info,sep='\t',index_col=0) quality_score = Q.eval(quality_score_formula) Strains["Representative_Strain"] = gd.best_genome_from_table( Strains.Strain, quality_score ) Strains.to_csv(snakemake.output.mag2strain, sep="\t") for col in ["Strain","Representative_Strain"]: Q[col] = Strains[col] Q.to_csv(snakemake.input.genome_info,sep='\t') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | import sys, os import pandas as pd sys.stdout = open(snakemake.log[0], "w") sys.stderr = open(snakemake.log[0], "a") from snakemake.shell import shell from common.alignments import parse_paf_files def run_minimap( ref, query, paf, options=snakemake.params.minimap_extra, threads=snakemake.threads, log=snakemake.log[0], ): os.makedirs(os.path.dirname(paf), exist_ok=True) shell( "minimap2 {options} -t {threads} {ref} {query} > {paf}.tmp 2> {log} ;" "mv {paf}.tmp {paf} 2> {log}" ) def many_minimap( alignment_list, genome_folder, paf_folder, genome_stats_file, extension=".fasta" ): stats = pd.read_csv(genome_stats_file, index_col=0, sep="\t") os.makedirs(paf_folder, exist_ok=True) paf_files = [] with open(alignment_list) as f: for line in f: # get larger genome as ref and smaler as query genome_pair = line.strip().split() genome_query, genome_ref = ( stats.loc[genome_pair, "Length"].sort_values().index ) paf_file = os.path.join(paf_folder, genome_ref, genome_query + ".paf") if not os.path.exists(paf_file): # run minimap fasta_ref = os.path.join(genome_folder, genome_ref + extension) fasta_query = os.path.join(genome_folder, genome_query + extension) run_minimap(fasta_ref, fasta_query, paf_file) paf_files.append(paf_file) return paf_files paf_files = many_minimap( snakemake.input.alignment_list, snakemake.input.genome_folder, snakemake.params.paf_folder, snakemake.input.genome_stats, snakemake.params.extension, ) parse_paf_files( paf_files, snakemake.input.genome_stats, snakemake.output.alignments_stats ) |
Support
- Future updates
Related Workflows





