Skip to content

Commit

Permalink
start writing metadata patch endpoint
Browse files Browse the repository at this point in the history
[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

add policy

Missing awaits and return results

adjust returned output
  • Loading branch information
ryuwd committed Feb 7, 2025
1 parent 19913ec commit 1f2de58
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion diracx-routers/src/diracx/routers/jobs/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import logging
from datetime import datetime
from http import HTTPStatus
from typing import Annotated
from typing import Annotated, Any

from fastapi import BackgroundTasks, HTTPException, Query

from diracx.core.models import (
JobStatusUpdate,
SetJobStatusReturn,
)
from diracx.db.sql.job.db import _get_columns
from diracx.db.sql.job.schema import Jobs
from diracx.db.sql.utils.job import (
remove_jobs,
reschedule_jobs_bulk,
Expand All @@ -21,6 +23,7 @@
Config,
JobDB,
JobLoggingDB,
JobParametersDB,
SandboxMetadataDB,
TaskQueueDB,
)
Expand Down Expand Up @@ -135,3 +138,50 @@ async def reschedule_bulk_jobs(
# self.__sendJobsToOptimizationMind(validJobList)

return resched_jobs


@router.patch("/metadata")
async def set_job_parameters_or_attributes(
updates: dict[int, dict[str, Any]],
job_db: JobDB,
job_parameters_db: JobParametersDB,
check_permissions: CheckWMSPolicyCallable,
):
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=updates)
possible_attribute_columns = [
name.lower() for name in _get_columns(Jobs.__table__, None)
]

attr_updates = {}
param_updates = {}

for job_id, metadata in updates.items():
# check if this is setting an attribute in the JobDB
attr_updates[job_id] = {
pname: pvalue
for pname, pvalue in metadata.items()
if pname.lower() in possible_attribute_columns
}
# else set elastic parameters DB
param_updates[job_id] = [
(pname, pvalue)
for pname, pvalue in metadata.items()
if pname.lower() not in possible_attribute_columns
]
# bulk set job attributes
await job_db.set_job_attributes_bulk(attr_updates)

# TODO: can we upsert to multiple documents?
for job_id, updates in param_updates.items():
await job_parameters_db.upsert(
int(job_id),
updates,
)

return {
job_id: {
"attributes": attr_updates[job_id],
"parameters": param_updates[job_id],
}
for job_id in updates
}

0 comments on commit 1f2de58

Please sign in to comment.