Skip to content

Commit

Permalink
Merge pull request #533 from cmoussa1/filter.jobs.by.bank
Browse files Browse the repository at this point in the history
`view-job-records`: add `--bank` filter option
  • Loading branch information
mergify[bot] authored Nov 12, 2024
2 parents a29e5a1 + 1c8ad78 commit 62e4e11
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/bindings/python/fluxacct/accounting/__init__.py.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
DB_DIR = "@X_LOCALSTATEDIR@/lib/flux/"
DB_PATH = "@X_LOCALSTATEDIR@/lib/flux/FluxAccounting.db"
DB_SCHEMA_VERSION = 23
DB_SCHEMA_VERSION = 24

# flux-accounting DB table column names
ASSOCIATION_TABLE = [
Expand Down Expand Up @@ -40,6 +40,7 @@ JOBS_TABLE = [
"R",
"jobspec",
"project",
"bank",
]

__all__ = [
Expand Down
3 changes: 2 additions & 1 deletion src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ def create_db(
ranks text NOT NULL,
R text NOT NULL,
jobspec text NOT NULL,
project text
project text,
bank text
);"""
)
logging.info("Created jobs table successfully")
Expand Down
39 changes: 32 additions & 7 deletions src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ class JobRecord:
"""

def __init__(
self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources, project
self,
userid,
jobid,
t_submit,
t_run,
t_inactive,
nnodes,
resources,
project,
bank,
):
self.userid = userid
self.username = get_username(userid)
Expand All @@ -47,6 +56,7 @@ def __init__(
self.nnodes = nnodes
self.resources = resources
self.project = project
self.bank = bank

@property
def elapsed(self):
Expand All @@ -73,6 +83,7 @@ def write_records_to_file(job_records, output_file):
"Nodes",
"R",
"Project",
"Bank",
)
)
for record in job_records:
Expand All @@ -87,6 +98,7 @@ def write_records_to_file(job_records, output_file):
str(record.nnodes),
str(record.resources),
str(record.project),
str(record.bank),
)
)

Expand All @@ -98,7 +110,7 @@ def convert_to_str(job_records):
"""
job_record_str = []
job_record_str.append(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20} {:<20}".format(
"UserID",
"Username",
"JobID",
Expand All @@ -107,11 +119,12 @@ def convert_to_str(job_records):
"T_Inactive",
"Nodes",
"Project",
"Bank",
)
)
for record in job_records:
job_record_str.append(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20} {:<20}".format(
record.userid,
record.username,
record.jobid,
Expand All @@ -120,6 +133,7 @@ def convert_to_str(job_records):
record.t_inactive,
record.nnodes,
record.project,
record.bank,
)
)

Expand Down Expand Up @@ -151,6 +165,7 @@ def convert_to_obj(rows):
nnodes=job_nnodes,
resources=row[6],
project=row[8] if row[8] is not None else "",
bank=row[9] if row[9] is not None else "",
)
job_records.append(job_record)

Expand Down Expand Up @@ -215,21 +230,28 @@ def get_jobs(conn, **kwargs):
- jobs that started after a certain time
- jobs that completed before a certain time
- jobid
- project
- bank
The function will execute a SQL query and return a list of jobs. If no
jobs are found, an empty list is returned.
"""
# find out which args were passed and place them in a dict
valid_params = {"user", "after_start_time", "before_end_time", "jobid", "project"}
valid_params = {
"user",
"after_start_time",
"before_end_time",
"jobid",
"project",
"bank",
}
params = {
key: val
for key, val in kwargs.items()
if val is not None and key in valid_params
}

select_stmt = (
"SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec,project FROM jobs"
)
select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec,project,bank FROM jobs"
where_clauses = []
params_list = []

Expand All @@ -249,6 +271,9 @@ def get_jobs(conn, **kwargs):
if "project" in params:
where_clauses.append("project = ?")
params_list.append(params["project"])
if "bank" in params:
where_clauses.append("bank = ?")
params_list.append(params["bank"])

if where_clauses:
select_stmt += " WHERE " + " AND ".join(where_clauses)
Expand Down
8 changes: 5 additions & 3 deletions src/cmd/flux-account-fetch-job-records.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ def fetch_new_jobs(last_timestamp=0.0):
# single_record["project"] to None if it can't be found
accounting_attributes = jobspec.get("attributes", {}).get("system", {})
single_record["project"] = accounting_attributes.get("project")
single_record["bank"] = accounting_attributes.get("bank")
except json.JSONDecodeError as exc:
# the job does not have a project in jobspec, so don't add it
# the job's jobspec can't be decoded; don't add any of its elements
# to the job dictionary
continue

Expand Down Expand Up @@ -138,8 +139,8 @@ def insert_jobs_in_db(conn, job_records):
cur.execute(
"""
INSERT OR IGNORE INTO jobs
(id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec, project)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
(id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec, project, bank)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
single_job["id"],
Expand All @@ -153,6 +154,7 @@ def insert_jobs_in_db(conn, job_records):
single_job["project"]
if single_job.get("project") is not None
else "",
single_job["bank"] if single_job.get("bank") is not None else "",
),
)
except KeyError:
Expand Down
1 change: 1 addition & 0 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def view_job_records(self, handle, watcher, msg, arg):
before_end_time=msg.payload["before_end_time"],
after_start_time=msg.payload["after_start_time"],
project=msg.payload["project"],
bank=msg.payload["bank"],
)

payload = {"view_job_records": val}
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ def add_view_job_records_arg(subparsers):
help="project",
metavar="PROJECT",
)
subparser_view_job_records.add_argument(
"--bank",
help="bank",
metavar="BANK",
)


def add_create_db_arg(subparsers):
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ TESTSCRIPTS = \
t1040-mf-priority-projects.t \
t1041-view-jobs-by-project.t \
t1042-issue508.t \
t1043-view-jobs-by-bank.t \
t5000-valgrind.t \
python/t1000-example.py \
python/t1001_db.py \
Expand Down
2 changes: 1 addition & 1 deletion t/expected/job_usage/no_jobs.expected
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UserID Username JobID T_Submit T_Run T_Inactive Nodes Project
UserID Username JobID T_Submit T_Run T_Inactive Nodes Project Bank
Binary file added t/expected/test_dbs/FluxAccountingv0-39-0.db
Binary file not shown.
98 changes: 98 additions & 0 deletions t/t1043-view-jobs-by-bank.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/bin/bash

test_description='test viewing and filtering job records by bank'

. `dirname $0`/sharness.sh
MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
DB_PATH=$(pwd)/FluxAccountingTest.db

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job

flux setattr log-stderr-level 1

test_expect_success 'load multi-factor priority plugin' '
flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY}
'

test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'create flux-accounting DB' '
flux account -p ${DB_PATH} create-db
'

test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

test_expect_success 'add banks to the DB' '
flux account add-bank root 1 &&
flux account add-bank --parent-bank=root bankA 1 &&
flux account add-bank --parent-bank=root bankB 1
'

test_expect_success 'add a user with a list of projects to the DB' '
flux account add-user --username=user1 --userid=5001 --bank=bankA &&
flux account add-user --username=user1 --userid=5001 --bank=bankB &&
flux account add-user --username=user2 --userid=5002 --bank=bankB
'

test_expect_success 'send flux-accounting DB information to the plugin' '
flux account-priority-update -p ${DB_PATH}
'

test_expect_success 'submit 2 jobs under bank A' '
job1=$(flux python ${SUBMIT_AS} 5001 hostname) &&
flux job wait-event -f json $job1 priority &&
job2=$(flux python ${SUBMIT_AS} 5001 hostname) &&
flux job wait-event -f json $job2 priority &&
flux cancel $job1 &&
flux cancel $job2
'

test_expect_success 'submit 2 jobs under bank B' '
job1=$(flux python ${SUBMIT_AS} 5002 hostname) &&
flux job wait-event -f json $job1 priority &&
job2=$(flux python ${SUBMIT_AS} 5002 hostname) &&
flux job wait-event -f json $job2 priority &&
flux cancel $job1 &&
flux cancel $job2
'

test_expect_success 'submit jobs under a secondary bank' '
job1=$(flux python ${SUBMIT_AS} 5001 --setattr=system.bank=bankB hostname) &&
flux job wait-event -f json $job1 priority &&
flux cancel $job1
'

test_expect_success 'run fetch-job-records script' '
flux account-fetch-job-records -p ${DB_PATH}
'

test_expect_success 'look at all jobs (will show 5 records in total)' '
flux account view-job-records > all_jobs.out &&
test $(grep -c "bankA" all_jobs.out) -eq 2 &&
test $(grep -c "bankB" all_jobs.out) -eq 3
'

test_expect_success 'filter jobs by bankA (will show 2 records in total)' '
flux account view-job-records --bank=bankA > bankA_jobs.out &&
test $(grep -c "5001" bankA_jobs.out) -eq 2 &&
test $(grep -c "5002" bankA_jobs.out) -eq 0
'

test_expect_success 'filter jobs by bankB (will show 3 records in total)' '
flux account view-job-records --bank=bankB > bankB_jobs.out &&
test $(grep -c "5001" bankB_jobs.out) -eq 1 &&
test $(grep -c "5002" bankB_jobs.out) -eq 2
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'

test_done

0 comments on commit 62e4e11

Please sign in to comment.