Skip to content

Commit

Permalink
Merge pull request #138 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds task result update
  • Loading branch information
Koed00 committed Jan 24, 2016
2 parents 5b3e69d + 9741efd commit b8521f6
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 22 deletions.
35 changes: 22 additions & 13 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,19 +408,28 @@ def save_task(task, broker):
try:
if task['success'] and 0 < Conf.SAVE_LIMIT <= Success.objects.count():
Success.objects.last().delete()
Task.objects.update_or_create(id=task['id'],
name=task['name'],
defaults={
'func': task['func'],
'hook': task.get('hook'),
'args': task['args'],
'kwargs': task['kwargs'],
'started': task['started'],
'stopped': task['stopped'],
'result': task['result'],
'group': task.get('group'),
'success': task['success']}
)
# check if this task has previous results
if Task.objects.filter(id=task['id'], name=task['name']).exists():
existing_task = Task.objects.get(id=task['id'], name=task['name'])
# only update the result if it hasn't succeeded yet
if not existing_task.success:
existing_task.stopped = task['stopped']
existing_task.result = task['result']
existing_task.success = task['success']
existing_task.save()
else:
Task.objects.create(id=task['id'],
name=task['name'],
func=task['func'],
hook=task.get('hook'),
args=task['args'],
kwargs=task['kwargs'],
started=task['started'],
stopped=task['stopped'],
result=task['result'],
group=task.get('group'),
success=task['success']
)
except Exception as e:
logger.error(e)

Expand Down
54 changes: 50 additions & 4 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import sys
from multiprocessing import Queue, Event, Value
import threading
from multiprocessing import Queue, Event, Value
from time import sleep
import os
from django.utils import timezone

import os
import pytest

myPath = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, myPath + '/../')

from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor
from django_q.humanhash import DEFAULT_WORDLIST
from django_q.cluster import Cluster, Sentinel, pusher, worker, monitor, save_task
from django_q.humanhash import DEFAULT_WORDLIST, uuid
from django_q.tasks import fetch, fetch_group, async, result, result_group, count_group, delete_group, queue_size
from django_q.models import Task, Success
from django_q.conf import Conf
Expand Down Expand Up @@ -340,6 +341,51 @@ def test_bad_secret(broker, monkeypatch):
broker.delete_queue()


@pytest.mark.django_db
def test_update_failed(broker):
tag = uuid()
task = {'id': tag[1],
'name': tag[0],
'func': 'math.copysign',
'args': (1, -1),
'kwargs': {},
'started': timezone.now(),
'stopped': timezone.now(),
'success': False,
'result': None}
# initial save - no success
save_task(task, broker)
assert Task.objects.filter(id=task['id']).exists()
saved_task = Task.objects.get(id=task['id'])
assert saved_task.success is False
sleep(0.5)
# second save - no success
old_stopped = task['stopped']
task['stopped']=timezone.now()
save_task(task, broker)
saved_task = Task.objects.get(id=task['id'])
assert saved_task.stopped > old_stopped
# third save - success
task['stopped']=timezone.now()
task['result']='result'
task['success']=True
save_task(task, broker)
saved_task = Task.objects.get(id=task['id'])
assert saved_task.success is True
# fourth save - no success
task['result'] = None
task['success'] = False
task['stopped'] = old_stopped
save_task(task, broker)
# should not overwrite success
saved_task = Task.objects.get(id=task['id'])
assert saved_task.success is True
assert saved_task.result == 'result'





@pytest.mark.django_db
def assert_result(task):
assert task is not None
Expand Down
7 changes: 5 additions & 2 deletions docs/brokers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ Some pointers:
* Don't set the :ref:`retry` timer to a lower or equal number than the task timeout.
* Retry time includes time the task spends waiting in the clusters internal queue.
* Don't set the :ref:`queue_limit` so high that tasks time out while waiting to be processed.
* In case a task is worked on twice, you will see a duplicate key error in the cluster logs.
* Duplicate tasks do generate additional receipt messages, but the result is discarded in favor of the first result.
* In case a task is worked on twice, the task result will be updated with the latest results.
* In some rare cases a non-atomic broker will re-queue a task after it has been acknowledged.
* If a task runs twice and a previous run has succeeded, the new result wil be discarded.
* Limiting the number of retries is handled globally in your actual broker's settings.


Support for more brokers is being worked on.

Expand Down
5 changes: 2 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
arrow==0.7.0
blessed==1.14.1
boto3==1.2.3
botocore==1.3.20 # via boto3
botocore==1.3.22 # via boto3
django-picklefield==0.3.2
django-redis==4.3.0
docutils==0.12 # via botocore
future==0.15.2
futures==3.0.4 # via boto3
hiredis==0.2.0
iron-core==1.2.0 # via iron-mq
iron-mq==0.8
jmespath==0.9.0 # via boto3, botocore
psutil==3.4.1
psutil==3.4.2
pymongo==3.2
python-dateutil==2.4.2 # via arrow, botocore, iron-core
redis==2.10.5
Expand Down

0 comments on commit b8521f6

Please sign in to comment.