script to convert phewascatalog database to JSONs, with ENSGIDs and EFO ids.
Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation, topic
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
This repository contains a collection of modules which generate evidence for several internal data sources (“internal” meaning that the code is maintained by the data team; the data itself usually comes from sources outside Open Targets).
How to set up and update the environment
The file
requirements.txt
contains the
direct
dependencies only with their exact versions pinned. Only this file should be edited directly.
Additionally, the file
requirements-frozen.txt
contains all
direct and transitive
dependencies with their exact versions pinned. It is generated automatically from the former file, and is intended for reproducing the exact working environment (as mush as possible) as of any particular commit version. This file should not be directly edited.
These are the steps to update an environment:
-
Add, delete or update packages as necessary in
requirements.txt
-
Install requirements with
pip3 install -r requirements.txt
-
Make sure everything works
-
Update the frozen file with
pip3 freeze > requirements-frozen.txt
-
Include both files,
requirements.txt
andrequirements-frozen.txt
, with your PR
Make sure to always work in a clean virtual environment to avoid any surprises.
How to generate the evidence
This will create a Google Cloud instance, SSH into it, install the necessary dependencies, generate, validate, and upload the evidence. Tweak the commands as necessary.
To run this, conditions related to the service accounts need to be satisfied:
-
The service account must have a Storage Admin role for two buckets,
_otar000-evidence_input_
and_otar001-core_
. -
The service account must have a Compute Admin and Service Account User roles in the open-targets-eu-dev project.
-
The user running the code must have access to use the service account.
By default, the generated evidence will be validated using the latest master snapshot of the JSON schema. This can be tweaked in
configuration.yaml
→ global → schema.
# Set parameters.
export INSTANCE_NAME=evidence-generation
export INSTANCE_ZONE=europe-west1-d
# Create the instance and SSH.
gcloud compute instances create \
${INSTANCE_NAME} \
--project=open-targets-eu-dev \
--zone=${INSTANCE_ZONE} \
--machine-type=n1-highmem-32 \
--service-account=426265110888-compute@developer.gserviceaccount.com \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--create-disk=auto-delete=yes,boot=yes,device-name=${INSTANCE_NAME},image=projects/ubuntu-os-cloud/global/images/ubuntu-2004-focal-v20210927,mode=rw,size=2000,type=projects/open-targets-eu-dev/zones/europe-west1-d/diskTypes/pd-balanced
gcloud compute ssh --zone ${INSTANCE_ZONE} ${INSTANCE_NAME}
screen
# Install the system dependencies.
sudo apt update
sudo apt install -y openjdk-8-jdk-headless python3-pip python3.8-venv
# Creating key for GCP service accout:
export FILENAME=~/gcp-account.json
export ACCOUNT_NAME=426265110888-compute@developer.gserviceaccount.com
export PROJECT=open-targets-eu-dev
gcloud iam service-accounts keys create ${FILENAME} \
--iam-account=${ACCOUNT_NAME} \
--project ${PROJECT}
# Strore credentials in the default variable:
export GOOGLE_APPLICATION_CREDENTIALS=${FILENAME}
# Activate the environment and install Python dependencies.
git clone https://github.com/opentargets/evidence_datasource_parsers
cd evidence_datasource_parsers
python3 -m venv env
source env/bin/activate
pip3 install --upgrade pip setuptools
pip3 install -r requirements-frozen.txt
export PYTHONPATH="$PYTHONPATH:$(pwd)"
At this point, we are ready to run the Snakemake pipeline. The following options are available:
-
snakemake --cores all
: Display help (the list of possible rules to be run) and do not run anything. -
snakemake --cores all --until local
: Generate all files, but do not upload them to Google Cloud Storage. The files generated in this way do not have prefixes, e.g.cancer_biomarkers.json.gz
. This is done intentionally, so that the pipeline can be re-run the next day without having to re-generate all the files.- It is also possible to locally run only a single rule by substituting its name instead of “local”.
-
snakemake --cores all --until all
: Generate all files and then upload them to Google Cloud Storage.
All individual parser rules are strictly local. The only rule which uploads files to Google Cloud Storage (all at once) is "all".
Some additional parameters which can be useful for debugging:
-
--keep-incomplete
: This will keep the output files of failed jobs. Must only be used with local runs. Note that Snakemake uses the presence of the output files to decide which jobs to run, so the incomplete files must be removed after investigation and before any re-runs of the workflow. -
--dry-run
: Do not run the workflow, and only show the list of jobs to be run.
Processing Genetics Portal evidence
Evidence generation for the Genetics Portal is not automated in the Snakefile. It can be done separately using the following commands. It is planned that this step will eventually become part of the Genetics Portal release pipeline.
gcloud dataproc clusters create \
cluster-l2g-data \
--image-version=1.4 \
--properties=spark:spark.debug.maxToStringFields=100,spark:spark.executor.cores=31,spark:spark.executor.instances=1 \
--master-machine-type=n1-standard-32 \
--master-boot-disk-size=1TB \
--zone=europe-west1-d \
--single-node \
--max-idle=5m \
--region=europe-west1 \
--project=open-targets-genetics
gcloud dataproc jobs submit pyspark \
--cluster=cluster-l2g-data \
--project=open-targets-genetics \
--region=europe-west1 \
modules/GeneticsPortal.py -- \
--locus2gene gs://genetics-portal-data/l2g/200127 \
--toploci gs://genetics-portal-data/v2d/200207/toploci.parquet \
--study gs://genetics-portal-data/v2d/200207/studies.parquet \
--threshold 0.05 \
--variantIndex gs://genetics-portal-data/variant-annotation/190129/variant-annotation.parquet \
--ecoCodes gs://genetics-portal-data/lut/vep_consequences.tsv \
--outputFile gs://genetics-portal-analysis/l2g-platform-export/data/genetics_portal_evidence.json.gz
Notes on how this repository is organised
Each module in
modules/
corresponds to one evidence generator.
Modules which are shared by multiple generators reside in
common/
.
The Conda virtual environment files, as well as instructions on how to maintain them, are available in
envs/
.
Historical notes on individual parsers
Note that some information in this section may be obsolete. It is provided for historical purposes, and will be eventually migrated into the source code of individual parsers or into the Snakefile.
ClinGen
The ClinGen parser processes the Gene Validity Curations table that can be downloaded from https://search.clinicalgenome.org/kb/gene-validity . As of January 2021 the downloadable file contains six header lines that look as follows:
CLINGEN GENE VALIDITY CURATIONS
FILE CREATED: 2021-01-18
WEBPAGE: https://search.clinicalgenome.org/kb/gene-validity
++++++++++,++++++++++++++,+++++++++++++,++++++++++++++++++,+++++++++,+++++++++,+++++++++,++++++++++++++,+++++++++++++,+++++++++++++++++++
GENE SYMBOL,GENE ID (HGNC),DISEASE LABEL,DISEASE ID (MONDO),MOI,SOP,CLASSIFICATION,ONLINE REPORT,CLASSIFICATION DATE,GCEP
+++++++++++,++++++++++++++,+++++++++++++,++++++++++++++++++,+++++++++,+++++++++,+++++++++,++++++++++++++,+++++++++++++,+++++++++++++++++++
Gene2Phenotype
The Gene2Phenotype parser processes the four gene panels (Developmental Disorders - DD, eye disorders, skin disorders and cancer) that can be downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads/ .
Genomics England Panel App
The Genomics England parser processes the associations between genes and diseases described in the Gene Panels Data table. This data is provided by Genomics England and can be downloaded here from the otar000-evidence_input bucket.
The source table is then formatted into a compressed set of JSON lines following the schema of the version to be used.
IntOGen
The intOGen parser generates evidence strings from three files that need to be in the working directory or in the resources directory:
-
intogen_cancer2EFO_mapping.tsv : Mappings of intOGen cancer type acronyms to EFO terms. Currently is a manually curated file given that in intOGen they do not use an specific ontology to model cancers.
-
intogen_cohorts.tsv : It contains information about the analysed cohorts and it can be downloaded from the intOGen website . In the current implementation, the total number of samples included in each cohort is used to calculate the percentage of samples that carry a relevant mutation in a driver gene.
-
intogen_Compendium_Cancer_Genes.tsv : It contains the genes that have been identified as drivers in at least one cohort and information related to the methods that yield significant results, the q-value and additional data about mutations. It can be downloaded from the same place as the cohorts file.
CRISPR
The CRISPR parser processes three files available in the
resources
directory:
-
crispr_evidence.tsv : Main file that contains the prioritisation score as well as target, disease and publication information. It was adapted from supplementary table 6 in the Behan et al. 2019 paper.
-
crispr_descriptions.tsv : File used to extract the number of targets prioritised per cancer types as well as to retrieve the cell lines from the next file.
-
crispr_cell_lines.tsv : It contains three columns with the cell line, tissue and the cancer type information. It is used to extract the cell lines in which the target has been identified as essential. The file has been adapted from the supplementary table 1 in the Behan et al. 2019 paper.
PROGENy
The PROGENy parser processes three files:
-
progeny_normalVStumor_opentargets.txt: Main file that contains a systematic comparison of pathway activities between normal and primary TCGA samples in 14 tumor types using PROGENy. This file can be downloaded here from the otar000-evidence_input bucket.
-
cancer2EFO_mappings.tsv: File containing the mappings between the acronym of the type of cancer and its respective disease listed in EFO. This file can be found in the
resources
directory. -
pathway2Reactome_mappings.tsv: File containing the mappings between the analysed pathway and their respective target and ID in Reactome. This file can be found in the
resources
directory.
SystemsBiology
This parser processes two files stored in Google cloud bucket:
gs://otar000-evidence_input/SysBio/data_files
. The
sysbio_evidence-31-01-2019.tsv
file contains evidence data, while
sysbio_publication_info_nov2018.tsv
contains study level information.
SLAPenrich
The SLAPenrich parser processes two files:
-
slapenrich_opentargets.tsv
: Main file that contains a systematic comparison of on somatic mutations from TCGA across 25 different cancer types and a collection of pathway gene sets from Reactome. This file can be downloaded here from the otar000-evidence_input bucket. -
cancer2EFO_mappings.tsv
: File containing the mappings between the acronym of the type of cancer and its respective disease listed in EFO. This file can be found in theresources
directory.
IMPC
Generates the mouse model target-disease evidence by querying the IMPC SOLR API.
The base of the evidence is the
disease_model_summary
table, which is unique on the combination of (
model_id
,
disease_id
). When target information is added, an original row may explode into multiple evidence strings. As a result, the final output is unique on the combination of (
biologicalModelId
,
targetFromSourceId
,
targetInModelId
,
diseaseFromSourceId
).
Code Snippets
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | import argparse import logging import re from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import ( array_distinct, col, collect_set, concat, element_at, explode, lit, initcap, regexp_extract, regexp_replace, size, split, struct, translate, trim, udf, upper, when ) from pyspark.sql.types import StringType, ArrayType from common.evidence import write_evidence_strings ALTERATIONTYPE2FUNCTIONCSQ = { # TODO: Map BIA 'MUT': 'SO_0001777', # somatic_variant 'CNA': 'SO_0001563', # copy_number_change 'FUS': 'SO_0001882', # feature_fusion 'EXPR': None, 'BIA': None } DRUGRESPONSE2EFO = { 'Responsive': 'GO_0042493', # response to drug 'Not Responsive': 'EFO_0020002', # lack of efficacy 'Resistant': 'EFO_0020001', # drug resistance 'Increased Toxicity': 'EFO_0020003', # drug toxicity 'Increased Toxicity (Myelosupression)': 'EFO_0007053', # myelosuppression 'Increased Toxicity (Ototoxicity)': 'EFO_0006951', # ototoxicity 'Increased Toxicity (Hyperbilirubinemia)': 'HP_0002904', # Hyperbilirubinemia 'Increased Toxicity (Haemolytic Anemia)': 'EFO_0005558' # hemolytic anemia } GENENAMESOVERRIDE = { # Correct gene names to use their approved symbol 'C15orf55': 'NUTM1', 'MLL': 'KMT2A', 'MLL2': 'KMT2D' } class cancerBiomarkersEvidenceGenerator: def __init__(self): # Create spark session self.spark = ( SparkSession.builder .appName('cancer-biomarkers') .getOrCreate() ) self.evidence = None self.get_variantId_udf = udf( cancerBiomarkersEvidenceGenerator.get_variantId, StringType() ) self.zip_alterations_with_type_udf = udf( cancerBiomarkersEvidenceGenerator.zip_alterations_with_type, ArrayType(ArrayType(StringType())) ) def compute( self, biomarkers_table: str, source_table: str, disease_table: str, drug_index: str, output_file: str ) -> None: """Loads and processes inputs to generate the Cancer Biomarkers evidence strings""" # Import data biomarkers_df = self.spark.read.csv(biomarkers_table, sep='\t', header=True) source_df = self.spark.read.json(source_table).select( col('label').alias('niceName'), 'source', 'url') disease_df = self.spark.read.json(disease_table).select( regexp_replace(col('name'), '_', '').alias('tumor_type'), regexp_extract(col('url'), r'[^/]+$', 0).alias('diseaseFromSourceMappedId')) drugs_df = self.spark.read.parquet(drug_index).select( col('id').alias('drugId'), col('name').alias('drug')) # Process inputs to generate evidence strings evidence = self.process_biomarkers( biomarkers_df, source_df, disease_df, drugs_df ) # Write evidence strings write_evidence_strings(self.evidence, output_file) logging.info(f'{evidence.count()} evidence strings have been saved to {output_file}.') def process_biomarkers( self, biomarkers_df: DataFrame, source_df: DataFrame, disease_df: DataFrame, drugs_df: DataFrame ) -> DataFrame: """The diverse steps to prepare and enrich the input table""" biomarkers_enriched = ( biomarkers_df .select( 'Biomarker', 'IndividualMutation', array_distinct(split(col('Alteration'), ';')).alias('alterations'), array_distinct(split(col('Gene'), ';')).alias('gene'), split(col('AlterationType'), ';').alias('alteration_types'), array_distinct(split(col("PrimaryTumorTypeFullName"), ";")).alias('tumor_type_full_name'), array_distinct(split(col('Drug'), ';|,')).alias('drug'), 'DrugFullName', 'Association', 'gDNA', array_distinct(split(col('EvidenceLevel'), ',')).alias('confidence'), array_distinct(split(col('Source'), ';')).alias('source') ) .withColumn('confidence', explode(col('confidence'))) .withColumn('tumor_type_full_name', explode(col('tumor_type_full_name'))) .withColumn('tumor_type', translate(col('tumor_type_full_name'), ' -', '')) .withColumn('drug', explode(col('drug'))) .withColumn('drug', translate(col('drug'), '[]', '')) .withColumn('gene', explode(col('gene'))) .replace(to_replace=GENENAMESOVERRIDE, subset=['gene']) .withColumn('gene', upper(col('gene'))) # At this stage alterations and alteration_types are both arrays # Disambiguation when the biomarker consists of multiple alterations is needed # This is solved by: # 1. Zipping both fields - tmp consists of a list of alteration/type tuples # 2. tmp is exploded - tmp consists of the alteration/type tuple # 3. alteration & alteration_type columns are overwritten with the elements in the tuple .withColumn( 'tmp', self.zip_alterations_with_type_udf(col('alterations'), col('alteration_types'))) .withColumn('tmp', explode(col('tmp'))) .withColumn('alteration_type', element_at(col('tmp'), 2)) .withColumn( 'alteration', when( ~col('IndividualMutation').isNull(), col('IndividualMutation') ) .otherwise(element_at(col('tmp'), 1)) ) .drop('tmp') # Clean special cases on the alteration string .withColumn( 'alteration', when( col('alteration') == 'NRAS:.12.,.13.,.59.,.61.,.117.,.146.', col('Biomarker') # 'NRAS (12,13,59,61,117,146)' ) .when( # Cleans strings like 'ARAF:.' col('alteration').contains(':.'), translate(col('alteration'), ':.', '') ) .when( # Fusion genes are described with '__' # biomarker is a cleaner representation when there's one alteration (col('alteration').contains('__')) & (~col('Biomarker').contains('+')), col('Biomarker') ) .otherwise(col('alteration')) ) # Split source into literature and urls # literature contains PMIDs # urls are enriched from the source table if not a CT .withColumn('source', explode(col('source'))) .withColumn('source', trim(regexp_extract(col('source'), r'(PMID:\d+)|([\w ]+)', 0).alias('source'))) .join(source_df, on='source', how='left') .withColumn( 'literature', when(col('source').startswith('PMID'), regexp_extract(col('source'), r'(PMID:)(\d+)', 2)) ) .withColumn( 'urls', when( col('source').startswith('NCT'), struct( lit('Clinical Trials').alias('niceName'), concat(lit('https://clinicaltrials.gov/ct2/show/'), col('source')).alias('url') ) ) .when( (~col('source').startswith('PMID')) | (~col('source').startswith('NCIT')), struct(col('niceName'), col('url')) ) ) # The previous conditional clause creates a struct regardless of # whether any condition is met. The empty struct is replaced with null .withColumn('urls', when(~col('urls.niceName').isNull(), col('urls'))) # Enrich data .withColumn('functionalConsequenceId', col('alteration_type')) .replace(to_replace=ALTERATIONTYPE2FUNCTIONCSQ, subset=['functionalConsequenceId']) .replace(to_replace=DRUGRESPONSE2EFO, subset=['Association']) .join(disease_df, on='tumor_type', how='left') .withColumn('drug', upper(col('drug'))) .withColumn( # drug class is coalesced when the precise name of the medicine is not provided 'drug', when(col('drug') == '', col('DrugFullName')).otherwise(col('drug'))) .join(drugs_df, on='drug', how='left') .withColumn('drug', initcap(col('drug'))) # Translate variantId .withColumn( 'variantId', when(~col('gDNA').isNull(), self.get_variantId_udf(col('gDNA'))) ) # Assign a GO ID when a gene expression data is reported .withColumn( 'geneExpressionId', when( (col('alteration_type') == 'EXPR') & (col('alteration').contains('over')), 'GO_0010628' ) .when( (col('alteration_type') == 'EXPR') & (col('alteration').contains('under')), 'GO_0010629' ) .when( (col('alteration_type') == 'EXPR') & (col('alteration').contains('norm')), 'GO_0010467' ) ) # Create variant struct .withColumn( 'variant', when( col('alteration_type') != 'EXPR', struct( col('alteration').alias('name'), col('variantId').alias('id'), col('functionalConsequenceId') ) ) ) # Create geneExpression struct .withColumn( 'geneExpression', when( col('alteration_type') == 'EXPR', struct( col('alteration').alias('name'), col('geneExpressionId').alias('id')) ) ) ) pre_evidence = ( biomarkers_enriched .withColumn('datasourceId', lit('cancer_biomarkers')) .withColumn('datatypeId', lit('affected_pathway')) .withColumnRenamed('tumor_type_full_name', 'diseaseFromSource') .withColumnRenamed('drug', 'drugFromSource') # diseaseFromSourceMappedId, drugId populated above .withColumnRenamed('Association', 'drugResponse') # confidence, literature and urls populated above .withColumnRenamed('gene', 'targetFromSourceId') .withColumnRenamed('Biomarker', 'biomarkerName') # variant, geneExpression populated above .drop( 'tumor_type', 'source', 'alteration', 'alteration_type', 'IndividualMutation', 'geneExpressionId', 'gDNA', 'functionalConsequenceId', 'variantId', 'DrugFullName', 'niceName', 'url') ) # Group evidence self.evidence = ( pre_evidence .groupBy('datasourceId', 'datatypeId', 'drugFromSource', 'drugId', 'drugResponse', 'targetFromSourceId', 'diseaseFromSource', 'diseaseFromSourceMappedId', 'confidence', 'biomarkerName') .agg( collect_set('literature').alias('literature'), collect_set('urls').alias('urls'), collect_set('variant').alias('variant'), collect_set('geneExpression').alias('geneExpression'), ) # Replace empty lists with null values .withColumn('literature', when(size(col('literature')) == 0, lit(None)).otherwise(col('literature'))) .withColumn('urls', when(size(col('urls')) == 0, lit(None)).otherwise(col('urls'))) .withColumn('variant', when(size(col('variant')) == 0, lit(None)).otherwise(col('variant'))) .withColumn( 'geneExpression', when(size(col('geneExpression')) == 0, lit(None)) .otherwise(col('geneExpression'))) # Collect variant info into biomarkers struct .withColumn( 'biomarkers', struct( 'variant', 'geneExpression' )) .drop('variant', 'geneExpression') .distinct() ) return self.evidence @staticmethod def get_variantId(gDNA: str) -> str: ''' Converts the genomic coordinates to the CHROM_POS_REF_ALT notation Ex.: 'chr14:g.105243048G_T' --> '14_105243048_G_T' ''' translate_dct = {'chr': '', ':g.': '_', '>': '_', 'del': '_', 'ins': '_'} try: for k, v in translate_dct.items(): gDNA = gDNA.replace(k, v) x, head, tail = re.split(r'^(.*?_\d+)', gDNA) if bool(re.search(r'\d+', tail)): tail = re.split(r'^(_\d+_)', tail)[-1] return head + '_' + tail except AttributeError: return @staticmethod def zip_alterations_with_type(alterations, alteration_type): ''' Zips in a tuple the combination of the alteration w/ its correspondent type so that when multiple alterations are reported, these can be disambiguated. By expanding the array of alteration types it accounts for the cases when several alterations are reported but only one type is given. Ex.: alterations = ['MET:Y1230C', 'Y1235D'] alteration_type = ['MUT'] --> [('MET:Y1230C', 'MUT'), ('Y1235D', 'MUT')] ''' alteration_types = alteration_type * len(alterations) return list(zip(alterations, alteration_types)) if __name__ == '__main__': parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--biomarkers_table', help='Input TSV table containing association data for the biomarkers.', required=True) parser.add_argument('--source_table', required=True, help='Input JSON file with the annotations to reference the source of the associations.') parser.add_argument('--disease_table', help='Input JSON file with the mapping of the tumor types to EFO IDs.', required=True) parser.add_argument('--drug_index', help='Directory of parquet files that stores OT\'s disease index.', required=True) parser.add_argument('--output_file', help='Output gzipped json file containing the evidence.', required=True) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) cancerBiomarkersEvidenceGenerator().compute( args.biomarkers_table, args.source_table, args.disease_table, args.drug_index, args.output_file ) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import argparse import logging import sys import pyspark.sql.functions as f from pyspark.sql.dataframe import DataFrame from common.evidence import initialize_sparksession, write_evidence_strings def main(chembl_evidence: str, predictions: str, output_file: str) -> None: """ This module adds the studyStopReasonCategories to the ChEMBL evidence as a result of the categorisation of the clinical trial reason to stop. Args: chembl_evidence: Input gzipped JSON with the evidence submitted by ChEMBL. predictions: Input JSON containing the categories of the clinical trial reason to stop. Instructions for applying the ML model here: https://github.com/ireneisdoomed/stopReasons. output_file: Output gzipped json file containing the ChEMBL evidence with the additional studyStopReasonCategories field. log_file: Destination of the logs generated by this script. Defaults to None. """ logging.info(f'ChEMBL evidence JSON file: {chembl_evidence}') logging.info(f'Classes of reason to stop table: {predictions}') chembl_df = spark.read.json(chembl_evidence).repartition(200).persist() predictions_df = ( spark.read.json(predictions) .transform(prettify_subclasses) .distinct() ) # Join datasets early_stopped_evd_df = ( # Evidence with a given reason to stop is always supported by a single NCT ID chembl_df.filter(f.col('studyStopReason').isNotNull()) .select( "*", f.explode(f.col("urls.url")).alias("nct_id") ).withColumn("nct_id", f.element_at(f.split(f.col("nct_id"), "/"), -1)) .join(predictions_df, on='nct_id', how='left').drop('nct_id') .distinct() ) # We expect that ~10% of evidence strings have a reason to stop assigned # It is asserted that this fraction is between 9 and 11% of the total count total_count = chembl_df.count() early_stopped_count = early_stopped_evd_df.count() if not 0.08 < early_stopped_count / total_count < 0.11: raise AssertionError(f'The fraction of evidence with a CT reason to stop class is not as expected ({early_stopped_count / total_count}).') logging.info('Evidence strings have been processed. Saving...') enriched_chembl_df = ( chembl_df.filter(f.col('studyStopReason').isNull()) .unionByName(early_stopped_evd_df, allowMissingColumns=True) ) assert enriched_chembl_df.count() == chembl_df.count() write_evidence_strings(enriched_chembl_df, output_file) logging.info(f'{total_count} evidence strings have been saved to {output_file}. Exiting.') def prettify_subclasses(predictions_df: DataFrame) -> DataFrame: """ List of categories must be converted formatted with a nice name. """ CATEGORIESMAPPINGS = { 'Business_Administrative': 'Business or administrative', 'Logistics_Resources': 'Logistics or resources', 'Covid19': 'COVID-19', 'Safety_Sideeffects': 'Safety or side effects', 'Endpoint_Met': 'Met endpoint', 'Insufficient_Enrollment': 'Insufficient enrollment', 'Negative': 'Negative', 'Study_Design': 'Study design', 'Invalid_Reason': 'Invalid reason', 'Study_Staff_Moved': 'Study staff moved', 'Another_Study': 'Another study', 'No_Context': 'No context', 'Regulatory': 'Regulatory', 'Interim_Analysis': 'Interim analysis', 'Success': 'Success', 'Ethical_Reason': 'Ethical reason', 'Insufficient_Data': 'Insufficient data', 'Uncategorised': 'Uncategorised' } sub_mapping_col = f.map_from_entries(f.array(*[f.struct(f.lit(k), f.lit(v)) for k, v in CATEGORIESMAPPINGS.items()])) return ( predictions_df .select('nct_id', 'subclasses', sub_mapping_col.alias('prettyStopReasonsMap')) # Create a MapType column to convert each element of the subclasses array .withColumn('studyStopReasonCategories', f.expr('transform(subclasses, x -> element_at(prettyStopReasonsMap, x))')) .drop('subclasses', 'prettyStopReasonsMap') ) def get_parser(): """Get parser object for script ChEMBL.py.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( '--chembl_evidence', help='Input gzipped JSON with the evidence submitted by ChEMBL', type=str, required=True, ) parser.add_argument( '--predictions', help='Input TSV containing the categories of the clinical trial reason to stop. Instructions for applying the ML model here: https://github.com/ireneisdoomed/stopReasons.', type=str, required=True, ) parser.add_argument( '--output', help='Output gzipped json file following the target safety liabilities data model.', type=str, required=True ) parser.add_argument( '--log_file', help='Destination of the logs generated by this script. Defaults to None', type=str, nargs='?', default=None ) return parser if __name__ == '__main__': args = get_parser().parse_args() # Logger initializer. If no log_file is specified, logs are written to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.log_file: logging.config.fileConfig(filename=args.log_file) else: logging.StreamHandler(sys.stderr) global spark spark = initialize_sparksession() main( chembl_evidence=args.chembl_evidence, predictions=args.predictions, output_file=args.output ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 | import argparse import logging from typing import List import pandas as pd import pyspark.sql.functions as f import pyspark.sql.types as t from pyspark.sql import DataFrame from common.evidence import initialize_sparksession, write_evidence_strings PROBES_SETS = [ "Bromodomains chemical toolbox", "Chemical Probes for Understudied Kinases", "Chemical Probes.org", "Gray Laboratory Probes", "High-quality chemical probes", "MLP Probes", "Nature Chemical Biology Probes", "Open Science Probes", "opnMe Portal", "Probe Miner (suitable probes)", "Protein methyltransferases chemical toolbox", "SGC Probes", "Tool Compound Set", "Concise Guide to Pharmacology 2019/20", "Kinase Chemogenomic Set (KCGS)", "Kinase Inhibitors (best-in-class)", "Novartis Chemogenetic Library (NIBR MoA Box)", "Nuisance compounds in cellular assays", ] def collapse_cols_data_in_array( df: DataFrame, source_cols: List[str], destination_col: str ) -> DataFrame: """Collapses the data in a single column when the information is one-hot encoded. Args: df (DataFrame): Dataframe containing the data for the different probes source_cols (List[str]): List of columns containing the data to be collapsed destination_col (str): Name of the column where the array will be stored Returns: DataFrame: Dataframe with a new column containing the sources that have data for a specific probe Examples: >>> probes_df = spark.createDataFrame([("A", 1, 0), ("B", 1, 1)], ["probe", "source1", "source2"]) >>> collapse_cols_data_in_array(df, ["source1", "source2"], "datasource").show() +-----+-------+-------+------------------+ |probe|source1|source2| datasource| +-----+-------+-------+------------------+ | A| 1| 0| [source1]| | B| 1| 1|[source1, source2]| +-----+-------+-------+------------------+ <BLANKLINE> """ # Escape the name of the columns in case they contain spaces source_cols = [f"`{e}`" for e in source_cols if " " in e] return df.withColumn( destination_col, f.array([f.when(df[c] == 1, c.replace(r"`", "")) for c in source_cols]), ).withColumn( destination_col, f.array_except(f.col(destination_col), f.array(f.lit(None))) ) def clean_origin_col(): """Removes the substring ' probe' from the origin column to just state if the probe has been reported from an experimental or computational approach.""" return f.array_distinct( f.expr("transform(origin, x -> trim(regexp_replace(x, ' probe', '')))") ) def extract_hq_flag(): """Returns a flag indicating if the probe is high-quality or not.""" return f.when( f.array_contains(f.col("datasourceIds"), "High-quality chemical probes"), f.lit(True), ).otherwise(f.lit(False)) def convert_stringified_array_to_array(col_name: str) -> DataFrame: """Converts a column of stringified arrays to an array column. Args: col_name: Name of the column that contains the stringified array Examples: >>> df = spark.createDataFrame([("['BI-1829', 'No control']")], t.StringType()) >>> df.select(convert_stringified_array_to_array("value").alias("res")).show(truncate=False) +---------------------+ |res | +---------------------+ |[BI-1829, No control]| +---------------------+ <BLANKLINE> """ return f.split(f.translate(col_name, "[]'", ""), ", ").cast( t.ArrayType(t.StringType()) ) def replace_dash(col_name): """Converts to null those values that only contain `-`.""" return f.when(f.col(col_name).cast(t.StringType()) != "-", f.col(col_name)) def process_scores(col_name): """Helper function to refactor the score processing logic.""" return ( f.when(f.col(col_name).contains("-"), replace_dash(col_name)) .when(f.col(col_name) == 0, f.lit(None)) .otherwise(f.col(col_name)) ).cast(t.IntegerType()) def process_probes_data(probes_excel: str) -> List[DataFrame]: """Metadata about the compound and the scores given by the different sources.""" return ( spark.createDataFrame( pd.read_excel( probes_excel, sheet_name="PROBES", header=0, index_col=0, ) # Probes that do not have an associated target are marked as nulls .query("target.notnull()") .reset_index() .drop("control_smiles", axis=1) ) # Collect list of datasources for each probe .transform( lambda df: collapse_cols_data_in_array(df, PROBES_SETS, "datasourceIds") ) # Collecting the list of detection methods of the probe .transform( lambda df: collapse_cols_data_in_array( df, ["experimental probe", "calculated probe"], "origin", ) ).select( "pdid", f.col("compound_name").alias("id"), clean_origin_col().alias("origin"), # Flag the high-quality probes and remove this from the list of datasources extract_hq_flag().alias("isHighQuality"), f.explode( f.array_except( f.col("datasourceIds"), f.array(f.lit("High-quality chemical probes")), ) ).alias("datasourceId"), replace_dash("control_name").alias("control"), ) ) def process_probes_targets_data(probes_excel: str) -> DataFrame: """Collection of targets associated with the probes and their scores.""" return ( spark.createDataFrame( pd.read_excel( probes_excel, sheet_name="PROBES TARGETS", header=0, index_col=0 ) # Probes that do not have an associated target are marked with "-" .query("gene_name != '-'") .reset_index() .drop("control_smiles", axis=1) ) .filter(f.col("organism") == "Homo sapiens") .withColumn( "mechanismOfAction", f.when( f.col("action") != "-", f.split(f.col("action"), ";"), ), ) .select( "pdid", f.col("target").alias("targetFromSource"), "mechanismOfAction", process_scores("`P&D probe-likeness score`").alias("probesDrugsScore"), process_scores("`Probe Miner Score`").alias("probeMinerScore"), process_scores("`Cells score (Chemical Probes.org)`").alias("scoreInCells"), process_scores("`Organisms score (Chemical Probes.org)`").alias( "scoreInOrganisms" ), ) ) def process_probes_sets_data(probes_excel: str) -> DataFrame: """Metadata about the different sources of probes.""" return ( spark.createDataFrame( pd.read_excel( probes_excel, sheet_name="COMPOUNDSETS", header=0, index_col=0 ) ) .selectExpr("COMPOUNDSET as datasourceId", "SOURCE_URL as url") .filter(f.col("url").startswith("http")) ) def process_targets_xrefs(probes_excel: str) -> DataFrame: """Look-up table between the gene symbols and the UniProt IDs.""" return spark.createDataFrame( pd.read_excel( probes_excel, sheet_name="TARGETS", header=0, index_col=0 ).reset_index() ).selectExpr("target as targetFromSource", "uniprot as targetFromSourceId") def process_drugs_xrefs(drugs_xrefs: str) -> DataFrame: """Look-up table between the probes IDs in P&Ds and ChEMBL.""" return ( spark.read.csv(drugs_xrefs, header=True) .selectExpr("pdid", "ChEMBL as drugId") .filter(f.col("drugId").isNotNull()) ) def main(probes_excel: str, drugs_xrefs: str) -> DataFrame: """Main logic of the script.""" probes_df = process_probes_data(probes_excel) probes_targets_df = process_probes_targets_data(probes_excel) probes_sets_df = process_probes_sets_data(probes_excel) targets_xref_df = process_targets_xrefs(probes_excel) drugs_xref_df = process_drugs_xrefs(drugs_xrefs) grouping_cols = [ "targetFromSourceId", "id", "drugId", "mechanismOfAction", "origin", "control", "isHighQuality", "probesDrugsScore", "probeMinerScore", "scoreInCells", "scoreInOrganisms", ] return ( probes_targets_df.join(probes_df, on="pdid", how="left") .join(targets_xref_df, on="targetFromSource", how="left") .join(probes_sets_df, on="datasourceId", how="left") .join(drugs_xref_df, on="pdid", how="left") .groupBy(grouping_cols) .agg( f.collect_set( f.struct( f.col("datasourceId").alias("niceName"), f.col("url").alias("url") ) ).alias("urls") ) ) def get_parser(): """Get parser object for script ChemicalProbes.py.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--probes_excel_path", help="Path to the Probes&Drugs Probes XLSX that contains the main tables.", type=str, required=True, ) parser.add_argument( "--probes_mappings_path", help="Path to the Probes&Drugs Compounds ID mapping standardised CSV that contains mappings between probes and ChEMBLIDs.", type=str, required=True, ) parser.add_argument( "--output", help="Output gzipped json file following the chemical probes data model.", type=str, required=True, ) return parser if __name__ == "__main__": args = get_parser().parse_args() logging.basicConfig( level=logging.INFO, format="%(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) global spark spark = initialize_sparksession() out = main( probes_excel=args.probes_excel_path, drugs_xrefs=args.probes_mappings_path ) write_evidence_strings(out, args.output) logging.info(f"Probes dataset has been saved to {args.output}. Exiting.") |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import argparse import logging from pyspark.conf import SparkConf from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import array, col, lit, monotonically_increasing_id, struct, trim from pyspark.sql.types import StringType, StructType, TimestampType from common.ontology import add_efo_mapping from common.evidence import write_evidence_strings def main(input_file: str, output_file: str, cache_dir: str, local: bool = False) -> None: # Initialize spark session if local: sparkConf = ( SparkConf() .set('spark.driver.memory', '15g') .set('spark.executor.memory', '15g') .set('spark.driver.maxResultSize', '0') .set('spark.debug.maxToStringFields', '2000') .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000') ) spark = ( SparkSession.builder .config(conf=sparkConf) .master('local[*]') .getOrCreate() ) else: sparkConf = ( SparkConf() .set('spark.driver.maxResultSize', '0') .set('spark.debug.maxToStringFields', '2000') .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000') ) spark = ( SparkSession.builder .config(conf=sparkConf) .getOrCreate() ) # Read and process Clingen's table into evidence strings clingen_df = read_input_file(input_file, spark_instance=spark) logging.info('Gene Validity Curations table has been imported. Processing evidence strings.') evidence_df = process_clingen(clingen_df) evidence_df = add_efo_mapping(evidence_strings=evidence_df, spark_instance=spark, ontoma_cache_dir=cache_dir) logging.info('Disease mappings have been added.') write_evidence_strings(evidence_df, output_file) logging.info(f'{evidence_df.count()} evidence strings have been saved to {output_file}') def read_input_file(input_file: str, spark_instance) -> DataFrame: """ Reads Gene Validity Curations CSV file into a Spark DataFrame forcing the schema The first 6 rows of this file include metadata that needs to be dropped """ clingen_schema = ( StructType() .add('GENE SYMBOL', StringType()) .add('GENE ID (HGNC)', StringType()) .add('DISEASE LABEL', StringType()) .add('DISEASE ID (MONDO)', StringType()) .add('MOI', StringType()) .add('SOP', StringType()) .add('CLASSIFICATION', StringType()) .add('ONLINE REPORT', StringType()) .add('CLASSIFICATION DATE', TimestampType()) .add('GCEP', StringType()) ) clingen_df = ( spark_instance.read.csv(input_file, schema=clingen_schema) # The first 6 rows of the GVC file include metadata that needs to be dropped .withColumn('idx', monotonically_increasing_id()) .filter(col('idx') > 5) .drop('idx') ) return clingen_df def process_clingen(clingen_df: DataFrame) -> DataFrame: """ The JSON Schema format is applied to the df """ return ( clingen_df .withColumn('datasourceId', lit('clingen')) .withColumn('datatypeId', lit('genetic_literature')) .withColumn('urls', struct(col('ONLINE REPORT').alias('url'))) .select( 'datasourceId', 'datatypeId', trim(col('GENE SYMBOL')).alias('targetFromSourceId'), col('DISEASE LABEL').alias('diseaseFromSource'), col('DISEASE ID (MONDO)').alias('diseaseFromSourceId'), array(col('MOI')).alias('allelicRequirements'), array(col('urls')).alias('urls'), col('CLASSIFICATION').alias('confidence'), col('GCEP').alias('studyId') ) ) if __name__ == "__main__": # Parse CLI arguments parser = argparse.ArgumentParser(description='Parse ClinGen gene-disease associations from Gene Validity Curations') parser.add_argument('--input_file', help='Name of csv file downloaded from https://search.clinicalgenome.org/kb/gene-validity', type=str, required=True) parser.add_argument('--output_file', help='Absolute path of the gzipped JSON evidence file.', type=str, required=True) parser.add_argument('--log_file', type=str, help='Optional filename to redirect the logs into.') parser.add_argument('--cache_dir', required=False, help='Directory to store the OnToma cache files in.') parser.add_argument( '--local', action='store_true', required=False, default=False, help='Flag to indicate if the script is executed locally or on the cluster' ) args = parser.parse_args() # Initialize logging: logging.basicConfig( level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.log_file: logging.config.fileConfig(filename=args.log_file) # Report input data: logging.info(f'Clingen input file path: {args.input_file}') logging.info(f'Evidence output file path: {args.output_file}') logging.info(f'Cache directory: {args.cache_dir}') main( input_file=args.input_file, output_file=args.output_file, cache_dir=args.cache_dir, local=args.local ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | import argparse import logging import sys from pyspark.conf import SparkConf from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import FloatType from pyspark.sql.functions import col, collect_set, split, element_at, struct, lit from common.evidence import write_evidence_strings # A few genes do not have Ensembl IDs in the data file provided CRISPR_SYMBOL_MAPPING = { 'CASC5': 'ENSG00000137812', 'CIRH1A': 'ENSG00000141076', 'EFTUD1': 'ENSG00000140598', 'ENSG00000163660': 'ENSG00000163660', 'KIAA0947': 'ENSG00000164151', 'KIAA1432': 'ENSG00000107036', 'NDNL2': 'ENSG00000185115', 'SRPR': 'ENSG00000182934', 'ZNF259': 'ENSG00000109917' } def main(desc_file, evid_file, cell_file, out_file): sparkConf = ( SparkConf() .set('spark.driver.memory', '15g') .set('spark.executor.memory', '15g') .set('spark.driver.maxResultSize', '0') .set('spark.debug.maxToStringFields', '2000') .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000') ) spark = ( SparkSession.builder .config(conf=sparkConf) .master('local[*]') .getOrCreate() ) # Log parameters: logging.info(f'Evidence file: {evid_file}') logging.info(f'Description file: {desc_file}') logging.info(f'Cell type annotation: {cell_file}') logging.info(f'Output file: {out_file}') # Read files: evidence_df = ( spark.read.csv(evid_file, sep='\t', header=True) .drop('pmid', 'gene_set_name', 'disease_name') ) cell_lines_df = spark.read.csv(cell_file, sep='\t', header=True) description_df = spark.read.csv(desc_file, sep='\t', header=True) # Logging dataframe stats: logging.info(f'Number of evidence: {evidence_df.count()}') logging.info(f'Number of descriptions: {description_df.count()}') logging.info(f'Number of cell/tissue annotation: {cell_lines_df.count()}') # Tissues and cancer types are annotated together in the same column (tissue_or_cancer_type) # To disambiguate one from another, the column is combined with the cell lines # First on the tissue level: tissue_desc = ( description_df .withColumnRenamed('tissue_or_cancer_type', 'tissue') .join(cell_lines_df, on='tissue', how='inner') ) # And then on the disease level: cell_desc = ( description_df .withColumnRenamed('tissue_or_cancer_type', 'diseaseFromSource') .join(cell_lines_df, on='diseaseFromSource', how='inner') ) merged_annotation = ( # Concatenating the above generated dataframes: cell_desc .union(tissue_desc) # Aggregating by disease and method: .groupBy('diseaseFromSource', 'efo_id', 'method') # The cell annotation is aggregated in a list of struct: .agg( collect_set(struct( col('name'), col('id'), col('tissue'), col('tissueId') )).alias('diseaseCellLines') ) .drop('method') ) # Joining merged annotation with evidence: pooled_evidence_df = ( evidence_df .select( col('target_id').alias('targetFromSourceId'), col('disease_id').alias('efo_id'), col('score').alias('resourceScore').cast(FloatType()), ) # Some of the target identifier are not Ensembl Gene id - replace them: .replace(to_replace=CRISPR_SYMBOL_MAPPING, subset=['targetFromSourceId']) # Merging with descriptions: .join(merged_annotation, on='efo_id', how='outer') # From EFO uri, generate EFO id: .withColumn( 'diseaseFromSourceMappedId', element_at(split(col('efo_id'), '/'), -1).alias('diseaseFromSourceMappedId') ) .drop('efo_id') # Adding constants: .withColumn('datasourceId', lit('crispr')) .withColumn('datatypeId', lit('affected_pathway')) .persist() ) logging.info(f'Saving {pooled_evidence_df.count()} CRISPR evidence in JSON format, to: {out_file}') write_evidence_strings(pooled_evidence_df, out_file) if __name__ == "__main__": # Parse CLI arguments: parser = argparse.ArgumentParser( description='Parse essential cancer genes identified using CRISPR assays and prioritised in Project Score' ) parser.add_argument('-d', '--descriptions_file', help='Name of tsv file with the description of the method per cancer type', type=str, required=True) parser.add_argument('-e', '--evidence_file', help='Name of tsv file with the priority score', type=str, required=True) parser.add_argument('-c', '--cell_types_file', help='Name of tsv file with cell line names per cancer type', type=str, required=True) parser.add_argument('-o', '--output_file', help='Name of evidence file. (gzip compressed json)', type=str, required=True) parser.add_argument('-l', '--log_file', help='Name of log file. If not provided logs are written to standard error.', type=str, required=False) args = parser.parse_args() desc_file = args.descriptions_file evid_file = args.evidence_file cell_file = args.cell_types_file out_file = args.output_file # If no logfile is specified, logs are written to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.log_file: logging.config.fileConfig(filename=args.log_file) else: logging.StreamHandler(sys.stderr) main(desc_file, evid_file, cell_file, out_file) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | import argparse from functools import reduce import logging import sys from pyspark.sql import SparkSession, DataFrame from common.evidence import initialize_sparksession, write_evidence_strings from BrainCRISPR import CRISPRBrain def crispr_brain_wrapper(spark: SparkSession, crispr_brain_mapping: str) -> DataFrame: """Simple wrapper around CRISPRBrain class to generate and extract evidence.""" crispr_brain_parser = CRISPRBrain(spark, crispr_brain_mapping) crispr_brain_parser.create_crispr_brain_evidence() return crispr_brain_parser.get_dataframe() def main(spark: SparkSession, crispr_brain_mapping: str) -> DataFrame: crispr_screen_evidence_sets = [ # Generate evidence from CRISPR Brain: crispr_brain_wrapper(spark, crispr_brain_mapping).persist(), # Further datasets can be added here: ] # Combining all sources into one single dataframe: combined_evidence = reduce( lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), crispr_screen_evidence_sets, ) logging.info(f"Total number of CRISPR screen evidence: {combined_evidence.count()}") return combined_evidence def get_parser(): """Parsing command line argument for crispr screen ingestion.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--crispr_brain_mapping", help="Curation for CRISPR Brain dataset containing disease mapping and contrast.", type=str, required=True, ) parser.add_argument( "--output", help="Output gzipped json file.", type=str, required=True, ) parser.add_argument( "--log_file", help="Destination of the logs generated by this script. Defaults to None", type=str, nargs="?", default=None, ) return parser if __name__ == "__main__": args = get_parser().parse_args() # Logger initializer. If no log_file is specified, logs are written to stderr logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if args.log_file: logging.FileHandler(args.log_file) else: logging.StreamHandler(sys.stderr) spark = initialize_sparksession() evidence_df = main( spark, args.crispr_brain_mapping, ) write_evidence_strings(evidence_df, args.output) logging.info(f"Evidence strings have been saved to {args.output}. Exiting.") |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | import argparse import logging import sys from pyspark.conf import SparkConf from pyspark.sql import DataFrame, SparkSession, functions as f, types as t from common.ontology import add_efo_mapping from common.evidence import initialize_sparksession, write_evidence_strings G2P_mutationCsq2functionalCsq = { "uncertain": "SO_0002220", # function_uncertain_variant "absent gene product": "SO_0002317", # absent_gene_product "altered gene product structure": "SO_0002318", # altered_gene_product_structure "5_prime or 3_prime UTR mutation": "SO_0001622", # UTR_variant "increased gene product level": "SO_0002315", # increased_gene_product_level "cis-regulatory or promotor mutation": "SO_0001566", # regulatory_region_variant } def main( dd_file: str, eye_file: str, skin_file: str, cancer_file: str, cardiac_file: str, output_file: str, cache_dir: str, local: bool = False, ) -> None: # Initialize spark session spark = initialize_sparksession() # Read and process G2P's tables into evidence strings gene2phenotype_df = read_input_files( spark, dd_file, eye_file, skin_file, cancer_file, cardiac_file ) logging.info( "Gene2Phenotype panels have been imported. Processing evidence strings." ) evidence_df = process_gene2phenotype(gene2phenotype_df) evidence_df = add_efo_mapping( evidence_strings=evidence_df, ontoma_cache_dir=cache_dir, spark_instance=spark ) logging.info("Disease mappings have been added.") # Saving data: write_evidence_strings(evidence_df, output_file) logging.info( f"{evidence_df.count()} evidence strings have been saved to {output_file}" ) def read_input_files( spark: SparkSession, dd_file: str, eye_file: str, skin_file: str, cancer_file: str, cardiac_file: str, ) -> DataFrame: """ Reads G2P's panel CSV files into a Spark DataFrame forcing the schema """ gene2phenotype_schema = ( t.StructType() .add("gene symbol", t.StringType()) .add("gene mim", t.IntegerType()) .add("disease name", t.StringType()) .add("disease mim", t.StringType()) .add("confidence category", t.StringType()) .add("allelic requirement", t.StringType()) .add("mutation consequence", t.StringType()) .add("phenotypes", t.StringType()) .add("organ specificity list", t.StringType()) .add("pmids", t.StringType()) .add("panel", t.StringType()) .add("prev symbols", t.StringType()) .add("hgnc id", t.IntegerType()) .add("gene disease pair entry date", t.TimestampType()) .add("cross cutting modifier", t.StringType()) .add("mutation consequence flag", t.StringType()) .add("confidence value flag", t.StringType()) .add("comments", t.StringType()) .add("variant consequence", t.StringType()) .add("disease ontology", t.StringType()) ) return ( spark.read.option("multiLine", True) .option("encoding", "UTF-8") .csv( [dd_file, eye_file, skin_file, cancer_file, cardiac_file], schema=gene2phenotype_schema, enforceSchema=True, header=True, sep=",", quote='"', ) # Dropping one incomplete evidence due to parsing problem: # All data: 4094 # Filtered data: 4093 .filter(f.col("gene symbol").isNotNull() & f.col("panel").isNotNull()) ) def process_gene2phenotype(gene2phenotype_df: DataFrame) -> DataFrame: """ The JSON Schema format is applied to the df """ return gene2phenotype_df.select( # Split pubmed IDs to list: f.split(f.col("pmids"), ";").alias("literature"), # Phenotypes are excluded from the schema: # f.split(f.col("phenotypes"), ";").alias("phenotypes"), # Renaming a few columns: f.col("gene symbol").alias("targetFromSourceId"), f.col("panel").alias("studyId"), f.col("confidence category").alias("confidence"), # Parsing allelic requirements: f.when( f.col("allelic requirement").isNotNull(), f.array(f.col("allelic requirement")), ).alias("allelicRequirements"), # Parsing disease from source identifier: f.when(f.col("disease ontology").isNotNull(), f.col("disease ontology")) .when( (~f.col("disease mim").contains("No disease mim")) & (f.col("disease mim").isNotNull()), f.concat(f.lit("OMIM:"), f.col("disease mim")), ) .otherwise(f.lit(None)) .alias("diseaseFromSourceId"), # Cleaning disease names: f.regexp_replace(f.col("disease name"), r".+-related ", "").alias( "diseaseFromSource" ), # Map functional consequences: translate(G2P_mutationCsq2functionalCsq)("mutation consequence").alias( "variantFunctionalConsequenceId" ), # Adding constant columns: f.lit("gene2phenotype").alias("datasourceId"), f.lit("genetic_literature").alias("datatypeId"), ) def translate(mapping): """ Mapping consequences - to SO codes """ def translate_(col): return mapping.get(col) return f.udf(translate_, t.StringType()) if __name__ == "__main__": # Parse CLI arguments parser = argparse.ArgumentParser( description="Parse Gene2Phenotype gene-disease files downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads/" ) parser.add_argument( "-d", "--dd_panel", help="DD panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads", required=True, type=str, ) parser.add_argument( "-e", "--eye_panel", help="Eye panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads", required=True, type=str, ) parser.add_argument( "-s", "--skin_panel", help="Skin panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads", required=True, type=str, ) parser.add_argument( "-c", "--cancer_panel", help="Cancer panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads", required=True, type=str, ) parser.add_argument( "-cr", "--cardiac_panel", help="Cardiac panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads", required=True, type=str, ) parser.add_argument( "-o", "--output_file", help="Absolute path of the gzipped, JSON evidence file.", required=True, type=str, ) parser.add_argument( "-l", "--log_file", help="Filename to store the parser logs.", type=str ) parser.add_argument( "--cache_dir", required=False, help="Directory to store the OnToma cache files in.", ) parser.add_argument( "--local", action="store_true", required=False, default=False, help="Flag to indicate if the script is executed locally or on the cluster", ) args = parser.parse_args() # Get parameters dd_file = args.dd_panel eye_file = args.eye_panel skin_file = args.skin_panel cancer_file = args.cancer_panel cardiac_file = args.cardiac_panel output_file = args.output_file log_file = args.log_file cache_dir = args.cache_dir local = args.local # Configure logger: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if log_file: logging.config.fileConfig(fname=log_file) else: logging.StreamHandler(sys.stderr) # Report input data: logging.info(f"DD panel file: {dd_file}") logging.info(f"Eye panel file: {eye_file}") logging.info(f"Skin panel file: {skin_file}") logging.info(f"Cancer panel file: {cancer_file}") logging.info(f"Cardiac panel file: {cardiac_file}") # Calling main: main( dd_file, eye_file, skin_file, cancer_file, cardiac_file, output_file, cache_dir ) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | import argparse from functools import partial, reduce import logging import sys from pyspark import SparkFiles from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame import pyspark.sql.functions as F import pyspark.sql.types as T from common.evidence import initialize_sparksession, write_evidence_strings from AzGeneBurden import main as process_az_gene_burden from GenebassGeneBurden import main as process_genebass_gene_burden def main( az_binary_data: str, az_quant_data: str, curated_data: str, genebass_data: str, ) -> DataFrame: """This module brings together and exports target/disease evidence generated by AzGeneBurden.py and RegeneronGeneBurdeb.py.""" burden_evidence_sets = [ # Generate evidence from AZ data: process_az_gene_burden(az_binary_data, az_quant_data).persist(), # Generate evidence from manual data: process_gene_burden_curation(curated_data).persist(), # Generate evidence from GeneBass data: process_genebass_gene_burden(genebass_data).persist(), ] unionByDiffSchema = partial(DataFrame.unionByName, allowMissingColumns=True) evd_df = reduce(unionByDiffSchema, burden_evidence_sets).distinct() logging.info(f'Total number of gene_burden evidence: {evd_df.count()}') return evd_df def process_gene_burden_curation(curated_data: str) -> DataFrame: """Process manual gene burden evidence.""" logging.info(f'File with the curated burden associations: {curated_data}') manual_df = read_gene_burden_curation(curated_data) logging.info(f'Total number of imported manual gene_burden evidence: {manual_df.count()}') manual_df = ( manual_df # The columns practically follow the schema, only small things need to be parsed # 1. Confidence intervals are detached .withColumn( 'oddsRatioConfidenceIntervalLower', F.when(F.col('oddsRatio').isNotNull(), F.col('ConfidenceIntervalLower')) ) .withColumn( 'oddsRatioConfidenceIntervalUpper', F.when(F.col('oddsRatio').isNotNull(), F.col('ConfidenceIntervalUpper')) ) .withColumn('betaConfidenceIntervalLower', F.when(F.col('beta').isNotNull(), F.col('ConfidenceIntervalLower'))) .withColumn('betaConfidenceIntervalUpper', F.when(F.col('beta').isNotNull(), F.col('ConfidenceIntervalUpper'))) .drop('ConfidenceIntervalLower', 'ConfidenceIntervalUpper') # 2. Collect PMID and allelic requirements in an array .withColumn('literature', F.array(F.col('literature'))) .withColumn( 'allelicRequirements', F.when(F.col('allelicRequirements').isNotNull(), F.array(F.col('allelicRequirements'))), ) # 3. Split the sex column to form an array .withColumn('sex', F.split(F.col('sex'), ', ')) # 4. Add hardcoded values and drop URLs (will be handled by the FE) and HGNC symbols .withColumn('datasourceId', F.lit('gene_burden')) .withColumn('datatypeId', F.lit('genetic_association')) .drop('url', 'targetFromSource') .distinct() ) return manual_df def read_gene_burden_curation(curated_data: str) -> DataFrame: """Read manual gene burden curation from remote to a Spark DataFrame.""" schema = T.StructType( [ T.StructField('projectId', T.StringType(), True), T.StructField('targetFromSource', T.StringType(), True), T.StructField('targetFromSourceId', T.StringType(), True), T.StructField('diseaseFromSource', T.StringType(), True), T.StructField('diseaseFromSourceMappedId', T.StringType(), True), T.StructField('resourceScore', T.DoubleType(), True), T.StructField('pValueMantissa', T.DoubleType(), True), T.StructField('pValueExponent', T.IntegerType(), True), T.StructField('oddsRatio', T.DoubleType(), True), T.StructField('ConfidenceIntervalLower', T.DoubleType(), True), T.StructField('ConfidenceIntervalUpper', T.DoubleType(), True), T.StructField('beta', T.DoubleType(), True), T.StructField('sex', T.StringType(), True), T.StructField('ancestry', T.StringType(), True), T.StructField('ancestryId', T.StringType(), True), T.StructField('cohortId', T.StringType(), True), T.StructField('studySampleSize', T.IntegerType(), True), T.StructField('studyCases', T.IntegerType(), True), T.StructField('studyCasesWithQualifyingVariants', T.IntegerType(), True), T.StructField('allelicRequirements', T.StringType(), True), T.StructField('studyId', T.StringType(), True), T.StructField('statisticalMethod', T.StringType(), True), T.StructField('statisticalMethodOverview', T.StringType(), True), T.StructField('literature', T.StringType(), True), T.StructField('url', T.StringType(), True), ] ) spark.sparkContext.addFile(curated_data) return SparkSession.getActiveSession().read.csv( SparkFiles.get(curated_data.split('/')[-1]), sep='\t', header=True, schema=schema ) def get_parser(): """Get parser object for script GeneBurden.py.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( '--az_binary_data', help='Input parquet files with AZ\'s PheWAS associations of binary traits.', type=str, required=True, ) parser.add_argument( '--az_quant_data', help='Input parquet files with AZ\'s PheWAS associations of quantitative traits.', type=str, required=True, ) parser.add_argument( '--curated_data', help='Input remote CSV file containing the gene burden manual curation.', type=str, required=True, ) parser.add_argument( '--genebass_data', help='Input parquet files with Genebass\'s burden associations.', type=str, required=True, ) parser.add_argument( '--output', help='Output gzipped json file following the gene_burden evidence data model.', type=str, required=True, ) parser.add_argument( '--log_file', help='Destination of the logs generated by this script. Defaults to None', type=str, nargs='?', default=None, ) return parser if __name__ == "__main__": args = get_parser().parse_args() # Logger initializer. If no log_file is specified, logs are written to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.log_file: logging.config.fileConfig(filename=args.log_file) else: logging.StreamHandler(sys.stderr) spark = initialize_sparksession() evd_df = main( az_binary_data=args.az_binary_data, az_quant_data=args.az_quant_data, curated_data=args.curated_data, genebass_data=args.genebass_data, ) write_evidence_strings(evd_df, args.output) logging.info(f'Evidence strings have been saved to {args.output}. Exiting.') |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 | import argparse import logging import os import pathlib import shutil import tempfile import urllib.request import pronto from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.sql import Window import pyspark.sql.functions as pf from pyspark.sql.types import StructType, StructField, StringType import requests from retry import retry from common.ontology import add_efo_mapping from common.evidence import detect_spark_memory_limit, write_evidence_strings # The tables and their fields to fetch from SOLR. Other tables (not currently used): gene, disease_gene_summary. IMPC_SOLR_TABLES = { # Mouse to human mappings. 'gene_gene': ('gene_id', 'hgnc_gene_id'), 'ontology_ontology': ('mp_id', 'hp_id'), # Mouse model and disease data. 'mouse_model': ('model_id', 'model_phenotypes'), 'disease': ('disease_id', 'disease_phenotypes'), 'disease_model_summary': ('model_id', 'model_genetic_background', 'model_description', 'disease_id', 'disease_term', 'disease_model_avg_norm', 'disease_model_max_norm', 'marker_id'), 'ontology': ('ontology', 'phenotype_id', 'phenotype_term'), } # List of fields on which to enforce uniqueness by only keeping the record with the highest score. UNIQUE_FIELDS = [ # Specific to IMPC. 'diseaseFromSource', # Original disease name. 'targetInModel', # Mouse gene name. 'biologicalModelAllelicComposition', # Mouse model property. 'biologicalModelGeneticBackground', # Mouse model property. # General. 'diseaseFromSourceMappedId', # EFO mapped disease ID. 'targetFromSourceId', # Ensembl mapped human gene ID. ] class ImpcSolrRetriever: """Retrieve data from the IMPC SOLR API and save the CSV files to the specified location.""" # Mouse model data from IMPC SOLR. IMPC_SOLR_HOST = 'http://www.ebi.ac.uk/mi/impc/solr/phenodigm/select' # The largest table is about 7 million records. The one billion limit is used as an arbitrary high number to # retrieve all records in one large request, which maximises the performance. IMPC_SOLR_BATCH_SIZE = 1000000000 IMPC_SOLR_TIMEOUT = 3600 # The decorator ensures that the requests are retried in case of network or server errors. @retry(tries=3, delay=5, backoff=1.2, jitter=(1, 3)) def get_number_of_solr_records(self, data_type): params = {'q': '*:*', 'fq': f'type:{data_type}', 'rows': 0} response = requests.get(self.IMPC_SOLR_HOST, params=params, timeout=self.IMPC_SOLR_TIMEOUT) response.raise_for_status() # Check for HTTP errors. This will be caught by @retry. return response.json()['response']['numFound'] @retry(tries=3, delay=5, backoff=1.2, jitter=(1, 3)) def query_solr(self, data_type, start): """Request one batch of SOLR records of the specified data type and write it into a temporary file.""" list_of_columns = [column.split(' > ')[0] for column in IMPC_SOLR_TABLES[data_type]] params = {'q': '*:*', 'fq': f'type:{data_type}', 'start': start, 'rows': self.IMPC_SOLR_BATCH_SIZE, 'wt': 'csv', 'fl': ','.join(list_of_columns)} response = requests.get(self.IMPC_SOLR_HOST, params=params, timeout=self.IMPC_SOLR_TIMEOUT, stream=True) response.raise_for_status() # Write records as they appear to avoid keeping the entire response in memory. with tempfile.NamedTemporaryFile('wt', delete=False) as tmp_file: response_lines = response.iter_lines(decode_unicode=True) header = next(response_lines) if start == 0: # Only write the header for the first requested batch. tmp_file.write(header + '\n') number_of_records = 0 for line in response_lines: number_of_records += 1 tmp_file.write(line + '\n') return number_of_records, tmp_file.name def fetch_data(self, data_type, output_filename): """Fetch all rows of the requested data type to the specified location.""" total_records = self.get_number_of_solr_records(data_type) assert total_records != 0, f'SOLR did not return any data for {data_type}.' with open(output_filename, 'wb') as outfile: start, total = 0, 0 # Initialise the counters. while True: number_of_records, tmp_filename = self.query_solr(data_type, start) with open(tmp_filename, 'rb') as tmp_file: shutil.copyfileobj(tmp_file, outfile) os.remove(tmp_filename) # Increment the counters. start += self.IMPC_SOLR_BATCH_SIZE total += number_of_records # Exit when all documents have been retrieved. if total == total_records: break class IMPC: """Retrieve the data, load it into Spark, process and write the resulting evidence strings.""" # Human gene mappings. HGNC_DATASET_URI = 'http://ftp.ebi.ac.uk/pub/databases/genenames/hgnc/tsv/hgnc_complete_set.txt' HGNC_DATASET_FILENAME = 'hgnc_complete_set.txt' # Mouse gene mappings. MGI_DATASET_URI = 'http://www.informatics.jax.org/downloads/reports/MGI_Gene_Model_Coord.rpt' MGI_DATASET_FILENAME = 'MGI_Gene_Model_Coord.rpt' # Mouse PubMed references. MGI_PUBMED_URI = 'http://www.informatics.jax.org/downloads/reports/MGI_PhenoGenoMP.rpt' MGI_PUBMED_FILENAME = 'MGI_PhenoGenoMP.rpt' # Mammalian Phenotype ontology. MGI_MP_URI = 'http://www.informatics.jax.org/downloads/reports/mp.owl' MGI_MP_FILENAME = 'MGI_mp.owl' IMPC_FILENAME = 'impc_solr_{data_type}.csv' def __init__(self, logger, cache_dir, local): self.logger = logger self.cache_dir = cache_dir self.gene_mapping, self.literature, self.mouse_phenotype_to_human_phenotype = [None] * 3 self.model_mouse_phenotypes, self.disease_human_phenotypes, self.disease_model_summary = [None] * 3 self.ontology, self.mp_terms, self.hp_terms, self.mp_class = [None] * 4 self.evidence, self.mouse_phenotypes = [None] * 2 # Initialize spark session spark_mem_limit = detect_spark_memory_limit() spark_conf = ( SparkConf() .set('spark.driver.memory', f'{spark_mem_limit}g') .set('spark.executor.memory', f'{spark_mem_limit}g') .set('spark.driver.maxResultSize', '0') .set('spark.debug.maxToStringFields', '2000') .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000') ) self.spark = ( SparkSession.builder .config(conf=spark_conf) .master('local[*]') .getOrCreate() ) def update_cache(self): """Fetch the Ensembl gene ID and SOLR data into the local cache directory.""" pathlib.Path(self.cache_dir).mkdir(parents=False, exist_ok=True) self.logger.info('Fetching human gene ID mappings from HGNC.') urllib.request.urlretrieve(self.HGNC_DATASET_URI, os.path.join(self.cache_dir, self.HGNC_DATASET_FILENAME)) self.logger.info('Fetching mouse gene ID mappings from MGI.') urllib.request.urlretrieve(self.MGI_DATASET_URI, os.path.join(self.cache_dir, self.MGI_DATASET_FILENAME)) self.logger.info('Fetching mouse PubMed references from MGI.') urllib.request.urlretrieve(self.MGI_PUBMED_URI, os.path.join(self.cache_dir, self.MGI_PUBMED_FILENAME)) self.logger.info('Fetching Mammalian Phenotype ontology definitions from MGI.') urllib.request.urlretrieve(self.MGI_MP_URI, os.path.join(self.cache_dir, self.MGI_MP_FILENAME)) self.logger.info('Fetching data from IMPC SOLR.') impc_solr_retriever = ImpcSolrRetriever() for data_type in IMPC_SOLR_TABLES: self.logger.info(f'Fetching data type {data_type}.') filename = os.path.join(self.cache_dir, self.IMPC_FILENAME.format(data_type=data_type)) impc_solr_retriever.fetch_data(data_type, filename) def load_tsv(self, filename, column_names=None): if column_names: schema = StructType([ StructField(column_name, StringType(), True) for column_name in column_names ]) header = False else: schema = None header = True return self.spark.read.csv(os.path.join(self.cache_dir, filename), sep='\t', header=header, schema=schema, nullValue='null') def load_solr_csv(self, data_type): """Load the CSV from SOLR; rename and select columns as specified.""" df = self.spark.read.csv( os.path.join(self.cache_dir, self.IMPC_FILENAME.format(data_type=data_type)), header=True ) column_name_mappings = [column_map.split(' > ') for column_map in IMPC_SOLR_TABLES[data_type]] columns_to_rename = {mapping[0]: mapping[1] for mapping in column_name_mappings if len(mapping) == 2} new_column_names = [mapping[-1] for mapping in column_name_mappings] # Rename columns. for old_column_name, new_column_name in columns_to_rename.items(): df = df.withColumnRenamed(old_column_name, new_column_name) # Restrict only to the columns we need. return df.select(new_column_names) def load_data_from_cache(self): """Load SOLR and MRI data from the downloaded TSV/CSV files into Spark and prepare for processing.""" # MGI gene ID → MGI gene name, Ensembl mouse gene ID. mgi_gene_id_to_ensembl_mouse_gene_id = ( self.load_tsv(self.MGI_DATASET_FILENAME) .withColumnRenamed('1. MGI accession id', 'targetInModelMgiId') .withColumnRenamed('3. marker symbol', 'targetInModel') .withColumnRenamed('11. Ensembl gene id', 'targetInModelEnsemblId') .filter(pf.col('targetInModelMgiId').isNotNull()) .select('targetInModelMgiId', 'targetInModel', 'targetInModelEnsemblId') # E.g. 'MGI:87859', 'Abl1', 'ENSMUSG00000026842'. ) # MGI gene ID → HGNC ID. mouse_gene_to_human_gene = ( self.load_solr_csv('gene_gene') .withColumnRenamed('gene_id', 'targetInModelMgiId') .select('targetInModelMgiId', 'hgnc_gene_id') # E.g. 'MGI:1346074', 'HGNC:4024'. ) # HGNC ID → Ensembl human gene ID. hgnc_gene_id_to_ensembl_human_gene_id = ( self.load_tsv(self.HGNC_DATASET_FILENAME) .withColumnRenamed('hgnc_id', 'hgnc_gene_id') .withColumnRenamed('ensembl_gene_id', 'targetFromSourceId') .select('hgnc_gene_id', 'targetFromSourceId') # E.g. 'HGNC:5', 'ENSG00000121410'. ) # Using the three datasets above, we construct the complete gene mapping from MGI gene ID (the only type of # identifier used in the source data) to gene name, mouse Ensembl ID and human Ensembl ID. In cases where # mappings are not one to one, joins will handle the necessary explosions. self.gene_mapping = ( mgi_gene_id_to_ensembl_mouse_gene_id .join(mouse_gene_to_human_gene, on='targetInModelMgiId', how='inner') .join(hgnc_gene_id_to_ensembl_human_gene_id, on='hgnc_gene_id', how='inner') # For both the evidence and mousePhenotypes datasets, entries without human gene mapping are unusable. .filter(pf.col('targetFromSourceId').isNotNull()) .select('targetInModelMgiId', 'targetInModel', 'targetInModelEnsemblId', 'targetFromSourceId') # E.g. 'MGI:87859', 'Abl1', 'ENSMUSG00000026842', 'ENSG00000121410'. ) # Mouse to human phenotype mappings. self.mouse_phenotype_to_human_phenotype = ( self.load_solr_csv('ontology_ontology') .select('mp_id', 'hp_id') # E.g. 'MP:0000745', 'HP:0100033'. ) # Mouse model and disease data. On loading, we split lists of phenotypes in the `mouse_model` and `disease` # tables and keep only the ID. For example, one row with 'MP:0001529 abnormal vocalization,MP:0002981 increased # liver weight' becomes two rows with 'MP:0001529' and 'MP:0002981'. Also note that the models are accessioned # with the same prefix ('MGI:') as genes, but they are separate entities. self.model_mouse_phenotypes = ( self.load_solr_csv('mouse_model') .withColumn('mp_id', pf.expr(r"regexp_extract_all(model_phenotypes, '(MP:\\d+)', 1)")) .withColumn('mp_id', pf.explode('mp_id')) .select('model_id', 'mp_id') # E. g. 'MGI:3800884', 'MP:0001304'. ) self.disease_human_phenotypes = ( self.load_solr_csv('disease') .withColumn('hp_id', pf.expr(r"regexp_extract_all(disease_phenotypes, '(HP:\\d+)', 1)")) .withColumn('hp_id', pf.explode('hp_id')) .select('disease_id', 'hp_id') # E.g. 'OMIM:609258', 'HP:0000545 Myopia'. ) self.disease_model_summary = ( self.load_solr_csv('disease_model_summary') .withColumnRenamed('model_genetic_background', 'biologicalModelGeneticBackground') .withColumnRenamed('model_description', 'biologicalModelAllelicComposition') # In Phenodigm, the scores report the association between diseases and animal models, not genes. The # phenotype similarity is computed using an algorithm called OWLSim which expresses the similarity in terms # of the Jaccard Index (simJ) or Information Content (IC). Therefore, to compute the score you can take the # maximum score of both analyses (disease_model_max_norm) or a combination of them both # (disease_model_avg_norm). In the Results and discussion section of the Phenodigm paper, the methods are # compared to a number of gold standards. It is concluded that the geometric mean of both analyses is the # superior metric and should therefore be used as the score. .withColumn('resourceScore', pf.col('disease_model_avg_norm').cast('float')) .drop('disease_model_avg_norm') .withColumnRenamed('marker_id', 'targetInModelMgiId') .select('model_id', 'biologicalModelGeneticBackground', 'biologicalModelAllelicComposition', 'disease_id', 'disease_term', 'resourceScore', 'targetInModelMgiId') # E. g. 'MGI:2681494', 'C57BL/6JY-smk', 'smk/smk', 'ORPHA:3097', 'Meacham Syndrome', 91.6, 'MGI:98324'. ) self.ontology = ( self.load_solr_csv('ontology') # E.g. 'HP', 'HP:0000002', 'Abnormality of body height'. .filter((pf.col('ontology') == 'MP') | (pf.col('ontology') == 'HP')) ) assert self.ontology.select('phenotype_id').distinct().count() == self.ontology.count(), \ f'Encountered multiple names for the same term in the ontology table.' # Process ontology information to enable MP and HP term lookup based on the ID. self.mp_terms, self.hp_terms = ( self.ontology .filter(pf.col('ontology') == ontology_name) .withColumnRenamed('phenotype_id', f'{ontology_name.lower()}_id') .withColumnRenamed('phenotype_term', f'{ontology_name.lower()}_term') .select(f'{ontology_name.lower()}_id', f'{ontology_name.lower()}_term') for ontology_name in ('MP', 'HP') ) # Process MP definitions to extract high level classes for each term. mp = pronto.Ontology(os.path.join(self.cache_dir, self.MGI_MP_FILENAME)) high_level_classes = set(mp['MP:0000001'].subclasses(distance=1)) - {mp['MP:0000001']} mp_class = [ [term.id, mp_high_level_class.id, mp_high_level_class.name] for mp_high_level_class in high_level_classes for term in mp_high_level_class.subclasses() ] self.mp_class = self.spark.createDataFrame( data=mp_class, schema=['modelPhenotypeId', 'modelPhenotypeClassId', 'modelPhenotypeClassLabel'] # E.g. 'MP:0000275', 'MP:0005385', 'cardiovascular system phenotype' ) # Literature cross-references which link a given mouse phenotype and a given mouse target. mgi_pubmed = ( self.load_tsv( self.MGI_PUBMED_FILENAME, column_names=['_0', '_1', '_2', 'mp_id', 'literature', 'targetInModelMgiId'] ) .select('mp_id', 'literature', 'targetInModelMgiId') .distinct() # Separate and explode targets. .withColumn('targetInModelMgiId', pf.split(pf.col('targetInModelMgiId'), r'\|')) .withColumn('targetInModelMgiId', pf.explode('targetInModelMgiId')) # Separate and explode literature references. .withColumn('literature', pf.split(pf.col('literature'), r'\|')) .withColumn('literature', pf.explode('literature')) .select('mp_id', 'literature', 'targetInModelMgiId') # E.g. 'MP:0000600', '12529408', 'MGI:97874'. ) # Literature references for a given (model, gene) combination. self.literature = ( self.disease_model_summary .select('model_id', 'targetInModelMgiId') .distinct() .join(self.model_mouse_phenotypes, on='model_id', how='inner') .join(mgi_pubmed, on=['targetInModelMgiId', 'mp_id'], how='inner') .groupby('model_id', 'targetInModelMgiId') .agg(pf.collect_set(pf.col('literature')).alias('literature')) .select('model_id', 'targetInModelMgiId', 'literature') ) @staticmethod def _cleanup_model_identifier(dataset): return ( # Model ID adjustments. First, strip the trailing modifiers, where present. The original ID, used for table # joins, may look like 'MGI:6274930#hom#early', where the first part is the allele ID and the second # specifies the zygotic state. There can be several models for the same allele ID with different phenotypes. # However, this information is also duplicated in `biologicalModelGeneticBackground` (for example: # 'C57BL/6NCrl,Ubl7<em1(IMPC)Tcp> hom early'), so in this field we strip those modifiers. dataset .withColumn( 'biologicalModelId', pf.split(pf.col('model_id'), '#').getItem(0) ) .drop('model_id') # Second, we only want to output the model names from the MGI namespace. An example of something we *don't* # want is 'NOT-RELEASED-025eb4a791'. This will be converted to null. .withColumn( 'biologicalModelId', pf.when(pf.col('biologicalModelId').rlike(r'^MGI:\d+$'), pf.col('biologicalModelId')) ) ) def generate_IMPC_evidence_strings(self, score_cutoff): """Generate the evidence by renaming, transforming and joining the columns.""" # Map mouse model phenotypes into human terms. model_human_phenotypes = ( self.model_mouse_phenotypes .join(self.mouse_phenotype_to_human_phenotype, on='mp_id', how='inner') .select('model_id', 'hp_id') ) # We are reporting all mouse phenotypes for a model, regardless of whether they can be mapped into any human # disease. all_mouse_phenotypes = ( self.model_mouse_phenotypes .join(self.mp_terms, on='mp_id', how='inner') .groupby('model_id') .agg( pf.collect_set(pf.struct( pf.col('mp_id').alias('id'), pf.col('mp_term').alias('label') )).alias('diseaseModelAssociatedModelPhenotypes') ) .select('model_id', 'diseaseModelAssociatedModelPhenotypes') ) # For human phenotypes, we only want to include the ones which are present in the disease *and* also can be # traced back to the model phenotypes through the MP → HP mapping relationship. matched_human_phenotypes = ( # We start with all possible pairs of model-disease associations. self.disease_model_summary.select('model_id', 'disease_id') # Add all disease phenotypes. Now we have: model_id, disease_id, hp_id (from disease). .join(self.disease_human_phenotypes, on='disease_id', how='inner') # Only keep the phenotypes which also appear in the mouse model (after mapping). .join(model_human_phenotypes, on=['model_id', 'hp_id'], how='inner') # Add ontology terms in addition to IDs. Now we have: model_id, disease_id, hp_id, hp_term. .join(self.hp_terms, on='hp_id', how='inner') .groupby('model_id', 'disease_id') .agg( pf.collect_set(pf.struct( pf.col('hp_id').alias('id'), pf.col('hp_term').alias('label') )).alias('diseaseModelAssociatedHumanPhenotypes') ) .select('model_id', 'disease_id', 'diseaseModelAssociatedHumanPhenotypes') ) self.evidence = ( # This table contains all unique (model_id, disease_id) associations which form the base of the evidence # strings. self.disease_model_summary # Filter out the associations with a low score. .filter(~(pf.col('resourceScore') < score_cutoff)) # Add mouse gene mapping information. The mappings are not necessarily one to one. When this happens, join # will handle the necessary explosions, and a single row from the original table will generate multiple # evidence strings. This adds the fields 'targetFromSourceId', 'targetInModelEnsemblId', and # 'targetFromSourceId'. .join(self.gene_mapping, on='targetInModelMgiId', how='inner') # Add all mouse phenotypes of the model → `diseaseModelAssociatedModelPhenotypes`. .join(all_mouse_phenotypes, on='model_id', how='left') # Add the matched model/disease human phenotypes → `diseaseModelAssociatedHumanPhenotypes`. .join(matched_human_phenotypes, on=['model_id', 'disease_id'], how='left') # Add literature references → 'literature'. .join(self.literature, on=['model_id', 'targetInModelMgiId'], how='left') ) self.evidence = ( # Post-process model ID field. self._cleanup_model_identifier(self.evidence) # Rename the disease data columns. .withColumnRenamed('disease_id', 'diseaseFromSourceId') .withColumnRenamed('disease_term', 'diseaseFromSource') # Add constant value columns. .withColumn('datasourceId', pf.lit('impc')) .withColumn('datatypeId', pf.lit('animal_model')) ) # Add EFO mapping information. self.evidence = add_efo_mapping(evidence_strings=self.evidence, spark_instance=self.spark, ontoma_cache_dir=self.cache_dir) # In case of multiple records with the same unique fields, keep only the one record with the highest score. This # is done to avoid duplicates where multiple source ontology records map to the same EFO record with slightly # different scores. w = Window.partitionBy([pf.col(c) for c in UNIQUE_FIELDS]).orderBy(pf.col('resourceScore').desc()) self.evidence = ( self.evidence .withColumn('row', pf.row_number().over(w)) .filter(pf.col('row') == 1) .drop('row') ) # Ensure stable column order. self.evidence = self.evidence.select( 'biologicalModelAllelicComposition', 'biologicalModelGeneticBackground', 'biologicalModelId', 'datasourceId', 'datatypeId', 'diseaseFromSource', 'diseaseFromSourceId', 'diseaseFromSourceMappedId', 'diseaseModelAssociatedHumanPhenotypes', 'diseaseModelAssociatedModelPhenotypes', 'literature', 'resourceScore', 'targetFromSourceId', 'targetInModel', 'targetInModelEnsemblId', 'targetInModelMgiId' ) def generate_mouse_phenotypes_dataset(self): """Generate the related mousePhenotypes dataset for the corresponding widget in the target object.""" mouse_phenotypes = ( # Extract base model-target associations. self.disease_model_summary .select('model_id', 'biologicalModelAllelicComposition', 'biologicalModelGeneticBackground', 'targetInModelMgiId') .distinct() # Add gene mapping information. .join(self.gene_mapping, on='targetInModelMgiId', how='inner') # Add mouse phenotypes. .join(self.model_mouse_phenotypes, on='model_id', how='inner') .join(self.mp_terms, on='mp_id', how='inner') # Add literature references. .join(self.literature, on=['model_id', 'targetInModelMgiId'], how='left') # Rename fields. .withColumnRenamed('mp_id', 'modelPhenotypeId') .withColumnRenamed('mp_term', 'modelPhenotypeLabel') # Join phenotype class information. .join(self.mp_class, on='modelPhenotypeId', how='inner') ) # Post-process model ID field. mouse_phenotypes = self._cleanup_model_identifier(mouse_phenotypes) # Convert the schema from flat to partially nested, grouping related models and phenotype classes. self.mouse_phenotypes = ( mouse_phenotypes .groupby( 'targetInModel', 'targetInModelMgiId', 'targetInModelEnsemblId', 'targetFromSourceId', 'modelPhenotypeId', 'modelPhenotypeLabel' ) .agg( pf.collect_set( pf.struct( pf.col('biologicalModelAllelicComposition').alias('allelicComposition'), pf.col('biologicalModelGeneticBackground').alias('geneticBackground'), pf.col('biologicalModelId').alias('id'), pf.col('literature') ) ).alias('biologicalModels'), pf.collect_set( pf.struct( pf.col('modelPhenotypeClassId').alias('id'), pf.col('modelPhenotypeClassLabel').alias('label') ) ).alias('modelPhenotypeClasses') ) ) def write_datasets(self, evidence_strings_filename, mouse_phenotypes_filename): """Dump the Spark evidence dataframe as a compressed JSON file. The order of the evidence strings is not maintained, and they are returned in random order as collected by Spark.""" for dataset, outfile in ((self.evidence, evidence_strings_filename), (self.mouse_phenotypes, mouse_phenotypes_filename)): logging.info(f'Processing dataset {outfile}') write_evidence_strings(dataset, outfile) def main( cache_dir, evidence_output, mouse_phenotypes_output, score_cutoff, use_cached=False, log_file=None, local=False ) -> None: # Initialize the logger based on the provided log file. If no log file is specified, logs are written to STDERR. logging_config = { 'level': logging.INFO, 'format': '%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', 'datefmt': '%Y-%m-%d %H:%M:%S', } if log_file: logging_config['filename'] = log_file logging.basicConfig(**logging_config) # Process the data. impc = IMPC(logging, cache_dir, local) if not use_cached: logging.info('Update the HGNC/MGI/SOLR cache.') impc.update_cache() logging.info('Load gene mappings and SOLR data from local cache.') impc.load_data_from_cache() logging.info('Build the evidence strings.') impc.generate_IMPC_evidence_strings(score_cutoff) logging.info('Generate the mousePhenotypes dataset.') impc.generate_mouse_phenotypes_dataset() logging.info('Collect and write the datasets.') impc.write_datasets(evidence_output, mouse_phenotypes_output) if __name__ == '__main__': parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) req = parser.add_argument_group('required arguments') req.add_argument('--cache-dir', required=True, help='Directory to store the HGNC/MGI/SOLR cache files in.') req.add_argument('--output-evidence', required=True, help='Name of the json.gz file to output the evidence strings into.') req.add_argument('--output-mouse-phenotypes', required=True, help='Name of the json.gz file to output the mousePhenotypes dataset into.') parser.add_argument('--score-cutoff', help=( 'Discard model-disease associations with the `disease_model_avg_norm` score less than this value. The score ' 'ranges from 0 to 100.' ), type=float, default=0.0) parser.add_argument('--use-cached', help='Use the existing cache and do not update it.', action='store_true') parser.add_argument('--log-file', help='Optional filename to redirect the logs into.') parser.add_argument( '--local', action='store_true', required=False, default=False, help='Flag to indicate if the script is executed locally or on the cluster' ) args = parser.parse_args() main(args.cache_dir, args.output_evidence, args.output_mouse_phenotypes, args.score_cutoff, args.use_cached, args.log_file, args.local) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | from __future__ import annotations import argparse import sys import logging from pyspark.sql import DataFrame, functions as f from common.evidence import ( write_evidence_strings, initialize_sparksession, ) class intogenEvidenceGenerator: def __init__( self: intogenEvidenceGenerator, inputGenes: str, inputCohorts: str, diseaseMapping: str | None, ) -> None: # Initialize spark session: self.spark = initialize_sparksession() # Reading cohort table: cohorts = self.spark.read.csv( inputCohorts, sep=r"\t", header=True, inferSchema=True ).drop("SAMPLES") # Reading cancer driver genes: self.evidence = ( self.spark.read.csv(inputGenes, sep=r"\t", header=True, inferSchema=True) # Join with cohort table: .join(cohorts, on="COHORT", how="inner").select( # Adding constant columns: f.lit("somatic_mutation").alias("datatypeId"), f.lit("intogen").alias("datasourceId"), # Extracting gene fields: f.col("SYMBOL").alias("targetFromSourceId"), # Extracting cohort specific annotation: f.col("COHORT").alias("cohortId"), f.col("COHORT_NICK").alias("cohortShortName"), f.col("COHORT_NAME").alias("cohortDescription"), # Splitting methods: f.split(f.col("METHODS"), ",").alias("significantDriverMethods"), # Generating mutated samples column: f.array( f.struct( # Parse functional consequences: f.when(f.col("ROLE") == "Act", "SO_0002053") # Gain of function .when(f.col("ROLE") == "LoF", "SO_0002054") # Loss of function .otherwise(None) .alias("functionalConsequenceId"), # Extract samples: f.col("SAMPLES").alias("numberSamplesTested"), f.col("SAMPLES").alias("numberMutatedSamples"), ) ).alias("mutatedSamples"), # Adding disease name: f.col("CANCER").alias("Cancer_type_acronym"), f.col("CANCER_NAME").alias("diseaseFromSource"), # Adding score: f.col("QVALUE_COMBINATION").alias("resourceScore"), ) ) # Mapping step executed if mapping file is provided: if diseaseMapping is not None: logging.info(f"Applying disease mapping from {diseaseMapping}.") self.evidence = self.cancer2EFO(diseaseMapping) # Extracting stats about mapping: unmapped_evidence_count = self.evidence.filter( f.col("diseaseFromSourceMappedId").isNull() ).count() unmapped_disease_count = ( self.evidence.filter(f.col("diseaseFromSourceMappedId").isNull()) .select("diseaseFromSource") .distinct() .count() ) logging.info( f"Number of evidence without EFO mapping: {unmapped_evidence_count}" ) logging.info( f"Number of diseases without EFO mapping: {unmapped_disease_count}" ) # Dropping cancer type acronym: self.evidence = self.evidence.drop("Cancer_type_acronym") def write_evidence(self: intogenEvidenceGenerator, evidence_file: str) -> None: write_evidence_strings(self.evidence, evidence_file) logging.info( f"{self.evidence.count()} evidence strings saved into {evidence_file}. Exiting." ) def cancer2EFO(self: intogenEvidenceGenerator, diseaseMapping: str) -> DataFrame: diseaseMappingsFile = self.spark.read.csv( diseaseMapping, sep=r"\t", header=True ).select( "Cancer_type_acronym", f.trim(f.col("EFO_id")).alias("diseaseFromSourceMappedId"), ) return self.evidence.join( diseaseMappingsFile, on="Cancer_type_acronym", how="left" ) def main(inputGenes, inputCohorts, diseaseMapping, outputFile): # Logging parameters logging.info(f"intOGen driver genes table: {inputGenes}") logging.info(f"intOGen cohorts table: {inputCohorts}") logging.info(f"Output file: {outputFile}") if diseaseMapping: logging.info(f"Cancer type to EFO ID table: {diseaseMapping}") else: logging.info("Disease mapping is skipped.") # Initialize evidence builder object evidenceBuilder = intogenEvidenceGenerator(inputGenes, inputCohorts, diseaseMapping) # Write evidence file: evidenceBuilder.write_evidence(outputFile) if __name__ == "__main__": # Initiating parser parser = argparse.ArgumentParser( description="This script generates evidences for the intOGen data source." ) parser.add_argument( "-g", "--inputGenes", required=True, type=str, help="Input source .tsv file listing the driver genes across the analyzed cohorts.", ) parser.add_argument( "-c", "--inputCohorts", required=True, type=str, help="Input source .tsv file with information about the analyzed cohorts.", ) parser.add_argument( "-d", "--diseaseMapping", required=False, type=str, help="Input look-up table containing the cancer type mappings to an EFO ID.", ) parser.add_argument( "-o", "--outputFile", required=True, type=str, help="Gzipped JSON file containing the evidence strings.", ) parser.add_argument( "-l", "--logFile", help="Destination of the logs generated by this script.", type=str, required=False, ) # Parsing parameters args = parser.parse_args() inputGenes = args.inputGenes inputCohorts = args.inputCohorts diseaseMapping = args.diseaseMapping outputFile = args.outputFile # Initialize logging: logging.basicConfig( level=logging.INFO, format="%(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if args.logFile: logging.config.fileConfig(filename=args.logFile) else: logging.StreamHandler(sys.stderr) main(inputGenes, inputCohorts, diseaseMapping, outputFile) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | import argparse from itertools import chain import logging import sys import xml.etree.ElementTree as ET from pyspark.sql import DataFrame, Row from pyspark.sql.functions import array_distinct, col, create_map, lit, split from common.ontology import add_efo_mapping from common.evidence import initialize_sparksession, write_evidence_strings # The rest of the types are assigned to -> germline for allele origins EXCLUDED_ASSOCIATIONTYPES = [ 'Major susceptibility factor in', 'Part of a fusion gene in', 'Candidate gene tested in', 'Role in the phenotype of', 'Biomarker tested in', 'Disease-causing somatic mutation(s) in', ] # Assigning variantFunctionalConsequenceId: CONSEQUENCE_MAP = { 'Disease-causing germline mutation(s) (loss of function) in': 'SO_0002054', 'Disease-causing germline mutation(s) in': None, 'Modifying germline mutation in': None, 'Disease-causing germline mutation(s) (gain of function) in': 'SO_0002053', } def main(input_file: str, output_file: str, cache_dir: str) -> None: # Read and process Orphanet's XML file into evidence strings orphanet_df = parse_orphanet_xml(input_file, spark) logging.info('Orphanet input file has been imported. Processing evidence strings.') evidence_df = process_orphanet(orphanet_df) evidence_df = add_efo_mapping(evidence_strings=evidence_df, spark_instance=spark, ontoma_cache_dir=cache_dir) logging.info('Disease mappings have been added.') # Save data write_evidence_strings(evidence_df, output_file) logging.info(f'{evidence_df.count()} evidence strings have been saved to {output_file}') def parse_orphanet_xml(orphanet_file: str, spark_instance) -> DataFrame: """ Function to parse Orphanet xml dump and return the parsed data as a list of dictionaries. Args: orphanet_file (str): Orphanet XML filename Returns: parsed data as a list of dictionaries """ # Reading + validating xml: tree = ET.parse(orphanet_file) assert isinstance(tree, ET.ElementTree) root = tree.getroot() assert isinstance(root, ET.Element) # Checking if the basic nodes are in the xml structure: logging.info(f"There are {root.find('DisorderList').get('count')} disease in the Orphanet xml file.") orphanet_disorders = [] for disorder in root.find('DisorderList').findall('Disorder'): # Extracting disease information: parsed_disorder = { 'diseaseFromSource': disorder.find('Name').text, 'diseaseFromSourceId': 'Orphanet_' + disorder.find('OrphaCode').text, 'type': disorder.find('DisorderType/Name').text, } # One disease might be mapped to multiple genes: for association in disorder.find('DisorderGeneAssociationList'): # For each mapped gene, an evidence is created: evidence = parsed_disorder.copy() # Not all gene/disease association is backed up by publication: try: evidence['literature'] = [ pmid.replace('[PMID]', '').rstrip() for pmid in association.find('SourceOfValidation').text.split('_') if '[PMID]' in pmid ] except AttributeError: evidence['literature'] = None evidence['associationType'] = association.find('DisorderGeneAssociationType/Name').text evidence['confidence'] = association.find('DisorderGeneAssociationStatus/Name').text # Parse gene name and id - going for Ensembl gene id only: gene = association.find('Gene') evidence['targetFromSource'] = gene.find('Name').text # Extracting ensembl gene id from cross references: ensembl_gene_id = [ xref.find('Reference').text for xref in gene.find('ExternalReferenceList') if 'ENSG' in xref.find('Reference').text ] evidence['targetFromSourceId'] = ensembl_gene_id[0] if len(ensembl_gene_id) > 0 else None # Collect evidence: orphanet_disorders.append(evidence) # Create a spark dataframe from the parsed data orphanet_df = spark_instance.createDataFrame(Row(**x) for x in orphanet_disorders) return orphanet_df def process_orphanet(orphanet_df: DataFrame) -> DataFrame: """ The JSON Schema format is applied to the df """ # Map association type to sequence ontology ID: so_mapping_expr = create_map([lit(x) for x in chain(*CONSEQUENCE_MAP.items())]) evidence_df = ( orphanet_df.filter(~col('associationType').isin(EXCLUDED_ASSOCIATIONTYPES)) .filter(~col('targetFromSourceId').isNull()) .withColumn('dataSourceId', lit('orphanet')) .withColumn('datatypeId', lit('genetic_association')) .withColumn('alleleOrigins', split(lit('germline'), '_')) .withColumn('literature', array_distinct(col('literature'))) .withColumn( 'variantFunctionalConsequenceId', so_mapping_expr.getItem(col('associationType')), ) .drop('associationType', 'type') # Select the evidence relevant fields .select( 'datasourceId', 'datatypeId', 'alleleOrigins', 'confidence', 'diseaseFromSource', 'diseaseFromSourceId', 'literature', 'targetFromSource', 'targetFromSourceId', 'variantFunctionalConsequenceId', ) .persist() ) return evidence_df if __name__ == '__main__': parser = argparse.ArgumentParser( description=( 'Parse Orphanet gene-disease annotation downloaded from ' 'http://www.orphadata.org/data/xml/en_product6.xml' ) ) parser.add_argument( '--input_file', help='Xml file containing target/disease associations.', type=str, ) parser.add_argument( '--output_file', help='Absolute path of the gzipped, JSON evidence file.', type=str, ) parser.add_argument( '--logFile', help='Destination of the logs generated by this script.', type=str, required=False, ) parser.add_argument( '--cache_dir', required=False, help='Directory to store the OnToma cache files in.', ) args = parser.parse_args() input_file = args.input_file output_file = args.output_file log_file = args.logFile cache_dir = args.cache_dir # Initialize logging: logging.basicConfig( level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) global spark spark = initialize_sparksession() if log_file: logging.config.fileConfig(filename=log_file) else: logging.StreamHandler(sys.stderr) main(input_file, output_file, cache_dir) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 | import argparse import logging import re import requests from pyspark.sql import functions as f, SparkSession from common.ontology import add_efo_mapping from common.evidence import write_evidence_strings class PanelAppEvidenceGenerator: # Fixes which are applied to original PanelApp phenotype strings *before* splitting them by semicolon. PHENOTYPE_BEFORE_SPLIT_RE = { # Fixes for specific records. r"\(HP:0006574;\);": r"(HP:0006574);", r"Abruzzo-Erickson;syndrome": r"Abruzzo-Erickson syndrome", r"Deafness, autosomal recessive; 12": r"Deafness, autosomal recessive, 12", r"Waardenburg syndrome, type; 3": r"Waardenburg syndrome, type 3", r"Ectrodactyly, ectodermal dysplasia, and cleft lip/palate syndrome; 3": r"Ectrodactyly, ectodermal dysplasia, and cleft lip/palate syndrome, 3", # Remove curly braces. They are *sometimes* (not consistently) used to separate disease name and OMIM code, for # example: "{Bladder cancer, somatic}, 109800", and interfere with regular expressions for extraction. r"[{}]": r"", # Fix cases like "Aarskog-Scott syndrome, 305400Mental retardation, X-linked syndromic 16, 305400", where # several phenotypes are glued to each other due to formatting errors. r"(\d{6})([A-Za-z])": r"$1;$2", # Replace all tab/space sequences with a single space. r"[\t ]+": r" ", # Remove leading and/or trailing spaces around semicolons. r" ?; ?": r";", } # Cleanup regular expressions which are applied to the phenotypes *after* splitting. PHENOTYPE_AFTER_SPLIT_RE = ( r" \(no OMIM number\)", r" \(NO phenotype number in OMIM\)", r"(no|No|NO) OMIM( phenotype|number|entry|NUMBER|NUMBER OR DISEASE)?", r"[( ]*(from )?PMID:? *\d+[ ).]*", ) # Regular expressions for extracting ontology information. LEADING = r"[ ,-]*" SEPARATOR = r"[:_ #]*" TRAILING = r"[:.]*" OMIM_RE = LEADING + r"(OMIM|MIM)?" + SEPARATOR + r"(\d{6})" + TRAILING OTHER_RE = ( LEADING + r"(OrphaNet: ORPHA|Orphanet|ORPHA|HP|MONDO)" + SEPARATOR + r"(\d+)" + TRAILING ) # Regular expression for filtering out bad PMID records. PMID_FILTER_OUT_RE = r"224,614,752,030,146,000,000,000" # Regular expressions for extracting publication information from the API raw strings. PMID_RE = [ ( # Pattern 1, e.g. '15643612' or '28055140, 27333055, 23063529'. r"^" # Start of the string. r"[\d, ]+" # A sequence of digits, commas and spaces. r"(?: |$)" # Ending either with a space, or with the end of the string. ), ( # Pattern 2, e.g. '... observed in the patient. PMID: 1908107 - publication describing function of ...' r"(?:PubMed|PMID)" # PubMed or a PMID prefix. r"[: ]*" # An optional separator (spaces/colons). r"[\d, ]+" # A sequence of digits, commas and spaces. ), ] def __init__(self, debug_output_prefix=None): self.spark = SparkSession.builder.appName("panelapp_parser").getOrCreate() self.debug_output_phenotypes_filename = None self.debug_output_pmids_filename = None self.debug_output_pmids_data = set() if debug_output_prefix: self.debug_output_phenotypes_filename = ( f"{debug_output_prefix}.phenotypes.tsv" ) self.debug_output_pmids_filename = f"{debug_output_prefix}.pmids.tsv" def __del__(self): if self.debug_output_pmids_filename: with open(self.debug_output_pmids_filename, "w") as outfile: outfile.write( "Source PMID string\tExtracted PMIDs\tNumber of extracted PMIDs\n" ) for line in sorted(set(self.debug_output_pmids_data)): outfile.write(line) def generate_panelapp_evidence( self, input_file: str, output_file: str, cache_dir: str ) -> None: logging.info("Filter and extract the necessary columns.") panelapp_df = self.spark.read.csv(input_file, sep=r"\t", header=True) # Panel version can be either a single number (e.g. 1), or two numbers separated by a dot (e.g. 3.14). We cast # either representation to float to ensure correct filtering below. (Note that conversion to float would not # work in the general case, because 3.4 > 3.14, but we only need to compare relative to 1.0.) panelapp_df = panelapp_df.withColumn( "Panel Version", panelapp_df["Panel Version"].cast("float").alias("Panel Version"), ) panelapp_df = ( panelapp_df.filter( ((f.col("List") == "green") | (f.col("List") == "amber")) & (f.col("Panel Version") >= 1.0) & (f.col("Panel Status") == "PUBLIC") ).select( "Symbol", "Panel Id", "Panel Name", "List", "Mode of inheritance", "Phenotypes", ) # The full original records are not redundant; however, uniqueness on a subset of fields is not guaranteed. .distinct() ) logging.info( "Fix typos and formatting errors which would interfere with phenotype splitting." ) panelapp_df = panelapp_df.withColumn("cleanedUpPhenotypes", f.col("Phenotypes")) for regexp, replacement in self.PHENOTYPE_BEFORE_SPLIT_RE.items(): panelapp_df = panelapp_df.withColumn( "cleanedUpPhenotypes", f.regexp_replace(f.col("cleanedUpPhenotypes"), regexp, replacement), ) logging.info("Split and explode the phenotypes.") panelapp_df = panelapp_df.withColumn( "cohortPhenotypes", f.array_distinct(f.split(f.col("cleanedUpPhenotypes"), ";")), ).withColumn("phenotype", f.explode(f.col("cohortPhenotypes"))) logging.info( "Remove specific patterns and phrases which will interfere with ontology extraction and mapping." ) panelapp_df = panelapp_df.withColumn("diseaseFromSource", f.col("phenotype")) for regexp in self.PHENOTYPE_AFTER_SPLIT_RE: panelapp_df = panelapp_df.withColumn( "diseaseFromSource", f.regexp_replace(f.col("diseaseFromSource"), f"({regexp})", ""), ) logging.info( "Extract ontology information, clean up and filter the split phenotypes." ) panelapp_df = ( panelapp_df # Extract Orphanet/MONDO/HP ontology identifiers and remove them from the phenotype string. .withColumn( "ontology_namespace", f.regexp_extract(f.col("diseaseFromSource"), self.OTHER_RE, 1), ) .withColumn( "ontology_namespace", f.regexp_replace( f.col("ontology_namespace"), "OrphaNet: ORPHA", "ORPHA" ), ) .withColumn( "ontology_id", f.regexp_extract(f.col("diseaseFromSource"), self.OTHER_RE, 2), ) .withColumn( "ontology", f.when( (f.col("ontology_namespace") != "") & (f.col("ontology_id") != ""), f.concat( f.col("ontology_namespace"), f.lit(":"), f.col("ontology_id") ), ), ) .withColumn( "diseaseFromSource", f.regexp_replace(f.col("diseaseFromSource"), f"({self.OTHER_RE})", ""), ) # Extract OMIM identifiers and remove them from the phenotype string. .withColumn( "omim_id", f.regexp_extract(f.col("diseaseFromSource"), self.OMIM_RE, 2) ) .withColumn( "omim", f.when( f.col("omim_id") != "", f.concat(f.lit("OMIM:"), f.col("omim_id")) ), ) .withColumn( "diseaseFromSource", f.regexp_replace(f.col("diseaseFromSource"), f"({self.OMIM_RE})", ""), ) # Choose one of the ontology identifiers, keeping OMIM as a priority. .withColumn( "diseaseFromSourceId", f.when(f.col("omim").isNotNull(), f.col("omim")).otherwise( f.col("ontology") ), ) .drop("ontology_namespace", "ontology_id", "ontology", "omim_id", "omim") # Clean up the final split phenotypes. .withColumn( "diseaseFromSource", f.regexp_replace(f.col("diseaseFromSource"), r"\(\)", ""), ) .withColumn("diseaseFromSource", f.trim(f.col("diseaseFromSource"))) .withColumn( "diseaseFromSource", f.when(f.col("diseaseFromSource") != "", f.col("diseaseFromSource")), ) # Remove low quality records, where the name of the phenotype string starts with a question mark. .filter( ~( (f.col("diseaseFromSource").isNotNull()) & (f.col("diseaseFromSource").startswith("?")) ) ) # Remove duplication caused by cases where multiple phenotypes within the same record fail to generate any # phenotype string or ontology identifier. .distinct() # For records where we were unable to determine either a phenotype string nor an ontology identifier, # substitute the panel name instead. .withColumn( "diseaseFromSource", f.when( (f.col("diseaseFromSource").isNull()) & (f.col("diseaseFromSourceId").isNull()), f.col("Panel Name"), ).otherwise(f.col("diseaseFromSource")), ) .persist() ) logging.info("Fetch and join literature references.") all_panel_ids = panelapp_df.select("Panel Id").toPandas()["Panel Id"].unique() literature_references = self.fetch_literature_references(all_panel_ids) panelapp_df = panelapp_df.join( literature_references, on=["Panel Id", "Symbol"], how="left" ) if self.debug_output_phenotypes_filename: logging.info("Output tables for debugging purposes, if requested.") ( panelapp_df.select( "Phenotypes", # Original, unaltered string with all phenotypes. "cleanedUpPhenotypes", # String with phenotypes after pre-split cleanup. "phenotype", # Individual phenotype after splitting. "diseaseFromSource", # Final cleaned up disease name. "diseaseFromSourceId", # Final cleaned up disease ID. ) .distinct() .toPandas() .to_csv(self.debug_output_phenotypes_filename, sep="\t", index=False) ) logging.info( "Drop unnecessary fields and populate the final evidence string structure." ) evidence_df = ( panelapp_df.drop("Phenotypes", "cleanedUpPhenotypes", "phenotype") # allelicRequirements requires a list, but we always only have one value from PanelApp. .withColumn( "allelicRequirements", f.when( f.col("Mode of inheritance").isNotNull(), f.array(f.col("Mode of inheritance")), ), ) .drop("Mode of inheritance") .withColumnRenamed("List", "confidence") .withColumn("datasourceId", f.lit("genomics_england")) .withColumn("datatypeId", f.lit("genetic_literature")) # diseaseFromSourceId populated above # literature populated above .withColumnRenamed("Panel Id", "studyId") .withColumnRenamed("Panel Name", "studyOverview") .withColumnRenamed("Symbol", "targetFromSourceId") # Some residual duplication is caused by slightly different representations from `cohortPhenotypes` being # cleaned up to the same representation in `diseaseFromSource`, for example "Pontocerebellar hypoplasia type # 2D (613811)" and "Pontocerebellar hypoplasia type 2D, 613811". .distinct() ) evidence_df = add_efo_mapping( evidence_strings=evidence_df, spark_instance=self.spark, ontoma_cache_dir=cache_dir, ) logging.info("Disease mappings have been added.") write_evidence_strings(evidence_df, output_file) logging.info( f"{evidence_df.count()} evidence strings have been saved to {output_file}" ) def fetch_literature_references(self, all_panel_ids): """Queries the PanelApp API to extract all literature references for (panel ID, gene symbol) combinations.""" publications = [] # Contains tuples of (panel ID, gene symbol, PubMed ID). for panel_id in all_panel_ids: logging.debug(f"Fetching literature references for panel {panel_id!r}.") url = f"https://panelapp.genomicsengland.co.uk/api/v1/panels/{panel_id}" panel_data = requests.get(url).json() # The source data and the online data might not be in-sync, due to requesting retired panels. # We still keep these entries, but won't try to fetch gene data: if "genes" not in panel_data: logging.warn(f"Panel info could not retrieved for panel id: {panel_id}") continue for gene in panel_data["genes"]: for publication_string in gene["publications"]: publications.extend( [ (panel_id, gene["gene_data"]["gene_symbol"], pubmed_id) for pubmed_id in self.extract_pubmed_ids(publication_string) ] ) # Group by (panel ID, gene symbol) pairs and convert into a PySpark dataframe. return ( self.spark.createDataFrame( publications, schema=("Panel ID", "Symbol", "literature") ) .groupby(["Panel ID", "Symbol"]) .agg(f.collect_set("literature").alias("literature")) ) def extract_pubmed_ids(self, publication_string): """Parses the publication information from the PanelApp API and extracts PubMed IDs.""" publication_string = ( publication_string.encode("ascii", "ignore") .decode() # To get rid of zero width spaces and other special characters. .strip() .replace("\n", "") .replace("\r", "") ) pubmed_ids = [] if not re.match(self.PMID_FILTER_OUT_RE, publication_string): for regexp in self.PMID_RE: # For every known representation pattern... for occurrence in re.findall( regexp, publication_string ): # For every occurrence of this pattern... pubmed_ids.extend( re.findall(r"(\d+)", occurrence) ) # Extract all digit sequences (PubMed IDs). # Filter out: # * 0 as a value, because it is a placeholder for a missing ID; # * PubMed IDs which are too short or too long. pubmed_ids = { pubmed_id for pubmed_id in pubmed_ids if pubmed_id != "0" and len(pubmed_id) <= 8 } if self.debug_output_pmids_filename: # Output the original PMID string, the extracted PMIDs, and the total number of the records extracted. self.debug_output_pmids_data.add( f'{publication_string}\t{" ".join(sorted(pubmed_ids))}\t{len(pubmed_ids)}\n' ) return pubmed_ids if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--input-file", required=True, type=str, help="Input TSV file with the table containing association details.", ) parser.add_argument( "--output-file", required=True, type=str, help="Output JSON file (gzip-compressed) with the evidence strings.", ) parser.add_argument( "--debug-output-prefix", required=False, type=str, help="If specified, a number of files will be created with this prefix to store phenotype/PMID cleanup data " "for debugging purposes.", ) parser.add_argument( "--cache_dir", required=False, help="Directory to store the OnToma cache files in.", ) args = parser.parse_args() PanelAppEvidenceGenerator(args.debug_output_prefix).generate_panelapp_evidence( input_file=args.input_file, output_file=args.output_file, cache_dir=args.cache_dir, ) |
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import sys import argparse import gzip import logging import json from pyspark.sql import SparkSession import pyspark.sql.functions as F class progenyEvidenceGenerator: def __init__(self): # Create spark session self.spark = ( SparkSession.builder .appName('progeny') .getOrCreate() ) # Initialize source table self.dataframe = None def generateEvidenceFromSource(self, inputFile, diseaseMapping, pathwayMapping, skipMapping): ''' Processing of the input file to build all the evidences from its data Returns: evidences (array): Object with all the generated evidences strings from source file ''' # Read input file self.dataframe = ( self.spark.read .option('header', 'true') .option('delimiter', '\t') .option('inferSchema', 'true') .csv(inputFile) ) # Disease mapping step if not skipMapping: try: self.dataframe = self.cancer2EFO(diseaseMapping) logging.info('Disease mappings have been imported.') except Exception as e: logging.error(f'An error occurred while importing disease mappings: \n{e}.') else: logging.info('Disease mapping has been skipped.') self.dataframe = self.dataframe.withColumn('EFO_id', F.lit(None)) self.dataframe = self.pathway2Reactome(pathwayMapping) logging.info('Pathway to reaction ID mappings have been imported.') # Build evidence strings per row logging.info('Generating evidence:') evidences = ( self.dataframe.rdd .map(progenyEvidenceGenerator.parseEvidenceString) .collect() # list of dictionaries ) return evidences def cancer2EFO(self, diseaseMapping): diseaseMappingsFile = ( self.spark .read.csv('resources/cancer2EFO_mappings.tsv', sep=r'\t', header=True) .select('Cancer_type_acronym', 'EFO_id') .withColumnRenamed('Cancer_type_acronym', 'Cancer_type') ) self.dataframe = self.dataframe.join( diseaseMappingsFile, on='Cancer_type', how='left' ) return self.dataframe def pathway2Reactome(self, pathwayMapping): pathwayMappingsFile = ( self.spark .read.csv('resources/pathway2Reactome_mappings.tsv', sep=r'\t', header=True) .withColumnRenamed('pathway', 'Pathway') ) self.dataframe = ( self.dataframe .join(pathwayMappingsFile, on='Pathway', how='inner') .withColumn('target', F.split(F.col('target'), ', ')) .withColumn('target', F.explode('target')) ) return self.dataframe @staticmethod def parseEvidenceString(row): try: evidence = { 'datasourceId': 'progeny', 'datatypeId': 'affected_pathway', 'resourceScore': row['P.Value'], 'targetFromSourceId': row['target'], 'diseaseFromSource': row['Cancer_type'], 'pathways': [ { 'id': row['reactomeId'], 'name': row['description'] } ] } # Skipping unmapped diseases: if row['EFO_id']: evidence['diseaseFromSourceMappedId'] = row['EFO_id'] return evidence except Exception: raise def main(inputFile, diseaseMapping, pathwayMapping, outputFile, skipMapping): # Logging parameters logging.info(f'PROGENy input table: {inputFile}') logging.info(f'Cancer type to EFO ID table: {diseaseMapping}') logging.info(f'Pathway to reaction ID table: {pathwayMapping}') logging.info(f'Output file: {outputFile}') # Initialize evidence builder object evidenceBuilder = progenyEvidenceGenerator() # Writing evidence strings into a json file evidences = evidenceBuilder.generateEvidenceFromSource(inputFile, diseaseMapping, pathwayMapping, skipMapping) with gzip.open(outputFile, 'wt') as f: for evidence in evidences: json.dump(evidence, f) f.write('\n') logging.info(f'{len(evidences)} evidence strings saved into {outputFile}. Exiting.') if __name__ == '__main__': # Initiating parser parser = argparse.ArgumentParser(description='This script generates evidences for the PROGENy data source.') parser.add_argument('-i', '--inputFile', required=True, type=str, help='Input source .tsv file.') parser.add_argument('-d', '--diseaseMapping', required=False, type=str, help='Input look-up table containing the cancer type mappings to an EFO ID.') parser.add_argument('-p', '--pathwayMapping', required=True, type=str, help='Input look-up table containing the pathway mappings to a respective target and ID in Reactome.') parser.add_argument('-o', '--outputFile', required=True, type=str, help='Gzipped JSON file containing the evidence strings.') parser.add_argument('-s', '--skipMapping', required=False, action='store_true', help='State whether to skip the disease to EFO mapping step.') parser.add_argument('-l', '--logFile', help='Destination of the logs generated by this script.', type=str, required=False) # Parsing parameters args = parser.parse_args() inputFile = args.inputFile diseaseMapping = args.diseaseMapping pathwayMapping = args.pathwayMapping outputFile = args.outputFile skipMapping = args.skipMapping # Initialize logging: logging.basicConfig( level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.logFile: logging.config.fileConfig(filename=args.logFile) else: logging.StreamHandler(sys.stderr) main(inputFile, diseaseMapping, pathwayMapping, outputFile, skipMapping) |
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import sys import argparse import gzip import logging import json from pyspark.sql import SparkSession import pyspark.sql.functions as F class SLAPEnrichEvidenceGenerator: def __init__(self): # Create spark session self.spark = ( SparkSession.builder .appName('SLAPEnrich') .getOrCreate() ) logging.info(f'Spark version: {self.spark.version}') # Initialize source table self.dataframe = None def generateEvidenceFromSource(self, inputFile, diseaseMapping, skipMapping): ''' Processing of the input file to build all the evidences from its data Returns: evidences (array): Object with all the generated evidences strings from source file ''' # Read input file self.dataframe = ( self.spark .read.csv(inputFile, sep=r'\t', header=True, inferSchema=True) .select('ctype', 'gene', 'pathway', 'SLAPEnrichPval') .withColumnRenamed('ctype', 'Cancer_type_acronym') .withColumnRenamed('SLAPEnrichPval', 'pval') .withColumn('pathwayId', F.split(F.col('pathway'), ': ').getItem(0)) .withColumn('pathwayDescription', F.split(F.col('pathway'), ': ').getItem(1)) ) # Filter by p-value self.dataframe = self.dataframe.filter(F.col('pval') < 1e-4) # Mapping step if not skipMapping: try: self.dataframe = self.cancer2EFO(diseaseMapping) logging.info('Disease mappings have been imported.') except Exception as e: logging.error(f'An error occurred while importing disease mappings: \n{e}.') else: logging.info('Disease mapping has been skipped.') self.dataframe = self.dataframe.withColumn('EFO_id', F.lit(None)) # Build evidence strings per row logging.info('Generating evidence:') evidences = ( self.dataframe.rdd .map(SLAPEnrichEvidenceGenerator.parseEvidenceString) .collect() ) # list of dictionaries return evidences def cancer2EFO(self, diseaseMapping): diseaseMappingsFile = ( self.spark .read.csv(diseaseMapping, sep=r'\t', header=True) .select('Cancer_type_acronym', 'EFO_id') ) self.dataframe = self.dataframe.join( diseaseMappingsFile, on='Cancer_type_acronym', how='left' ) return self.dataframe @staticmethod def parseEvidenceString(row): try: evidence = { 'datasourceId': 'slapenrich', 'datatypeId': 'affected_pathway', 'resourceScore': row['pval'], 'targetFromSourceId': row['gene'], 'diseaseFromSource': row['Cancer_type_acronym'], 'pathways': [ { 'id': row['pathwayId'], 'name': row['pathwayDescription'] } ] } # For unmapped diseases, we skip the mappedID key: if row['EFO_id']: evidence['diseaseFromSourceMappedId'] = row['EFO_id'] return evidence except Exception as e: raise def main(inputFile, diseaseMapping, outputFile, skipMapping): # Logging parameters logging.info(f'SLAPEnrich input table: {inputFile}') logging.info(f'Cancer type to EFO ID table: {diseaseMapping}') logging.info(f'Output file: {outputFile}') # Initialize evidence builder object evidenceBuilder = SLAPEnrichEvidenceGenerator() # Writing evidence strings into a json file evidences = evidenceBuilder.generateEvidenceFromSource(inputFile, diseaseMapping, skipMapping) with gzip.open(outputFile, 'wt') as f: for evidence in evidences: json.dump(evidence, f) f.write('\n') logging.info(f'{len(evidences)} evidence strings saved into {outputFile}. Exiting.') if __name__ == '__main__': # Initiating parser parser = argparse.ArgumentParser(description='This script generates evidences for the SLAPEnrich data source.') parser.add_argument('-i', '--inputFile', required=True, type=str, help='Input source .tsv file.') parser.add_argument('-d', '--diseaseMapping', required=False, type=str, help='Input look-up table containing the cancer type mappings to an EFO ID.') parser.add_argument('-o', '--outputFile', required=True, type=str, help='Gzipped JSON file containing the evidence strings.') parser.add_argument('-s', '--skipMapping', required=False, action='store_true', help='State whether to skip the disease to EFO mapping step.') parser.add_argument('-l', '--logFile', help='Destination of the logs generated by this script.', type=str, required=False) # Parsing parameters args = parser.parse_args() inputFile = args.inputFile diseaseMapping = args.diseaseMapping outputFile = args.outputFile skipMapping = args.skipMapping # Initialize logging: logging.basicConfig( level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.logFile: logging.config.fileConfig(filename=args.logFile) else: logging.StreamHandler(sys.stderr) main(inputFile, diseaseMapping, outputFile, skipMapping) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | import math import argparse import logging import sys import pandas as pd def renormalize(n, start_range, new_range=[0.5, 1]): """ A function to scale a value from a given range to a new range. Apply the function f(x) to n using and old (start_range) and a new range where f(x) = (dNewRange / dOldRange * (n - old_range_lower_bound)) + new_lower """ delta1 = start_range[1] - start_range[0] delta2 = new_range[1] - new_range[0] max_new_range = max(new_range) min_new_range = min(new_range) if delta1 or delta2: try: normalized = (delta2 * (n - start_range[0]) / delta1) + new_range[0] except ZeroDivisionError: normalized = new_range[0] else: normalized = n # The formula results in values slightly smaller and larger than the boundaries of the new range if normalized > max_new_range: return max_new_range elif normalized < min_new_range: return min_new_range return round(normalized, 4) def generate_score(row): """ Score generation depends on the score type. """ score_type = row['score_type'] score = row['score'] min_score = row['min_score'] max_score = row['max_score'] if score_type == 'p-value': parsed_score = renormalize(math.log10(score), [math.log10(max_score), math.log10(min_score)]) elif score_type == 'rank': parsed_score = renormalize(score, [min_score, max_score]) else: parsed_score = 0.75 return parsed_score def main(studyFile, evidenceFile, out_file): logging.info(f'Output file: {out_file}') # Reading evidence: logging.info(f'Evidence file: {evidenceFile}') evidence_df = pd.read_csv(evidenceFile, sep='\t') logging.info(f'Number of evidence: {len(evidence_df)}') logging.info(f'Number of target: {len(evidence_df.target_id.unique())}') logging.info(f'Number of disease: {len(evidence_df.disease_id.unique())}') # Reading study file: logging.info(f'Study description file: {studyFile}') publication_df = pd.read_csv(studyFile, sep='\t') logging.info(f'Number of studies: {len(publication_df)}') # Merging publication with evidence data: merged = evidence_df.merge(publication_df.drop('pmid', axis=1), on='gene_set_name', how='outer', indicator=True) # Checking if merging worked just fine: if len(merged.loc[merged._merge != 'both']) != 0: logging.warning(f'{len(merged.loc[merged._merge != "both"])} rows could not be joined.') logging.warning(merged.loc[merged._merge != "both"]) # Generate evidence: merged = ( merged .assign( diseaseFromSourceMappedId=merged.disease_id.apply(lambda x: x.split('/')[-1]), datasourceId='sysbio', datatypeId='affected_pathway', literature=merged.pmid.apply(lambda x: [str(x)]), pathways=merged.gene_set_name.apply(lambda x: [{'name': x}]), resourceScore=merged.apply(generate_score, axis=1) ) .rename(columns={ 'target_id': 'targetFromSourceId', 'disease_name': 'diseaseFromSource', 'method': 'studyOverview' }) .drop(['_merge', 'max_score', 'min_score', 'score_type', 'score', 'disease_id', 'pmid', 'gene_set_name'], axis=1) .to_json(out_file, compression='gzip', orient='records', lines=True) ) logging.info('Evidence generation finished.') if __name__ == "__main__": # Parse CLI arguments parser = argparse.ArgumentParser(description='Generating target/disease evidence strings from SystemsBiology data.') parser.add_argument('-e', '--evidenceFile', help='Name of tsv file with the per gene evidence file.', type=str, required=True) parser.add_argument('-s', '--studyFile', help='Name of tsv file with the study descriptions.', type=str, required=True) parser.add_argument('-o', '--outputFile', help='Name of gzip compressed json output file.', type=str, required=True) parser.add_argument('-l', '--logFile', help='Name of log file. If not provided logs are written to standard error.', type=str, required=False) args = parser.parse_args() studyFile = args.studyFile evidenceFile = args.evidenceFile out_file = args.outputFile # If no logfile is specified, logs are written to stderr logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) if args.logFile: logging.config.fileConfig(filename=args.logFile) else: logging.StreamHandler(sys.stderr) main(studyFile, evidenceFile, out_file) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | import argparse from functools import reduce import logging import sys from typing import Optional from pyspark import SparkFiles from pyspark.sql.dataframe import DataFrame import pyspark.sql.functions as F from common.evidence import initialize_sparksession, write_evidence_strings def main( toxcast: str, output: str, adverse_events: str, safety_risk: str, aopwiki: str, log_file: Optional[str] = None, ): """ This module puts together data from different sources that describe target safety liabilities. Args: adverse_events: Input TSV containing adverse events associated with targets that have been collected from relevant publications. Fetched from GitHub. safety_risk: Input TSV containing cardiovascular safety liabilities associated with targets that have been collected from relevant publications. Fetched from GitHub. toxcast: Input table containing biological processes associated with relevant targets that have been observed in toxicity assays. output: Output gzipped json file following the target safety liabilities data model. log_file: Destination of the logs generated by this script. Defaults to None. """ # Logger initializer. If no log_file is specified, logs are written to stderr logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if log_file: logging.config.fileConfig(filename=log_file) else: logging.StreamHandler(sys.stderr) # Initialize spark context global spark spark = initialize_sparksession() spark.sparkContext.addFile(adverse_events) spark.sparkContext.addFile(safety_risk) logging.info("Remote files successfully added to the Spark Context.") # Load and process the input files into dataframes ae_df = process_adverse_events(SparkFiles.get(adverse_events.split("/")[-1])) sr_df = process_safety_risk(SparkFiles.get(safety_risk.split("/")[-1])) toxcast_df = process_toxcast(toxcast) aopwiki_df = process_aop(aopwiki) logging.info("Data has been processed. Merging...") # Combine dfs and group evidence evidence_unique_cols = [ "id", "targetFromSourceId", "event", "eventId", "datasource", "effects", "isHumanApplicable", "literature", "url", ] safety_dfs = [ae_df, sr_df, toxcast_df, aopwiki_df] safety_df = ( reduce( lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), safety_dfs ) # Collect biosample and study metadata by grouping on the unique evidence fields .groupBy(evidence_unique_cols) .agg( F.collect_set(F.col("biosample")).alias("biosamples"), F.collect_set(F.col("study")).alias("studies"), ) .withColumn( "biosamples", F.when(F.size("biosamples") != 0, F.col("biosamples")) ) .withColumn("studies", F.when(F.size("studies") != 0, F.col("studies"))) ) # Write output logging.info("Evidence strings have been processed. Saving...") write_evidence_strings(safety_df, output) logging.info( f"{safety_df.count()} evidence of safety liabilities have been saved to {output}. Exiting." ) return 0 def process_aop(aopwiki: str) -> DataFrame: """Loads and processes the AOPWiki input JSON.""" return ( spark.read.json(aopwiki) # if isHumanApplicable is False, set it to null as the lack of applicability has not been tested - it only shows lack of data .withColumn( "isHumanApplicable", F.when( F.col("isHumanApplicable") != F.lit(True), F.col("isHumanApplicable") ), ) # data bug: some events have the substring "NA" at the start - removal and trim the string .withColumn("event", F.trim(F.regexp_replace(F.col("event"), "^NA", ""))) # data bug: effects.direction need to be in lowercase, this field is an enum .withColumn( "effects", F.transform( F.col("effects"), lambda x: F.struct( F.when( x.direction == "Activation", F.lit("Activation/Increase/Upregulation"), ) .when( x.direction == "Inhibition", F.lit("Inhibition/Decrease/Downregulation"), ) .alias("direction"), x.dosing.alias("dosing"), ), ), ) # I need to convert the biosamples array into a struct so that data is parsed the same way as the rest of the sources .withColumn("biosample", F.explode_outer("biosamples")) ) def process_adverse_events(adverse_events: str) -> DataFrame: """ Loads and processes the adverse events input TSV. Ex. input record: biologicalSystem | gastrointestinal effect | activation_general efoId | EFO_0009836 ensemblId | ENSG00000133019 pmid | 23197038 ref | Bowes et al. (2012) symptom | bronchoconstriction target | CHRM3 uberonCode | UBERON_0005409 url | null Ex. output record: id | ENSG00000133019 event | bronchoconstriction datasource | Bowes et al. (2012) eventId | EFO_0009836 literature | 23197038 url | null biosample | {gastrointestinal, UBERON_0005409, null, null, null} effects | [{Activation/Increase/Upregulation, general}] """ ae_df = ( spark.read.csv(adverse_events, sep="\t", header=True) .select( F.col("ensemblId").alias("id"), F.col("symptom").alias("event"), F.col("efoId").alias("eventId"), F.col("ref").alias("datasource"), F.col("pmid").alias("literature"), "url", F.struct( F.col("biologicalSystem").alias("tissueLabel"), F.col("uberonCode").alias("tissueId"), F.lit(None).alias("cellLabel"), F.lit(None).alias("cellFormat"), F.lit(None).alias("cellId"), ).alias("biosample"), F.split(F.col("effect"), "_").alias("effects"), ) .withColumn( "effects", F.struct( F.when( F.col("effects")[0].contains("activation"), F.lit("Activation/Increase/Upregulation"), ) .when( F.col("effects")[0].contains("inhibition"), F.lit("Inhibition/Decrease/Downregulation"), ) .alias("direction"), F.element_at(F.col("effects"), 2).alias("dosing"), ), ) ) # Multiple dosing effects need to be grouped in the same record. effects_df = ae_df.groupBy("id", "event", "datasource").agg( F.collect_set(F.col("effects")).alias("effects") ) ae_df = ae_df.drop("effects").join( effects_df, on=["id", "event", "datasource"], how="left" ) return ae_df def process_safety_risk(safety_risk: str) -> DataFrame: """ Loads and processes the safety risk information input TSV. Ex. input record: biologicalSystem | cardiovascular sy... ensemblId | ENSG00000132155 event | heart disease eventId | EFO_0003777 liability | Important for the... pmid | 21283106 ref | Force et al. (2011) target | RAF1 uberonId | UBERON_0004535 Ex. output record: id | ENSG00000132155 event | heart disease eventId | EFO_0003777 literature | 21283106 datasource | Force et al. (2011) biosample | {cardiovascular s... study | {Important for th... """ return ( spark.read.csv(safety_risk, sep="\t", header=True) .select( F.col("ensemblId").alias("id"), "event", "eventId", F.col("pmid").alias("literature"), F.col("ref").alias("datasource"), F.struct( F.col("biologicalSystem").alias("tissueLabel"), F.col("uberonId").alias("tissueId"), F.lit(None).alias("cellLabel"), F.lit(None).alias("cellFormat"), F.lit(None).alias("cellId"), ).alias("biosample"), F.struct( F.col("liability").alias("description"), F.lit(None).alias("name"), F.lit(None).alias("type"), ).alias("study"), ) .withColumn( "event", F.when(F.col("datasource").contains("Force"), "heart disease").when( F.col("datasource").contains("Lamore"), "cardiac arrhythmia" ), ) .withColumn( "eventId", F.when(F.col("datasource").contains("Force"), "EFO_0003777").when( F.col("datasource").contains("Lamore"), "EFO_0004269" ), ) ) def process_toxcast(toxcast: str) -> DataFrame: """ Loads and processes the ToxCast input table. Ex. input record: assay_component_endpoint_name | ACEA_ER_80hr assay_component_desc | ACEA_ER_80hr, is ... biological_process_target | cell proliferation tissue | null cell_format | cell line cell_short_name | T47D assay_format_type | cell-based official_symbol | ESR1 eventId | null Ex. output record: targetFromSourceId | ESR1 event | cell proliferation eventId | null biosample | {null, null, T47D... datasource | ToxCast url | https://www.epa.g... study | {ACEA_ER_80hr, AC... """ return spark.read.csv(toxcast, sep="\t", header=True).select( F.trim(F.col("official_symbol")).alias("targetFromSourceId"), F.col("biological_process_target").alias("event"), "eventId", F.struct( F.col("tissue").alias("tissueLabel"), F.lit(None).alias("tissueId"), F.col("cell_short_name").alias("cellLabel"), F.col("cell_format").alias("cellFormat"), F.lit(None).alias("cellId"), ).alias("biosample"), F.lit("ToxCast").alias("datasource"), F.lit( "https://www.epa.gov/chemical-research/exploring-toxcast-data-downloadable-data" ).alias("url"), F.struct( F.col("assay_component_endpoint_name").alias("name"), F.col("assay_component_desc").alias("description"), F.col("assay_format_type").alias("type"), ).alias("study"), ) def get_parser(): """Get parser object for script TargetSafety.py.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--toxcast", help="Input table containing biological processes associated with relevant targets that have been observed in toxicity assays.", type=str, required=True, ) parser.add_argument( "--adverse_events", help="Input TSV containing adverse events associated with targets that have been collected from relevant publications. Fetched from https://raw.githubusercontent.com/opentargets/curation/master/target_safety/adverse_effects.tsv.", type=str, required=True, ) parser.add_argument( "--safety_risk", help="Input TSV containing cardiovascular safety liabilities associated with targets that have been collected from relevant publications. Fetched from https://raw.githubusercontent.com/opentargets/curation/master/target_safety/safety_risks.tsv.", type=str, required=True, ) parser.add_argument( "--aopwiki", help="Input JSON containing targets implicated in adverse outcomes as reported by the AOPWiki. Parsed from their source XML data.", type=str, required=True, ) parser.add_argument( "--output", help="Output gzipped json file following the target safety liabilities data model.", type=str, required=True, ) parser.add_argument( "--log_file", help="Destination of the logs generated by this script. Defaults to None", type=str, nargs="?", default=None, ) return parser if __name__ == "__main__": args = get_parser().parse_args() main( toxcast=args.toxcast, adverse_events=args.adverse_events, safety_risk=args.safety_risk, aopwiki=args.aopwiki, output=args.output, ) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | import argparse import logging import sys from pyspark.sql import functions as f from pyspark import SparkFiles from common.evidence import ( write_evidence_strings, initialize_sparksession, ) # The TEP dataset is made available by the SGC as a tab-separated file: TEPURL = "https://www.thesgc.org/sites/default/files/fileuploads/available-teps.tsv" def main(outputFile: str) -> None: # Initialize spark session spark = initialize_sparksession() spark.sparkContext.addFile(TEPURL) # Fetching and processing the TEP table and saved as a JSON file: TEP_df = ( spark.read.csv(SparkFiles.get(TEPURL.split("/")[-1]), sep="\t", header=True) # Generating TEP url from Gene column: SLC12A4/SLC12A6 -> https://www.thesgc.org/tep/SLC12A4SLC12A6 .withColumn( "url", f.concat( f.lit("https://www.thesgc.org/tep/"), f.regexp_replace(f.lower(f.col("Gene")), "/", ""), ), ) # Exploding TEPs, where multiple genes are given: .withColumn("targetFromSourceId", f.explode(f.split(f.col("Gene"), "/"))) # Renaming columns: .withColumnRenamed("Therapeutic Area", "therapeuticArea") .withColumnRenamed("Description", "description") # Dropping columns: .drop(*["Gene", "version", "Date"]) .persist() ) logging.info("TEP dataset has been downloaded and formatted.") logging.info(f"Number of TEPs: {TEP_df.count()}") logging.info( f'Number of unique genes: {TEP_df.select("targetFromSourceId").distinct().count()}' ) # Saving data: write_evidence_strings(TEP_df, outputFile) logging.info(f"TEP dataset is written to {outputFile}.") if __name__ == "__main__": # Reading output file name from the command line: parser = argparse.ArgumentParser( description="This script fetches TEP data from Structural Genomics Consortium." ) parser.add_argument( "--output_file", "-o", type=str, help="Output file. gzipped JSON", required=True ) parser.add_argument( "--log_file", type=str, help="File into which the logs are saved", required=False, ) args = parser.parse_args() # If no logfile is specified, logs are written to the standard error: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if args.log_file: logging.config.fileConfig(filename=args.log_file) else: logging.StreamHandler(sys.stderr) main(args.output_file) |
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 | import argparse import logging import os import sys from functools import reduce from scipy import stats as st from typing import Dict, List, Any from pyspark.sql import SparkSession, functions as f, types as t from pyspark.sql.dataframe import DataFrame from pyspark.sql.window import Window from common.evidence import ( initialize_sparksession, write_evidence_strings, GenerateDiseaseCellLines, read_ppp_config, ) class EncoreEvidenceGenerator: """This parser generates disease/target evidence based on datafiles submitted by the Encore team. Input for the parser: - JSON configuration describing the experiments. For each experiment the following input files are expected: - lfc_file: log fold change values for every gene pairs, for every experiment. - bliss_file: the analysed cooperative effect for each gene pairs calculated by the BLISS analysis. - gemini_file: the analysed cooperative effect for each gene pairs calculated by the Gemini analysis. The parser joins the lfc data with the cooperativity measurements by gene pairs and cell line. Apply filter on gene pairs: - only extract gene pairs with significant log fold change. - only extract gene pairs with significant cooperative effect. """ def __init__( self, spark: SparkSession, cell_passport_df: DataFrame, shared_parameters: dict ) -> None: self.spark = spark self.cell_passport_df = cell_passport_df # Parsing paramters: self.log_fold_change_cutoff_p_val = shared_parameters["logFoldChangeCutoffPVal"] self.log_fold_change_cutoff_FDR = shared_parameters["logFoldChangeCutoffFDR"] self.interaction_cutoff_p_val = shared_parameters["blissCutoffPVal"] self.data_folder = shared_parameters["data_folder"] self.release_version = shared_parameters["releaseVersion"] self.release_date = shared_parameters["releaseDate"] @staticmethod @f.udf( t.ArrayType( t.StructType( [ t.StructField("targetFromSourceId", t.StringType(), False), t.StructField("targetRole", t.StringType(), False), t.StructField( "interactingTargetFromSourceId", t.StringType(), False ), t.StructField("interactingTargetRole", t.StringType(), False), ] ) ) ) def parse_targets(gene_pair: str, gene_role: str) -> List[Dict[Any, Any]]: """The gene pair string is split and assigned to the relevant role + exploding into evidence of two targets. gene pair: 'SHC1~ADAD1', where 'SHC1' is the library gene, and 'ADAD1' is the anchor gene. Both genes will be targetFromSource AND interactingTargetFromSource, while keeping their roles. """ genes = gene_pair.split("~") roles = [gene_role.replace("Combinations", "").lower(), "anchor"] assert len(genes) == 2 parsed = [] for i, (gene, role) in enumerate(zip(genes, roles)): parsed.append( { "targetFromSourceId": gene, "targetRole": role, "interactingTargetFromSourceId": genes[1] if i == 0 else genes[0], "interactingTargetRole": roles[1] if i == 0 else roles[0], } ) return parsed def get_lfc_data(self, lfc_file: str) -> DataFrame: """ The table has results from CRISPR/Cas9 experiments: the p-value, false discovery rate (fdr) and the log fold-change file describes the observed survival of the cells. Each row is a gene pair, columns contain results from the measurements. Column names contain information on the tested cell line (SIDM) and ocassionally the replicate (CSID) identifiers. This really wide table is stacked to get only three numeric columns (p-value, fold-change and FDR), plus string column with cell line. Args: lfc_file (str): str Returns: A dataframe with the following columns: id cellLineName Note1 Note2 phenotypicConsequenceLogFoldChange phenotypicConsequencePValue phenotypicConsequenceFDR """ # Fixed statistical field names: stats_fields = ["p-value", "fdr", "lfc"] # Reading the data into a single dataframe: lfc_df = self.spark.read.csv(lfc_file, sep="\t", header=True) # Collect the cell lines from the lfc file header: cell_lines = set(["_".join(x.split("_")[:-1]) for x in lfc_df.columns[4:]]) # Generating struct for each cell lines: # SIDM are Sanger model identifiers, while CSID are replicate identifiers. # SIDM00049_CSID1053_p-value # SIDM00049_CSID1053_fdr # SIDM00049_CSID1053_lfc # Into: SIDM00049_CSID1053: struct(p-value, fdr, lfc) expressions = map( lambda cell: ( cell, f.struct( [ f.col(f"{cell}_{x}").cast(t.FloatType()).alias(x) for x in stats_fields ] ), ), cell_lines, ) # Applying map on the dataframe: res_df = reduce(lambda DF, value: DF.withColumn(*value), expressions, lfc_df) # Stack the previously generated columns: unpivot_expression = f"""stack({len(cell_lines)}, {", ".join([f"'{x}', {x}" for x in cell_lines])} ) as (cellLineName, cellLineData)""" return ( res_df # Unpivot: .select("id", "Note1", "Note2", f.expr(unpivot_expression)) # Extracting and renaming log-fold-change parameters: .select( "id", f.split(f.col("cellLineName"), "_").getItem(0).alias("cellLineName"), "Note1", "Note2", f.col("cellLineData.lfc").alias("phenotypicConsequenceLogFoldChange"), f.col("cellLineData.p-value").alias("phenotypicConsequencePValue"), f.col("cellLineData.fdr").alias("phenotypicConsequenceFDR"), ) ) def get_bliss_data(self, bliss_file: str) -> DataFrame: """ It reads a bliss file, extracts the cell lines from the header, generates a struct for each cell line, unpivots the data, and finally extracts the bliss score and p-value Args: bliss_file (str): The path to the bliss file. Returns: A dataframe with the following columns: - id - cellLineName - geneticInteractionScore - geneticInteractionPValue - statisticalMethod """ stats_fields = [ "gene1", "gene2", "observed", "observed_expected", "pval", "zscore", ] # Read bliss file: bliss_df = self.spark.read.csv(bliss_file, sep="\t", header=True) # Collect cell-line/recplicate pairs from the headers: cell_lines = set( [ "_".join(x.split("_")[0:2]) for x in bliss_df.columns[4:] if x.startswith("SID") ] ) # Generating struct for each cell lines: expressions = map( lambda cell: ( cell, f.struct([f.col(f"{cell}_{x}").alias(x) for x in stats_fields]), ), cell_lines, ) # Applying map on the dataframe: res_df = reduce(lambda DF, value: DF.withColumn(*value), expressions, bliss_df) # Stack the previously generated columns: unpivot_expression = f"""stack({len(cell_lines)}, {", ".join([f"'{x}', {x}" for x in cell_lines])} ) as (cellLineName, cellLineData)""" # Windowing over the cell-line/gene-pair pairs: w = Window.partitionBy(["id", "cellLineName"]).orderBy( f.asc("geneticInteractionPValue") ) return ( res_df.select( # Create a consistent id column: f.regexp_replace(f.col("Gene_Pair"), ";", "~").alias("id"), # Unpivot: f.expr(unpivot_expression), ) # Extracting and renaming bliss statistical values: .select( "id", f.split(f.col("cellLineName"), "_")[0].alias("cellLineName"), f.split(f.col("cellLineName"), "_")[1].alias("experimentId"), f.col("cellLineData.zscore").cast(t.FloatType()).alias("zscore"), ) .filter(f.col("zscore") != "NaN") # BLISS data is not aggregated for cell lines, instead we have data for each replicates. # To allow averaging, data needs to be grouped by gene pair and cell line: .groupby("id", "cellLineName") # Averaging z-scores: .agg( f.sum(f.col("zscore")).alias("sumzscore"), f.count(f.col("zscore")).alias("experiment_count"), ) .withColumn( "geneticInteractionScore", f.col("sumzscore") / f.sqrt(f.col("experiment_count")), ) # Calculating p-value for the averaged z-score: .withColumn( "geneticInteractionPValue", f.udf( lambda zscore: float(st.norm.sf(abs(zscore)) * 2) if zscore else None, t.FloatType(), )(f.col("geneticInteractionScore")), ) .select( "id", "cellLineName", f.lit("bliss").alias("statisticalMethod"), "geneticInteractionPValue", "geneticInteractionScore", # Based on the z-score we can tell the nature of the interaction: # - positive z-score means synergictic # - negative z-score means antagonistics interaction f.when(f.col("geneticInteractionScore") > 0, "synergistic") .otherwise("antagonistic") .alias("geneInteractionType"), ) ) def get_gemini_data(self, gemini_file: str) -> DataFrame: """Parsing cooperative effects calculated by gemini method The structure of the file similar to the lfc file, the process follows a similar fashion, except it slightly different. Different enough to get a separate function. """ # Fixed statistical field names: stats_fields = ["score", "pval", "FDR"] # Reading the data into a single dataframe: gemini_df = self.spark.read.csv(gemini_file, sep="\t", header=True) # Collect the cell lines from the lfc file header: cell_lines = set( [ "_".join(x.split("_")[:-1]) for x in gemini_df.columns[4:] if x.startswith("SID") ] ) # There are some problems in joining gemini files on Encore side. It causes a serious issues: # 1. Multiple Gene_Pair columns in the file -> these will be indexed in the pyspark dataframe # 2. Some columns for some cell lines will be missing eg. pvalue for SIDM00049_CSID1053 # # To mitigate these issue we have to check for gene pair header and remove cell lines with incomplete data. if "Gene_Pair" in gemini_df.columns: gene_column = "Gene_Pair" elif "Gene_Pair0" in gemini_df.columns: gene_column = "Gene_Pair0" else: raise ValueError( f'No \'Gene_Pair\' column in Gemini data: {",".join(gemini_df.columns)}' ) # We check if all stats columns available for all cell lines (this coming from a data joing bug at encore): missing_columns = [ f"{cell}_{stat}" for cell in cell_lines for stat in stats_fields if f"{cell}_{stat}" not in gemini_df.columns ] cells_to_drop = set(["_".join(x.split("_")[:-1]) for x in missing_columns]) # If there are missingness, the relevant cell lines needs to be removed from the analysis: if missing_columns: logging.warn(f'Missing columns: {", ".join(missing_columns)}') logging.warn(f'Dropping cell_lines: {", ".join(cells_to_drop)}') # Removing missing cell lines: cell_lines = [x for x in cell_lines if x not in cells_to_drop] # Generating struct for each cell lines: expressions = map( lambda cell: ( cell, f.struct([f.col(f"{cell}_{x}").alias(x) for x in stats_fields]), ), cell_lines, ) # Applying map on the dataframe: res_df = reduce(lambda DF, value: DF.withColumn(*value), expressions, gemini_df) # Stack the previously generated columns: unpivot_expression = f"""stack({len(cell_lines)}, {", ".join([f"'{x}', {x}" for x in cell_lines])} ) as (cellLineName, cellLineData)""" return ( res_df # Create a consistent id column: .withColumn("id", f.regexp_replace(f.col(gene_column), ";", "~")) # Unpivot: .select("id", f.expr(unpivot_expression)) # Extracting and renaming gemini statistical values: .select( "id", f.regexp_replace("cellLineName", "_strong", "").alias("cellLineName"), f.col("cellLineData.score") .cast(t.FloatType()) .alias("geneticInteractionScore"), f.col("cellLineData.pval") .cast(t.FloatType()) .alias("geneticInteractionPValue"), f.col("cellLineData.FDR") .cast(t.FloatType()) .alias("geneticInteractionFDR"), f.lit("gemini").alias("statisticalMethod"), ) ) def parse_experiment(self, parameters: dict) -> DataFrame: """Parsing experiments based on the experimental descriptions. Args: parameters: Dictionary of experimental parameters. The following keys are required: - dataset: Name of the dataset eg. COLO1- referring to the first libraryset of colo. - diseaseFromSource: Name of the disease model of the experiment. - diseaseFromSourceMappedId: EFO ID of the disease model of the experiment. - logFoldChangeFile: File path to the log fold change file. - geminiFile: File path to the gemini file. - blissFile: File path to the bliss file. Returns: A pyspark dataframe of experiment data. Process: - Reading all files. - Joining files. - Applying filters. - Apply filter on lf change - Apply filter on cooperativity - Apply filter on target role (keeping only library genes, controls are dropped) - Adding additional columns + finalizing evidence model """ disease_from_source = parameters["diseaseFromSource"] disease_from_source_mapped_id = parameters["diseaseFromSourceMappedId"] dataset = parameters["dataset"] log_fold_change_file = parameters["logFoldChangeFile"] # gemini_file = parameters["geminiFile"] # <= Removed as genini method is dropped for now. blissFile = parameters["blissFile"] # Testing if experiment needs to be skipped: if parameters["skipStudy"] is True: logging.info(f"Skipping study: {dataset}") return None logging.info(f"Parsing experiment: {dataset}") # if no log fold change file is provided, we will not generate any evidence. if log_fold_change_file is None: logging.warning(f"No log fold change file provided for {dataset}.") return None # Reading lfc data: lfc_file = f"{self.data_folder}/{log_fold_change_file}" lfc_df = self.get_lfc_data(lfc_file) logging.info( f'Number of gene pairs in the log(fold change) dataset: {lfc_df.select("id").distinct().count()}' ) logging.info( f'Number cell lines in the log(fold change) dataset: {lfc_df.select("cellLineName").distinct().count()}' ) # A decistion was made to exclude gemini asessments, for now. # # Reading gemini data: # gemini_file = f"{self.data_folder}/{gemini_file}" # gemini_df = self.get_gemini_data(gemini_file) # logging.info( # f'Number of gene pairs in the gemini dataset: {gemini_df.select("id").distinct().count()}' # ) # logging.info( # f'Number cell lines in the gemini dataset: {gemini_df.select("cellLineName").distinct().count()}' # ) bliss_file = f"{self.data_folder}/{blissFile}" bliss_df = self.get_bliss_data(bliss_file) # Merging lfc + gemini: merged_dataset = ( lfc_df # Data is joined by the gene-pair and cell line: .join(bliss_df, how="inner", on=["id", "cellLineName"]) # Applying filters on logFoldChange + interaction p-value thresholds: .filter( ( f.col("phenotypicConsequencePValue") <= self.log_fold_change_cutoff_p_val ) & (f.col("phenotypicConsequenceFDR") <= self.log_fold_change_cutoff_FDR) & (f.col("geneticInteractionPValue") <= self.interaction_cutoff_p_val) ) # Cleaning the cell line annotation: .withColumn("cellId", f.split(f.col("cellLineName"), "_").getItem(0)) # Joining with cell passport data containing diseaseCellLine and biomarkers info: .join( self.cell_passport_df.select( f.col("id").alias("cellId"), "diseaseCellLine", "biomarkerList" ), on="cellId", how="left", ) ) logging.info( f'Number of gene pairs in the merged dataset: {merged_dataset.select("id").count()}' ) logging.info( f'Number of cell lines in the merged dataset: {merged_dataset.select("cellLineName").distinct().count()}' ) evidence_df = ( merged_dataset # Parsing and exploding gene names and target roles: .withColumn("id", self.parse_targets(f.col("id"), f.col("Note1"))) .select("*", f.explode(f.col("id")).alias("genes")) .select( # Adding target releated fields: f.col("genes.*"), # Adding cell line specific annotation: f.array(f.col("diseaseCellLine")).alias("diseaseCellLines"), "biomarkerList", # Adding evidence specific stats: "phenotypicConsequenceLogFoldChange", "phenotypicConsequencePValue", "phenotypicConsequenceFDR", "statisticalMethod", "geneticInteractionPValue", "geneticInteractionScore", "geneInteractionType", # Adding disease information: f.lit(disease_from_source_mapped_id).alias("diseaseFromSourceMappedId"), f.lit(disease_from_source).alias("diseaseFromSource"), # Adding release releated fields: f.lit(self.release_version).alias("releaseVersion"), f.lit(self.release_date).alias("releaseDate"), # Data source specific fields: f.lit("ot_partner").alias("datatypeId"), f.lit("encore").alias("datasourceId"), f.lit("OTAR2062").alias("projectId"), f.lit("Encore project").alias("projectDescription"), ) # Dropping all evidence for anchor and control genes: .filter(f.col("targetRole") == "library") .distinct() ) # If there's a warning message for the sudy, add that: if "warningMessage" in parameters and parameters["warningMessage"] is not None: evidence_df = evidence_df.withColumn( "warningMessage", f.lit(parameters["warningMessage"]) ) return evidence_df def main(outputFile: str, parameters: dict, cellPassportFile: str) -> None: # Initialize spark session spark = initialize_sparksession() # Opening and parsing the cell passport data from Sanger: cell_passport_df = GenerateDiseaseCellLines(cellPassportFile, spark).get_mapping() logging.info(f"Cell passport dataframe has {cell_passport_df.count()} rows.") logging.info("Parsing experiment data...") # Initialising evidence generator: evidenceGenerator = EncoreEvidenceGenerator( spark, cell_passport_df, parameters["sharedMetadata"] ) # Create evidence for all experiments. Dataframes are collected in a list: evidence_dfs = [] for experiment in parameters["experiments"]: evidence_dfs.append(evidenceGenerator.parse_experiment(experiment)) # Filter out None values, so only dataframes with evidence are kept: evidence_dfs = list(filter(None, evidence_dfs)) # combine all evidence dataframes into one: combined_evidence_df = reduce(lambda df1, df2: df1.union(df2), evidence_dfs) # The combined evidence is written to a file: write_evidence_strings(combined_evidence_df, outputFile) if __name__ == "__main__": # Reading output file name from the command line: parser = argparse.ArgumentParser( description="This script parses ENCORE data and generates disease-target evidence." ) parser.add_argument( "--output_file", "-o", type=str, help="Output file. gzipped JSON", required=True ) parser.add_argument( "--parameter_file", "-p", type=str, help="A JSON file describing exeriment metadata", required=True, ) parser.add_argument( "--data_folder", "-d", type=str, help="A folder in which the data files are located", required=True, ) parser.add_argument( "--log_file", type=str, help="File into which the logs are saved", required=False, ) parser.add_argument( "--cell_passport_file", "-c", type=str, help="File containing cell passport data", required=True, ) args = parser.parse_args() # If no logfile is specified, logs are written to the standard error: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if args.log_file: logging.config.fileConfig(filename=args.log_file) else: logging.StreamHandler(sys.stderr) # Validating data folder: if not os.path.isdir(args.data_folder): logging.error(f"Data folder {args.data_folder} does not exist.") raise ValueError(f"Data folder {args.data_folder} does not exist.") # Reading parameter json: parameters = read_ppp_config(args.parameter_file) # Updating parameters: parameters["sharedMetadata"]["data_folder"] = args.data_folder # Passing all the required arguments: main(args.output_file, parameters, args.cell_passport_file) |
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 | import argparse from datetime import datetime import gzip import json import logging import sys import pandas as pd # The statisticalTestTail is inferred by the column name which is being filtered on: FILTER_COLUMN_MAP = { "pos|fdr": "upper tail", "neg|fdr": "lower tail", "neg|p-value": "lower tail", "pos|p-value": "upper tail", } class OTAR_CRISPR_study_parser(object): def __init__(self, study_url: str) -> None: self.study_file = study_url # Store the study dataframe after dropping problematic studies: study_df = ( pd.read_json(study_url) # drop rows with no study id or data file: .loc[lambda df: df.studyId.notna() & df.dataFiles.notna()] ) # Test and warn if multiple studies have the same study id: duplicated_study_ids = study_df.loc[ lambda df: df.studyId.duplicated() ].studyId.tolist() assert ( len(duplicated_study_ids) == 0 ), f'Multiple studies have the same study id: {", ".join(duplicated_study_ids)}' # Splitting studies if study_df = ( study_df # Exploding filter columns: .assign(filterColumn=lambda df: df.filterColumn.str.split(",")).explode( "filterColumn" ) ) logging.info(f"Number of studies processed: {len(study_df.studyId.unique())}") projects = study_df.projectId.unique() logging.info(f'Number of projects: {len(projects)} ({", ".join(projects)})') self.study_df = study_df def generate_evidence(self, data_folder: str) -> None: # Looping through the studies and generating evidence: # Reading all data files and filter for significant hits: study_columns = [ "releaseDate", "releaseVersion", "studyId", "dataFiles", "dataFileType", "filterColumn", "threshold", "projectId", "ControlDataset", "projectDescription", ] # hits is a pd.Series with pd.DataFrames as values. hits = ( self.study_df[study_columns] .explode("dataFiles") .assign( dataFile=lambda df: df.apply( lambda x: f'{data_folder}/{x["projectId"]}/{x["dataFiles"]}', axis=1 ) ) .assign( ControlDataset=lambda df: df.apply( lambda x: f'{data_folder}/{x["projectId"]}/{x["ControlDataset"]}' if pd.notna(x["ControlDataset"]) else None, axis=1, ) ) # TODO: parsing the data files should be file type dependent! # The following apply returns pd.DataFrames: .apply(self.parse_MAGeCK_file, axis=1) ) # Concatenate all hits into one single dataframe: hits_df = ( pd.concat(hits.to_list()).reset_index(drop=True) # Cleaning gene id column: .assign( targetFromSourceId=lambda df: df.targetFromSourceId.apply( self.cleaning_gene_id ) ) ) # Merging: evidence_fields = [ "targetFromSourceId", "diseaseFromSourceMappedId", "projectDescription", "projectId", "studyId", "studyOverview", "contrast", "crisprScreenLibrary", "cellType", "cellLineBackground", "geneticBackground", "statisticalTestTail", "resourceScore", "log2FoldChangeValue", "releaseDate", "releaseVersion", ] self.merged_dataset = ( self.study_df.assign( statisticalTestTail=lambda df: df.filterColumn.map(FILTER_COLUMN_MAP) ) .merge(hits_df, on=["studyId", "filterColumn"], how="inner") .explode("diseaseFromSourceMappedId") .filter(items=evidence_fields) .assign(datasourceId="ot_crispr", datatypeId="ot_partner") ) @staticmethod def cleaning_gene_id(gene_id: str) -> str: """Expandable set of string processing steps to clean gene identifiers. Examples: >>> cleaning_gene_id("ENSG00000187123_LYPD6") >>> "ENSG00000187123" """ # ENSG00000187123_LYPD6 -> ENSG00000187123 gene_id = gene_id.split("_")[0] return gene_id @staticmethod def parse_MAGeCK_file(row: pd.Series) -> pd.DataFrame: """This function returns a pandas dataframe with the datafile and with properly named columns""" datafile = row["dataFile"] filterColumn = row["filterColumn"] threshold = float(row["threshold"]) studyId = row["studyId"] controlDataFile = row["ControlDataset"] # Which end of the distribution are we looking? - "neg" or "pos"? side = filterColumn.split("|")[0] # Read data, filter and rename columns: mageck_df = ( pd.read_csv(datafile, sep="\t") .rename( columns={ filterColumn: "resourceScore", "id": "targetFromSourceId", # Extracting log fold change for the relevant direction: f"{side}|lfc": "log2FoldChangeValue", } ) .loc[lambda df: df.resourceScore <= threshold][ ["targetFromSourceId", "resourceScore", "log2FoldChangeValue"] ] .assign(studyId=studyId, filterColumn=filterColumn) ) # Applying control if present: if pd.isna(controlDataFile): logging.info(f"Number of genes reach threshold: {len(mageck_df)}") return mageck_df # Read control data, filter and rename columns: logging.info(f"Reading control data file: {controlDataFile}") controlHits = ( pd.read_csv(controlDataFile, sep="\t") .rename(columns={filterColumn: "resourceScore", "id": "targetFromSourceId"}) .loc[lambda df: df.resourceScore <= threshold]["targetFromSourceId"] .tolist() ) # Excluding control genes: mageck_df = mageck_df.loc[lambda df: df.targetFromSourceId.isin(controlHits)] logging.info(f"Number of genes reach threshold: {len(mageck_df)}") return mageck_df def write_evidence(self, output_file: str) -> None: """Write the merged evidence to file""" json_list = [ json.dumps(row.dropna().to_dict()) for _, row in self.merged_dataset.iterrows() ] with gzip.open(output_file, "wt") as f: f.write("\n".join(json_list) + "\n") def main(study_table, output_file, data_folder) -> None: # Parsing study table: parser = OTAR_CRISPR_study_parser(study_table) # Read data: parser.generate_evidence(data_folder) # Save data: parser.write_evidence(output_file) if __name__ == "__main__": # Parsing arguments: parser = argparse.ArgumentParser( description="Script to parse internally generated datasets for OTAR projects." ) parser.add_argument( "-s", "--study_table", type=str, required=True, help="A JSON file with study level metadata.", ) parser.add_argument( "-o", "--output", required=False, type=str, help="Output json.gz file with the evidece.", ) parser.add_argument( "-l", "--log_file", required=False, type=str, help="Logs are saved into this file.", ) parser.add_argument( "-d", "--data_folder", required=True, type=str, help="Folder with the data files.", ) args = parser.parse_args() study_table = args.study_table log_file = args.log_file data_folder = args.data_folder if not args.output: output_file = datetime.today().strftime("otar_crispr-%Y-%m-%d.json.gz") else: output_file = args.output # Configure logger: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if log_file: logging.config.fileConfig(filename=log_file) else: logging.StreamHandler(sys.stderr) # Report input data: logging.info(f"Study information is read from: {study_table}") logging.info(f"Evidence saved to: {output_file}") logging.info(f"Data files read from: {data_folder}") # Process data: main(study_table, output_file, data_folder) |
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 | import argparse import logging import sys from functools import reduce from pyspark.sql import SparkSession import pyspark.sql.functions as f import pyspark.sql.types as t from pyspark.sql.dataframe import DataFrame from common.evidence import ( write_evidence_strings, initialize_sparksession, read_ppp_config, ) # Datasource-wide constants: VALIDATION_LAB_DATASOURCE_ID = "ot_crispr_validation" VALIDATION_LAB_DATATYPE_ID = "ot_validation_lab" # This is a map that provides recipie to generate the biomarker objects # If a value cannot be found in the map, the value will be returned. BIOMARKERMAPS = { "PAN": { "direct_mapping": { "CO": {"name": "PAN-CO", "description": "Pan-colorectal carcinoma"} } }, "MS_status": { "direct_mapping": { "MSI": {"name": "MSI", "description": "Microsatellite instable"}, "MSS": {"name": "MSS", "description": "Microsatellite stable"}, } }, "CRIS_subtype": { "direct_mapping": { "A": { "name": "CRIS-A", "description": "mucinous, glycolytic, enriched for microsatellite instability or KRAS mutations.", }, "B": { "name": "CRIS-B", "description": "TGF-β pathway activity, epithelial-mesenchymal transition, poor prognosis.", }, "C": { "name": "CRIS-C", "description": "elevated EGFR signalling, sensitivity to EGFR inhibitors.", }, "D": { "name": "CRIS-D", "description": "WNT activation, IGF2 gene overexpression and amplification.", }, "E": { "name": "CRIS-E", "description": "Paneth cell-like phenotype, TP53 mutations.", }, "?": {"name": "CRIS-?", "description": "CRIS subtype undetermined."}, } }, "KRAS_status": { "description": "KRAS mutation status: ", "name": "KRAS-", }, "TP53_status": { "description": "TP53 mutation status: ", "name": "TP53-", }, "APC_status": { "description": "APC mutation status: ", "name": "APC-", }, "BRAF_status": {"description": "BRAF mutation status: ", "name": "BRAF-"}, } class ParseHypotheses: def __init__(self, spark) -> None: self.spark = spark def parse_hypotheses(self, expectedFile: str, observedFile: str) -> DataFrame: """ Hypothesis is parsed from two files describing the expected and observed results. This function reads the files, compare them, parses the hypothesis as biomarker? + status Args: expectedFile: file with the expected results observedFile: file with the observed results Returns: DataFrame with the following schema: |-- gene: string (nullable = true) |-- hypotheses: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- name: string (nullable = true) | | |-- description: string (nullable = true) | | |-- status: string (nullable = false) """ # The observed and expected results follows the same schema and parsed the same way: expected_df = self.read_hypothesis_data(expectedFile, "expected") observed_df = self.read_hypothesis_data(observedFile, "observed") return ( expected_df # Joining expected vs observed hypothesis tables: .join(observed_df, on=["gene", "hypothesis"], how="inner") # Filter hypotheses where at least one was True: # .filter(col('expected') | col('observed')) # From the hypothesis column eg. CRIS_subtype-B ectract the type CRIS_subtype and the call: B .withColumn( "hypothesis_type", f.element_at(f.split(f.col("hypothesis"), "-"), 1) ) .withColumn( "hypothesis_call", f.element_at(f.split(f.col("hypothesis"), "-"), 2) ) # Using the biomarker parser generate an struct similar to the biomarker object: .withColumn( "hypothesis", get_biomarker(f.col("hypothesis_type"), f.col("hypothesis_call")), ) # Besides the annotation we add the status of the hypothesis: .withColumn( "status", f.when(f.col("expected") & f.col("observed"), "observed and expected") .when(f.col("expected"), "expected but not observed") .when(f.col("observed"), "observed but not expected") .otherwise("not expected and not observed"), ) .withColumn("hypothesis", f.struct("hypothesis.*", "status")) # Collect all hypotheses for each gene: .groupBy("gene") .agg(f.collect_set("hypothesis").alias("validationHypotheses")) .persist() ) def read_hypothesis_data(self, file: str, call: str) -> DataFrame: """Parsing the hypothesis file. Args: file: hypothesis file tsv with genes in rows and biomarkers in columns, the hypotheses are boolean. Returns: DataFrame with the following columns: gene, hypothesis, call (true/false) """ hypothesis_df = ( self.spark.read.csv(file, sep="\t", header=True).withColumnRenamed( "Gene", "gene" ) # The gene names are manually typed, there are lower and upper case names: .withColumn("gene", f.upper(f.col("gene"))) ) # The first column is the gene name, the rest are the hypotheses: hypothesis_columns = hypothesis_df.columns[1:] unpivot_expression = f"""stack({len(hypothesis_columns)}, {", ".join([f"'{x}', `{x}`" for x in hypothesis_columns])} ) as (hypothesis, {call})""" return ( hypothesis_df.select("Gene", f.expr(unpivot_expression)) .withColumn(call, f.col(call).cast(t.BooleanType())) .persist() ) @f.udf( t.StructType( [ t.StructField("name", t.StringType(), False), t.StructField("description", t.StringType(), False), ] ) ) def get_biomarker(column_name, biomarker): """This function returns with a struct with the biomarker name and description""" # If the biomarker has a direct mapping: if "direct_mapping" in BIOMARKERMAPS[column_name]: try: return BIOMARKERMAPS[column_name]["direct_mapping"][biomarker] except KeyError: logging.warning( f"Could not find direct mapping for {column_name}:{biomarker}" ) return None # If the value needs to be parsed: if biomarker == "wt": return { "name": BIOMARKERMAPS[column_name]["name"] + biomarker, "description": BIOMARKERMAPS[column_name]["description"] + "wild type", } elif biomarker == "mut": return { "name": BIOMARKERMAPS[column_name]["name"] + biomarker, "description": BIOMARKERMAPS[column_name]["description"] + "mutant", } else: logging.warning(f"Could not find direct mapping for {column_name}: {biomarker}") return None def get_cell_passport_data(spark: SparkSession, cell_passport_file: str) -> DataFrame: # loading cell line annotation data from Sanger: return ( spark.read.option("multiline", True) .csv(cell_passport_file, header=True, sep=",", quote='"') .select( f.regexp_replace(f.col("model_name"), r"-", "").alias("cellName"), f.col("model_id").alias("cellId"), f.col("tissue"), ) .persist() ) def parse_experiment( spark: SparkSession, parameters: dict, cellPassportDf: DataFrame, data_folder: str ) -> DataFrame: """ Parse experiment data from a file. Args: spark: Spark session. parameters: Dictionary of experimental parameters. cellPassportDf: Dataframe of cell passport data. data_folder: Location of the input data files. Returns: A dataframe of experiment data. """ # Extracting parameters: experiment_file = f"{data_folder}/{parameters['experimentData']}" contrast = parameters["contrast"] studyOverview = parameters["studyOverview"] projectId = parameters["projectId"] projectDescription = parameters["projectDescription"] diseaseFromSource = parameters["diseaseFromSource"] diseaseFromSourceMapId = parameters["diseaseFromSourceMappedId"] confidenceCutoff = parameters["confidenceCutoff"] cell_line_file = f"{data_folder}/{parameters['cellLineFile']}" tissue_id = parameters["tissueId"] # The hypothesis is defined by two datasets: hypothesis_expected_file = f"{data_folder}/{parameters['hypothesisFileExpected']}" hypothesis_observed_file = f"{data_folder}/{parameters['hypothesisFileObserved']}" # Reading cell metadata from validation lab: validation_lab_cell_lines = ( spark.read.csv(cell_line_file, sep="\t", header=True) # Renaming columns: .withColumnRenamed("cell_line", "cellName") .drop("tissue") # Joining dataset with cell model data read downloaded from Sanger website: .join(cellPassportDf, on="cellName", how="left") # Adding UBERON code to tissues (it's constant colon) .withColumn("tissueID", f.lit(tissue_id)) # generating disease cell lines object: .withColumn( "diseaseCellLines", f.array( f.struct( f.col("cellName").alias("name"), f.col("cellId").alias("id"), f.col("tissue"), f.lit(tissue_id).alias("tissueId"), ) ), ) .persist() ) logging.info( f"Validation lab cell lines has {validation_lab_cell_lines.count()} cell types." ) # Defining how to process biomarkers: # 1. Looping through all possible biomarker - from biomarkerMaps.keys() # 2. The biomakers are then looked up in the map and process based on how the map defines. # 3. Description is also added read from the map. biomarkers_in_data = [ biomarker for biomarker in BIOMARKERMAPS.keys() if biomarker in validation_lab_cell_lines.columns ] expressions = map( # Function to process biomarker: lambda biomarker: ( biomarker, get_biomarker(f.lit(biomarker), f.col(biomarker)), ), # Iterator to apply the function over: biomarkers_in_data, ) # Applying the full map on the dataframe one-by-one: biomarkers = reduce( lambda DF, value: DF.withColumn(*value), expressions, validation_lab_cell_lines ) # The biomarker columns are unstacked into one single 'biomarkers' column: biomarker_unstack = f"""stack({len(biomarkers_in_data)}, {", ".join([f"'{x}', {x}" for x in biomarkers_in_data])}) as (biomarker_name, biomarkers)""" validation_lab_cell_lines = ( biomarkers # Selecting cell line name, cell line annotation and applyting the stacking expression: .select( f.col("cellName"), "diseaseCellLines", f.expr(biomarker_unstack), ) # Filter out all null biomarkers: .filter( (f.col("biomarkers").isNotNull()) & # Following the request of the validation lab, we are removing CRIS biomarker annotation: (f.col("biomarker_name") != "CRIS_subtype") ) # Grouping data by cell lines again: .groupBy("cellName") .agg( f.collect_list("biomarkers").alias("biomarkers"), f.first(f.col("diseaseCellLines")).alias("diseaseCellLines"), ) .persist() ) # Reading and processing hypothesis data: hypothesis_generator = ParseHypotheses(spark) validation_hypotheses_df = hypothesis_generator.parse_hypotheses( expectedFile=hypothesis_expected_file, observedFile=hypothesis_observed_file ) # Reading experiment data from validation lab: evidence = ( # Reading evidence: spark.read.csv(experiment_file, sep="\t", header=True) .withColumnRenamed("cell_line", "cellName") # Genes need to be uppercase: .withColumn("gene", f.upper(f.col("gene"))) # Joining hypothesis data: .join(validation_hypotheses_df, on="gene", how="left") # Joining with cell line data: .join(validation_lab_cell_lines, on="cellName", how="left") # Selecting all columns: .select( f.col("gene").alias("targetFromSourceId"), f.col("validationHypotheses"), f.when( f.col("effect_size").cast("double") > 0, f.col("effect_size").cast("double"), ) .otherwise(0) .alias("resourceScore"), f.when( f.col("effect_size").cast("double") >= confidenceCutoff, f.lit("significant"), ) .otherwise(f.lit("not significant")) .alias("confidence"), f.when(f.col("expected_to_pass") == "TRUE", f.lit("significant")) .otherwise(f.lit("not significant")) .alias("expectedConfidence"), f.lit("upper tail").alias("statisticalTestTail"), f.lit(contrast).alias("contrast"), f.lit(studyOverview).alias("studyOverview"), f.lit(diseaseFromSourceMapId).alias("diseaseFromSourceMappedId"), f.lit(diseaseFromSource).alias("diseaseFromSource"), f.lit(projectId).alias("projectId"), f.lit(projectDescription).alias("projectDescription"), f.col("biomarkers").alias("biomarkerList"), f.col("diseaseCellLines"), f.lit(VALIDATION_LAB_DATATYPE_ID).alias("datatypeId"), f.lit(VALIDATION_LAB_DATASOURCE_ID).alias("datasourceId"), ) .persist() ) logging.info(f"Evidence count: {evidence.count()}.") return evidence def main( config_file: str, output_file: str, cell_passport_file: str, data_folder: str ) -> None: # Initialize spark session spark = initialize_sparksession() # Parse experimental parameters: parameters = read_ppp_config(config_file) # Opening and parsing the cell passport data from Sanger: cell_passport_df = get_cell_passport_data(spark, cell_passport_file) logging.info(f"Cell passport dataframe has {cell_passport_df.count()} rows.") logging.info("Parsing experiment data...") # Create evidence for all experiments: evidence_dfs = [] for experiment in parameters["experiments"]: evidence_dfs.append( parse_experiment(spark, experiment, cell_passport_df, data_folder) ) # combine all evidence dataframes into one: combined_evidence_df = reduce(lambda df1, df2: df1.union(df2), evidence_dfs) # Save the combined evidence dataframe: write_evidence_strings(combined_evidence_df, output_file) if __name__ == "__main__": # Reading output file name from the command line: parser = argparse.ArgumentParser( description="This script parse validation lab data and generates disease target evidence." ) parser.add_argument( "--output_file", "-o", type=str, help="Output file. gzipped JSON", required=True ) parser.add_argument( "--parameter_file", "-i", type=str, help="A JSON file describing exeriment metadata", required=True, ) parser.add_argument( "--log_file", type=str, help="File into which the logs are saved", required=False, ) parser.add_argument( "--cell_passport_file", type=str, help="Cell passport file", required=True ) parser.add_argument( "--data_folder", type=str, help="Location of input files to process.", required=True, ) args = parser.parse_args() # If no logfile is specified, logs are written to the standard error: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if args.log_file: logging.config.fileConfig(filename=args.log_file) else: logging.StreamHandler(sys.stderr) # Passing all the required arguments: main( args.parameter_file, args.output_file, args.cell_passport_file, args.data_folder ) |
60 61 | shell: "sed -n 's/^rule//p' {input}" |
68 69 70 71 72 73 74 75 76 77 | run: # The way this works is that remote filenames are actually represented by Snakemake as pseudo-local files. # Snakemake will catch the "os.rename" (the same way it would have caught a "mv" call) and proceed with # uploading the files. for local_filename, remote_filename in zip(input, output[:-1]): os.rename(local_filename, remote_filename) # Compress, timestamp, and upload the logs. with tarfile.open(output[-1], 'w:gz') as archive: for filename in os.listdir('log'): archive.add(os.path.join('log', filename)) |
96 97 98 99 100 101 102 103 104 105 106 107 108 | shell: """ # In this and the following rules, the exec call redirects the output of all subsequent commands (both STDOUT # and STDERR) to the specified log file. exec &> {log} python modules/cancerBiomarkers.py \ --biomarkers_table {input.biomarkers_table} \ --source_table {input.source_table} \ --disease_table {input.disease_table} \ --drug_index {input.drug_index} \ --output_file {output} opentargets_validator --schema {params.schema} {output} """ |
120 121 122 123 124 125 126 127 128 | shell: """ exec &> {log} python modules/ChEMBL.py \ --chembl_evidence {input.evidenceFile} \ --predictions {input.stopReasonCategories} \ --output {output.evidenceFile} opentargets_validator --schema {params.schema} {output} """ |
140 141 142 143 144 145 146 147 148 149 150 151 152 153 | shell: """ exec &> {log} # Download directly because HTTP RemoteProvider does not handle retries correctly. wget -q -O clingen_summary.csv {params.summaryTableWeb} # Retain the original summary table and store that in GCS. cp clingen_summary.csv {output.summaryTable} python modules/ClinGen.py \ --input_file clingen_summary.csv \ --output_file {output.evidenceFile} \ --cache_dir {params.cacheDir} \ --local opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
166 167 168 169 170 171 172 173 174 175 | shell: """ exec &> {log} python modules/CRISPR.py \ --evidence_file {input.evidenceFile} \ --descriptions_file {input.descriptionsFile} \ --cell_types_file {input.cellTypesFile} \ --output_file {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
189 190 191 192 193 194 195 196 197 198 199 | shell: """ exec &> {log} python modules/GeneBurden.py \ --az_binary_data {input.azPhewasBinary} \ --az_quant_data {input.azPhewasQuant} \ --curated_data {input.curation} \ --genebass_data {input.genebass} \ --output {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | shell: """ exec &> {log} # Retain the inputs. They will be later uploaded to GCS. cp {input.ddPanel} {output.ddBucket} cp {input.eyePanel} {output.eyeBucket} cp {input.skinPanel} {output.skinBucket} cp {input.cancerPanel} {output.cancerBucket} cp {input.cardiacPanel} {output.cardiacBucket} python modules/Gene2Phenotype.py \ --dd_panel {input.ddPanel} \ --eye_panel {input.eyePanel} \ --skin_panel {input.skinPanel} \ --cancer_panel {input.cancerPanel} \ --cardiac_panel {input.cardiacPanel} \ --output_file {output.evidenceFile} \ --cache_dir {params.cacheDir} \ --local opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
252 253 254 255 256 257 258 259 260 261 | shell: """ exec &> {log} python modules/IntOGen.py \ --inputGenes {input.inputGenes} \ --inputCohorts {input.inputCohorts} \ --diseaseMapping {input.diseaseMapping} \ --outputFile {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
273 274 275 276 277 278 279 280 281 | shell: """ exec &> {log} python modules/Orphanet.py \ --input_file {input} \ --output_file {output} \ --cache_dir {params.cacheDir} opentargets_validator --schema {params.schema} {output} """ |
293 294 295 296 297 298 299 300 301 | shell: """ exec &> {log} python modules/PanelApp.py \ --input-file {input.inputFile} \ --output-file {output.evidenceFile} \ --cache_dir {params.cacheDir} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
312 313 314 315 316 317 318 319 320 321 | shell: """ exec &> {log} python modules/IMPC.py \ --cache-dir {params.cacheDir} \ --output-evidence {output.evidenceFile} \ --output-mouse-phenotypes {output.mousePhenotypes} \ --score-cutoff 41 opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
334 335 336 337 338 339 340 341 342 343 | shell: """ exec &> {log} python modules/PROGENY.py \ --inputFile {input.inputFile} \ --diseaseMapping {input.diseaseMapping} \ --pathwayMapping {input.pathwayMapping} \ --outputFile {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
355 356 357 358 359 360 361 362 363 | shell: """ exec &> {log} python modules/SLAPEnrich.py \ --inputFile {input.inputFile} \ --diseaseMapping {input.diseaseMapping} \ --outputFile {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
375 376 377 378 379 380 381 382 383 | shell: """ exec &> {log} python modules/SystemsBiology.py \ --evidenceFile {input.evidenceFile} \ --studyFile {input.studyFile} \ --outputFile {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
393 394 395 396 397 398 399 | shell: """ exec &> {log} python modules/TEP.py \ --output_file {output} opentargets_validator --schema {params.schema} {output} """ |
409 410 411 412 413 414 415 416 | shell: """ exec &> {log} python modules/crispr_screens.py \ --crispr_brain_mapping {params.crispr_brain_mapping} \ --output {output} opentargets_validator --schema {params.schema} {output} """ |
431 432 433 434 435 436 437 438 439 440 441 | shell: """ exec &> {log} python modules/TargetSafety.py \ --adverse_events {params.ae} \ --safety_risk {params.sr} \ --toxcast {input.toxcast} \ --aopwiki {input.aopwiki} \ --output {output} opentargets_validator --schema {params.schema} {output} """ |
455 456 457 458 459 460 461 462 463 464 465 466 | shell: """ exec &> {log} # Retain the inputs. They will be later uploaded to GCS. cp {input.rawProbesExcel} {output.rawProbesExcel} cp {input.probesXrefsTable} {output.probesXrefsTable} python modules/chemicalProbes.py \ --probes_excel_path {input.rawProbesExcel} \ --probes_mappings_path {input.probesXrefsTable} \ --output {output.evidenceFile} opentargets_validator --schema {params.schema} {output.evidenceFile} """ |
477 478 479 480 481 482 483 484 485 | shell: """ exec &> {log} python partner_preview_scripts/ot_crispr.py \ --study_table {params.study_table} \ --data_folder {params.data_folder} \ --output {output} opentargets_validator --schema {params.schema} {output} """ |
498 499 500 501 502 503 504 505 506 507 | shell: """ exec &> {log} python partner_preview_scripts/encore_parser.py \ --output_file {output} \ --parameter_file {params.config} \ --data_folder {params.data_folder} \ --cell_passport_file {input.cell_passport_table} opentargets_validator --schema {params.schema} {output} """ |
520 521 522 523 524 525 526 527 528 529 | shell: """ exec &> {log} python partner_preview_scripts/ValidationLab.py \ --parameter_file {params.config} \ --data_folder {params.data_folder} \ --cell_passport_file {input.cell_passport_table} \ --output_file {output} opentargets_validator --schema {params.schema} {output} """ |
536 537 538 | run: for source in PPP_sources: os.rename(source[0], source[1]) |
Support
- Future updates
Related Workflows





