-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_tasks.py
118 lines (79 loc) · 2.76 KB
/
celery_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import time
from multiprocessing import Value
from typing import Tuple, List
from celery import Celery
from logic.main import check_imdb_user_watchlist, check_medialist
from logic.config import default_config
from logic.datatypes import ResultElement
broker_url = default_config.CELERY_BROKER_URL
result_url = default_config.CELERY_RESULT_URL
broker_pool_limit = default_config.CELERY_BROKER_POOL_LIMIT
print("Using %s for broker." % broker_url[:20])
app = Celery('celery_tasks', broker_url=broker_url, backend=result_url)
app.conf.update(broker_pool_limit=int(broker_pool_limit))
class ProgressTracker:
def __init__(self, task):
self._task = task
self._progress = None
self._total = None
self._message = ""
def start_work(self, message: str, total: int):
self._progress = Value('i', 0)
self._message = message
self._total = total
self.update()
def info(self, message: str):
self._message = message
self._total = None
self.update()
def progress(self, i: int):
self._progress.value += i
self.update()
def update(self):
message = self._message
if self._total:
message = "%s %d/%d" % (self._message, self._progress.value, self._total)
self._task.update_state(state="PROGRESS", meta={"message": message})
@app.task(bind=True)
def run_imdb_user_watchlist_check(self, url: str, location_code: str) -> dict:
pt = ProgressTracker(self)
pt.info("Starting ...")
url = url.strip()
response_dict = {"result": None}
response_dict["result"] = check_imdb_user_watchlist(url, location_code, pt)
pt.info("Finalizing ...")
return response_dict
@app.task(bind=True)
def check_medialist_task(self, name: str, location_code: str) -> dict:
pt = ProgressTracker(self)
pt.info("Starting ...")
response_dict = {"result": []}
response_dict["result"] = check_medialist(name, location_code, pt)
pt.info("Finalizing ...")
return response_dict
def get_state_by_id(id: str) -> Tuple[str, str]:
"""Return current task state by id.
Args:
id (str): task id
Returns:
Tuple[str, str]: state and message
"""
job = app.AsyncResult(id)
state = job.state
message = ""
if state == "PROGRESS":
info = job.info
message = info["message"]
elif state == "FAILURE":
error = job.info
message = str(error)
return state, message
def get_result_by_id(id: str) -> List[ResultElement]:
result = app.AsyncResult(id)
print("Trying to get results from '%s'" % id)
return result.get(timeout=10)
# small example for Celery
@app.task
def wait(seconds: int):
time.sleep(seconds)
return "Waited for %d seconds!" % int(seconds)