diff --git a/validate.py b/validate.py index c94ce16..241390b 100644 --- a/validate.py +++ b/validate.py @@ -8,6 +8,7 @@ import pyBigWig import json import traceback +import math from logger import log @@ -41,28 +42,35 @@ def validate(bw_file, window_size=25): log.info('Opening bigwig file...') bw = pyBigWig.open(bw_file.strip("'")) - + all_msg = '' + try: valid = True # print chrsz - log.info('Validating chromosome sizes...') - log.info('==== Template ====') - print(json.dumps({k: CHRSZ[k] for k in sorted(CHRSZ)}, indent=4)) - log.info('==== Your submission ====') - print(json.dumps({k: bw.chroms()[k] for k in sorted(bw.chroms())}, indent=4)) + # log.info('Validating chromosome sizes...') + # log.info('==== Template ====') + # print(json.dumps({k: CHRSZ[k] for k in sorted(CHRSZ)}, indent=4)) + # log.info('==== Your submission ====') + # print(json.dumps({k: bw.chroms()[k] for k in sorted(bw.chroms())}, indent=4)) # check number of chrs if len(bw.chroms()) != len(CHRSZ): - print('Invalid number of chromosome {}. It should match with {}'.format( - len(bw.chroms()), len(CHRSZ))) + msg = 'Invalid number of chromosome {}. It should match with {}'.format( + len(bw.chroms()), len(CHRSZ)) + print(msg) + all_msg += msg + '\n' valid = False # check each chrsz for k, v in bw.chroms().items(): if k not in CHRSZ: - print('Invalid chromosome {}'.format(k)) + msg = 'Invalid chromosome {}'.format(k) + print(msg) + all_msg += msg + '\n' valid = False continue if v != CHRSZ[k]: - print('Invalid size {} for chromosome {}'.format(v, k)) + msg = 'Invalid size {} for chromosome {}'.format(v, k) + print(msg) + all_msg += msg + '\n' valid = False continue @@ -70,34 +78,51 @@ def validate(bw_file, window_size=25): for c, s in CHRSZ.items(): log.info('Validating chromosome {}...'.format(c)) if bw.intervals(c) is None: - print('No intervals found for chromosome {}. '.format(c)) + msg = 'No intervals found for chromosome {}. '.format(c) + print(msg) + all_msg += msg + '\n' valid = False continue for start, end, v in bw.intervals(c): if end == s: continue if end-start != window_size: - print('Invalid window size for chromosome {}. ' - 'start: {}, end: {}, value: {}'.format( - c, start, end, v)) + msg = 'Invalid window size for chromosome {}. '\ + 'start: {}, end: {}, value: {}'.format( + c, start, end, v) + print(msg) + all_msg += msg + '\n' valid = False if end > s: - print('Invalid end interval in chromosome {}. ' - 'End must be equal to or smaller than chrom size. ' - 'start: {}, end: {}, value: {}, chrsz: {}'.format( - c, start, end, v, s)) + msg = 'Invalid end interval in chromosome {}. '\ + 'End must be equal to or smaller than chrom size. ' + 'start: {}, end: {}, value: {}, chrsz: {}'.format( + c, start, end, v, s) + print(msg) + all_msg += msg + '\n' + valid = False + if math.isnan(v) or v == float('inf') or v == float('-inf'): + msg = 'Found NaN or Inf. '\ + 'start: {}, end: {}, value: {}, chrsz: {}'.format( + c, start, end, v, s) + print(msg) + all_msg += msg + '\n' valid = False except Exception as e: - traceback.print_exc() + st = StringIO() + traceback.print_exc(file=st) + msg = st.getvalue() + print(msg) + all_msg += msg + '\n' valid = False if valid: log.info('Validation done successfully.') - return 0 + return True, all_msg else: log.info('Validation failed.') - return 1 + return False, all_msg def parse_arguments(): import argparse @@ -115,8 +140,11 @@ def main(): # read params args = parse_arguments() - return validate(args.bw, args.window_size) - + valid, _ = validate(args.bw, args.window_size) + if valid: + return 0 + else: + return 1 if __name__ == '__main__': main() diff --git a/validate_round2.py b/validate_round2.py new file mode 100644 index 0000000..172b10a --- /dev/null +++ b/validate_round2.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +"""Imputation challenge validation script for round2 +Author: + Jin Lee (leepc12@gmail.com) +""" + +import os +import re +import time +import shutil +import math +#import gc +import traceback +import synapseclient +import multiprocessing +from validate import validate +from io import StringIO +from score_leaderboard import mkdir_p, send_message +from score_leaderboard import WIKI_TEMPLATE_SUBMISSION_STATUS, RE_PATTERN_SUBMISSION_FNAME +from logger import log + + +BIG_INT = 99999999 # for multiprocessing + + +ROUND2_VALID_CELL_ASSAY = [ +'C05M17', +'C05M18', +'C05M20', +'C05M29', +'C06M16', +'C06M17', +'C06M18', +'C07M20', +'C07M29', +'C12M01', +'C12M02', +'C19M16', +'C19M17', +'C19M18', +'C19M20', +'C19M22', +'C19M29', +'C22M16', +'C22M17', +'C28M17', +'C28M18', +'C28M22', +'C28M29', +'C31M01', +'C31M02', +'C31M16', +'C31M29', +'C38M01', +'C38M02', +'C38M17', +'C38M18', +'C38M20', +'C38M22', +'C38M29', +'C39M16', +'C39M17', +'C39M18', +'C39M20', +'C39M22', +'C39M29', +'C40M16', +'C40M17', +'C40M18', +'C40M20', +'C40M22', +'C40M29', +'C51M16', +'C51M17', +'C51M18', +'C51M20', +'C51M29' +] + +def is_valid_round2_cell_assay(cell, assay): + return (cell + assay) in ROUND2_VALID_CELL_ASSAY + +def update_wiki_for_round2(syn, team_name_dict, args): + # calculate ranks and update round2 wiki + log.info('Updating wiki...') + wiki_id_map = { + k.split(':')[0]: k.split(':')[1] for k in args.round2_wiki_id.split(',') + } + for k, wiki_id in wiki_id_map.items(): + w = syn.getWiki(args.project_id, wiki_id) + + if k == 'submission_status': + title = 'Submission status' + markdown = WIKI_TEMPLATE_SUBMISSION_STATUS.format( + eval_queue_id=args.eval_queue_id) + + else: + raise Exception('invalid wiki type') + + w.markdown = markdown + w.title = title + w = syn.store(w) + + return None + +def parse_arguments(): + import argparse + import os + + py_path = os.path.dirname(os.path.realpath(__file__)) + + parser = argparse.ArgumentParser( + description='ENCODE Imputation Challenge round2 script. ') + parser.add_argument('eval_queue_id', + help='Synapse evaluation queue ID to retreive submissions from.') + p_score = parser.add_argument_group( + title='Scoring parameters') + p_score.add_argument('--window-size', default=25, type=int, + help='Window size for bigwig in bp') + p_score.add_argument('--update-wiki-only', action='store_true', + help='Update wiki based on DB file (--db-file) without ' + 'scoring submissions') + p_sys = parser.add_argument_group( + title='System and resource settings') + p_sys.add_argument('--nth', type=int, default=1, + help='Number of threads to parallelize scoring (per) submission') + p_sys.add_argument('--team-name-tsv', + help='TSV file with team_id/team_name (1st col/2nd col).') + p_syn = parser.add_argument_group( + title='Communitation with synapse') + p_syn.add_argument('--dry-run', action='store_true', + help='Do not update submission\'s status on synapse.') + p_syn.add_argument('--project-id', default='syn17083203', + help='Synapse project ID.') + p_syn.add_argument('--round2-wiki-id', + default='submission_status:594309', + help='Comma-delimited Synapse wiki ID for round2.') + p_syn.add_argument('--submission-dir', default='./submissions', + help='Download submissions here.') + p_syn.add_argument('--send-msg-to-admin', action='store_true', + help='Send message to admin.') + p_syn.add_argument('--send-msg-to-user', action='store_true', + help='Send message to user.') + p_syn.add_argument('--period', default=1800, + help='Time period in second to download submissions from synapse ' + 'and score them') + p_syn.add_argument('--admin-id', nargs='+', default=['3345120'], + help='Admin\'s Synapse ID (as string) ') + args = parser.parse_args() + + return args + +def validate_submission(submission, status, args, syn): + status['status'] = 'INVALID' + + submission_dir = os.path.join( + os.path.abspath(args.submission_dir), submission.id) + mkdir_p(submission_dir) + + metadata = { + 'id': submission.id, + 'team': 'undefined' + } + + try: + metadata['team'] = get_team_name(syn, None, submission.teamId) + + log.info('Downloading submission... {}'.format(submission.id)) + submission = syn.getSubmission( + submission, + downloadLocation=submission_dir, + ifcollision='overwrite.local' + ) + print() + submission_fname = submission.filePath + cell, assay = parse_submission_filename(submission_fname) + if not is_valid_round2_cell_assay(cell, assay): + raise Exception('Invalid cell/assay combination for ' + 'round2 round') + + log.info('Downloading done {}, {}, {}, {}, {}'.format( + submission_fname, submission.id, + submission.teamId, cell, assay)) + + # read pred npy (submission) + log.info('Validating bigwig...{}'.format(submission.id)) + valid, message = validate(submission_fname, args.window_size) + + if valid: + status['status'] = 'VALIDATED' + subject = 'Successfully validated submission %s %s %s:\n' % ( + submission.name, submission.id, submission.teamId) + else: + subject = 'Invalid submission %s %s %s:\n' % ( + submission.name, submission.id, submission.teamId) + # delete file + shutil.rmtree(submission_dir) + + log.info(subject + message) + + except Exception as ex1: + if 'teamId' in submission: + teamId = submission.teamId + else: + teamId = 'undefined' + subject = 'Error validating submission %s %s %s:\n' % ( + submission.name, submission.id, teamId) + st = StringIO() + traceback.print_exc(file=st) + message = st.getvalue() + log.error(subject + message) + # delete file + shutil.rmtree(submission_dir) + + # send message + users_to_send_msg = [] + if args.send_msg_to_user: + users_to_send_msg.append(submission.userId) + if args.send_msg_to_admin: + users_to_send_msg.extend(args.admin_id) + send_message(syn, users_to_send_msg, subject, message) + + if not args.dry_run: + status['annotations'] = synapseclient.annotations.to_submission_status_annotations( + metadata, is_private=False) + status = syn.store(status) + + return status + + +def main(): + log.info('Parsing arguments...') + + args = parse_arguments() + + if args.team_name_tsv is not None: + team_name_dict = parse_team_name_tsv(args.team_name_tsv) + else: + team_name_dict = None + print(team_name_dict) + + syn = synapseclient.login() + t0 = time.perf_counter() + + while True: + try: + if not args.update_wiki_only: + evaluation = syn.getEvaluation(args.eval_queue_id) + + # init multiprocessing + pool = multiprocessing.Pool(args.nth) + + # distribute jobs + ret_vals = [] + for submission, status in syn.getSubmissionBundles(evaluation, status='RECEIVED'): + ret_vals.append( + pool.apply_async(validate_submission, + (submission, status, args, syn, + gene_annotations, enh_annotations))) + # gather + for r in ret_vals: + r.get(BIG_INT) + + pool.close() + pool.join() + + update_wiki(syn, team_name_dict, args) + + except Exception as ex1: + st = StringIO() + traceback.print_exc(file=st) + message = st.getvalue() + + subject = 'Server error:' + if args.send_msg_to_admin: + send_message(syn, args.admin_id, subject, message) + log.error(message) + + log.info('Waiting for new submissions...') + while time.perf_counter() - t0 < args.period: + time.sleep(60) + t0 = time.perf_counter() + + log.info('All done') + + +if __name__ == '__main__': + main() +