A Snakemake workflow to run and benchmark structure learning (a.k.a. causal discovery) algorithms for probabilistic graphical models.
Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation, topic
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
Benchpress [1] is a Snakemake workflow where structure learning algorithms, implemented in possibly different languages, can be executed and compared. The computations scale seamlessly on multiple cores or "... to server, cluster, grid and cloud environments, without the need to modify the workflow definition" - Snakemake . The documentation is found at https://benchpressdocs.readthedocs.io.
The following main functionalities are provided by Benchpress
-
Benchmarks - Benchmark publically available structure learning algorithms.
-
Algorithm development - Benchmark your own algorithm along with the existing ones while developing.
-
Data analysis - Estimate the underlying graph structure for your own dataset(s).
You may also have a look at this Medium story for an introduction.
Citing
@misc{rios2021benchpress,
title={Benchpress: a scalable and versatile workflow for benchmarking structure learning algorithms for graphical models},
author={Felix L. Rios and Giusi Moffa and Jack Kuipers},
year={2021},
eprint={2107.03863},
archivePrefix={arXiv},
primaryClass={stat.ML}
}
Contact
For problems, bug reporting, or questions please raise an issue or open a discussion thread.
Contributing
Contrubutions are very welcomed. See CONTRIBUTING.md for instructions.
-
Fork it!
-
Create your feature branch:
git checkout -b my-new-feature
-
Commit your changes:
git commit -am 'Add some feature'
-
Push to the branch:
git push origin my-new-feature
-
Open a pull request
License
This project is licensed under the GPL-2.0 License - see the LICENSE file for details
References
Code Snippets
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | def adjmat_estimate_path_mcmc(algorithm): ret = "{output_dir}/adjmat_estimate/"\ "adjmat=/{adjmat}/"\ "parameters=/{bn}/"\ "data=/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" + pattern_strings["mcmc_est"] + "/" \ "seed={replicate}/" \ "adjmat.csv" return ret def alg_output_seqgraph_path_fine_match(algorithm): return "{output_dir}/adjvecs/"\ "adjmat=/{adjmat}/"\ "parameters=/{bn}/"\ "data=/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" + \ "seed={replicate}/" \ "adjvecs.csv" def alg_output_seqgraph_path(algorithm): return "{output_dir}/adjvecs/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" + \ "seed={replicate}/" \ "adjvecs_tobecompressed.csv" def alg_output_seqgraph_path_nocomp(algorithm): return "{output_dir}/adjvecs/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" + \ "seed={replicate}/" \ "adjvecs.csv" ### Standard algorithms def alg_output_adjmat_path(algorithm): return "{output_dir}/adjmat_estimate/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" +\ "seed={replicate}/" \ "adjmat.csv" def alg_output_time_path(algorithm): return "{output_dir}/time/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" +\ "seed={replicate}/" \ "time.txt" # This is code repetition, yes... def alg_output_ntests_path(algorithm): return "{output_dir}/ntests/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" +\ "seed={replicate}/" \ "ntests.txt" def alg_input_data(): return "{output_dir}/data/{data}/seed={replicate}.csv" def time_path(algorithm): ret = "{output_dir}/time/"\ "adjmat=/{adjmat}/"\ "parameters=/{bn}/"\ "data=/{data}/"\ "algorithm=/" + pattern_strings[algorithm] + "/" + \ "seed={replicate}/" \ "time.txt" return ret def data_path(): return "{output_dir}/data/adjmat=/{adjmat}/parameters=/{bn}/data=/{data}/seed={replicate}.csv" def adjmat_true_path(): return "{output_dir}/adjmat/{adjmat}.csv", |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | def get_seed_range(seed_range): if seed_range == None: return [1] else: return range(seed_range[0], seed_range[1]+1) def active_algorithm_files(wildcards): with open(configfilename) as json_file: conf = json.load(json_file) algs = active_algorithms() alg_filenames = ["results/output/benchmarks/"+conf["benchmark_setup"]["evaluation"]["benchmarks"]["filename_prefix"] + alg + ".csv" for alg in algs] return alg_filenames def active_algorithms(eval_method="benchmarks"): with open(configfilename) as json_file: conf = json.load(json_file) algs = [] if (eval_method == "mcmc_traj_plots") or (eval_method == "mcmc_autocorr_plots") or (eval_method == "mcmc_heatmaps"): benchmarks_alg_ids = [benchmarks_dict["id"] for benchmarks_dict in config["benchmark_setup"]["evaluation"][eval_method] if benchmarks_dict["active"] == True] for alg, alg_conf_list in config["resources"]["structure_learning_algorithms"].items(): for alg_conf_id in benchmarks_alg_ids: #print(alg_conf_id) if alg_conf_id in [ac["id"] for ac in alg_conf_list]: algs.append( alg ) elif eval_method == "benchmarks": benchmarks_alg_ids = config["benchmark_setup"]["evaluation"]["benchmarks"]["ids"] for alg, alg_conf_list in config["resources"]["structure_learning_algorithms"].items(): for alg_conf_id in benchmarks_alg_ids: if alg_conf_id in [ac["id"] for ac in alg_conf_list]: algs.append( alg ) else: benchmarks_alg_ids = [benchmarks_dict for benchmarks_dict in config["benchmark_setup"]["evaluation"][eval_method]] for alg, alg_conf_list in config["resources"]["structure_learning_algorithms"].items(): for alg_conf_id in benchmarks_alg_ids: if alg_conf_id in [ac["id"] for ac in alg_conf_list]: algs.append( alg ) return list(set(algs)) def get_active_rules(wildcards): """ This function returns empty "trigger files" for the type of evaluations that are used. Yes, this is ugly. Should maybe have a directory output instead. """ rules = [] for key, val in config["benchmark_setup"]["evaluation"].items(): # Check if boolean or list or object wirh nonempty ids field. if isinstance(val, dict) and val["ids"] != []: # TODO: this was OrderedDict, so might have to impose order somewhere. rules.append("results/output/"+key+"/"+key+".done") if isinstance(val, bool) and val is True: rules.append("results/output/"+key+"/"+key+".done") if isinstance(val, list) and val != []: if key == "mcmc_traj_plots" or key == "mcmc_heatmaps" or key == "mcmc_autocorr_plots": for item in val: if ("active" not in item) or item["active"] == True: rules.append("results/output/"+key+"/"+key+".done") break else: rules.append("results/output/"+key+"/"+key+".done") return rules def check_system_requirements(): import subprocess from snakemake.utils import min_version # To update Snakemake using Mamba run # mamba update -c conda-forge -c bioconda snakemake min_version("7.30.1") # Check that Apptainer or Singularity >=3.2 is installed. (apptainer_ecode, apptainer_outp) = subprocess.getstatusoutput("apptainer --version") (ecode, outp) = subprocess.getstatusoutput("singularity --version") # Check if either singularity of apptainera is installed if ecode != 0 and apptainer_ecode != 0: raise Exception("Benchpress requires Singularity >= 3.2 or Apptainer.") # If Singularity and not apptainer is installer, check the version of Singularity. if ecode == 0 and apptainer_ecode != 0: # This is usually on the format: singularity version x.y.z version_string = outp.split()[-1] # Get the version number as a string as the last word v1 = version_string.split(".")[0] # major version v2 = version_string.split(".")[1] # minor version smkver = float(v1 + "." + v2) # Make it a number for comparison if smkver < 3.2: raise Exception( "You have " + outp + ". Benchpress requires Singularity >= 3.2." ) |
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | def dict_to_path(d): if len(d) == 0: return "" c = d[0].copy() # take the first element in the list. BUG c.pop("id") # remove id from the string as only the parameters should identify the computation. if "burnin_frac" in c: c.pop("burnin_frac") if "threshold" in c: c.pop("threshold") if "mcmc_estimator" in c: c.pop("mcmc_estimator") if "active" in c: c.pop("active") if "standardized" in c: c.pop("standardized") sep = "/" # HACK: Should put standardized first if it exists.. tmp = [key+"={"+key+"}" for key, val in c.items()] ret = sep.join(tmp) return ret # The pattern strings are generated from the json config file. pattern_strings = {} # structure learning algorithms. # May be good to add all since they might be input for other algs. for alg in config["resources"]["structure_learning_algorithms"].keys(): pattern_strings[alg] = alg+"/alg_params=/"+dict_to_path(config["resources"]["structure_learning_algorithms"][alg]) # graph modules for module in config["resources"]["graph"]: pattern_strings[module] = module + "/" + dict_to_path(config["resources"]["graph"][module]) # parameters modules for module in config["resources"]["parameters"]: pattern_strings[module] = module + "/" + dict_to_path(config["resources"]["parameters"][module]) # data modules for module in config["resources"]["data"]: pattern_strings[module] = module + "/" + dict_to_path(config["resources"]["data"][module]) # Evaluation strings. These have not exactly the same logic as the above, but it works. pattern_strings["mcmc_traj_plots"] = "mcmc_traj_plots/" + dict_to_path(config["benchmark_setup"]["evaluation"]["mcmc_traj_plots"]) pattern_strings["mcmc_autocorr_plots"] = "mcmc_autocorr_plots/" + dict_to_path(config["benchmark_setup"]["evaluation"]["mcmc_autocorr_plots"]) pattern_strings["mcmc_heatmaps"] = "mcmc_heatmaps/" + dict_to_path(config["benchmark_setup"]["evaluation"]["mcmc_heatmaps"]) # Estimation parameters of mcmc algorithms pattern_strings["mcmc_est"] = "mcmc_params/"\ "mcmc_estimator={mcmc_estimator}/"\ "threshold={threshold}/"\ "burnin_frac={burnin_frac}" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | def id_to_alg(id): for key, alg in config["resources"]["structure_learning_algorithms"].items(): for obj in alg: if obj["id"] == id: return key return None """ This file procudes a dict where the paths for all algorithms are generated based on the algorithm pattern strings and the values in the config file. MCMC methods are special since the the estimation parameters should not mix with the parameters that define the run. Order MCMC is special in the sense that it can define a startspace by means of the id of some algorithm. Thus the id has to be exptracted into a path string first. """ def idtopath(idlist): # mylist can either be None, an id, or a list of ids. # The id may correspond to an MCMC alg, then the estimator parameters should be added too. alg = id_to_alg(idlist) vals = config["resources"]["structure_learning_algorithms"][alg][0] if idlist is None: return "None" if isinstance(idlist, list): # TODO: not spported yet if id_to_alg(idlist[0]) in mcmc_modules: return else: return [json_string[startalg][0] for startalg in idlist] else: if alg in mcmc_modules: return expand(pattern_strings[alg]+"/"+pattern_strings["mcmc_est"], **vals) else: return expand(pattern_strings[alg], **vals) json_string = {} # Generate strings from the config file. for alg in config["resources"]["structure_learning_algorithms"]: # Some algorihtm takes input graphs. These are treated separately. has_input_algs = ["bidag_order_mcmc", "bidag_partition_mcmc" , "bdgraph_pip"] if (alg not in has_input_algs) and (alg not in mcmc_modules): # not the mcmc_modules yet json_string.update({val["id"]: expand(pattern_strings[alg], **val) for val in config["resources"]["structure_learning_algorithms"][alg]}) # These are special and have to be the last one since they take input strings as start space. # The start space path has to be generated first. if "bidag_order_mcmc" in pattern_strings: order_mcmc_list = config["resources"]["structure_learning_algorithms"]["bidag_order_mcmc"] for items in order_mcmc_list: items["startspace_algorithm"] = idtopath(items["startspace_algorithm"]) json_string.update({val["id"]: expand(pattern_strings["bidag_order_mcmc"]+"/"+pattern_strings["mcmc_est"], **val,) for val in order_mcmc_list } ) if "bdgraph_pip" in pattern_strings: bdgraph_pip_list = config["resources"]["structure_learning_algorithms"]["bdgraph_pip"] # The path to the startspace algorithm is extended here for items in bdgraph_pip_list: items["startalg"] = idtopath(items["startalg"]) json_string.update({val["id"]: expand(pattern_strings["bdgraph_pip"] +"/"+pattern_strings["mcmc_est"] , **val,) for val in bdgraph_pip_list} ) if "bidag_partition_mcmc" in pattern_strings: bidag_partition_mcmc_list = config["resources"]["structure_learning_algorithms"]["bidag_partition_mcmc"] # The path to the startspace algorithm is extended here for items in bidag_partition_mcmc_list: items["startspace_algorithm"] = idtopath(items["startspace_algorithm"]) json_string.update({val["id"]: expand(pattern_strings["bidag_partition_mcmc"]+"/"+pattern_strings["mcmc_est"], **val,) for val in bidag_partition_mcmc_list } ) # All MCMC algs for alg in mcmc_modules: if alg in pattern_strings: json_string.update({val["id"]: expand(pattern_strings[alg]+"/"+pattern_strings["mcmc_est"], **val) for val in config["resources"]["structure_learning_algorithms"][alg]}) # Since we dont want the mcmc_est when we call the trajectory directly. json_string_mcmc_noest = {} for alg in mcmc_modules: if alg in pattern_strings: json_string_mcmc_noest.update({val["id"]: expand(pattern_strings[alg], **val,) for val in config["resources"]["structure_learning_algorithms"][alg]}) # Evaluation strings def gen_evaluation_string_from_conf(method, alg_id): # This essentially converts a dict in (from an evaluation method conf) to a path string following a pattern # specified in pattern_strings. eval_dict = next(item for item in config["benchmark_setup"]["evaluation"][method] if item["id"] == alg_id) return expand(pattern_strings[method], **eval_dict) # Graph strings def gen_adjmat_string_from_conf(adjmat_id, seed): # find the adjmat_gen_method from adjmat_gen_id for module in config["resources"]["graph"]: if adjmat_id in [c["id"] for c in config["resources"]["graph"][module]]: adjmat_dict = next(item for item in config["resources"]["graph"][module] if item["id"] == adjmat_id) return expand(pattern_strings[module] + "/seed={seed}", **adjmat_dict, seed=seed) if adjmat_id is not None and Path("resources/adjmat/myadjmats/"+adjmat_id).is_file(): filename_no_ext = os.path.splitext(os.path.basename(adjmat_id))[0] return "myadjmats/" + filename_no_ext # this could be hepar2.csv e.g. elif adjmat_id is None: return None # Parameter strings def gen_parameter_string_from_conf(gen_method_id, seed): for module in config["resources"]["parameters"]: if gen_method_id in [c["id"] for c in config["resources"]["parameters"][module]]: curconf = next(item for item in config["resources"]["parameters"][module] if item["id"] == gen_method_id) return expand(pattern_strings[module] + "/seed={seed}", **curconf, seed=seed) if Path("resources/parameters/myparams/bn.fit_networks/"+str(gen_method_id)).is_file(): return "bn.fit_networks/" + gen_method_id # gen_method_id could be hepar2.rds e.g. elif Path("resources/parameters/myparams/sem_params/"+str(gen_method_id)).is_file(): return "sem_params/" + gen_method_id # gen_method_id could be hepar2.rds e.g. elif gen_method_id is None: return None # Data strings def gen_data_string_from_conf(data_id, seed, seed_in_path=True): if Path("resources/data/mydatasets/"+data_id).is_file(): num_lines = sum(1 for line in open("resources/data/mydatasets/"+data_id)) - 1 return "fixed" + \ "/filename="+data_id + \ "/n="+str(num_lines) + \ "/seed="+str(seed) elif Path("resources/data/mydatasets/"+data_id).exists(): paths = Path("resources/data/mydatasets/").glob(data_id+'/*.csv') files = [x.name for x in paths if x.is_file()] return ["fixed" + \ "/filename="+data_id + "/"+ f + \ "/n="+str(None) + \ "/seed="+str(seed) for f in files] else: for module in config["resources"]["data"]: if data_id in [c["id"] for c in config["resources"]["data"][module]]: # Find the data entry from the resources data = next(item for item in config["resources"]["data"][module] if item["id"] == data_id) if seed_in_path: # seed_in_path is a HACK.. return expand(pattern_strings[module] + "/standardized={standardized}" + # standardized has to come last, see standardize rule "/seed={seed}", seed = seed, **data) else: return expand(pattern_strings[module]+"/standardized={standardized}" , **data) |
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | available_conf_ids = [] for alg, alg_conf_avail in config["resources"]["structure_learning_algorithms"].items(): for alg_conf in alg_conf_avail: available_conf_ids.append(alg_conf["id"]) # Check that all ids in the benchmarks section actually exist. for benchmarksitem in config["benchmark_setup"]["evaluation"]["benchmarks"]["ids"]: if benchmarksitem not in available_conf_ids: raise Exception(benchmarksitem + " not available.\nThe available id's are:\n{ids}".format(ids=sorted(available_conf_ids))) # Check that all ids in the graph_plots actually exist. for benchmarksitem in config["benchmark_setup"]["evaluation"]["graph_plots"]: if benchmarksitem not in available_conf_ids: raise Exception(benchmarksitem + " not available.\nThe available id's are:\n{ids}".format(ids=sorted(available_conf_ids))) # Check that the startspace for order mcmc exist. # for alg_conf in config["resources"]["structure_learning_algorithms"]["bidag_order_mcmc"]: # if alg_conf["startspace_algorithm"] not in set(available_conf_ids + [None]) - {c["id"] for c in config["resources"]["structure_learning_algorithms"]["bidag_order_mcmc"]}: # raise Exception(alg_conf["startspace_algorithm"] + " not available startspace for order_mcmc.\n"\ # "The available are:\n"+str(sorted(list(set(available_conf_ids) - {c["id"] for c in config["resources"]["structure_learning_algorithms"]["bidag_order_mcmc"]})))) def validate_data_setup(config, dict): # Check that adjmat exists available_conf_ids = [] for alg, alg_conf_avail in config["resources"]["graph"].items(): for alg_conf in alg_conf_avail: available_conf_ids.append(alg_conf["id"]) available_conf_ids += os.listdir("resources/adjmat/myadjmats") # Roc rewuires a true graph if config["benchmark_setup"]["evaluation"]["benchmarks"]["ids"] != []: if dict["graph_id"] is None: raise Exception("ROC evaluation requires graph_id.\n" "The available graph id´s are:\n" + str(sorted(available_conf_ids))) if not dict["graph_id"] in available_conf_ids: raise Exception(dict["graph_id"] + " is not an available graph id.\n" "The available graph id´s are:\n" + str(sorted(available_conf_ids))) # Roc rewuires a true graph if config["benchmark_setup"]["evaluation"]["graph_true_stats"] == True: if dict["graph_id"] is None: raise Exception("graph_true_stats requires graph_id.\n" "The available graph id´s are:\n" + str(sorted(available_conf_ids))) if not dict["graph_id"] in available_conf_ids: raise Exception(dict["graph_id"] + " is not an available graph id.\n" "The available graph id´s are:\n" + str(sorted(available_conf_ids))) available_data_files = os.listdir("resources/data/mydatasets") # Check that data exists available_conf_ids = [] for alg, alg_conf_avail in config["resources"]["data"].items(): for alg_conf in alg_conf_avail: available_conf_ids.append(alg_conf["id"]) available_conf_ids += available_data_files if dict["data_id"] not in available_conf_ids: raise Exception(str(dict["data_id"]) + " is not an available data id.\n" "The available data id´s are:\n" + str(sorted(available_conf_ids))) # Check that graph, parameters, and data are properly combined. # Check that parameters exists available_conf_ids = [] for alg, alg_conf_avail in config["resources"]["parameters"].items(): for alg_conf in alg_conf_avail: available_conf_ids.append(alg_conf["id"]) available_conf_ids += os.listdir("resources/parameters/myparams/bn.fit_networks") available_conf_ids += os.listdir("resources/parameters/myparams/sem_params") if dict["data_id"] not in available_data_files and dict["parameters_id"] not in available_conf_ids: raise Exception(str(dict["parameters_id"])+ " is not an available parameter id.\n" "The available paremeter id´s are:\n" + str(sorted(available_conf_ids))) for data_setup in config["benchmark_setup"]["data"]: validate_data_setup(config, data_setup) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import sys import seaborn as sns import matplotlib.pyplot as plt import pandas as pd import networkx as nx import numpy as np import matplotlib import os from mcmc_utils import * matplotlib.use('Agg') sns.set_style("whitegrid") # Treating the case when empty files are created. Such files # are created if the algorithm was timed out. if os.stat(snakemake.input["traj"]).st_size > 0: df = pd.read_csv(snakemake.input["traj"], sep=",") edgesymb = get_edge_symb(df) if snakemake.params["estimator"] == "map": df_adjmat = get_max_score_graph(df) df_adjmat.to_csv(snakemake.output["adjmat"], index=False) if snakemake.params["estimator"] == "heatmap": df_heatmap = estimate_heatmap(df, float(snakemake.params["burnin_frac"]), edgesymb) df_heatmap.to_csv(snakemake.output["heatmap"], index=False) if snakemake.params["estimator"] == "threshold": df_heatmap = estimate_heatmap(df, float(snakemake.params["burnin_frac"]), edgesymb) df_adjmat = (df_heatmap > float(snakemake.params["threshold"])) * 1 df_adjmat.to_csv(snakemake.output["adjmat"], index=False) else: if snakemake.params["estimator"] == "map" or snakemake.params["estimator"] == "threshold": open(snakemake.output["adjmat"],'a').close() if snakemake.params["estimator"] == "heatmap": open(snakemake.output["heatmap"],'a').close() |
98 99 | shell: "cp -r {input} {output}" |
168 169 | script: "scripts/evaluation/graphtraj_est.py" |
Support
- Future updates
Related Workflows





