Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database: add the ability to remove old job records from jobs table #459

Merged
merged 4 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions doc/guide/accounting-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,36 @@ The scripts should be run by :core:man1:`flux-cron`:

30 * * * * bash -c "flux account-fetch-job-records; flux account update-usage; flux account-update-fshare; flux account-priority-update"

Periodically fetching and storing job records in the flux-accounting database
can cause the DB to grow large in size. Since there comes a point where job
records become no longer useful to flux-accounting in terms of job usage and
fair-share calculation, you can run ``flux account scrub-old-jobs`` to
remove old job records. If no argument is passed to this command, it will
delete any job record that has completed more than 6 months ago. This can be
tuned by specifying the number of weeks to go back when determining which
records to remove. The example below will remove any job record more than 4
weeks old:

.. code-block:: console

$ flux account scrub-old-jobs 4

By default, the memory occupied by a SQLite database does not decrease when
records are ``DELETE``'d from the database. After scrubbing old job records
from the flux-accounting database, if space is still an issue, the ``VACUUM``
command will clean up the space previously occupied by those deleted records.
You can run this command by connecting to the flux-accounting database in a
SQLite shell:

.. code-block:: console

$ sqlite3 FluxAccounting.db
sqlite> VACUUM;

Note that running ``VACUUM`` can take minutes to run and also requires an
exclusive lock on the database; it will fail if the database has a pending SQL
statement or open transaction.

***********************
Database Administration
***********************
Expand Down
15 changes: 15 additions & 0 deletions src/bindings/python/fluxacct/accounting/job_archive_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,18 @@ def update_job_usage(acct_conn, pdhl=1):
acct_conn.commit()

return 0


# Scrub jobs from the flux-accounting "jobs" table by removing any
# record that is older than num_weeks old. If no number of weeks is
# specified, remove any record that is older than 6 months old.
def scrub_old_jobs(conn, num_weeks=26):
cur = conn.cursor()
# calculate total amount of time to go back (in terms of seconds)
# (there are 604,800 seconds in a week)
cutoff_time = time.time() - (num_weeks * 604800)

# fetch all jobs that finished before this time
select_stmt = "DELETE FROM jobs WHERE t_inactive < ?"
cur.execute(select_stmt, (cutoff_time,))
conn.commit()
15 changes: 15 additions & 0 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, flux_handle, conn):
"edit_queue",
"add_project",
"delete_project",
"scrub_old_jobs",
"shutdown_service",
]

Expand Down Expand Up @@ -483,6 +484,20 @@ def delete_project(self, handle, watcher, msg, arg):
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
)

def scrub_old_jobs(self, handle, watcher, msg, arg):
try:
val = jobs.scrub_old_jobs(self.conn, msg.payload["num_weeks"])

payload = {"scrub_old_jobs": val}

handle.respond(msg, payload)
except KeyError as exc:
handle.respond_error(msg, 0, f"missing key in payload: {exc}")
except Exception as exc:
handle.respond_error(
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
)


LOGGER = logging.getLogger("flux-uri")

Expand Down
25 changes: 25 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,24 @@ def add_delete_project_arg(subparsers):
)


def add_scrub_job_records_arg(subparsers):
subparser = subparsers.add_parser(
"scrub-old-jobs",
help="clean job-archive of old job records",
formatter_class=flux.util.help_formatter(),
)

subparser.set_defaults(func="scrub_old_jobs")
subparser.add_argument(
"num_weeks",
help="delete jobs that have finished more than NUM_WEEKS ago",
type=int,
nargs="?",
metavar="NUM_WEEKS",
default=26,
)


def add_arguments_to_parser(parser, subparsers):
add_path_arg(parser)
add_output_file_arg(parser)
Expand All @@ -505,6 +523,7 @@ def add_arguments_to_parser(parser, subparsers):
add_add_project_arg(subparsers)
add_view_project_arg(subparsers)
add_delete_project_arg(subparsers)
add_scrub_job_records_arg(subparsers)


def set_db_location(args):
Expand Down Expand Up @@ -670,6 +689,12 @@ def select_accounting_function(args, output_file, parser):
"project": args.project,
}
return_val = flux.Flux().rpc("accounting.delete_project", data).get()
elif args.func == "scrub_old_jobs":
data = {
"path": args.path,
"num_weeks": args.num_weeks,
}
return_val = flux.Flux().rpc("accounting.scrub_old_jobs", data).get()
else:
print(parser.print_usage())
return
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TESTSCRIPTS = \
t1032-mf-priority-update-bank.t \
t1033-mf-priority-update-job.t \
t1034-mf-priority-config.t \
t1035-flux-account-scrub-old-jobs.t \
t5000-valgrind.t \
python/t1000-example.py

Expand Down
97 changes: 97 additions & 0 deletions t/scripts/insert_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
###############################################################
# Copyright 2024 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import sqlite3
import sqlite3
import sys
import time


def main():
if len(sys.argv) < 2:
sys.exit(f"Usage: insert_jobs DATABASE_PATH")

db_uri = sys.argv[1]

try:
conn = sqlite3.connect(db_uri, uri=True)
cur = conn.cursor()
except sqlite3.OperationalError as exc:
print(f"Unable to open database file: {db_uri}", file=sys.stderr)
print(exc)
sys.exit(1)

userid = 9999
t_submit = t_run = 0
t_inactive_recent = time.time() # job that just finished
t_inactive_two_weeks = time.time() - (604861 * 2) # more than 2 weeks old
t_inactive_old = time.time() - (604861 * 27) # more than six months old
ranks = r = jobspec = ""
insert_stmt = "INSERT INTO jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?)"

cur.execute(
insert_stmt,
(
"1",
userid,
t_submit,
t_run,
t_inactive_recent,
ranks,
r,
jobspec,
),
)
cur.execute(
insert_stmt,
(
"2",
userid,
t_submit,
t_run,
t_inactive_two_weeks,
ranks,
r,
jobspec,
),
)
cur.execute(
insert_stmt,
(
"3",
userid,
t_submit,
t_run,
t_inactive_two_weeks,
ranks,
r,
jobspec,
),
)
cur.execute(
insert_stmt,
(
"4",
userid,
t_submit,
t_run,
t_inactive_old,
ranks,
r,
jobspec,
),
)

conn.commit()
conn.close()


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions t/t1026-flux-account-perms.t
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ test_expect_success 'delete-project should not be accessible by all users' '
)
'

test_expect_success 'scrub-old-jobs should not be accessible by all users' '
newid=$(($(id -u)+1)) &&
( export FLUX_HANDLE_ROLEMASK=0x2 &&
export FLUX_HANDLE_USERID=$newid &&
test_must_fail flux account scrub-old-jobs > no_access_scrub_old_jobs.out 2>&1 &&
grep "Request requires owner credentials" no_access_scrub_old_jobs.out
)
'

test_expect_success 'remove flux-accounting DB' '
rm $(pwd)/FluxAccountingTest.db
'
Expand Down
83 changes: 83 additions & 0 deletions t/t1035-flux-account-scrub-old-jobs.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/bin/bash

test_description='test removing old job records from the flux-accounting database'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think a test for the default (no args == 26 weeks) is missing? Maybe should stick one more test record in there for something > 6 months old.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, probably should cover the path argument too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks. I just pushed up a test that does both - specifying a path to the DB as well as the default for scrub-old-jobs (which should remove a simulated job that "finished" over six months ago).


. `dirname $0`/sharness.sh
DB_PATH=$(pwd)/FluxAccountingTest.db
QUERYCMD="flux python ${SHARNESS_TEST_SRCDIR}/scripts/query.py"
INSERT_JOBS="flux python ${SHARNESS_TEST_SRCDIR}/scripts/insert_jobs.py"

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job

flux setattr log-stderr-level 1

# get job records from jobs table
# arg1 - database path
get_job_records() {
local dbpath=$1
local i=0
local row_count=0
query="select count(*) from jobs;"

row_count=$(${QUERYCMD} -t 100 ${dbpath} "${query}" | awk -F' = ' '{print $2}')
echo $row_count
}

test_expect_success 'create flux-accounting DB' '
flux account -p $(pwd)/FluxAccountingTest.db create-db
'

test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

# insert_jobs.py inserts three fake job records into the jobs table in the
# flux-accounting database. Four total job records are added to the jobs table:
#
# Two of the jobs have a simulated time of finishing just over two weeks ago.
# One of the jobs has a simulated time of finishing very recently.
# One of the jobs has a simulated time of finishing over six months ago.
test_expect_success 'populate DB with four job records' '
${INSERT_JOBS} ${DB_PATH}
'

test_expect_success 'ensure the jobs table has four records in it' '
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 4
'

test_expect_success 'do not pass an argument to scrub-old-jobs (should remove the oldest job)' '
flux account -p ${DB_PATH} scrub-old-jobs &&
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 3
'

# Passing 0 for num_weeks is saying "Remove all records older than 0 weeks
# old," or rather, remove all jobs in the table.
test_expect_success 'if we pass 0 for num_weeks, all jobs will be removed' '
flux account scrub-old-jobs 0 &&
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 0
'

# If num_weeks == 2, all jobs that have finished more than 2 weeks ago will be
# removed. In our testsuite, that should leave just the job that finished
# "recently".
test_expect_success 'only remove job records older than 2 weeks old' '
${INSERT_JOBS} ${DB_PATH} &&
flux account scrub-old-jobs 2 &&
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 1
'

test_expect_success 'remove flux-accounting DB' '
rm $(pwd)/FluxAccountingTest.db
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'

test_done
Loading