Skip to content

Commit

Permalink
Merge pull request #429 from cmoussa1/update.banks.plugin
Browse files Browse the repository at this point in the history
plugin: add support for updating the bank of a pending job
  • Loading branch information
mergify[bot] authored Mar 14, 2024
2 parents b2f42b6 + 2621f14 commit ad1d92b
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 14 deletions.
113 changes: 99 additions & 14 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ static int run_cb (flux_plugin_t *p,


/*
* Apply an update on a job with regard to its queue once it has been
* validated.
* Apply an update on a job with regard to its queue or associated bank once
* it has been validated.
*/
static int job_updated (flux_plugin_t *p,
const char *topic,
Expand All @@ -744,15 +744,19 @@ static int job_updated (flux_plugin_t *p,
int userid;
char *bank = NULL;
char *updated_queue = NULL;
char *updated_bank = NULL;
Association *a;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}, s:{s?s}}",
"{s:i, s{s{s{s?s}}}, s:{s?s, s?s}}",
"userid", &userid,
"jobspec", "attributes", "system", "bank", &bank,
"jobspec", "attributes", "system", "bank",
&bank,
"updates",
"attributes.system.queue", &updated_queue) < 0)
"attributes.system.queue", &updated_queue,
"attributes.system.bank", &updated_bank) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// grab Association object from job
Expand All @@ -768,17 +772,40 @@ static int job_updated (flux_plugin_t *p,
return -1;
}

// look up association
a = get_association (userid, bank, users, users_def_bank);
if (updated_bank != NULL && a->bank_name != std::string (updated_bank)) {
// the bank for the user has been updated, so we need to update
// the Association object for this job

if (a == nullptr)
flux_jobtap_raise_exception (p,
// get attributes of new bank
Association *a_new = get_association (userid,
updated_bank,
users,
users_def_bank);
if (a_new == nullptr) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.update: cannot find user/bank "
"or user/default bank entry for "
"uid: %i", userid);

return -1;
}

// update the active jobs count of the old bank
a->cur_active_jobs--;
// assign the new Association object to the original Association object
a = a_new;
// update the active jobs count of the updated bank
a->cur_active_jobs++;

// re-pack the updated Association object to the job
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"cannot find user/bank or "
"user/default bank entry "
"for uid: %i", userid);
"mf_priority:bank_info",
a,
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");
}

if (updated_queue != NULL)
// the queue for the job has been updated, so fetch the priority
Expand Down Expand Up @@ -836,6 +863,63 @@ static int update_queue_cb (flux_plugin_t *p,
}


/*
* Check for an updated bank and validate it for a user/bank; if the
* user/bank does not have access to the bank they are trying to update
* their job for, reject the update and keep the job under its current bank.
*
* Also check the active jobs and running jobs limits for the new bank; if the
* new bank is currently at its max active jobs or max running jobs limit,
* reject the update and keep the job under its current bank.
*/
static int update_bank_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int userid;
char *bank = NULL;
Association *a;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:s}",
"userid", &userid,
"value", &bank) < 0) {
return flux_jobtap_error (p, args, "unable to unpack bank arg");
}

// look up association
a = get_association (userid, bank, users, users_def_bank);
if (a == nullptr)
return flux_jobtap_reject_job (p,
args,
"cannot find user/bank or "
"user/default bank entry "
"for uid: %i", userid);

if (a->max_active_jobs > 0 && a->cur_active_jobs >= a->max_active_jobs)
// new bank is already at its max active jobs limit; reject update
return flux_jobtap_error (p,
args,
"new bank is already at max-active-jobs "
"limit");

if (a->max_run_jobs > 0 && a->cur_run_jobs == a->max_run_jobs)
// jobs are held in DEPEND state when an association is at their max
// running jobs limit and there isn't a way to bring a job back to
// DEPEND, so just reject the update
return flux_jobtap_error (p,
args,
"updating to bank %s while it is already at "
"max-run-jobs limit is not allowed; try "
"again later",
bank);

return 0;
}


static int inactive_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -909,6 +993,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.state.run", run_cb, NULL},
{ "plugin.query", query_cb, NULL},
{ "job.update.attributes.system.queue", update_queue_cb, NULL },
{ "job.update.attributes.system.bank", update_bank_cb, NULL },
{ 0 },
};

Expand Down
2 changes: 2 additions & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ TESTSCRIPTS = \
t1029-mf-priority-default-bank.t \
t1030-mf-priority-update-queue.t \
t1031-mf-priority-issue406.t \
t1032-mf-priority-update-bank.t \
t1033-mf-priority-update-job.t \
t5000-valgrind.t \
python/t1000-example.py

Expand Down
135 changes: 135 additions & 0 deletions t/t1032-mf-priority-update-bank.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/bin/bash

test_description='test updating the bank for a pending job in priority plugin'

. `dirname $0`/sharness.sh

mkdir -p conf.d

MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
DB_PATH=$(pwd)/FluxAccountingTest.db

export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job -o,--config-path=$(pwd)/conf.d

flux setattr log-stderr-level 1

test_expect_success 'create flux-accounting DB' '
flux account -p $(pwd)/FluxAccountingTest.db create-db
'

test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

test_expect_success 'load multi-factor priority plugin' '
flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} &&
flux jobtap list | grep mf_priority
'

test_expect_success 'add some banks to the DB' '
flux account add-bank root 1 &&
flux account add-bank --parent-bank=root A 1 &&
flux account add-bank --parent-bank=root B 1 &&
flux account add-bank --parent-bank=root C 1
'

test_expect_success 'add a user to the DB' '
flux account add-user \
--username=user1 \
--userid=5001 \
--bank=A &&
flux account add-user \
--username=user1 \
--userid=5001 \
--bank=B \
--max-active-jobs=3 \
--max-running-jobs=2
'

test_expect_success 'send flux-accounting DB information to the plugin' '
flux account-priority-update -p $(pwd)/FluxAccountingTest.db
'

test_expect_success 'submit one job under default bank, two under non-default bank' '
job1=$(flux python ${SUBMIT_AS} 5001 --urgency=0 sleep 30) &&
job2=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --urgency=0 sleep 30) &&
job3=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --urgency=0 sleep 30)
'

test_expect_success 'update of bank of pending job works' '
flux update $job1 bank=B &&
flux job wait-event -t 30 $job1 priority &&
flux job eventlog $job1 > eventlog.out &&
grep "attributes.system.bank=\"B\"" eventlog.out
'

test_expect_success 'trying to update to a bank user does not have access to fails in job.validate' '
test_must_fail flux update $job1 bank=C > invalid_bank.out 2>&1 &&
test_debug "cat invalid_bank.out" &&
grep "cannot find user/bank or user/default bank entry for uid:" invalid_bank.out
'

test_expect_success 'trying to update to a bank that does not exist fails in job.validate' '
test_must_fail flux update $job1 bank=foo > nonexistent_bank.out 2>&1 &&
test_debug "cat nonexistent_bank.out" &&
grep "cannot find user/bank or user/default bank entry for uid:" nonexistent_bank.out
'

test_expect_success 'update a job to another bank that is at max-active-jobs limit' '
job4=$(flux python ${SUBMIT_AS} 5001 --urgency=0 sleep 30) &&
test_must_fail flux update $job4 bank=B > max_active_jobs.out 2>&1 &&
test_debug "cat max_active_jobs.out" &&
grep "new bank is already at max-active-jobs limit" max_active_jobs.out &&
flux cancel $job4
'

test_expect_success 'cancel one of the jobs so bank is not at max-active-jobs limit' '
flux cancel $job3
'

test_expect_success 'update urgency of held jobs so they transition to RUN' '
flux job urgency $job1 default &&
flux job urgency $job2 default &&
flux job wait-event -t 10 $job1 alloc &&
flux job wait-event -t 10 $job2 alloc
'

test_expect_success 'update a job to another bank that is at max-run-jobs limit' '
job5=$(flux python ${SUBMIT_AS} 5001 --urgency=0 sleep 30) &&
test_must_fail flux update $job5 bank=B > max_run_jobs.out 2>&1 &&
test_debug "cat max_run_jobs.out" &&
grep "already at max-run-jobs limit is not allowed" max_run_jobs.out &&
flux cancel $job5
'

test_expect_success 'cancel jobs' '
flux cancel $job1 &&
flux cancel $job2
'

test_expect_success 'submit job under non-default bank' '
job6=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --urgency=0 sleep 30)
'

test_expect_success 'updating job to default bank works' '
flux update $job6 bank=A &&
flux job wait-event -t 30 $job6 priority &&
flux job eventlog $job6 > eventlog.out &&
grep "attributes.system.bank=\"A\"" eventlog.out
'

test_expect_success 'check that plugin also sees the job update' '
flux jobtap query mf_priority.so > query.json &&
jq -e ".mf_priority_map[0].banks[0].bank_name == \"A\"" <query.json &&
jq -e ".mf_priority_map[0].banks[0].cur_active_jobs == 1" <query.json &&
jq -e ".mf_priority_map[0].banks[1].bank_name == \"B\"" <query.json &&
jq -e ".mf_priority_map[0].banks[1].cur_active_jobs == 0" <query.json
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'

test_done
Loading

0 comments on commit ad1d92b

Please sign in to comment.