A Snakemake pipeline to prepare binary PLINK files for the Michigan Imputation Server.
A Snakemake pipeline to prepare, impute and process binary PLINK files on the Michigan/TOPMed Imputation Servers.
Requirements:
-
Anaconda or Miniconda python
-
Singularity
-
Python3 (The future is now!)
-
Snakemake
All other reqirements will be installed locally by the pipeline at runtime.
Usage:
Configuration:
config/config.yaml
contains settings for QC and preparation, submission and download, and post-imputation processing.
Please review and edit the configuration file before starting.
Data-file location:
By default, the pipeline looks for PLINK filesets in the base directory of your installation. You can choose a different destination by editing
directory
. Cohort names are infered from the stem of the plink filesets.
If you do not want all filesets in the
directory
to be processed and imputed, then you can include
COHORT:
or
SAMPLES:
and a list of cohorts in the config file like this:
COHORT:
- group1
- study2
or
COHORT: [group1, study2]
Output directory:
The output files for the pipeline will be stored in the directory specified by
out_dir
. This directory
must
exist.
Output file choices
Output file choices can be specified under
outputs
, and you can delete or comment out any list item you do not want. The options follow:
-
stat_report
A report of imputation quality -
vcf_bycohort
Bgzipped VCF files for each cohort -
vcf_merged
A bgzipped VCF with all cohorts -
bgen_bycohort
Binary oxford filesets for each cohort -
bgen_merged
A binary oxford fileset with all cohorts -
plink_bycohort
Binary plink filesets for each cohort -
plink_merged
A binary plink fileset with all cohorts
Imputation configuration
This pipeline can impute using the NIH imputatation server and the Michigan Imputation Server . You must have an account and API key with any server you plan to use.
Imputation settings are stored under
imputation:
as a hash/dictionary of cohort names, with a hash of settings underneath. You can provide default settings with the hash key
default
, and/or override those settings with your individual cohort names.
The hash of settings follows the API specifications
here
and
here
, in addition to your API key, stored under
token
.
files
and
password
are automatically set. Additionally, if you are planning to use HRC or TOPMed, and you just provide the refpanel or server along with your API key, best-practice defaults will be used.
Options for
server
are
NIH
or
Michigan
. Case does not matter.
For each cohort specified, the pipeline will override the defaults, with the specified settings. If those are unchanged from the default, they do not need to be edited.
Here is an example:
imputation:
default:
token: token_here
refpanel: topmed-r2
study2:
token: other_token_here
server: Michigan
refpanel: gasp-v2
population: ASN
This will run study2 on the Michigan server using the GAsP panel with the ASN population, and all other cohorts with TOPMed using all populations.
Chromosome selection
Select the chromosomes you want to upload by editing
chroms
. Separate ranges with ":" and listed chromosomes with ",". You can put both in the same string. Use M for mitochondra.
Options are 1:22,X,Y,M
Reference and genome build
The pipeline will reference-align the input files before imputation using the fasta file specified under
ref:
. This fasta MUST be in the same genome build as the input. Input genome builds must match the builds supported by the chosen server, and you must specify the build under
imputation
if it does not match the default for the server.
This pipeline supports all builds for the downloaded imputed files. Be aware that TOPMed imputation returns GRCh38 genotypes.
QC
The pipeline can both do QC in preparation for upload and filter the files provided by the servers on a per-cohort basis. Pre-imputation QC is under
preqc:
and post-imputation QC is under
postqc:
. The options are documented in the configuration file.
Sample fitering to remove low-callrate subjects
The imputation server imputes in 10kBase chunks, removing chunks that have below a 50% callrate in
any
subject. To avoid this, we can filter such that no chromosome has above 20% missingness (
chr_callrate: True
), or that no imputation chunk with at least 50 variants has a missingness greater than 50% by removing subjects who violate the criterion. You can skip both checks by setting both options to false, but you can only perform a maximum of one of these checks. The chunk check is recomended.
Sample inclusion or excusion by name
You can also include OR exclude samples by a named list file using one of the following options:
-
include_samp:
filename of samples to include (plink --keep) -
exclude_samp:
filename of samples to exclude (plink --remove)
include_samp
accepts a space/tab-delimited text file with family IDs in the first column and within-family IDs in the second column, and removes all unlisted samples from the current analysis.
exclude_samp
does the same for all
listed
samples.
Running
You must run the pipeline with Anaconda environments and Singularity enabled. You can either use a Snakemake profile to do so or run the pipeline with the following command:
snakemake --use-conda --use-singularity
Make sure your cluster supports the amount of memory required for report generation (528 GiB) and that the memory is being properly requested. If that is not the case, you can edit the resource requirements on the rule
stats
in the
postImpute
pipeline and modify the modularization lines in
workflow/Snakefile
in this pipeline. We have included a
lsf.yaml
file that ensures those resources are available within this pipeline.
Code Snippets
13 | shell: 'vcftools --missing-indv --gzvcf {input.vcf} --out {params.out}' |
28 | script: '../scripts/process_imiss.R {params.dir}' |
26 27 28 29 30 | shell: ''' plink --bfile {params.ins} --memory 4096 --real-ref-alleles \ --recode vcf bgz --out {params.out} ''' |
44 45 46 47 48 49 50 | shell: ''' mkdir -p {params.tempdir} bcftools sort -Oz -o {output.vcf} \ --max-mem 64000M -T {params.tempdir} {input} bcftools index -t {output.vcf} ''' |
62 | script: '../scripts/fullchunker.py' |
78 79 80 81 | shell: ''' vcftools --missing-indv --gzvcf {input.vcf} {params.ranges} --out {params.out} ''' |
95 | script: '../scripts/process_chunk_imiss.R' |
28 29 30 31 32 33 34 | shell: ''' plink --keep-allele-order --allow-extra-chr --chr 1-26,X,Y,XY,MT \ --bfile {params.ins} --memory 8192 \ --geno {params.geno} {params.maf} \ --make-bed --out {params.out} --silent ''' |
49 50 51 52 53 54 55 | shell: ''' plink --keep-allele-order \ --bfile {params.ins} --memory 8192 \ --mind {params.mind} {params.keep_remove} \ --make-bed --out {params.out} --silent ''' |
70 71 72 73 74 75 76 | shell: ''' plink --keep-allele-order \ --bfile {params.ins} --memory 8192 \ --hwe {params.hwe} 'midp' \ --make-bed --out {params.out} --silent ''' |
96 97 98 99 | shell: ''' flippyr -o {params.out} --outputSuffix {params.suff} --plink \ {input.fasta} {params.bim} ''' |
115 | script: '../scripts/rename_chrom.py' |
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | run: import json with open(input[0], 'r') as f: m = json.load(f) if int(m['build']) == 37 and wildcards.chrom == 'M': chr_ = 'MT' elif int(m['build']) == 37: chr_ = wildcards.chrom else: chr_ = m[wildcards.chrom] with open(output[0], 'w') as f: print(chr_, file=f) |
155 156 157 158 | shell: ''' plink --bfile {params.ins} --bim {input.bim} --real-ref-alleles \ --make-bed --out {params.out} ''' |
173 174 175 176 177 | shell: ''' plink --bfile {params.ins} --chr "$(cat {input.chrname})" \ --memory 16000 --real-ref-alleles \ --recode vcf bgz --out {params.out} --silent ''' |
189 190 191 192 193 | shell: ''' bcftools sort -Oz -o {output.vcf} {input} bcftools index -t {output.vcf} ''' |
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | import json import requests import datetime import time from urllib.parse import urlparse import urllib import sys import os # jsonfile = 'intermediate/pre-impute_prep/imputation/EUR_imputation.json' # imputation = {'url': 'https://imputationserver.sph.umich.edu/api/v2', # 'token': 'eyJjdHkiOiJ0ZXh0XC9wbGFpbiIsImFsZyI6IkhTMjU2In0.eyJtYWlsIjoiZnVsdG9uaDFAZ21haWwuY29tIiwiZXhwaXJlIjoxNjQxMDUyNDc0NjMxLCJuYW1lIjoiQnJpYW4gRSBGdWx0b24tSG93YXJkIiwiYXBpIjp0cnVlLCJ1c2VybmFtZSI6ImJlZmgifQ.IhGvMBoHG12qCpjuAndPldZwGnRwBklB35xpQl2UjAM', # 'id': 'job-20211207-143135-505'} # imputation['id'] = 'job-20171123-122635-549' def progress(count, block_size, total_size): global start_time if count == 0: start_time = time.time() return duration = time.time() - start_time progress_size = int(count * block_size) speed = int(progress_size / (1024 * duration)) percent = min(int(count * block_size * 100 / total_size), 100) sys.stdout.write('\r {}%, {} MB, {} KB/s, {} seconds passed'.format( percent, progress_size // (1024 * 1024), int(speed), int(duration))) def jobinfo(settings, token=None): settings['token'] = token if token is not None else settings['token'] r_check = requests.get( '{u}/jobs/{j}'.format(u=settings['url'], j=settings['id']), headers={'X-Auth-Token': settings['token']}) if r_check.status_code != 200: token_config = snakemake.params['token'] if r_check.status_code == 401: if token is not None or settings['token'] == token_config: raise ValueError('Bad or expired API token') else: return jobinfo(settings, token=token_config) else: raise Exception('GET /jobs/{}/status {}'.format( settings['id'], r_check.status_code)) jinfo = r_check.json() jinfo['settings'] = settings return jinfo def fmt_delta(start_time): delt = datetime.datetime.now() - start_time hours = delt.seconds // 3600 mins = (delt.seconds % 3600) // 60 sec = (delt.seconds % 3600) % 60 ts = '{h:02d}:{m:02d}:{s:02d}'.format(h=hours, m=mins, s=sec) if delt.days > 0: return '{D} Days {t}'.format(D=delt.days, t=ts) return ts jsonfile = snakemake.input[0] with open(jsonfile, 'r') as j: imputation = json.load(j) jinfo = jobinfo(imputation) if jinfo['state'] < 4: print('Waiting for {} to complete.\n\n'.format(imputation['id'])) submission = datetime.datetime.fromtimestamp(jinfo['submittedOn']/1000) while jinfo['state'] < 4: print("\033[A \033[A") if jinfo['state'] == 1: print('{} waiting for {}. (Queue position {})'.format( imputation['id'], fmt_delta(submission), jinfo['positionInQueue'])) time.sleep(60) # wait for 1 minute elif jinfo['state'] == 2: start_time = datetime.datetime.fromtimestamp(jinfo['startTime']/1000) print('{} running for {}.'.format( imputation['id'], fmt_delta(start_time))) time.sleep(20) # wait for 20 sec elif jinfo['state'] == 2: print('{} exporting.'.format(imputation['id'])) time.sleep(20) # wait for 20 sec else: raise Exception('{} died.'.format(imputation['id'])) jinfo = jobinfo(jinfo['settings']) url_dl = 'https://{fqdn}/share/results/{{hash}}/{{name}}'.format( fqdn=urlparse(imputation['url']).netloc) outputs = [{**x, 'download': [{'url_dl': url_dl.format(**y), 'filename': y['name'], 'size': y['size']} for y in x['files']]} for x in jinfo['outputParams']] outputs_dict = {x['description']: x['download'] for x in outputs if x['download']} if jinfo['state'] not in [7, 10]: for desc, dl in outputs_dict.items(): print('Downloading ' + desc) for file in dl: destfile = os.path.join(snakemake.params['outpath'], file['filename']) print('\nFile: {} ...'.format(file['filename'])) urllib.request.urlretrieve( file['url_dl'], filename=destfile, reporthook=progress) print('\n') with open(os.path.join(snakemake.params['outpath'], 'job.json'), 'w') as f: json.dump(jinfo, f) if jinfo['state'] in [4, 8]: print('{} was successful.'.format(imputation['id'])) elif jinfo['state'] in [5, 6, 7, 9, 10]: state = {6: 'cancelled', 7: 'retired', 10: 'deleted', 5: 'failed', 9: 'failed'}[jinfo['state']] state += '. Check logs' if jinfo['state'] in [5, 9] else '. Please rerun' raise Exception('{} was {}.'.format(imputation['id'], state)) else: raise Exception('{} status not recognized. Status = {}'.format( imputation['id'], jinfo['state'])) def human_readable_size(size, decimal_places=4): for unit in ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB']: if size < 1024.0 or unit == 'PiB': break size /= 1024.0 return f"{size:.{decimal_places}f} {unit}" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | from chunks import chunker import cyvcf2 import sys import json def contig_bounds(vcf): def getbounds(vcf, cname): contig = list(vcf(cname)) bounds = [contig[x].POS for x in [0, -1]] return bounds cnames = vcf.seqnames return {x: getbounds(vcf, x) for x in cnames} def chunk_contigs(bounds): chunks = [] for contig, range in bounds.items(): chunks += chunker(contig, 20000000, range[1], range[0], raw=True) return chunks def make_chunk_json(fname, chunks): keys = ['chrom', 'from', 'through'] chunkdict = {x: y for x, y in zip(keys, zip(*chunks))} with open(fname, 'w') as file: json.dump(chunkdict, file) if __name__ == "__main__": in_name = snakemake.input[0] out_name = snakemake.output[0] vcf = cyvcf2.VCFReader(in_name) bounds = contig_bounds(vcf) chunks = chunk_contigs(bounds) make_chunk_json(out_name, chunks) |
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | message("Loading Packages") packload <- function(...) { # partially from pacman packages <- as.character(match.call(expand.dots = FALSE)[[2]]) pl <- function(x) { suppressPackageStartupMessages(require(x, character.only = T)) } no_output <- sapply(packages, pl) } packload(dplyr, readr, tibble, magrittr, stringr) path <- path.expand(snakemake@params[["dir"]]) threshold <- as.double(snakemake@params[["threshold"]]) chunk_variant_count_min <- as.integer( snakemake@params[["chunk_variant_count_min"]]) f_list <- snakemake@input #f_list <- list.files(path = path, pattern = "*.sample_missingness.imiss") %>% # paste0(path, "/", .) message("Reading Missingness") df <- tibble(INDV = character()) for (imiss in f_list) { chrom <- str_match(basename(imiss), "(chr.+)\\.sam")[1, 2] df2 <- read_tsv(imiss, col_types = cols( .default = "i", INDV = "c", F_MISS = "d")) n <- max(df2$N_DATA) if (n > chunk_variant_count_min) { df <- df2 %>% select(INDV, !!chrom := F_MISS) %>% left_join(df, by = "INDV") } } message("Processing Missingness") df %<>% select_if(function(col) max(col) >= threshold) %>% mutate(maxs = apply(Filter(is.numeric, .), 1, max)) %>% filter(maxs >= threshold) %>% arrange(desc(maxs)) write_tsv(df, path = paste0(path, "/chrall.imiss")) write_lines(df$INDV, paste0(path, "/chrall.irem")) |
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | message("Loading Packages") packload <- function(...) { # partially from pacman packages <- as.character(match.call(expand.dots = FALSE)[[2]]) pl <- function(x) { suppressPackageStartupMessages(require(x, character.only = T)) } no_output <- sapply(packages, pl) } packload(dplyr, readr, tibble, magrittr, stringr) path <- path.expand(snakemake@params[["dir"]]) threshold <- as.double(snakemake@params[["threshold"]]) f_list <- snakemake@input #f_list <- list.files(path = path, pattern = "*.sample_missingness.imiss") %>% # paste0(path, "/", .) message("Reading Missingness") df <- tibble(INDV = character()) for (imiss in f_list) { chrom <- str_match(basename(imiss), "(chr\\d+).")[2] df <- read_tsv(imiss, col_types = cols( .default = "i", INDV = "c", F_MISS = "d")) %>% select(INDV, !!chrom := F_MISS) %>% left_join(df, by = "INDV") } message("Processing Missingness") df %<>% select_if(function(col) max(col) >= threshold || str_detect(names(.), "INDV")) %>% mutate(maxs = apply(Filter(is.numeric, .), 1, max)) %>% filter(maxs >= threshold) %>% arrange(desc(maxs)) write_tsv(df, path = paste0(path, "/chrall.imiss")) write_lines(df$INDV, paste0(path, "/chrall.irem")) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | from pyfaidx import Fasta import json fasta = snakemake.input['fasta'] bim_in = snakemake.input['bim'] bim_out = snakemake.output['bim'] json_out = snakemake.output['json'] seqs = Fasta(fasta).keys() if any(x in seqs for x in ['1', 'chrUn_gl000247', 'GL000247.1']): mapping = {'23': 'X', '25': 'X', 'XY': 'X', '24': 'Y', '26': 'MT', 'M': 'MT', '0M': 'MT', 'chrM': 'MT', 'chrMT': 'MT'} mapping_chr = {'chr{}'.format(x): str(x) for x in range(1, 23)} json_ = {'build': 37} else: mapping = {'23': 'chrX', '25': 'chrX', 'XY': 'chrX', '24': 'chrY', 'X': 'chrX', 'Y': 'chrY', '26': 'chrM', 'M': 'chrM', 'MT': 'chrM', '0M': 'chrM', 'chrMT': 'chrM'} mapping_chr = {str(x): 'chr{}'.format(x) for x in range(1, 23)} json_ = {'build': 38, 'map': {**mapping_chr, 'Y': 'chrY', 'X': 'chrX', 'MT': 'chrM', 'M': 'chrM'}} mapping.update(mapping_chr) with open(json_out, "w") as f: json.dump(json_, f) with open(bim_in, 'r') as f_in: with open(bim_out, 'w') as f_out: for line in f_in: rec = line.strip().split() update = mapping[rec[0]] if rec[0] in mapping else rec[0] updated = '\t'.join([update] + rec[1:]) print(updated, file=f_out) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | import requests import json import time import secrets import string import datetime def getjobs(url, token): r_jobs = requests.get(url + "/jobs", headers={'X-Auth-Token' : token }) if r_jobs.status_code != 200: raise Exception('GET /jobs/ {}'.format(r_jobs.status_code)) return r_jobs.json()['data'] if snakemake.params['server'].lower() == 'michigan': url = 'https://imputationserver.sph.umich.edu/api/v2' elif snakemake.params['server'].lower() == 'nih': url = 'https://imputation.biodatacatalyst.nhlbi.nih.gov/api/v2' else: url = snakemake.params['server'] token = snakemake.params['token'] r = requests.get(url + "/jobs", headers={'X-Auth-Token': token }) if r.status_code == 401: raise ValueError('Bad or expired API token') if r.status_code == 404: raise ValueError('Invalid Imputation Server API URL') elif r.status_code != 200: raise Exception('Server Error: Status {}'.format(r_jobs.status_code)) else: try: r.json() except ValueError: raise ValueError('Invalid Imputation Server API URL or invalid response') # get all jobs jobs = getjobs(url, token) incomplete = list(filter(lambda x: x['state'] < 4, jobs)) if len(incomplete) > 2: print("Three jobs are already queued or running on the server:\n\n") while len(incomplete) > 2: print("\033[A \033[A") #erase line if max([x['state'] for x in incomplete]) == 1: #queued only qpos = min([x['positionInQueue'] for x in incomplete]) print("lowest queue position is {}.".format(qpos)) else: running = len([x for x in incomplete if x['state'] > 1]) print('{} jobs running.'.format(running)) time.sleep(600) # wait for 10 minutes jobs = getjobs(url, token) incomplete = list(filter(lambda x: x['state'] < 4, jobs)) print("Job completed; ready to submit.") # define password and job name, then remove extranious imputation params data=snakemake.params['imp_settings'] data['password'] = ''.join( (secrets.choice(string.ascii_letters + string.digits) for i in range(48))) data['job-name'] = '{}_submitted{}'.format( snakemake.wildcards['cohort'], datetime.datetime.now().strftime("%Y-%m-%d.%H%M")) if 'token' in data: del data['token'] if 'server' in data: del data['server'] # rudimentary build check with open(snakemake.input.contig_build, 'r') as f: contig_build = 'hg19' if json.load(f)['build'] == 37 else 'hg38' if 'build' in data and data['build'] != contig_build: raise ValueError('Specified build does not match contig names.') elif 'build' not in data and contig_build != 'hg19': raise ValueError('Build not specified but contig names not hg19.') # submit new job if url == 'https://imputation.biodatacatalyst.nhlbi.nih.gov/api/v2': submit = "/jobs/submit/imputationserver" else: submit = "/jobs/submit/minimac4" r_submission = requests.post(url + submit, files=[('files', open(x, 'rb')) for x in snakemake.input.vcf], data=data, headers={'X-Auth-Token': token }) if r_submission.status_code != 200: print(r.json()['message']) raise Exception('POST {} {}'.format(submit, r_submission.status_code)) json_submission = r_submission.json() # print message print(json_submission['message']) print(json_submission['id']) json_submission['password'] = data['password'] json_submission['job-name'] = data['job-name'] json_submission['url'] = url json_submission['token'] = token del data['password'] del data['job-name'] json_submission['settings'] = data r_check = requests.get( '{u}/jobs/{j}/status'.format(u=url, j=json_submission['id']), headers={'X-Auth-Token': token }) if r_check.status_code != 200: raise Exception('GET /jobs/{}/status {}'.format( json_submission['id'], r_check.status_code)) print('Queue position : {}'.format(r_check.json()['positionInQueue'])) serverstats = requests.get(url + '/server/counters').json() print('{} jobs currently running.'.format(serverstats['running']['runs'])) with open(snakemake.output[0], 'w') as jobinfo: json.dump(json_submission, jobinfo) |
171 172 173 174 175 | shell: ''' bcftools sort --threads {threads} -Oz -o {output} {input} bcftools index -t {output} ''' |
188 189 190 191 192 | shell: ''' bcftools view --threads {threads} -S ^{input.irem} -Oz -o {output} {input.vcf} bcftools index -t {output} ''' |
205 | script: 'scripts/submit_imputation.py' |
216 | script: 'scripts/download_imputation.py' |
232 | shell: '7za e {input.zip} -p$(jq -r .password {input.json}) -o{params.odir}' |
Support
- Future updates
Related Workflows





