From f9a98f9687ea0aaf292c9da159a9d4b597423d29 Mon Sep 17 00:00:00 2001 From: David Read Date: Fri, 5 Oct 2012 16:35:04 +0100 Subject: [PATCH] Since ckanext-qa (and several other extensions) are now relying on ckanext-archiver to do the downloading work, need to store more info about download failures. Tightened up error handling. So this is saved in the task_status table with key "status". Also get rid of filtered list of formats to download - download all we can. Tests fixed, but they are no longer thorough. --- README.rst | 4 +- ckanext/archiver/default_settings.py | 28 --- ckanext/archiver/tasks.py | 291 ++++++++++++++++++++------- test-core.ini | 56 ++++++ test.ini | 78 +++++++ tests/fake_ckan.py | 9 + tests/test_archiver.py | 7 + 7 files changed, 367 insertions(+), 106 deletions(-) create mode 100644 test-core.ini create mode 100644 test.ini diff --git a/README.rst b/README.rst index bbd2a68e..fe2569f9 100644 --- a/README.rst +++ b/README.rst @@ -73,7 +73,6 @@ Configuration * ARCHIVE_DIR: path to the directory that archived files will be saved to * MAX_CONTENT_LENGTH: the maximum size (in bytes) of files to archive - * DATA_FORMATS: the data formats that are archived Alternatively, if you are running CKAN with this patch: https://github.com/datagovuk/ckan/commit/83dcaf3d875d622ee0cd7f3c1f65ec27a970cd10 @@ -81,7 +80,6 @@ Configuration * ckanext-archiver.archive_dir * ckanext-archiver.max_content_length - * ckanext-archiver.data_formats (space separated) Using Archiver @@ -137,4 +135,4 @@ Tests should be run from the CKAN root directory (not the extension root). :: - (pyenv)~/pyenv/src/ckan$ nosetests --ckan ../ckanext-archiver/tests/ + (pyenv)~/pyenv/src/ckan$ nosetests --ckan ../ckanext-archiver/tests/ --with-pylons=../ckanext-archiver/test.ini diff --git a/ckanext/archiver/default_settings.py b/ckanext/archiver/default_settings.py index 013c5f5e..22a47b1d 100644 --- a/ckanext/archiver/default_settings.py +++ b/ckanext/archiver/default_settings.py @@ -7,31 +7,3 @@ # Max content-length of archived files, larger files will be ignored MAX_CONTENT_LENGTH = int(config.get('ckanext-archiver.max_content_length', 50000000)) - -# Only files with these mime-types or extensions will be archived. -# To archive all files, set DATA_FORMATS = 'all' -DEFAULT_DATA_FORMATS = [ - 'csv', - 'text/csv', - 'txt', - 'text/plain', - 'text/html', - 'html', - 'rdf', - 'text/rdf', - 'xml', - 'xls', - 'application/ms-excel', - 'application/vnd.ms-excel', - 'application/xls', - 'text/xml', - 'tar', - 'application/x-tar', - 'zip', - 'application/zip' - 'gz', - 'application/gzip', - 'application/x-gzip', - 'application/octet-stream' -] -DATA_FORMATS = config.get['ckanext-archiver.data_formats'].split() if 'ckan-archiver.data_formats' in config else DEFAULT_DATA_FORMATS diff --git a/ckanext/archiver/tasks.py b/ckanext/archiver/tasks.py index 9ef15532..328d417c 100644 --- a/ckanext/archiver/tasks.py +++ b/ckanext/archiver/tasks.py @@ -8,7 +8,7 @@ import tempfile import traceback import shutil -from datetime import datetime +import datetime from ckan.lib.celery_app import celery @@ -27,44 +27,21 @@ httplib.METHOD_NOT_ALLOWED: "405 Method Not Allowed" } -DEFAULT_DATA_FORMATS = [ - 'csv', - 'text/csv', - 'txt', - 'text/plain', - 'text/html', - 'html', - 'rdf', - 'text/rdf', - 'xml', - 'xls', - 'application/ms-excel', - 'application/vnd.ms-excel', - 'application/xls', - 'text/xml', - 'tar', - 'application/x-tar', - 'zip', - 'application/zip' - 'gz', - 'application/gzip', - 'application/x-gzip', - 'application/octet-stream' -] - class ArchiverError(Exception): pass -class DownloadError(Exception): +class DownloadError(ArchiverError): + pass +class ArchiveError(ArchiverError): pass -class ChooseNotToDownload(Exception): +class ChooseNotToDownload(ArchiverError): pass -class LinkCheckerError(Exception): +class LinkCheckerError(ArchiverError): pass class LinkInvalidError(LinkCheckerError): pass class LinkHeadRequestError(LinkCheckerError): pass -class CkanError(Exception): +class CkanError(ArchiverError): pass def _clean_content_type(ct): @@ -75,15 +52,28 @@ def _clean_content_type(ct): return ct def download(context, resource, url_timeout=30, - max_content_length=settings.MAX_CONTENT_LENGTH, - data_formats=DEFAULT_DATA_FORMATS): + max_content_length=settings.MAX_CONTENT_LENGTH): '''Given a resource, tries to download it. If the size or format is not acceptable for download then ChooseNotToDownload is raised. - If there is an error performing the download then - DownloadError is raised. + Params: + resource - dict of the resource + + Exceptions from link_checker may be propagated: + LinkInvalidError if the URL is invalid + LinkHeadRequestError if HEAD request fails + + If there is an error performing the download, raises: + DownloadError + + If download is not suitable (e.g. too large), raises: + ChooseNotToDownload + + Returns a dict of results of a successful download: + length, hash, headers, saved_file + Updates the resource values for: mimetype, size, hash ''' log = update.get_logger() @@ -128,14 +118,6 @@ def download(context, resource, url_timeout=30, raise ChooseNotToDownload("Content-length %s exceeds maximum allowed value %s" % (cl, max_content_length)) - # check that resource is a data file - if data_formats != 'all' and not (resource_format in data_formats or ct.lower() in data_formats): - if resource_changed: - _update_resource(context, resource, log) - log.info('Resource wrong type to download: %s / %s. Resource: %s %r', - resource_format, ct.lower(), resource['id'], url) - raise ChooseNotToDownload('Of content type "%s" which is not a recognised data file for download' % ct) - # get the resource and archive it try: res = requests.get(url, timeout=url_timeout) @@ -151,8 +133,10 @@ def download(context, resource, url_timeout=30, raise DownloadError('Error downloading: %s' % e) except Exception, e: raise DownloadError('Error with the download: %s' % e) + log.info('GET succeeded. Content starts: %r', res.content[:10]) - length, hash, saved_file = _save_resource(resource, res, max_content_length) + length, hash, saved_file_path = _save_resource(resource, res, max_content_length) + log.info('Resource saved. Length: %s File: %s', length, saved_file_path) # check if resource size changed if unicode(length) != resource.get('size'): @@ -193,12 +177,12 @@ def download(context, resource, url_timeout=30, pass log.info('Resource downloaded: id=%s url=%r cache_filename=%s length=%s hash=%s', - resource['id'], url, saved_file, length, hash) + resource['id'], url, saved_file_path, length, hash) return {'length': length, 'hash' : hash, 'headers': headers, - 'saved_file': saved_file} + 'saved_file': saved_file_path} @celery.task(name = "archiver.clean") @@ -212,7 +196,23 @@ def clean(): @celery.task(name = "archiver.update") def update(context, data): ''' - Archive a resource. The resource_dict is in the data. + Archive a resource. + + Params: + data - resource_dict + e.g. { + "revision_id": "2bc8ed56-8900-431a-b556-2417f309f365", + "id": "842062b2-e146-4c5f-80e8-64d072ad758d"} + "content_length": "35731", + "hash": "", + "description": "", + "format": "", + "url": "http://www.justice.gov.uk/publications/companywindingupandbankruptcy.htm", + "openness_score_failure_count": "0", + "content_type": "text/html", + "openness_score": "1", + "openness_score_reason": "obtainable via web page", + "position": 0, ''' log = update.get_logger() log.info('Starting update task: %r', data) @@ -221,10 +221,8 @@ def update(context, data): context = json.loads(context) result = _update(context, data) return result - except ArchiverError, e: - log.error('Archive error during update: %s\nResource: %s', - e, data) except Exception, e: + # Any problem at all is recorded in task_status and then reraised log.error('Error occurred during archiving resource: %s\nResource: %r', e, data) update_task_status(context, { @@ -235,25 +233,35 @@ def update(context, data): 'value': unicode(update.request.id), 'error': '%s: %s' % (e.__class__.__name__, unicode(e)), 'stack': traceback.format_exc(), - 'last_updated': datetime.now().isoformat() + 'last_updated': datetime.datetime.now().isoformat() }, log) raise -def _update(context, data): +def _update(context, resource): """ Link check and archive the given resource. + Records result in the task_status key='status'. + If successful, updates the resource with the cache_url & hash etc. - Returns a JSON dict: + Params: + resource - resource dict + Should only raise on a fundamental error: + ArchiverError + CkanError + + Returns a JSON dict, ready to be returned from the celery task giving a + success status: { 'resource': the updated resource dict, 'file_path': path to archived file (if archive successful), or None } + If not successful, returns None. """ log = update.get_logger() - data.pop(u'revision_id', None) - if not data: + resource.pop(u'revision_id', None) + if not resource: raise ArchiverError('Resource not found') # check that archive directory exists @@ -261,32 +269,66 @@ def _update(context, data): log.info("Creating archive directory: %s" % settings.ARCHIVE_DIR) os.mkdir(settings.ARCHIVE_DIR) - if hasattr(settings, 'DATA_FORMATS') and settings.DATA_FORMATS: - data_formats = settings.DATA_FORMATS - else: - data_formats = DEFAULT_DATA_FORMATS - - log.info("Attempting to download resource: %s" % data['url']) + # Get current task_status + status = get_status(context, resource['id'], log) + def _save_status(has_passed, status_txt, exception, status, resource_id): + last_success = status.get('last_success', '') + first_failure = status.get('first_failure', '') + failure_count = status.get('failure_count', 0) + if has_passed: + last_success = datetime.datetime.now().isoformat() + first_failure = '' + failure_count = 0 + reason = '' + else: + if not first_failure: + first_failure = datetime.datetime.now().isoformat() + failure_count += 1 + reason = '%s %s' % (exception, exception.args) + save_status(context, resource_id, status_txt, + reason, + last_success, first_failure, + failure_count, log) + + log.info("Attempting to download resource: %s" % resource['url']) result = None try: - result = download(context, data, data_formats=data_formats) + result = download(context, resource) if result is None: raise ArchiverError("Download failed") - except DownloadError, downloaderr: - log.info('Download failed: %r, %r', downloaderr, downloaderr.args) + except LinkInvalidError, e: + log.info('URL invalid: %r, %r', e, e.args) + _save_status(False, 'URL invalid', e, status, resource['id']) + return + except LinkHeadRequestError, e: + log.info('Link head request error: %r, %r', e, e.args) + _save_status(False, 'URL request failed', e, status, resource['id']) + return + except DownloadError, e: + log.info('Download failed: %r, %r', e, e.args) + _save_status(False, 'Download error', e, status, resource['id']) return except ChooseNotToDownload, e: log.info('Download not carried out: %r, %r', e, e.args) + _save_status(False, 'Chose not to download', e, status, resource['id']) return except Exception, downloaderr: - log.info('Download failure: %r, %r', downloaderr, downloaderr.args) + log.error('Uncaught download failure: %r, %r', downloaderr, downloaderr.args) + _save_status(False, 'Download failure', e, status, resource['id']) return - log.info("Attempting to archive resource: %s" % data['url']) - file_path = archive_resource(context, data, log, result) + log.info('Attempting to archive resource') + try: + file_path = archive_resource(context, resource, log, result) + except ArchiveError, e: + log.error('System error during archival: %r, %r', e, e.args) + _save_status(False, 'System error during archival', e, status, resource['id']) + return + # Success + _save_status(True, 'Archived successfully', '', status, resource['id']) return json.dumps({ - 'resource': data, + 'resource': resource, 'file_path': file_path }) @@ -370,8 +412,16 @@ def link_checker(context, data): def archive_resource(context, resource, log, result=None, url_timeout=30): """ - Archive the given resource. Downloads the file and updates the - resource with the link to it. + Archive the given resource. Moves the file from the temporary location + given in download() and updates the resource with the link to it. + + Params: + result - result of the download(), containing keys: length, saved_file + + If there is a failure, raises ArchiveError. + + Updates resource keys: cache_url, cache_last_updated, cache_filepath + Returns """ if result['length']: dir = os.path.join(settings.ARCHIVE_DIR, resource['id']) @@ -387,6 +437,7 @@ def archive_resource(context, resource, log, result=None, url_timeout=30): shutil.move(result['saved_file'], saved_file) os.chmod(saved_file, 0644) # allow other users to read it log.info('Archived resource as: %s', saved_file) + resource['cache_filepath'] = saved_file # update the resource object: set cache_url and cache_last_updated if context.get('cache_url_root'): @@ -395,14 +446,15 @@ def archive_resource(context, resource, log, result=None, url_timeout=30): ) if resource.get('cache_url') != cache_url: resource['cache_url'] = cache_url - resource['cache_last_updated'] = datetime.now().isoformat() + resource['cache_last_updated'] = datetime.datetime.now().isoformat() log.info('Updating resource with cache_url=%s', cache_url) _update_resource(context, resource, log) else: log.info('Not updating resource since cache_url is unchanged: %s', cache_url) else: - log.warning('Not updated resource because no value for cache_url_root') + log.warning('Not saved cache_url because no value for cache_url_root in config') + raise ArchiveError('No value for cache_url_root in config') return saved_file @@ -442,11 +494,15 @@ def _update_resource(context, resource, log): Use CKAN API to update the given resource. If cannot update, records this fact in the task_status table. + Params: + context - dict containing 'apikey' and 'site_url' + resource - dict of the resource containing + Returns the content of the response. """ api_url = urlparse.urljoin(context['site_url'], 'api/action') + '/resource_update' - resource['last_modified'] = datetime.now().isoformat() + resource['last_modified'] = datetime.datetime.now().isoformat() post_data = json.dumps(resource) res = requests.post( api_url, post_data, @@ -468,10 +524,17 @@ def _update_resource(context, resource, log): def update_task_status(context, data, log): """ - Use CKAN API to update the task status. The data parameter - should be a dict representing one row in the task_status table. + Use CKAN API to update the task status. + + Params: + context - dict containing 'site_url', 'site_user_apikey' + data - dict representing one row in the task_status table: + entity_id, entity_type, task_type, key, value, + error, stack, last_updated + + May raise CkanError if the request fails. - Returns the content of the response. + Returns the content of the response. """ api_url = urlparse.urljoin(context['site_url'], 'api/action') + '/task_status_update' post_data = json.dumps(data) @@ -491,4 +554,82 @@ def update_task_status(context, data, log): log.error('ckan failed to update task_status, status_code (%s), error %s. Maybe the API key or site URL are wrong?.\ncontext: %r\ndata: %r\nres: %r\nres.error: %r\npost_data: %r\napi_url: %r' % (res.status_code, content, context, data, res, res.error, post_data, api_url)) raise CkanError('ckan failed to update task_status, status_code (%s), error %s' % (res.status_code, content)) + log.info('Task status updated ok: %s=%s', key, value) + +def get_task_status(key, context, resource_id, log): + '''Gets a row from the task_status table as a dict including keys: + 'value', 'error', 'stack' + If the key isn\'t there, returns None.''' + api_url = urlparse.urljoin(context['site_url'], 'api/action') + '/task_status_show' + response = requests.post( + api_url, + json.dumps({'entity_id': resource_id, 'task_type': 'archiver', + 'key': key}), + headers={'Authorization': context['apikey'], + 'Content-Type': 'application/json'} + ) + if response.content: + try: + res_dict = json.loads(response.content) + except ValueError, e: + raise CkanError('CKAN response not JSON: %s', response.content) + else: + res_dict = {} + if response.status_code == 404 and res_dict['success'] == False: + return None + elif response.error: + log.error('Error getting %s. Error=%r\napi_url=%r\ncode=%r\ncontent=%r', + key, response.error, api_url, response.status_code, response.content) + raise CkanError('Error getting %s' % key) + elif res_dict['success']: + result = res_dict['result'] + else: + log.error('Error getting %s. Status=%r Error=%r\napi_url=%r', + key, response.status_code, response.content, api_url) + raise CkanError('Error getting %s' % key) + return result + +def get_status(context, resource_id, log): + '''Returns a dict of the current archiver 'status'. + (task status value where key='status') + May propagate CkanError if the request fails. + ''' + task_status = get_task_status('status', context, resource_id, log) + if task_status: + status = json.loads(task_status['error']) \ + if task_status['error'] else {} + status['value'] = task_status['value'] + log.info('Previous status checked ok: %s', status) + else: + status = {'value': '', 'reason': '', + 'last_success': '', 'first_failure': '', 'failure_count': 0} + log.info('Previous status blank - using default: %s', status) + return status + +def save_status(context, resource_id, status, reason, + last_success, first_failure, failure_count, log): + '''Writes to the task status table the result of a an attempt to download + the resource. + + May propagate a CkanError. + ''' + now = datetime.datetime.now().isoformat() + data = { + 'entity_id': resource_id, + 'entity_type': u'resource', + 'task_type': 'archiver', + 'key': u'status', + 'value': status, + 'error': json.dumps({ + 'reason': status, + 'last_success': last_success, + 'first_failure': first_failure, + 'failure_count': failure_count, + }), + 'last_updated': now + } + + update_task_status(context, data, log) + log.info('Saved status: %r reason=%r last_success=%r first_failure=%r failure_count=%r', + status, reason, last_success, first_failure, failure_count) diff --git a/test-core.ini b/test-core.ini new file mode 100644 index 00000000..422ddc9f --- /dev/null +++ b/test-core.ini @@ -0,0 +1,56 @@ +# +# ckan - Pylons testing environment configuration +# +# The %(here)s variable will be replaced with the parent directory of this file +# +[DEFAULT] +debug = true +# Uncomment and replace with the address which should receive any error reports +#email_to = you@yourdomain.com +smtp_server = localhost +error_email_from = paste@localhost + +[server:main] +use = egg:Paste#http +host = 0.0.0.0 +port = 5000 + + +[app:main] +use = config:../ckan/test-core.ini + +ckan.plugins = archiver +ckan.cache_url_root = http://localhost:50001/resources/ + +# Logging configuration +[loggers] +keys = root, ckan, sqlalchemy + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console + +[logger_ckan] +qualname = ckan +handlers = +level = INFO + +[logger_sqlalchemy] +handlers = +qualname = sqlalchemy.engine +level = WARN + +[handler_console] +class = StreamHandler +args = (sys.stdout,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s diff --git a/test.ini b/test.ini new file mode 100644 index 00000000..7c1094c4 --- /dev/null +++ b/test.ini @@ -0,0 +1,78 @@ +# +# ckan - Pylons testing environment configuration +# +# The %(here)s variable will be replaced with the parent directory of this file +# +[DEFAULT] +debug = true +# Uncomment and replace with the address which should receive any error reports +#email_to = you@yourdomain.com +smtp_server = localhost +error_email_from = paste@localhost + +[server:main] +use = egg:Paste#http +host = 0.0.0.0 +port = 5000 + + +[app:main] +use = config:test-core.ini +# Here we hard-code the database and a flag to make default tests +# run fast. +faster_db_test_hacks = True +sqlalchemy.url = sqlite:/// +# NB: other test configuration should go in test-core.ini, which is +# what the postgres tests use. + +# Logging configuration +[loggers] +keys = root, activity, harvest, ckan, ckanext, sqlalchemy + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console + +[logger_ckan] +qualname = ckan +handlers = console +level = DEBUG +propagate = 0 + +[logger_ckanext] +qualname = ckanext +handlers = console +level = DEBUG +propagate = 0 + +[logger_sqlalchemy] +handlers = console +qualname = sqlalchemy.engine +level = WARN + +[logger_harvest] +level = WARNING +handlers = console +qualname = ckanext.harvest +propagate = 0 + +[logger_activity] +level = WARNING +handlers = console +qualname = ckan.lib.activity +propagate = 0 + +[handler_console] +class = StreamHandler +args = (sys.stdout,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s diff --git a/tests/fake_ckan.py b/tests/fake_ckan.py index 08e7c16a..04d68b83 100644 --- a/tests/fake_ckan.py +++ b/tests/fake_ckan.py @@ -19,6 +19,15 @@ def task_status_update(): }) return 'ok' +@app.route("/api/action/task_status_show", methods=['GET', 'POST']) +def task_status_show(): + request_store.append({ + "data": request.json, + "headers": dict(request.headers) + }) + return jsonify({'success': True, + 'result': {'value': '', 'error': '', 'stack': ''}}) + @app.route("/api/action/resource_update", methods=['GET', 'POST']) def resource_update(): request_store.append({ diff --git a/tests/test_archiver.py b/tests/test_archiver.py index 53074b5c..c251956d 100644 --- a/tests/test_archiver.py +++ b/tests/test_archiver.py @@ -1,3 +1,4 @@ +import logging import os import shutil import tempfile @@ -28,6 +29,11 @@ from mock_remote_server import MockEchoTestServer +# enable celery logging for when you run nosetests -s +log = logging.getLogger('ckanext.archiver.tasks') +def get_logger(): + return log +update.get_logger = get_logger def with_mock_url(url=''): """ @@ -148,6 +154,7 @@ def setup_class(cls): 'site_url': cls.fake_ckan_url, 'apikey': u'fake_api_key', 'site_user_apikey': u'fake_site_user_api_key', + 'cache_url_root': 'http://localhost:50001/resources/', } cls.fake_resource = { 'id': u'fake_resource_id',