Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: flux-framework/flux-accounting
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d9fd90c33ccb50ebcfb1059dc8a8e50173f67c54
Choose a base ref
..
head repository: flux-framework/flux-accounting
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 8e62574d6bb7b809d14ff8814746f2872663a46d
Choose a head ref
3 changes: 2 additions & 1 deletion src/bindings/python/fluxacct/accounting/__init__.py.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
DB_DIR = "@X_LOCALSTATEDIR@/lib/flux/"
DB_PATH = "@X_LOCALSTATEDIR@/lib/flux/FluxAccounting.db"
DB_SCHEMA_VERSION = 23
DB_SCHEMA_VERSION = 24

# flux-accounting DB table column names
ASSOCIATION_TABLE = [
@@ -40,6 +40,7 @@ JOBS_TABLE = [
"R",
"jobspec",
"project",
"bank",
]

__all__ = [
50 changes: 23 additions & 27 deletions src/bindings/python/fluxacct/accounting/bank_subcommands.py
Original file line number Diff line number Diff line change
@@ -10,9 +10,11 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import sqlite3
import json

import fluxacct.accounting
from fluxacct.accounting import user_subcommands as u
from fluxacct.accounting import formatter as fmt
from fluxacct.accounting import sql_util as sql

###############################################################
# #
@@ -422,45 +424,39 @@ def edit_bank(
def list_banks(
conn,
inactive=False,
fields=None,
cols=None,
table=False,
):
"""
List all banks in the bank_table in JSON format.
List all banks in bank_table.
Args:
inactive: whether to include inactive banks. By default, only banks that are
active will be included in the output.
fields: a list of fields to include in the output. By default, all fields are
included.
active will be included in the output.
cols: a list of columns from the table to include in the output. By default, all
columns are included.
table: output data in bank_table in table format. By default, the format of any
returned data is in JSON.
"""
default_fields = {"bank_id", "bank", "active", "parent_bank", "shares", "job_usage"}
# if fields is None, just use the default fields
fields = fields or default_fields
# use all column names if none are passed in
cols = cols or fluxacct.accounting.BANK_TABLE

try:
cur = conn.cursor()

# validate the fields passed in
invalid_fields = [field for field in fields if field not in default_fields]
if invalid_fields:
raise ValueError(f"invalid fields: {', '.join(invalid_fields)}")

sql.validate_columns(cols, fluxacct.accounting.BANK_TABLE)
# construct SELECT statement
select_fields = ", ".join(fields)
select_stmt = f"SELECT {select_fields} FROM bank_table"
select_stmt = f"SELECT {', '.join(cols)} FROM bank_table"
if not inactive:
select_stmt += " WHERE active=1"

cur.execute(select_stmt)
result = cur.fetchall()

# create individual object for each row in the query result
banks = [
{field: row[idx] for idx, field in enumerate(fields)} for row in result
]

json_string = json.dumps(banks, indent=2)
return json_string
# initialize AccountingFormatter object
formatter = fmt.AccountingFormatter(cur)
if table:
return formatter.as_table()
return formatter.as_json()
except sqlite3.Error as err:
raise sqlite3.Error(f"an sqlite3.Error occurred: {err}")
raise sqlite3.Error(f"list-banks: an sqlite3.Error occurred: {err}")
except ValueError as exc:
raise ValueError(f"list-banks: {exc}")
3 changes: 2 additions & 1 deletion src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
@@ -213,7 +213,8 @@ def create_db(
ranks text NOT NULL,
R text NOT NULL,
jobspec text NOT NULL,
project text
project text,
bank text
);"""
)
logging.info("Created jobs table successfully")
39 changes: 32 additions & 7 deletions src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,16 @@ class JobRecord:
"""

def __init__(
self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources, project
self,
userid,
jobid,
t_submit,
t_run,
t_inactive,
nnodes,
resources,
project,
bank,
):
self.userid = userid
self.username = get_username(userid)
@@ -47,6 +56,7 @@ def __init__(
self.nnodes = nnodes
self.resources = resources
self.project = project
self.bank = bank

@property
def elapsed(self):
@@ -73,6 +83,7 @@ def write_records_to_file(job_records, output_file):
"Nodes",
"R",
"Project",
"Bank",
)
)
for record in job_records:
@@ -87,6 +98,7 @@ def write_records_to_file(job_records, output_file):
str(record.nnodes),
str(record.resources),
str(record.project),
str(record.bank),
)
)

@@ -98,7 +110,7 @@ def convert_to_str(job_records):
"""
job_record_str = []
job_record_str.append(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20} {:<20}".format(
"UserID",
"Username",
"JobID",
@@ -107,11 +119,12 @@ def convert_to_str(job_records):
"T_Inactive",
"Nodes",
"Project",
"Bank",
)
)
for record in job_records:
job_record_str.append(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20} {:<20}".format(
record.userid,
record.username,
record.jobid,
@@ -120,6 +133,7 @@ def convert_to_str(job_records):
record.t_inactive,
record.nnodes,
record.project,
record.bank,
)
)

@@ -151,6 +165,7 @@ def convert_to_obj(rows):
nnodes=job_nnodes,
resources=row[6],
project=row[8] if row[8] is not None else "",
bank=row[9] if row[9] is not None else "",
)
job_records.append(job_record)

@@ -215,21 +230,28 @@ def get_jobs(conn, **kwargs):
- jobs that started after a certain time
- jobs that completed before a certain time
- jobid
- project
- bank
The function will execute a SQL query and return a list of jobs. If no
jobs are found, an empty list is returned.
"""
# find out which args were passed and place them in a dict
valid_params = {"user", "after_start_time", "before_end_time", "jobid", "project"}
valid_params = {
"user",
"after_start_time",
"before_end_time",
"jobid",
"project",
"bank",
}
params = {
key: val
for key, val in kwargs.items()
if val is not None and key in valid_params
}

select_stmt = (
"SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec,project FROM jobs"
)
select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec,project,bank FROM jobs"
where_clauses = []
params_list = []

@@ -249,6 +271,9 @@ def get_jobs(conn, **kwargs):
if "project" in params:
where_clauses.append("project = ?")
params_list.append(params["project"])
if "bank" in params:
where_clauses.append("bank = ?")
params_list.append(params["bank"])

if where_clauses:
select_stmt += " WHERE " + " AND ".join(where_clauses)
8 changes: 5 additions & 3 deletions src/cmd/flux-account-fetch-job-records.py
Original file line number Diff line number Diff line change
@@ -100,8 +100,9 @@ def fetch_new_jobs(last_timestamp=0.0):
# single_record["project"] to None if it can't be found
accounting_attributes = jobspec.get("attributes", {}).get("system", {})
single_record["project"] = accounting_attributes.get("project")
single_record["bank"] = accounting_attributes.get("bank")
except json.JSONDecodeError as exc:
# the job does not have a project in jobspec, so don't add it
# the job's jobspec can't be decoded; don't add any of its elements
# to the job dictionary
continue

@@ -138,8 +139,8 @@ def insert_jobs_in_db(conn, job_records):
cur.execute(
"""
INSERT OR IGNORE INTO jobs
(id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec, project)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
(id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec, project, bank)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
single_job["id"],
@@ -153,6 +154,7 @@ def insert_jobs_in_db(conn, job_records):
single_job["project"]
if single_job.get("project") is not None
else "",
single_job["bank"] if single_job.get("bank") is not None else "",
),
)
except KeyError:
24 changes: 13 additions & 11 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
@@ -171,16 +171,16 @@ def view_user(self, handle, watcher, msg, arg):
def add_user(self, handle, watcher, msg, arg):
try:
val = u.add_user(
self.conn,
msg.payload["username"],
msg.payload["bank"],
msg.payload["userid"],
msg.payload["shares"],
msg.payload["max_running_jobs"],
msg.payload["max_active_jobs"],
msg.payload["max_nodes"],
msg.payload["queues"],
msg.payload["projects"],
conn=self.conn,
username=msg.payload["username"],
bank=msg.payload["bank"],
uid=msg.payload["userid"],
shares=msg.payload["shares"],
max_running_jobs=msg.payload["max_running_jobs"],
max_active_jobs=msg.payload["max_active_jobs"],
max_nodes=msg.payload["max_nodes"],
queues=msg.payload["queues"],
projects=msg.payload["projects"],
)

payload = {"add_user": val}
@@ -331,7 +331,8 @@ def list_banks(self, handle, watcher, msg, arg):
val = b.list_banks(
self.conn,
msg.payload["inactive"],
msg.payload["fields"].split(","),
msg.payload["fields"].split(",") if msg.payload.get("fields") else None,
msg.payload["table"],
)

payload = {"list_banks": val}
@@ -359,6 +360,7 @@ def view_job_records(self, handle, watcher, msg, arg):
before_end_time=msg.payload["before_end_time"],
after_start_time=msg.payload["after_start_time"],
project=msg.payload["project"],
bank=msg.payload["bank"],
)

payload = {"view_job_records": val}
15 changes: 14 additions & 1 deletion src/cmd/flux-account.py
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ def add_add_user_arg(subparsers):
"--username",
help="username",
metavar="USERNAME",
required=True,
)
subparser_add_user.add_argument(
"--userid",
@@ -81,6 +82,7 @@ def add_add_user_arg(subparsers):
"--bank",
help="bank to charge jobs against",
metavar="BANK",
required=True,
)
subparser_add_user.add_argument(
"--shares",
@@ -239,6 +241,11 @@ def add_view_job_records_arg(subparsers):
help="project",
metavar="PROJECT",
)
subparser_view_job_records.add_argument(
"--bank",
help="bank",
metavar="BANK",
)


def add_create_db_arg(subparsers):
@@ -369,9 +376,15 @@ def add_list_banks_arg(subparsers):
"--fields",
type=str,
help="list of fields to include in JSON output",
default="bank_id,bank,parent_bank,shares,job_usage",
default=None,
metavar="BANK_ID,BANK,ACTIVE,PARENT_BANK,SHARES,JOB_USAGE",
)
subparser_list_banks.add_argument(
"--table",
action="store_const",
const=True,
help="list all banks in table format",
)


def add_update_usage_arg(subparsers):
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ TESTSCRIPTS = \
t1040-mf-priority-projects.t \
t1041-view-jobs-by-project.t \
t1042-issue508.t \
t1043-view-jobs-by-bank.t \
t5000-valgrind.t \
python/t1000-example.py \
python/t1001_db.py \
2 changes: 1 addition & 1 deletion t/expected/job_usage/no_jobs.expected
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UserID Username JobID T_Submit T_Run T_Inactive Nodes Project
UserID Username JobID T_Submit T_Run T_Inactive Nodes Project Bank
Binary file added t/expected/test_dbs/FluxAccountingv0-39-0.db
Binary file not shown.
10 changes: 10 additions & 0 deletions t/t1007-flux-account-users.t
Original file line number Diff line number Diff line change
@@ -50,6 +50,16 @@ test_expect_success 'add some queues to the DB' '
flux account add-queue special --priority=99999
'

test_expect_success 'call add-user without specifying a username' '
test_must_fail flux account add-user --bank=A > error.out 2>&1 &&
grep "add-user: error: the following arguments are required: --username" error.out
'

test_expect_success 'call add-user without specifying a bank' '
test_must_fail flux account add-user --username=user5011 > error.out 2>&1 &&
grep "add-user: error: the following arguments are required: --bank" error.out
'

test_expect_success 'trying to add an association that already exists should raise an IntegrityError' '
test_must_fail flux account add-user --username=user5011 --userid=5011 --bank=A > already_exists.out 2>&1 &&
grep "association user5011,A already active in association_table" already_exists.out
Loading