Skip to content

Commit

Permalink
Adding endpoint to get job output (#18)
Browse files Browse the repository at this point in the history
* adding endpoint to get job output
* support to flux-restful-cli in Python for the same endpoints
* ensure we add test for log to authenticated set
* submit jobs form in UI
* finishing up early work for jobs table and info pages
* finishing up tweaks to add to python client

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch authored Nov 13, 2022
1 parent 02ea81d commit 37d84b4
Show file tree
Hide file tree
Showing 45 changed files with 1,593 additions and 203 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ jobs:
- name: Run tests
run: |
flux start pytest -xs tests/test_api.py
flux start pytest -xs tests/test_api_auth.py
export TEST_AUTH=true
flux start pytest -xs tests/test_api.py
32 changes: 31 additions & 1 deletion app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,43 @@
from pydantic import BaseSettings


def get_int_envar(key, default=None):
"""
Get (and parse) an integer environment variable
"""
value = os.environ.get(key)
if not value:
value = default
try:
value = int(value)
return value
except Exception:
return default


def get_bool_envar(key, default=False):
"""
Get a boolean from the environment, meaning the value is set.
"""
return default if not os.environ.get(key) else not default


class Settings(BaseSettings):
"""
Basic settings and defaults for the Flux RESTFul API
"""

app_name: str = "Flux RESTFul API"

# These map to envars, e.g., FLUX_USER
has_gpus: bool = get_bool_envar("FLUX_HAS_GPUS")

# Assume there is at least one node!
flux_nodes: int = get_int_envar("FLUX_NUMBER_NODES", 1)

flux_user: str = os.environ.get("FLUX_USER")
flux_token: str = os.environ.get("FLUX_TOKEN")
require_auth: bool = False if not os.environ.get("FLUX_REQUIRE_AUTH") else True
require_auth: bool = get_bool_envar("FLUX_REQUIRE_AUTH")


settings = Settings()
58 changes: 58 additions & 0 deletions app/forms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import List, Optional

from fastapi import Request

import app.library.flux as flux_cli


class SubmitForm:
def __init__(self, request: Request):
self.request: Request = request
self.errors: List = []
self.command: str
self.workdir: Optional[str] = None
self.num_tasks: Optional[int] = None
self.num_nodes: Optional[int] = None
self.runtime: Optional[int] = None
self.cores_per_task: Optional[int] = None
self.gpus_per_task: Optional[int] = None
self.exclusive: Optional[bool] = False

# STOPPED HERE - serialize in jquery from form, submit as application/json.
async def load_data(self):
form = await self.request.form()
self.command = form.get("command")
self.workdir = form.get("workdir") or None
self.num_tasks = form.get("num_tasks") or 1
self.num_nodes = form.get("num_nodes") or 1
self.runtime = form.get("runtime") or 0
self.cores_per_task = form.get("cores_per_task") or None
self.gpus_per_task = form.get("gpus_per_task") or None
self.exclusive = form.get("exclusive") or False

@property
def kwargs(self):
"""
Prepared key value dictionary of items.
"""
kwargs = {}
for key in [
"command",
"num_tasks",
"num_nodes",
"cores_per_task",
"gpys_per_task",
"exclusive",
]:
if getattr(self, key, None) is not None:
kwargs[key] = getattr(self, key)
return kwargs

def is_valid(self):
"""
Determine if the form is valid (devoid of errors)
"""
self.errors = flux_cli.validate_submit_kwargs(self.kwargs, runtime=self.runtime)
if not self.errors:
return True
return False
190 changes: 190 additions & 0 deletions app/library/flux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import json
import os
import re
import shlex

import flux
import flux.job

from app.core.config import settings


def validate_submit_kwargs(kwargs, envars=None, runtime=None):
"""
Shared function to validate submit, from API or web UI.
Kwargs are expected to be given to JobspecV1, and
everything else is added to the fluxjob.
"""
errors = []
if "command" not in kwargs or not kwargs["command"]:
errors.append("'command' is required.")

# We can't ask for more nodes than available!
num_nodes = kwargs.get("num_nodes")
if num_nodes and num_nodes > settings.flux_nodes:
errors.append(
f"The server only has {settings.flux_nodes} nodes, you requested {num_nodes}"
)

# If the user asks for gpus and we don't have any, no go
if "gpus_per_task" in kwargs and not settings.has_gpus:
errors.append("This server does not support gpus: gpus_per_task cannot be set.")

# Minimum value of zero
if runtime and runtime < 0:
errors.append(f"Runtime must be >= 0, found {runtime}")

# Minimum values of 1
for key in ["cpus_per_task", "gpus_per_task"]:
if key in kwargs and kwargs[key] < 1:
errors.append(f"Parameter {key} must be >= 1")

if envars and not isinstance(envars, dict):
errors.append("Environment variables must be key/value pairs (dict)")
return errors


def prepare_job(kwargs, runtime=0, workdir=None, envars=None):
"""
After validation, prepare the job (shared function).
"""
envars = envars or {}

# Generate the flux job
command = kwargs["command"]
if isinstance(command, str):
command = shlex.split(command)

# Delete command from the kwargs (we added because is required and validated that way)
del kwargs["command"]
fluxjob = flux.job.JobspecV1.from_command(command, **kwargs)

if workdir is not None:
fluxjob.workdir = workdir

# A duration of zero (the default) means unlimited
fluxjob.duration = runtime

# Additional envars in the payload?
environment = dict(os.environ)
environment.update(envars)
fluxjob.environment = environment
return fluxjob


def query_job(jobinfo, query):
"""
This would be better suited for a database, but should work for small numbers.
"""
searchstr = "".join([str(x) for x in list(jobinfo.values())])
return re.search(query, searchstr)


def query_jobs(contenders, query):
"""
Wrapper to query more than one job.
"""
jobs = []
for contender in contenders:
if not query_job(contender, query):
continue
jobs.append(contender)
return jobs


def get_job_output(jobid):
"""
Given a jobid, get the output.
"""
lines = []
from app.main import app

# If the submit is too close to the log reqest, it cannot find the file handle
# It could be also the jobid cannot be found.
try:
for line in flux.job.event_watch(app.handle, jobid, "guest.output"):
if "data" in line.context:
lines.append(line.context["data"])
except Exception:
pass
return lines


def list_jobs_detailed(limit=None, query=None):
"""
Get a detailed listing of jobs.
"""
listing = list_jobs()
ids = listing.get()["jobs"]
jobs = {}
for job in ids:

# Stop if a limit is defined and we have hit it!
if limit is not None and len(jobs) >= limit:
break

try:
jobinfo = get_job(job["id"])

# Best effort hack to do a query
if query and not query_job(jobinfo, query):
continue
jobs[job["id"]] = jobinfo
except Exception:
pass
return jobs


def list_jobs():
"""
Get a simple listing of jobs (just the ids)
"""
from app.main import app

return flux.job.job_list(app.handle)


def get_simple_job(jobid):
"""
Not used - an original (simpler) implementation.
"""
from app.main import app

info = flux.job.job_list_id(app.handle, jobid, attrs=["all"])
return json.loads(info.get_str())["job"]


def get_job(jobid):
"""
Get details for a job
"""
from app.main import app

payload = {"id": int(jobid), "attrs": ["all"]}
rpc = flux.job.list.JobListIdRPC(app.handle, "job-list.list-id", payload)
try:
jobinfo = rpc.get()

# The job does not exist!
except FileNotFoundError:
return None

jobinfo = jobinfo["job"]

# User friendly string from integer
state = jobinfo["state"]
jobinfo["state"] = flux.job.info.statetostr(state)

# Get job info to add to result
info = rpc.get_jobinfo()
jobinfo["nnodes"] = info._nnodes
jobinfo["result"] = info.result
jobinfo["returncode"] = info.returncode
jobinfo["runtime"] = info.runtime
jobinfo["priority"] = info._priority
jobinfo["waitstatus"] = info._waitstatus
jobinfo["nodelist"] = info._nodelist
jobinfo["nodelist"] = info._nodelist
jobinfo["exception"] = info._exception.__dict__
return jobinfo
20 changes: 20 additions & 0 deletions app/library/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@
import markdown


def has_boolean_arg(payload, key):
"""
A helper to determine if a payload has a key, and it's in some derivation of True
"""
return key in payload and payload.get(key) in [True, "true"]


def get_int_arg(payload, key):
"""
Attempt to get (and parse) and integer argument. Fallback to None.
"""
arg = payload.get(key)
if arg:
try:
arg = int(arg)
except ValueError:
arg = None
return arg


def read_json(filename):
with open(filename, "r") as fd:
content = json.loads(fd.read())
Expand Down
22 changes: 1 addition & 21 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import sys

from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

from app.core.logging import init_loggers
from app.routers import api, views

from .library.helpers import openfile

init_loggers()
log = logging.getLogger("flux-restful")

Expand All @@ -28,6 +25,7 @@
app.mount("/data", StaticFiles(directory=data_root), name="data")

app.include_router(views.router)
app.include_router(views.auth_views_router)
app.include_router(api.router)

try:
Expand All @@ -53,21 +51,3 @@ async def load_app_data(request: Request, call_next):
"Cannot find flux instance! Ensure you have run flux start or similar."
)
return await call_next(request)


@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
data = openfile("index.md")
return templates.TemplateResponse(
"index.html",
{
"request": request,
"data": data,
},
)


@app.get("/page/{page_name}", response_class=HTMLResponse)
async def show_page(request: Request, page_name: str):
data = openfile(page_name + ".md")
return templates.TemplateResponse("page.html", {"request": request, "data": data})
Loading

0 comments on commit 37d84b4

Please sign in to comment.