Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into extension-template
Browse files Browse the repository at this point in the history
Conflicts:
	setup.py
  • Loading branch information
David Read committed Oct 27, 2015
2 parents ebca07d + 2560b46 commit 3181876
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 85 deletions.
5 changes: 2 additions & 3 deletions ckanext/archiver/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,13 @@ def update(self):
if res.state == 'active']
self.log.info('Queuing dataset %s (%s resources)',
package.name, len(pkg_resources))
for resource in pkg_resources:
plugin.create_archiver_task(resource, self.options.queue)
plugin.create_archiver_package_task(package, self.options.queue)
time.sleep(0.1) # to try to avoid Redis getting overloaded

for resource in resources:
package = resource.resource_group.package
self.log.info('Queuing resource %s/%s', package.name, resource.id)
plugin.create_archiver_task(resource, self.options.queue)
plugin.create_archiver_resource_task(resource, self.options.queue)
time.sleep(0.05) # to try to avoid Redis getting overloaded

self.log.info('Completed queueing')
Expand Down
38 changes: 16 additions & 22 deletions ckanext/archiver/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,26 @@

log = logging.getLogger(__name__)


class ArchiverPlugin(p.SingletonPlugin):
"""
Registers to be notified whenever CKAN resources are created or their URLs
change, and will create a new ckanext.archiver celery task to archive the
resource.
"""
p.implements(p.IDomainObjectModification, inherit=True)
p.implements(p.IResourceUrlChange)
p.implements(IReport)
p.implements(p.IConfigurer, inherit=True)

# IDomainObjectModification / IResourceUrlChange
# IDomainObjectModification

def notify(self, entity, operation=None):
if not isinstance(entity, model.Resource):
if not isinstance(entity, model.Package):
return

log.debug('Notified of resource event: %s %s', entity.id, operation)

if operation:
# Only interested in 'new resource' events. Note that once this
# occurs, in tasks.py it will update the resource with the new
# cache_url, that will cause a 'change resource' notification,
# which we nee to ignore here.
if operation == model.DomainObjectOperation.new:
create_archiver_task(entity, 'priority')
else:
log.debug('Ignoring resource event because operation is: %s',
operation)
else:
# if operation is None, resource URL has been changed, as the
# notify function in IResourceUrlChange only takes 1 parameter
create_archiver_task(entity, 'priority')
log.debug('Notified of package event: %s %s', entity.id, operation)

create_archiver_package_task(entity, 'priority')

# IReport

Expand All @@ -57,15 +44,22 @@ def register_reports(self):
def update_config(self, config):
p.toolkit.add_template_directory(config, 'templates')

def create_archiver_task(resource, queue):
def create_archiver_resource_task(resource, queue):
from pylons import config
package = resource.resource_group.package
task_id = '%s/%s/%s' % (package.name, resource.id[:4], make_uuid()[:4])
ckan_ini_filepath = os.path.abspath(config.__file__)
celery.send_task('archiver.update', args=[ckan_ini_filepath, resource.id, queue],
ckan_ini_filepath = os.path.abspath(config['__file__'])
celery.send_task('archiver.update_resource', args=[ckan_ini_filepath, resource.id, queue],
task_id=task_id, queue=queue)
log.debug('Archival of resource put into celery queue %s: %s/%s url=%r', queue, package.name, resource.id, resource.url)

def create_archiver_package_task(package, queue):
from pylons import config
task_id = '%s/%s' % (package.name, make_uuid()[:4])
ckan_ini_filepath = os.path.abspath(config['__file__'])
celery.send_task('archiver.update_package', args=[ckan_ini_filepath, package.id, queue],
task_id=task_id, queue=queue)
log.debug('Archival of package put into celery queue %s: %s', queue, package.name)

class TestIPipePlugin(p.SingletonPlugin):
"""
Expand Down
87 changes: 66 additions & 21 deletions ckanext/archiver/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from requests.packages import urllib3

from ckan.lib.celery_app import celery
from ckan import plugins
try:
from ckanext.archiver import settings
except ImportError:
Expand Down Expand Up @@ -77,21 +76,22 @@ class CkanError(ArchiverError):
pass


@celery.task(name="archiver.update")
def update(ckan_ini_filepath, resource_id, queue='bulk'):
@celery.task(name="archiver.update_resource")
def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
'''
Archive a resource.
'''
log = update.get_logger()
log.info('Starting update task: res_id=%r queue=%s', resource_id, queue)
log = update_resource.get_logger()
log.info('Starting update_resource task: res_id=%r queue=%s', resource_id, queue)

# HACK because of race condition #1481
time.sleep(2)

# Do all work in a sub-routine since it can then be tested without celery.
# Also put try/except around it since we don't trust celery to log errors well.
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
result = _update(ckan_ini_filepath, resource_id, queue)
result = _update_resource(ckan_ini_filepath, resource_id, queue)
return result
except Exception, e:
if os.environ.get('DEBUG'):
Expand All @@ -101,7 +101,42 @@ def update(ckan_ini_filepath, resource_id, queue='bulk'):
e, resource_id)
raise

def _update(ckan_ini_filepath, resource_id, queue):
@celery.task(name="archiver.update_package")
def update_package(ckan_ini_filepath, package_id, queue='bulk'):
'''
Archive a package.
'''
from ckan import model
from ckan.logic import get_action

load_config(ckan_ini_filepath)
register_translator()

log = update_package.get_logger()
log.info('Starting update_package task: package_id=%r queue=%s', package_id, queue)

# Do all work in a sub-routine since it can then be tested without celery.
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
package = get_action('package_show')(context_, {'id': package_id})

for resource in package['resources']:
resource_id = resource['id']
_update_resource(ckan_ini_filepath, resource_id, queue)
except Exception, e:
if os.environ.get('DEBUG'):
raise
# Any problem at all is logged and reraised so that celery can log it too
log.error('Error occurred during archiving package: %s\nPackage: %r %r',
e, package_id, package['name'] if package in dir() else '')
raise

notify_package(package, queue, ckan_ini_filepath)


def _update_resource(ckan_ini_filepath, resource_id, queue):
"""
Link check and archive the given resource.
If successful, updates the archival table with the cache_url & hash etc.
Expand All @@ -123,7 +158,7 @@ def _update(ckan_ini_filepath, resource_id, queue):
}
If not successful, returns None.
"""
log = update.get_logger()
log = update_resource.get_logger()

load_config(ckan_ini_filepath)
register_translator()
Expand All @@ -147,9 +182,10 @@ def _save(status_id, exception, resource, url_redirected_to=None,
reason, url_redirected_to,
download_result, archive_result,
log)
notify(resource,
queue,
archive_result.get('cache_filename') if archive_result else None)
notify_resource(
resource,
queue,
archive_result.get('cache_filename') if archive_result else None)

# Download
log.info("Attempting to download resource: %s" % resource['url'])
Expand Down Expand Up @@ -240,8 +276,7 @@ def download(context, resource, url_timeout=30,
Returns a dict of results of a successful download:
mimetype, size, hash, headers, saved_file, url_redirected_to
'''

log = update.get_logger()
log = update_resource.get_logger()

url = resource['url']

Expand Down Expand Up @@ -332,7 +367,7 @@ def get_content():
return {'mimetype': mimetype,
'size': length,
'hash': hash,
'headers': res.headers,
'headers': dict(res.headers),
'saved_file': saved_file_path,
'url_redirected_to': url_redirected_to,
'request_type': method}
Expand Down Expand Up @@ -385,14 +420,24 @@ def archive_resource(context, resource, log, result=None, url_timeout=30):
'cache_url': cache_url}


def notify(resource, queue, cache_filepath):
def notify_resource(resource, queue, cache_filepath):
'''
Broadcasts a notification that an archival has taken place (or at least
Broadcasts a notification that an resource archival has taken place (or at least
the archival object is changed somehow). e.g. ckanext-qa listens for this
'''
archiver_interfaces.IPipe.send_data('archived',
resource_id=resource['id'],
queue=queue,
queue=queue,
cache_filepath=cache_filepath)

def notify_package(package, queue, cache_filepath):
'''
Broadcasts a notification that a package archival has taken place (or at least
the archival object is changed somehow). e.g. ckanext-packagezip listens for this
'''
archiver_interfaces.IPipe.send_data('package-archived',
package_id=package['id'],
queue=queue,
cache_filepath=cache_filepath)


Expand Down Expand Up @@ -616,7 +661,7 @@ def api_request(context, resource):
and get a valid response. If it does it returns the response, otherwise
Archives the response and stores what sort of request elicited it.
'''
log = update.get_logger()
log = update_resource.get_logger()
# 'resource' holds the results of the download and will get saved. Only if
# an API request is successful do we want to save the details of it.
# However download() gets altered for these API requests. So only give
Expand Down Expand Up @@ -691,7 +736,7 @@ def link_checker(context, data):
Returns a json dict of the headers of the request
"""
log = update.get_logger()
log = update_resource.get_logger()
data = json.loads(data)
url_timeout = data.get('url_timeout', 30)

Expand Down Expand Up @@ -731,6 +776,6 @@ def link_checker(context, data):
error_message = 'Server returned HTTP error status: %s %s' % \
(res.status_code, res.reason)
raise LinkHeadRequestError(error_message)
return json.dumps(headers)
return json.dumps(dict(headers))


2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
'kombu==2.1.3',
'kombu-sqlalchemy==1.1.0',
'SQLAlchemy>=0.6.6',
'requests==1.1.0',
'requests>=1.1.0',
'flask==0.8' # flask needed for tests
],

Expand Down
2 changes: 2 additions & 0 deletions test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use = egg:Paste#http
host = 0.0.0.0
port = 5000

[app:celery]
CELERY_ALWAYS_EAGER = True

[app:main]
use = config:test-core.ini
Expand Down
Loading

0 comments on commit 3181876

Please sign in to comment.