Skip to content

Commit

Permalink
Handle retry_on failures (#111)
Browse files Browse the repository at this point in the history
Rather than crashing the worker, mark the task as failed.
  • Loading branch information
thomasst authored Jul 24, 2018
1 parent 789502d commit add21af
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
11 changes: 8 additions & 3 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,18 @@ def retry_method(self):
def retry_on(self):
return self._data.get('retry_on')

def should_retry_on(self, exception_class):
def should_retry_on(self, exception_class, logger=None):
"""
Whether this task should be retried when the given exception occurs.
"""
for n in (self.retry_on or []):
if issubclass(exception_class, import_attribute(n)):
return True
try:
if issubclass(exception_class, import_attribute(n)):
return True
except TaskImportError:
if logger:
logger.error('should_retry_on could not import class',
exception_name=n)
return False

@property
Expand Down
4 changes: 2 additions & 2 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def _worker_queue_expired_tasks(self):
self._did_work = True
try:
task = Task.from_id(self.tiger, queue, ACTIVE, task_id)
if task.should_retry_on(JobTimeoutException):
if task.should_retry_on(JobTimeoutException, logger=self.log):
self.log.info('queueing expired task',
queue=queue, task_id=task_id)

Expand Down Expand Up @@ -762,7 +762,7 @@ def _mark_done():
log.error('could not import exception',
exception_name=exception_name)
else:
if task.should_retry_on(exception_class):
if task.should_retry_on(exception_class, logger=log):
should_retry = True
else:
should_retry = True
Expand Down
15 changes: 15 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,21 @@ def test_retry_on_3(self):
scheduled={'default': 1},
error={'default': 0})

def test_retry_on_invalid(self):
"""
Ensure we handle exceptions that can't be imported.
"""
class CustomException(Exception):
"""
Since this is an inline exception, it's not possible to import it
via dotted path.
"""
self.tiger.delay(exception_task, retry_on=[CustomException])
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={'default': 0},
scheduled={'default': 0},
error={'default': 1})

def test_retry_method(self):
task = self.tiger.delay(exception_task,
retry_method=linear(DELAY, DELAY, 3))
Expand Down

0 comments on commit add21af

Please sign in to comment.