diff --git a/diracx-routers/src/diracx/routers/jobs/status.py b/diracx-routers/src/diracx/routers/jobs/status.py index ab9048ee..e349d31d 100644 --- a/diracx-routers/src/diracx/routers/jobs/status.py +++ b/diracx-routers/src/diracx/routers/jobs/status.py @@ -3,7 +3,7 @@ import logging from datetime import datetime from http import HTTPStatus -from typing import Annotated +from typing import Annotated, Any from fastapi import BackgroundTasks, HTTPException, Query @@ -11,6 +11,8 @@ JobStatusUpdate, SetJobStatusReturn, ) +from diracx.db.sql.job.db import _get_columns +from diracx.db.sql.job.schema import Jobs from diracx.db.sql.utils.job import ( remove_jobs, reschedule_jobs_bulk, @@ -21,6 +23,7 @@ Config, JobDB, JobLoggingDB, + JobParametersDB, SandboxMetadataDB, TaskQueueDB, ) @@ -135,3 +138,50 @@ async def reschedule_bulk_jobs( # self.__sendJobsToOptimizationMind(validJobList) return resched_jobs + + +@router.patch("/metadata") +async def set_job_parameters_or_attributes( + updates: dict[int, dict[str, Any]], + job_db: JobDB, + job_parameters_db: JobParametersDB, + check_permissions: CheckWMSPolicyCallable, +): + await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=updates) + possible_attribute_columns = [ + name.lower() for name in _get_columns(Jobs.__table__, None) + ] + + attr_updates = {} + param_updates = {} + + for job_id, metadata in updates.items(): + # check if this is setting an attribute in the JobDB + attr_updates[job_id] = { + pname: pvalue + for pname, pvalue in metadata.items() + if pname.lower() in possible_attribute_columns + } + # else set elastic parameters DB + param_updates[job_id] = [ + (pname, pvalue) + for pname, pvalue in metadata.items() + if pname.lower() not in possible_attribute_columns + ] + # bulk set job attributes + await job_db.set_job_attributes_bulk(attr_updates) + + # TODO: can we upsert to multiple documents? + for job_id, updates in param_updates.items(): + await job_parameters_db.upsert( + int(job_id), + updates, + ) + + return { + job_id: { + "attributes": attr_updates[job_id], + "parameters": param_updates[job_id], + } + for job_id in updates + }