From add0eee9b2f8b880e905c5066370d0d9980c8208 Mon Sep 17 00:00:00 2001 From: Idan Gabdank Date: Thu, 7 Dec 2017 13:12:18 -0800 Subject: [PATCH] CHK-4 allow running on dnanexus (#6) --- .gitignore | 4 + README.md | 49 +++- checkfiles.py | 240 +++++++++++------- checkfiles/Readme.developer.md | 31 +++ checkfiles/Readme.md | 21 ++ checkfiles/dxapp.json | 154 +++++++++++ .../resources/home/dnanexus/checkfiles.py | 1 + checkfiles/src/checkfiles.py | 86 +++++++ checkfiles/test/test.py | 57 +++++ checkfiles_asset/.DS_Store | Bin 0 -> 6148 bytes checkfiles_asset/Makefile | 13 + checkfiles_asset/dxasset.json | 9 + 12 files changed, 570 insertions(+), 95 deletions(-) create mode 100644 checkfiles/Readme.developer.md create mode 100644 checkfiles/Readme.md create mode 100644 checkfiles/dxapp.json create mode 120000 checkfiles/resources/home/dnanexus/checkfiles.py create mode 100755 checkfiles/src/checkfiles.py create mode 100755 checkfiles/test/test.py create mode 100644 checkfiles_asset/.DS_Store create mode 100644 checkfiles_asset/Makefile create mode 100644 checkfiles_asset/dxasset.json diff --git a/.gitignore b/.gitignore index 5287056..3d27376 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,7 @@ ENV/ /lib/ pip-selfcheck.json pyvenv.cfg + +checkfiles/.DS_Store + +.DS_Store diff --git a/README.md b/README.md index 616df57..aec7ea8 100644 --- a/README.md +++ b/README.md @@ -27,9 +27,54 @@ Install required packages for running deploy:: pyvenv . bin/pip install -r requirements-deploy.txt -Deploy ------- +Deploy to AWS +------------- Supply arguments for checkfiles after a ``--`` separator:: bin/python deploy.py -- --username ACCESS_KEY_ID --password SECRET_ACCESS_KEY --bot-token SLACK-BOT-TOKEN https://www.encodeproject.org + +Run on DNAnexus +--------------- + +Prerequisites +* DNAnexus login +* dx toolkit +* Access to the DNAnexus checkfiles project (everyone in DNAnexus org-cherrylab should have this) +* A DNAnexus path of a file to check in the form project-name:/dir/subdir/filename + +To run on a DNAnexus file with --dry-run:: + + dx run -i dry_run=t -i dx_file="project-name:/dir/subdir/filename" --watch --yes checkfiles:checkfiles + + To capture output and error streams to a file that will be saved to the current DNAnexus project and send those to slack:: + + dx run -i dx_file="project-name:/dir/subdir/filename" -i bot_token="mybot-token" -i out="myoutfile" -i err="myerrfile" --watch --yes checkfiles:checkfiles + +To see full syntax and options:: + + dx run checkfiles:checkfiles --help + +NOTE: stdout and stderr are currently sent to the DNAnexus log. Saving those streams to files it not yet supported. + +Deploy to DNAnexus +------------------ + +You only need to do this if you have changed the code. + +If you don't aleady have a DNAnexus login token:: + + dx login + +Select the checkfiles project on DNAnexus:: + + dx select checkfiles + +Only if you have changed the checkfiles asset Makefile, in the checkfiles repo root:: + + dx build_asset checkfiles_asset + +Build the new applet:: + + dx build -a checkfiles + diff --git a/checkfiles.py b/checkfiles.py index fbb3c91..c285852 100644 --- a/checkfiles.py +++ b/checkfiles.py @@ -7,6 +7,7 @@ --output check_files.log https://www.encodeproject.org """ import datetime +import time import os.path import json import sys @@ -702,12 +703,14 @@ def check_file(config, session, url, job): if job.get('skip'): return job - no_file_flag = item.get('no_file_available') - if no_file_flag: - return job - - upload_url = job['upload_url'] - local_path = os.path.join(config['mirror'], upload_url[len('s3://'):]) + local_path = job.get('local_file') + if not local_path: + no_file_flag = item.get('no_file_available') + if no_file_flag: + return job + else: + upload_url = job['upload_url'] + local_path = os.path.join(config['mirror'], upload_url[len('s3://'):]) # boolean standing for local .bed file creation is_local_bed_present = False if item['file_format'] == 'bed': @@ -838,8 +841,10 @@ def remove_local_file(path_to_the_file, errors): except NameError: pass +def extract_accession(file_path): + return file_path.split('/')[-1].split('.')[0] -def fetch_files(session, url, search_query, out, include_unexpired_upload=False, file_list=None): +def fetch_files(session, url, search_query, out, include_unexpired_upload=False, file_list=None, local_file=None): graph = [] # checkfiles using a file with a list of file accessions to be checked if file_list: @@ -858,6 +863,15 @@ def fetch_files(session, url, search_query, out, include_unexpired_upload=False, local = copy.deepcopy(r.json()['@graph']) graph.extend(local) # checkfiles using a query + elif local_file: + r = session.get( + urljoin(url, '/search/?field=@id&limit=all&type=File&accession=' + extract_accession(local_file))) + try: + r.raise_for_status() + except requests.HTTPError: + return + else: + graph = r.json()['@graph'] else: r = session.get( urljoin(url, '/search/?field=@id&limit=all&type=File&' + search_query)) @@ -903,6 +917,9 @@ def fetch_files(session, url, search_query, out, include_unexpired_upload=False, # Probably a transient error job['skip'] = True + if local_file: + job['local_file'] = local_file + yield job @@ -970,8 +987,17 @@ def patch_file(session, url, job): 'was {} and now is {}.'.format(job['item'].get('status', 'UNKNOWN'), etag_r.json()['status']) return -def run(out, err, url, username, password, encValData, mirror, search_query, file_list=None, bot_token=None, - processes=None, include_unexpired_upload=False, dry_run=False, json_out=False): + +def wait_until_indexed(session, url): + response = 'indexing' + while response != 'waiting': + response = session.get(urljoin(url, '_indexer')).json()['status'] + time.sleep(60) + + +def run(out, err, url, username, password, encValData, mirror, search_query, file_list=None, + bot_token=None, local_file=None, processes=None, include_unexpired_upload=False, + dry_run=False, json_out=False): import functools import multiprocessing @@ -992,7 +1018,7 @@ def run(out, err, url, username, password, encValData, mirror, search_query, fil except multiprocessing.NotImplmentedError: nprocesses = 1 - version = '1.18' + version = '1.19' try: ip_output = subprocess.check_output( @@ -1001,93 +1027,118 @@ def run(out, err, url, username, password, encValData, mirror, search_query, fil except subprocess.CalledProcessError as e: ip = '' - - initiating_run = 'STARTING Checkfiles version ' + \ - '{} ({}) ({}): with {} processes {} on {} at {}'.format( - version, url, search_query, nprocesses, dr, ip, datetime.datetime.now()) - if bot_token: - sc = SlackClient(bot_token) - sc.api_call( - "chat.postMessage", - channel="#bot-reporting", - text=initiating_run, - as_user=True - ) - - out.write(initiating_run + '\n') - if processes == 0: - # Easier debugging without multiprocessing. - imap = map + # waiting for complete indexing of the site prior to checkfiles run! + try: + wait_until_indexed(session, url) + except requests.exceptions.RequestException as e: + error_waiting = 'STARTING Checkfiles version ' \ + '{} ({}) ({}): with {} processes {} ' \ + 'on {} at {}. Error while waiting for ' \ + 'the site to be fully indexed : {}'.format( + version, + url, + search_query, + nprocesses, + dr, + ip, + datetime.datetime.now(), + str(e)) + if bot_token: + sc = SlackClient(bot_token) + sc.api_call( + "chat.postMessage", + channel="#bot-reporting", + text=error_waiting, + as_user=True + ) + out.write('ERROR while waiting to the site to be fully indexed\n' + str(e) + '\n') + out.flush() else: - pool = multiprocessing.Pool(processes=processes) - imap = pool.imap_unordered - - jobs = fetch_files(session, url, search_query, out, include_unexpired_upload, file_list) - if not json_out: - headers = '\t'.join(['Accession', 'Lab', 'Errors', 'Aliases', 'Upload URL', - 'Upload Expiration']) - out.write(headers + '\n') + initiating_run = 'STARTING Checkfiles version ' + \ + '{} ({}) ({}): with {} processes {} on {} at {}'.format( + version, url, search_query, nprocesses, dr, ip, datetime.datetime.now()) + if bot_token: + sc = SlackClient(bot_token) + sc.api_call( + "chat.postMessage", + channel="#bot-reporting", + text=initiating_run, + as_user=True + ) + + out.write(initiating_run + '\n') out.flush() - err.write(headers + '\n') - err.flush() - for job in imap(functools.partial(check_file, config, session, url), jobs): - if not dry_run: - patch_file(session, url, job) - - if not job.get('skip'): - errors_string = str(job.get('errors', {'errors': None})) - else: - errors_string = str({'errors': 'status have not been changed, the file check was skipped due to the file unavailability on S3'}) - tab_report = '\t'.join([ - job['item'].get('accession', 'UNKNOWN'), - job['item'].get('lab', 'UNKNOWN'), - errors_string, - str(job['item'].get('aliases', ['n/a'])), - job.get('upload_url', ''), - job.get('upload_expiration', ''), - ]) - if json_out: - out.write(json.dumps(job) + '\n') - out.flush() - if job['errors']: - err.write(json.dumps(job) + '\n') - err.flush() + if processes == 0: + # Easier debugging without multiprocessing. + imap = map else: - out.write(tab_report + '\n') + pool = multiprocessing.Pool(processes=processes) + imap = pool.imap_unordered + + jobs = fetch_files(session, url, search_query, out, include_unexpired_upload, file_list, local_file) + if not json_out: + headers = '\t'.join(['Accession', 'Lab', 'Errors', 'Aliases', 'Upload URL', + 'Upload Expiration']) + out.write(headers + '\n') out.flush() - if job['errors']: - err.write(tab_report + '\n') - err.flush() - - finishing_run = 'FINISHED Checkfiles at {}'.format(datetime.datetime.now()) - out.write(finishing_run + '\n') - out.flush() - output_filename = out.name - out.close() - error_filename = err.name - err.close() - - if bot_token: - with open(output_filename, 'r') as output_file: - x = sc.api_call("files.upload", - title=output_filename, - channels='#bot-reporting', - content=output_file.read(), - as_user=True) - - with open(error_filename, 'r') as output_file: - x = sc.api_call("files.upload", - title=error_filename, - channels='#bot-reporting', - content=output_file.read(), - as_user=True) - - sc.api_call( - "chat.postMessage", - channel="#bot-reporting", - text=finishing_run, - as_user=True - ) + for job in imap(functools.partial(check_file, config, session, url), jobs): + if not dry_run: + patch_file(session, url, job) + + if not job.get('skip'): + errors_string = str(job.get('errors', {'errors': None})) + else: + errors_string = str({'errors': 'status have not been changed, the file check was skipped due to the file unavailability on S3'}) + tab_report = '\t'.join([ + job['item'].get('accession', 'UNKNOWN'), + job['item'].get('lab', 'UNKNOWN'), + errors_string, + str(job['item'].get('aliases', ['n/a'])), + job.get('upload_url', ''), + job.get('upload_expiration', ''), + ]) + if json_out: + out.write(json.dumps(job) + '\n') + out.flush() + if job['errors']: + err.write(json.dumps(job) + '\n') + err.flush() + else: + out.write(tab_report + '\n') + out.flush() + if job['errors']: + err.write(tab_report + '\n') + err.flush() + + finishing_run = 'FINISHED Checkfiles at {}'.format(datetime.datetime.now()) + out.write(finishing_run + '\n') + out.flush() + output_filename = out.name + out.close() + error_filename = err.name + err.close() + + if bot_token: + with open(output_filename, 'r') as output_file: + x = sc.api_call("files.upload", + title=output_filename, + channels='#bot-reporting', + content=output_file.read(), + as_user=True) + + with open(error_filename, 'r') as output_file: + x = sc.api_call("files.upload", + title=error_filename, + channels='#bot-reporting', + content=output_file.read(), + as_user=True) + + sc.api_call( + "chat.postMessage", + channel="#bot-reporting", + text=finishing_run, + as_user=True + ) def main(): import argparse @@ -1127,6 +1178,9 @@ def main(): parser.add_argument( '--file-list', default='', help="list of file accessions to check") + parser.add_argument( + '--local-file', default='', + help="path to local file to check") parser.add_argument('url', help="server to post to") args = parser.parse_args() run(**vars(args)) diff --git a/checkfiles/Readme.developer.md b/checkfiles/Readme.developer.md new file mode 100644 index 0000000..635231a --- /dev/null +++ b/checkfiles/Readme.developer.md @@ -0,0 +1,31 @@ +# checkfiles Developer Readme + + + +## Running this app with additional computational resources + +This app has the following entry points: + +* main + +When running this app, you can override the instance type to be used by +providing the ``systemRequirements`` field to ```/applet-XXXX/run``` or +```/app-XXXX/run```, as follows: + + { + systemRequirements: { + "main": {"instanceType": "mem2_hdd2_x2"} + }, + [...] + } + +See Run +Specification in the API documentation for more information about the +available instance types. diff --git a/checkfiles/Readme.md b/checkfiles/Readme.md new file mode 100644 index 0000000..9fe0448 --- /dev/null +++ b/checkfiles/Readme.md @@ -0,0 +1,21 @@ + +# ENCODEd checkfiles (DNAnexus Platform App) + +Applies validation checks to files in "uploading" status on ENCODEd. + +This is the source code for an app that runs on the DNAnexus Platform. +For more information about how to run or modify it, see +https://wiki.dnanexus.com/. + + + + + diff --git a/checkfiles/dxapp.json b/checkfiles/dxapp.json new file mode 100644 index 0000000..c40fe4e --- /dev/null +++ b/checkfiles/dxapp.json @@ -0,0 +1,154 @@ +{ + "name": "checkfiles", + "title": "ENCODEd checkfiles", + "summary": "Applies validation checks to files in ENCODEd.", + "dxapi": "1.0.0", + "version": "0.0.1", + "inputSpec": [ + { + "name": "mirror", + "class": "string", + "optional": true, + "help": "" + }, + { + "name": "encValData", + "class": "string", + "optional": true, + "help": "encValData location" + }, + { + "name": "bot_token", + "class": "string", + "optional": true, + "help": "Slack bot token" + }, + { + "name": "key", + "class": "string", + "optional": true, + "help": "key into DCC Credentials or default for logged-in user" + }, + { + "name": "out", + "class": "string", + "optional": true, + "help": "Filename for output (default stdout)" + }, + { + "name": "err", + "class": "string", + "optional": true, + "help": "Filename for errors (default stderr)" + }, + { + "name": "processes", + "class": "int", + "optional": true, + "help": "defaults to cpu count, use 0 for debugging in a single process" + }, + { + "name": "include_unexpired_upload", + "class": "boolean", + "optional": true, + "help": "include files whose upload credentials have not yet expired (may be replaced!)" + }, + { + "name": "dry_run", + "class": "boolean", + "optional": true, + "help": "Don't update status, just check" + }, + { + "name": "search_query", + "class": "string", + "optional": true, + "help": "override the file search query, e.g. 'accession=ENCFF000ABC'" + }, + { + "name": "file_list", + "class": "array:string", + "optional": true, + "help": "list of file accessions to check" + }, + { + "name": "local_file", + "class": "string", + "optional": true, + "help": "path to local file to check" + }, + { + "name": "dx_file", + "class": "file", + "optional": true, + "help": "DNAnexus file to check" + }, + { + "name": "url", + "class": "string", + "optional": true, + "help": "server to post to" + } + ], + "outputSpec": [ + { + "name": "out", + "label": "Output", + "class": "file", + "patterns": [ + "*" + ], + "help": "File to store the output stream", + "optional": true + }, + { + "name": "err", + "label": "Error log", + "class": "file", + "patterns": [ + "*" + ], + "help": "File to store the error log", + "optional": true + } + ], + "runSpec": { + "timeoutPolicy": { + "*": { + "hours": 48 + } + }, + "interpreter": "python2.7", + "file": "src/checkfiles.py", + "systemRequirements": { + "*": { + "instanceType": "mem1_ssd1_x4" + } + }, + "distribution": "Ubuntu", + "release": "14.04", + "assetDepends": [ + { + "name": "common_asset", + "project": "project-BKpvFg00VBPV975PgJ6Q03v6", + "version": "0.0.1", + "folder": "/ChIP-seq/assets/" + }, + { + "name": "checkfiles_asset", + "project": "project-F8YYqF00YY7vPF4Q6P7y7zQ3", + "version": "0.0.1", + "folder": "/" + } + ], + "execDepends": [ + ] + }, + "access": { + "project": "CONTRIBUTE", + "allProjects": "VIEW", + "network": [ + "*" + ] + } +} diff --git a/checkfiles/resources/home/dnanexus/checkfiles.py b/checkfiles/resources/home/dnanexus/checkfiles.py new file mode 120000 index 0000000..5710d35 --- /dev/null +++ b/checkfiles/resources/home/dnanexus/checkfiles.py @@ -0,0 +1 @@ +../../../../checkfiles.py \ No newline at end of file diff --git a/checkfiles/src/checkfiles.py b/checkfiles/src/checkfiles.py new file mode 100755 index 0000000..2f4c2a4 --- /dev/null +++ b/checkfiles/src/checkfiles.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python2 +# checkfiles 0.0.1 + +import common +import os +import sys +import shlex +import subprocess +import dxpy +from pprint import pprint +from copy import deepcopy +import logging + +logger = logging.getLogger(__name__) +logger.addHandler(dxpy.DXLogHandler()) +logger.propagate = False +logger.setLevel(logging.INFO) + +DCC_CREDENTIALS_PROJECT = 'project-F30FzF0048K9JZKxPvB3Y563' +DCC_CREDENTIALS_FOLDER = '/credentials' +KEYFILE = 'keypairs.json' + + +class PortalCredentialsError(Exception): + pass + + +@dxpy.entry_point('main') +def main(**kwargs): + + dxpy.download_folder( + DCC_CREDENTIALS_PROJECT, '.', folder=DCC_CREDENTIALS_FOLDER) + if 'key' in kwargs: + key = '-'.join([dxpy.api.system_whoami()['id'], kwargs.pop('key')]) + else: + key = dxpy.api.system_whoami()['id'] + key_tuple = common.processkey(key, KEYFILE) + if not key_tuple: + logger.error("Key %s is not found in the keyfile %s" % (key, KEYFILE)) + raise PortalCredentialsError("Supply a valid keypair ID") + authid, authpw, server = key_tuple + if 'url' in kwargs: + server = kwargs.pop('url') + keypair = (authid, authpw) + + tokens = ['python3 checkfiles.py'] + for k, v in kwargs.iteritems(): + if isinstance(v, bool): + if v: + tokens.append("--"+k.replace('_', '-')) + continue + if isinstance(v, str) or isinstance(v, unicode) or isinstance(v, int): + tokens.append(' '.join(["--"+k.replace('_', '-'), str(v)])) + + if 'dx_file' in kwargs: + dxfile = dxpy.DXFile(kwargs.get('dx_file')) + local_file = dxpy.download_dxfile(dxfile, dxfile.name) + tokens.append("--local-file %s" % (dxfile.name)) + + # this is just to get a command string to print that has no secrets + tokens_safe = deepcopy(tokens) + tokens_safe.append("--username %s --password %s" % ("."*len(authid), "."*len(authpw))) + tokens_safe.append(server) + logger.info(' '.join(tokens_safe)) + + tokens.append("--username %s --password %s" % (authid, authpw)) + # this needs to be the last token + tokens.append(server) + + checkfiles_command = ' '.join(tokens) + subprocess.check_call(shlex.split(checkfiles_command)) + + output = {} + outfilename = kwargs.get('out') + errfilename = kwargs.get('err') + if outfilename: + out = dxpy.upload_local_file(outfilename) + output.update({'out': dxpy.dxlink(out)}) + if errfilename: + err = dxpy.upload_local_file(errfilename) + output.update({'err': dxpy.dxlink(err)}) + + return output + + +dxpy.run() diff --git a/checkfiles/test/test.py b/checkfiles/test/test.py new file mode 100755 index 0000000..a71ca6a --- /dev/null +++ b/checkfiles/test/test.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# checkfiles 0.0.1 test suite +# Generated by dx-app-wizard. + +import json, os, time, unittest + +import dxpy +import dxpy.app_builder + +from dxpy.exceptions import DXAPIError + +src_dir = os.path.join(os.path.dirname(__file__), "..") +test_resources_dir = os.path.join(src_dir, "test", "resources") + +def makeInputs(): + # Please fill in this method to generate default inputs for your app. + return {} + +class Testcheckfiles(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Upload the app to the Platform. + cls.base_input = makeInputs() + bundled_resources = dxpy.app_builder.upload_resources(src_dir) + try: + app_name = os.path.basename(os.path.abspath(src_dir)) + "_test" + except OSError: + app_name = "test_app" + applet_basename = app_name + "_" + str(int(time.time())) + cls.applet_id, _ignored_applet_spec = dxpy.app_builder.upload_applet(src_dir, bundled_resources, override_name=applet_basename) + + @classmethod + def tearDownClass(cls): + # Clean up by removing the app we created. + try: + dxpy.api.container_remove_objects(dxpy.WORKSPACE_ID, {"objects": [cls.applet_id]}) + except DXAPIError as e: + print "Error removing %s during cleanup; ignoring." % (cls.applet_id,) + print e + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_base_input(self): + """ + Tests the app with a basic input. + """ + job = dxpy.DXApplet(self.applet_id).run(self.base_input) + print "Waiting for %s to complete" % (job.get_id(),) + job.wait_on_done() + print json.dumps(job.describe()["output"]) + +if __name__ == '__main__': + unittest.main() diff --git a/checkfiles_asset/.DS_Store b/checkfiles_asset/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..3f67873522c1128766520a943b420741e69ab0dc GIT binary patch literal 6148 zcmeHKF-`L~8@vCn@yj%e{ACD7WPl9(GX`|k z?YkX5D(=>k&$GKWq1~aGm|rmk0)6oaz`)Ltt7>$6G97+7usGT(HebVmd GVBiDhC@o$9 literal 0 HcmV?d00001 diff --git a/checkfiles_asset/Makefile b/checkfiles_asset/Makefile new file mode 100644 index 0000000..bd603a4 --- /dev/null +++ b/checkfiles_asset/Makefile @@ -0,0 +1,13 @@ +SHELL=/bin/bash -e + +all: + # Trust the signing key for this repo. Reference: http://cran.rstudio.com/bin/linux/ubuntu/README.html + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E084DAB9 + sudo rm -f /etc/apt/apt.conf.d/99dnanexus + sudo apt-get update + sudo apt-get -y install python3-pip + sudo pip3 install slackclient + sudo curl -sS -L -o /usr/local/bin/validateFiles http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/validateFiles + sudo chmod +x /usr/local/bin/validateFiles + sudo mkdir /opt/encValData + sudo git clone --depth 1 https://github.com/ENCODE-DCC/encValData /opt/encValData diff --git a/checkfiles_asset/dxasset.json b/checkfiles_asset/dxasset.json new file mode 100644 index 0000000..ac9367f --- /dev/null +++ b/checkfiles_asset/dxasset.json @@ -0,0 +1,9 @@ +{ + "name": "checkfiles_asset", + "title": "checkfiles Asset", + "description": "checkfiles dependencies", + "version": "0.0.1", + "distribution": "Ubuntu", + "instanceType": "mem3_hdd2_x2", + "release": "14.04" +} \ No newline at end of file