Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom serializers #22

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tasktiger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,17 @@ def __init__(self, connection=None, config=None, setup_structlog=False):

# If non-empty, a worker only processeses the given queues.
'ONLY_QUEUES': [],

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a quick comment just like the other settings.

'SERIALIZER': json.dumps,

'DESERIALIZER': json.loads
}
if config:
self.config.update(config)

self._serialize_data = self.config['SERIALIZER']
self._deserialize_data = self.config['DESERIALIZER']

self.connection = connection or redis.Redis(decode_responses=True)
self.scripts = RedisScripts(self.connection)

Expand Down
15 changes: 7 additions & 8 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import json
import redis
import time

Expand All @@ -12,7 +11,7 @@ class Task(object):
def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None,
hard_timeout=None, unique=None, lock=None, lock_key=None,
retry=None, retry_on=None, retry_method=None,
_data=None, _state=None, _ts=None, _executions=None):
_data=None, _state=None, _ts=None, _executions=None, deserialize=json.loads, serialize=json.dumps):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove these.

"""
Queues a task. See README.rst for an explanation of the options.
"""
Expand Down Expand Up @@ -269,7 +268,7 @@ def delay(self, when=None):

# When using ALWAYS_EAGER, make sure we have serialized the task to
# ensure there are no serialization errors.
serialized_task = json.dumps(self._data)
serialized_task = self.tiger._serialize_data(self._data)

if tiger.config['ALWAYS_EAGER'] and state == QUEUED:
return self.execute()
Expand Down Expand Up @@ -330,8 +329,8 @@ def from_id(self, tiger, queue, state, task_id, load_executions=0):
serialized_executions = []
# XXX: No timestamp for now
if serialized_data:
data = json.loads(serialized_data)
executions = [json.loads(e) for e in serialized_executions if e]
data = tiger._deserialize_data(serialized_data)
executions = [tiger._deserialize_data(e) for e in serialized_executions if e]
return Task(tiger, queue=queue, _data=data, _state=state,
_executions=executions)
else:
Expand Down Expand Up @@ -369,8 +368,8 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000,
results = pipeline.execute()

for serialized_data, serialized_executions, ts in zip(results[0], results[1:], tss):
data = json.loads(serialized_data)
executions = [json.loads(e) for e in serialized_executions if e]
data = tiger._deserialize_data(serialized_data)
executions = [tiger._deserialize_data.loads(e) for e in serialized_executions if e]

task = Task(tiger, queue=queue, _data=data, _state=state,
_ts=ts, _executions=executions)
Expand All @@ -379,7 +378,7 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000,
else:
data = tiger.connection.mget([tiger._key('task', item[0]) for item in items])
for serialized_data, ts in zip(data, tss):
data = json.loads(serialized_data)
data = tiger._deserialize_data(serialized_data)
task = Task(tiger, queue=queue, _data=data, _state=state,
_ts=ts)
tasks.append(task)
Expand Down
7 changes: 3 additions & 4 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections import OrderedDict
import errno
import json
import os
import random
import select
Expand Down Expand Up @@ -242,7 +241,7 @@ def _execute_forked(self, tasks, log):
''.join(traceback.format_exception(*exc_info)) if exc_info != (None, None, None) else None
execution['success'] = success
execution['host'] = socket.gethostname()
serialized_execution = json.dumps(execution)
serialized_execution = self.tiger._serialize_data(execution)
for task in tasks:
self.connection.rpush(self._key('task', task.id, 'executions'),
serialized_execution)
Expand Down Expand Up @@ -359,7 +358,7 @@ def _process_from_queue(self, queue):
tasks = []
for task_id, serialized_task in zip(task_ids, serialized_tasks):
if serialized_task:
task_data = json.loads(serialized_task)
task_data = self.tiger._deserialize_data(serialized_task)
else:
task_data = {}
task = Task(self.tiger, queue=queue, _data=task_data,
Expand Down Expand Up @@ -492,7 +491,7 @@ def _mark_done():
self._key('task', task.id, 'executions'), -1)

if execution:
execution = json.loads(execution)
execution = self.tiger._deserialize_data(execution)

if execution.get('retry'):
if 'retry_method' in execution:
Expand Down
19 changes: 18 additions & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pickle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preferably don't want to mention pickle in this package since it suggests that this would be a sane choice. A better example would be JSON with a custom JSONEncoder/JSONDecoder. This would then also ensure that the given serialization is actually being used (since the default encoder would throw an error).

import datetime
import json
from multiprocessing import Pool
Expand Down Expand Up @@ -32,7 +33,7 @@ def _ensure_queue(typ, data):
for name, n in data.items():
task_ids = self.conn.zrange('t:%s:%s' % (typ, name), 0, -1)
self.assertEqual(len(task_ids), n)
ret[name] = [json.loads(self.conn.get('t:task:%s' % task_id))
ret[name] = [self.tiger._deserialize_data(self.conn.get('t:task:%s' % task_id))
for task_id in task_ids]
self.assertEqual(list(task['id'] for task in ret[name]),
task_ids)
Expand All @@ -46,6 +47,22 @@ def _ensure_queue(typ, data):
'scheduled': _ensure_queue('scheduled', scheduled),
}


class CustomSerializerTestCase(BaseTestCase):
def setUp(self):
self.tiger = get_tiger(SERIALIZER=lambda x: pickle.dumps(x).decode('latin1'), DESERIALIZER=lambda x: pickle.loads(x.encode('latin1')))
self.conn = self.tiger.connection
self.conn.flushdb()

def test_simple_task(self):
self.tiger.delay(simple_task, queue='custom_ser')
queues = self._ensure_queues(queued={'custom_ser': 1})
task = queues['queued']['custom_ser'][0]
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={'custom_ser': 0})
self.assertFalse(self.conn.exists('t:task:%s' % task['id']))


class TestCase(BaseTestCase):
"""
TaskTiger main test cases.
Expand Down
8 changes: 5 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from .config import *

def get_tiger():
def get_tiger(**kwargs):
"""
Sets up logging and returns a new tasktiger instance.
"""
Expand All @@ -15,7 +15,7 @@ def get_tiger():
)
logging.basicConfig(format='%(message)s')
conn = redis.Redis(db=TEST_DB, decode_responses=True)
tiger = TaskTiger(connection=conn, config={
config = {
# We need this 0 here so we don't pick up scheduled tasks when
# doing a single worker run.
'SELECT_TIMEOUT': 0,
Expand All @@ -27,7 +27,9 @@ def get_tiger():
'BATCH_QUEUES': {
'batch': 3,
}
})
}
config.update(kwargs)
tiger = TaskTiger(connection=conn, config=config)
tiger.log.setLevel(logging.CRITICAL)
return tiger

Expand Down