Skip to content

Commit

Permalink
Add job queue name to response (#380)
Browse files Browse the repository at this point in the history
* add job queue name to response

* lint

* Update src/actinia_core/models/response_models.py

Co-authored-by: Markus Neteler <[email protected]>

* black

* undo-black

Co-authored-by: Markus Neteler <[email protected]>
  • Loading branch information
mmacata and neteler authored Sep 22, 2022
1 parent b648b0e commit 60331d5
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/actinia_core/core/common/redis_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def enqueue_job(timeout, func, *args, queue_type_overwrite=None):
global job_queues, redis_conn
num_queues = global_config.NUMBER_OF_WORKERS
queue_type = global_config.QUEUE_TYPE
queue_name = "local"
if queue_type_overwrite:
queue_type = global_config.QUEUE_TYPE_OVERWRITE

Expand All @@ -136,6 +137,7 @@ def enqueue_job(timeout, func, *args, queue_type_overwrite=None):
__create_job_queue(queue_name)
for i in job_queues:
if i.name == queue_name:
args[0].set_queue_name(queue_name)
__enqueue_job_redis(i, timeout, func, *args)

elif queue_type == "redis":
Expand All @@ -147,10 +149,12 @@ def enqueue_job(timeout, func, *args, queue_type_overwrite=None):
# to chose for each job a different queue
num = redis_conn.incr("actinia_worker_count", 1)
current_queue = num % num_queues
args[0].set_queue_name(job_queues[current_queue].name)
__enqueue_job_redis(job_queues[current_queue], timeout, func, *args)

elif queue_type == "local":
# __enqueue_job_local(timeout, func, *args)
args[0].set_queue_name(queue_name)
enqueue_job_local(timeout, func, *args)
return
# Just in case the current process queue does not work
Expand Down
3 changes: 3 additions & 0 deletions src/actinia_core/core/resource_data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def set_user_data(self, user_data):
def set_request_data(self, request_data):
self.request_data = request_data

def set_queue_name(self, queue_name):
self.queue = queue_name

def set_storage_model_to_file(self):
self.storage_model = "file"

Expand Down
7 changes: 7 additions & 0 deletions src/actinia_core/models/response_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ class ProcessingResponseModel(Schema):
"type": "string",
"description": "The unique resource id",
},
"queue": {
"type": "string",
"description": "The name of the queue in which the job is queued",
},
"process_log": {
"type": "array",
"items": ProcessLogModel,
Expand Down Expand Up @@ -1186,6 +1190,7 @@ def create_response_from_model(
status=None,
user_id=None,
resource_id=None,
queue=None,
iteration=None,
process_log=None,
progress=None,
Expand Down Expand Up @@ -1220,6 +1225,7 @@ def create_response_from_model(
status (str): One of: accepted, running, finished, error
user_id (str): The user id
resource_id (str): The resource id
queue (str): The name of the queue
iteration (int): The iteration of the job
process_log (dict, str, list): The log from the running GRASS module
progress (ProgressInfoModel): Progress information
Expand Down Expand Up @@ -1258,6 +1264,7 @@ def create_response_from_model(
status=status,
user_id=user_id,
resource_id=resource_id,
queue=str(queue),
# iteration=iteration,
accept_timestamp=orig_time,
accept_datetime=orig_datetime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def _send_resource_update(self, message, results=None):
status="running",
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.rdc.queue,
iteration=self.iteration,
# process_log=self.module_output_log,
progress=self.progress,
Expand Down Expand Up @@ -319,6 +320,7 @@ def _send_resource_finished(self, message, results=None):
status="finished",
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.rdc.queue,
iteration=self.iteration,
process_log=self.module_output_log,
progress=self.progress,
Expand Down Expand Up @@ -348,6 +350,7 @@ def _send_resource_terminated(self, message, results=None):
status="terminated",
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.rdc.queue,
iteration=self.iteration,
process_log=self.module_output_log,
progress=self.progress,
Expand Down Expand Up @@ -376,6 +379,7 @@ def _send_resource_time_limit_exceeded(self, message, results=None):
status="terminated",
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.rdc.queue,
iteration=self.iteration,
process_log=self.module_output_log,
progress=self.progress,
Expand Down Expand Up @@ -404,6 +408,7 @@ def _send_resource_error(self, message, results=None, exception=None):
status="error",
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.rdc.queue,
iteration=self.iteration,
process_log=self.module_output_log,
progress=self.progress,
Expand Down
12 changes: 12 additions & 0 deletions src/actinia_core/rest/base/resource_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ def __init__(self, resource_id=None, iteration=None, post_url=None):
self.resource_id = resource_id
self.request_id = self.generate_request_id_from_resource_id()

if global_config.QUEUE_TYPE == "per_job":
self.queue = "%s_%s" % (
global_config.WORKER_QUEUE_PREFIX,
self.resource_id,
)
elif global_config.QUEUE_TYPE == "redis":
self.queue = "%s_%s" % (global_config.WORKER_QUEUE_PREFIX, "count")
else:
self.queue = "local"

# set iteration and post_url
self.iteration = iteration
self.post_url = post_url
Expand Down Expand Up @@ -186,6 +196,7 @@ def create_error_response(self, message, status="error", http_code=400):
status=status,
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.queue,
iteration=self.iteration,
process_log=None,
results={},
Expand Down Expand Up @@ -360,6 +371,7 @@ def preprocess(
status="accepted",
user_id=self.user_id,
resource_id=self.resource_id,
queue=self.queue,
iteration=self.iteration,
process_log=None,
results={},
Expand Down

0 comments on commit 60331d5

Please sign in to comment.