Skip to content

Commit

Permalink
CHK-4 allow running on dnanexus (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabdank authored and caseylitton committed Dec 7, 2017
1 parent c18d30d commit add0eee
Show file tree
Hide file tree
Showing 12 changed files with 570 additions and 95 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,7 @@ ENV/
/lib/
pip-selfcheck.json
pyvenv.cfg

checkfiles/.DS_Store

.DS_Store
49 changes: 47 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,54 @@ Install required packages for running deploy::
pyvenv .
bin/pip install -r requirements-deploy.txt

Deploy
------
Deploy to AWS
-------------

Supply arguments for checkfiles after a ``--`` separator::

bin/python deploy.py -- --username ACCESS_KEY_ID --password SECRET_ACCESS_KEY --bot-token SLACK-BOT-TOKEN https://www.encodeproject.org

Run on DNAnexus
---------------

Prerequisites
* DNAnexus login
* dx toolkit
* Access to the DNAnexus checkfiles project (everyone in DNAnexus org-cherrylab should have this)
* A DNAnexus path of a file to check in the form project-name:/dir/subdir/filename

To run on a DNAnexus file with --dry-run::

dx run -i dry_run=t -i dx_file="project-name:/dir/subdir/filename" --watch --yes checkfiles:checkfiles

To capture output and error streams to a file that will be saved to the current DNAnexus project and send those to slack::

dx run -i dx_file="project-name:/dir/subdir/filename" -i bot_token="mybot-token" -i out="myoutfile" -i err="myerrfile" --watch --yes checkfiles:checkfiles

To see full syntax and options::

dx run checkfiles:checkfiles --help

NOTE: stdout and stderr are currently sent to the DNAnexus log. Saving those streams to files it not yet supported.

Deploy to DNAnexus
------------------

You only need to do this if you have changed the code.

If you don't aleady have a DNAnexus login token::

dx login

Select the checkfiles project on DNAnexus::

dx select checkfiles

Only if you have changed the checkfiles asset Makefile, in the checkfiles repo root::

dx build_asset checkfiles_asset

Build the new applet::

dx build -a checkfiles

240 changes: 147 additions & 93 deletions checkfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
--output check_files.log https://www.encodeproject.org
"""
import datetime
import time
import os.path
import json
import sys
Expand Down Expand Up @@ -702,12 +703,14 @@ def check_file(config, session, url, job):
if job.get('skip'):
return job

no_file_flag = item.get('no_file_available')
if no_file_flag:
return job

upload_url = job['upload_url']
local_path = os.path.join(config['mirror'], upload_url[len('s3://'):])
local_path = job.get('local_file')
if not local_path:
no_file_flag = item.get('no_file_available')
if no_file_flag:
return job
else:
upload_url = job['upload_url']
local_path = os.path.join(config['mirror'], upload_url[len('s3://'):])
# boolean standing for local .bed file creation
is_local_bed_present = False
if item['file_format'] == 'bed':
Expand Down Expand Up @@ -838,8 +841,10 @@ def remove_local_file(path_to_the_file, errors):
except NameError:
pass

def extract_accession(file_path):
return file_path.split('/')[-1].split('.')[0]

def fetch_files(session, url, search_query, out, include_unexpired_upload=False, file_list=None):
def fetch_files(session, url, search_query, out, include_unexpired_upload=False, file_list=None, local_file=None):
graph = []
# checkfiles using a file with a list of file accessions to be checked
if file_list:
Expand All @@ -858,6 +863,15 @@ def fetch_files(session, url, search_query, out, include_unexpired_upload=False,
local = copy.deepcopy(r.json()['@graph'])
graph.extend(local)
# checkfiles using a query
elif local_file:
r = session.get(
urljoin(url, '/search/?field=@id&limit=all&type=File&accession=' + extract_accession(local_file)))
try:
r.raise_for_status()
except requests.HTTPError:
return
else:
graph = r.json()['@graph']
else:
r = session.get(
urljoin(url, '/search/?field=@id&limit=all&type=File&' + search_query))
Expand Down Expand Up @@ -903,6 +917,9 @@ def fetch_files(session, url, search_query, out, include_unexpired_upload=False,
# Probably a transient error
job['skip'] = True

if local_file:
job['local_file'] = local_file

yield job


Expand Down Expand Up @@ -970,8 +987,17 @@ def patch_file(session, url, job):
'was {} and now is {}.'.format(job['item'].get('status', 'UNKNOWN'), etag_r.json()['status'])
return

def run(out, err, url, username, password, encValData, mirror, search_query, file_list=None, bot_token=None,
processes=None, include_unexpired_upload=False, dry_run=False, json_out=False):

def wait_until_indexed(session, url):
response = 'indexing'
while response != 'waiting':
response = session.get(urljoin(url, '_indexer')).json()['status']
time.sleep(60)


def run(out, err, url, username, password, encValData, mirror, search_query, file_list=None,
bot_token=None, local_file=None, processes=None, include_unexpired_upload=False,
dry_run=False, json_out=False):
import functools
import multiprocessing

Expand All @@ -992,7 +1018,7 @@ def run(out, err, url, username, password, encValData, mirror, search_query, fil
except multiprocessing.NotImplmentedError:
nprocesses = 1

version = '1.18'
version = '1.19'

try:
ip_output = subprocess.check_output(
Expand All @@ -1001,93 +1027,118 @@ def run(out, err, url, username, password, encValData, mirror, search_query, fil
except subprocess.CalledProcessError as e:
ip = ''


initiating_run = 'STARTING Checkfiles version ' + \
'{} ({}) ({}): with {} processes {} on {} at {}'.format(
version, url, search_query, nprocesses, dr, ip, datetime.datetime.now())
if bot_token:
sc = SlackClient(bot_token)
sc.api_call(
"chat.postMessage",
channel="#bot-reporting",
text=initiating_run,
as_user=True
)

out.write(initiating_run + '\n')
if processes == 0:
# Easier debugging without multiprocessing.
imap = map
# waiting for complete indexing of the site prior to checkfiles run!
try:
wait_until_indexed(session, url)
except requests.exceptions.RequestException as e:
error_waiting = 'STARTING Checkfiles version ' \
'{} ({}) ({}): with {} processes {} ' \
'on {} at {}. Error while waiting for ' \
'the site to be fully indexed : {}'.format(
version,
url,
search_query,
nprocesses,
dr,
ip,
datetime.datetime.now(),
str(e))
if bot_token:
sc = SlackClient(bot_token)
sc.api_call(
"chat.postMessage",
channel="#bot-reporting",
text=error_waiting,
as_user=True
)
out.write('ERROR while waiting to the site to be fully indexed\n' + str(e) + '\n')
out.flush()
else:
pool = multiprocessing.Pool(processes=processes)
imap = pool.imap_unordered

jobs = fetch_files(session, url, search_query, out, include_unexpired_upload, file_list)
if not json_out:
headers = '\t'.join(['Accession', 'Lab', 'Errors', 'Aliases', 'Upload URL',
'Upload Expiration'])
out.write(headers + '\n')
initiating_run = 'STARTING Checkfiles version ' + \
'{} ({}) ({}): with {} processes {} on {} at {}'.format(
version, url, search_query, nprocesses, dr, ip, datetime.datetime.now())
if bot_token:
sc = SlackClient(bot_token)
sc.api_call(
"chat.postMessage",
channel="#bot-reporting",
text=initiating_run,
as_user=True
)

out.write(initiating_run + '\n')
out.flush()
err.write(headers + '\n')
err.flush()
for job in imap(functools.partial(check_file, config, session, url), jobs):
if not dry_run:
patch_file(session, url, job)

if not job.get('skip'):
errors_string = str(job.get('errors', {'errors': None}))
else:
errors_string = str({'errors': 'status have not been changed, the file check was skipped due to the file unavailability on S3'})
tab_report = '\t'.join([
job['item'].get('accession', 'UNKNOWN'),
job['item'].get('lab', 'UNKNOWN'),
errors_string,
str(job['item'].get('aliases', ['n/a'])),
job.get('upload_url', ''),
job.get('upload_expiration', ''),
])
if json_out:
out.write(json.dumps(job) + '\n')
out.flush()
if job['errors']:
err.write(json.dumps(job) + '\n')
err.flush()
if processes == 0:
# Easier debugging without multiprocessing.
imap = map
else:
out.write(tab_report + '\n')
pool = multiprocessing.Pool(processes=processes)
imap = pool.imap_unordered

jobs = fetch_files(session, url, search_query, out, include_unexpired_upload, file_list, local_file)
if not json_out:
headers = '\t'.join(['Accession', 'Lab', 'Errors', 'Aliases', 'Upload URL',
'Upload Expiration'])
out.write(headers + '\n')
out.flush()
if job['errors']:
err.write(tab_report + '\n')
err.flush()

finishing_run = 'FINISHED Checkfiles at {}'.format(datetime.datetime.now())
out.write(finishing_run + '\n')
out.flush()
output_filename = out.name
out.close()
error_filename = err.name
err.close()

if bot_token:
with open(output_filename, 'r') as output_file:
x = sc.api_call("files.upload",
title=output_filename,
channels='#bot-reporting',
content=output_file.read(),
as_user=True)

with open(error_filename, 'r') as output_file:
x = sc.api_call("files.upload",
title=error_filename,
channels='#bot-reporting',
content=output_file.read(),
as_user=True)

sc.api_call(
"chat.postMessage",
channel="#bot-reporting",
text=finishing_run,
as_user=True
)
for job in imap(functools.partial(check_file, config, session, url), jobs):
if not dry_run:
patch_file(session, url, job)

if not job.get('skip'):
errors_string = str(job.get('errors', {'errors': None}))
else:
errors_string = str({'errors': 'status have not been changed, the file check was skipped due to the file unavailability on S3'})
tab_report = '\t'.join([
job['item'].get('accession', 'UNKNOWN'),
job['item'].get('lab', 'UNKNOWN'),
errors_string,
str(job['item'].get('aliases', ['n/a'])),
job.get('upload_url', ''),
job.get('upload_expiration', ''),
])
if json_out:
out.write(json.dumps(job) + '\n')
out.flush()
if job['errors']:
err.write(json.dumps(job) + '\n')
err.flush()
else:
out.write(tab_report + '\n')
out.flush()
if job['errors']:
err.write(tab_report + '\n')
err.flush()

finishing_run = 'FINISHED Checkfiles at {}'.format(datetime.datetime.now())
out.write(finishing_run + '\n')
out.flush()
output_filename = out.name
out.close()
error_filename = err.name
err.close()

if bot_token:
with open(output_filename, 'r') as output_file:
x = sc.api_call("files.upload",
title=output_filename,
channels='#bot-reporting',
content=output_file.read(),
as_user=True)

with open(error_filename, 'r') as output_file:
x = sc.api_call("files.upload",
title=error_filename,
channels='#bot-reporting',
content=output_file.read(),
as_user=True)

sc.api_call(
"chat.postMessage",
channel="#bot-reporting",
text=finishing_run,
as_user=True
)

def main():
import argparse
Expand Down Expand Up @@ -1127,6 +1178,9 @@ def main():
parser.add_argument(
'--file-list', default='',
help="list of file accessions to check")
parser.add_argument(
'--local-file', default='',
help="path to local file to check")
parser.add_argument('url', help="server to post to")
args = parser.parse_args()
run(**vars(args))
Expand Down
Loading

0 comments on commit add0eee

Please sign in to comment.