Skip to content

Commit

Permalink
fix pep8 issues
Browse files Browse the repository at this point in the history
  • Loading branch information
alanhamlett committed Aug 14, 2018
1 parent cfd2eaa commit 18a11c9
Showing 1 changed file with 37 additions and 13 deletions.
50 changes: 37 additions & 13 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,26 @@
import redis
import time

from ._internal import *
from ._internal import (
ACTIVE,
QUEUED,
ERROR,
TaskImportError,
SCHEDULED,
g,
gen_id,
gen_unique_id,
get_timestamp,
import_attribute,
serialize_func_name,
serialize_retry_method,
)
from .exceptions import TaskNotFound


__all__ = ['Task']


class Task(object):
def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None,
hard_timeout=None, unique=None, lock=None, lock_key=None,
Expand Down Expand Up @@ -209,27 +224,36 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None):
if not when:
when = time.time()
if mode:
scripts.zadd(_key(to_state, queue), when, self.id,
mode, client=pipeline)
scripts.zadd(
_key(to_state, queue),
when,
self.id,
mode,
client=pipeline,
)
else:
pipeline.zadd(_key(to_state, queue), self.id, when)
pipeline.sadd(_key(to_state), queue)
pipeline.zrem(_key(from_state, queue), self.id)

if not to_state: # Remove the task if necessary
if not to_state: # Remove the task if necessary
if self.unique:
# Only delete if it's not in any other queue
check_states = set([ACTIVE, QUEUED, ERROR, SCHEDULED])
check_states.remove(from_state)
# TODO: Do the following two in one call.
scripts.delete_if_not_in_zsets(_key('task', self.id, 'executions'),
self.id, [
_key(state, queue) for state in check_states
], client=pipeline)
scripts.delete_if_not_in_zsets(_key('task', self.id),
self.id, [
_key(state, queue) for state in check_states
], client=pipeline)
scripts.delete_if_not_in_zsets(
_key('task', self.id, 'executions'),
self.id,
[_key(state, queue) for state in check_states],
client=pipeline,
)
scripts.delete_if_not_in_zsets(
_key('task', self.id),
self.id,
[_key(state, queue) for state in check_states],
client=pipeline,
)
else:
# Safe to remove
pipeline.delete(_key('task', self.id, 'executions'))
Expand Down Expand Up @@ -370,7 +394,7 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000,
key = tiger._key(state, queue)
pipeline = tiger.connection.pipeline()
pipeline.zcard(key)
pipeline.zrange(key, -limit-skip, -1-skip, withscores=True)
pipeline.zrange(key, -limit - skip, -1 - skip, withscores=True)
n, items = pipeline.execute()

tasks = []
Expand Down

0 comments on commit 18a11c9

Please sign in to comment.