Skip to content

Commit

Permalink
Merge pull request #399 from cmoussa1/plugin.queue.updates
Browse files Browse the repository at this point in the history
plugin: add callback specific for validating an updated queue
  • Loading branch information
mergify[bot] authored Nov 6, 2023
2 parents 9fef06e + a14133a commit 1a84215
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 11 deletions.
83 changes: 74 additions & 9 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,10 @@ static int run_cb (flux_plugin_t *p,
}


/*
* apply an update on a job with regard to its queue once it has been
* validated.
*/
static int job_updated (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand All @@ -1087,12 +1091,12 @@ static int job_updated (flux_plugin_t *p,

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s, s?s}}}}",
"{s:i, s{s{s{s?s}}}, s:{s?s}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank, "queue", &queue) < 0) {
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}
"jobspec", "attributes", "system", "bank", &bank,
"updates",
"attributes.system.queue", &queue) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// grab bank_info struct for user/bank (if any)
b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
Expand Down Expand Up @@ -1129,10 +1133,70 @@ static int job_updated (flux_plugin_t *p,
bank_it = lookup_result.second;
}

// fetch the priority of the validated queue; assign it to the bank_info
// struct associated with the job
int queue_factor = get_queue_info (queue, bank_it);
b->queue_factor = queue_factor;
// if the queue for the job has been updated, fetch the priority of the
// validated queue and assign it to the associated bank_info struct
if (queue != NULL) {
int queue_factor = get_queue_info (queue, bank_it);
b->queue_factor = queue_factor;
}

return 0;
}


/*
* check for an updated queue and validate it for a user/bank; if the
* user/bank does not have access to the queue they are trying to update
* their job for, reject the update and keep the job in its current queue.
*/
static int update_queue_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, struct bank_info>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:s, s{s{s{s?s}}}}",
"userid", &userid,
"value", &queue,
"jobspec", "attributes", "system", "bank",
&bank) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// look up user/bank info based on unpacked information
bank_info_result lookup_result = get_bank_info (userid, bank);

if (lookup_result.first == BANK_USER_NOT_FOUND) {
return flux_jobtap_error (p,
args,
"mf_priority: cannot find info for user ",
userid);
} else if (lookup_result.first == BANK_INVALID) {
return flux_jobtap_error (p,
args,
"mf_priority: not a member of %s",
bank);
} else if (lookup_result.first == BANK_NO_DEFAULT) {
return flux_jobtap_error (p,
args,
"mf_priority: user/default bank entry does "
"not exist");
} else if (lookup_result.first == BANK_SUCCESS) {
bank_it = lookup_result.second;

// validate the updated queue and make sure the user/bank has
// access to it; if not, reject the update
if (get_queue_info (queue, bank_it) == INVALID_QUEUE)
return flux_jobtap_error (p,
args,
"mf_priority: queue not valid for user: %s",
queue);
}

return 0;
}
Expand Down Expand Up @@ -1210,6 +1274,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.update", job_updated, NULL},
{ "job.state.run", run_cb, NULL},
{ "plugin.query", query_cb, NULL},
{ "job.update.attributes.system.queue", update_queue_cb, NULL },
{ 0 },
};

Expand Down
34 changes: 32 additions & 2 deletions t/t1030-mf-priority-update-queue.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ test_expect_success 'load multi-factor priority plugin' '

test_expect_success 'add some banks to the DB' '
flux account add-bank root 1 &&
flux account add-bank --parent-bank=root A 1
flux account add-bank --parent-bank=root A 1 &&
flux account add-bank --parent-bank=root B 1
'

test_expect_success 'add some queues to the DB' '
Expand All @@ -44,6 +45,10 @@ test_expect_success 'add a user to the DB' '
flux account add-user --username=user5001 \
--userid=5001 \
--bank=A \
--queues="bronze,silver" &&
flux account add-user --username=user5001 \
--userid=5001 \
--bank=B \
--queues="bronze,silver"
'

Expand Down Expand Up @@ -78,13 +83,38 @@ test_expect_success 'update of queue of pending job works' '
test_expect_success 'updating a job using a queue the user does not belong to fails' '
test_must_fail flux update $jobid1 queue=gold > unavail_queue.out 2>&1 &&
test_debug "cat unavail_queue.out" &&
grep "ERROR: Queue not valid for user: gold" unavail_queue.out
grep "ERROR: mf_priority: queue not valid for user: gold" unavail_queue.out
'

test_expect_success 'cancel job' '
flux job cancel $jobid1
'

test_expect_success 'submit job for testing under non-default bank' '
jobid2=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --queue=bronze sleep 30) &&
flux job wait-event -f json $jobid2 priority \
| jq '.context.priority' > job2_bronze.test &&
grep 1050000 job2_bronze.test
'

test_expect_success 'update of queue of pending job under a non-default bank works' '
flux update $jobid2 queue=silver &&
flux job wait-event -f json $jobid2 priority &&
flux job eventlog $jobid2 > eventlog.out &&
grep "attributes.system.queue=\"silver\"" eventlog.out &&
grep 2050000 eventlog.out
'

test_expect_success 'updating a job under non-default bank using a queue the user does not belong to fails' '
test_must_fail flux update $jobid2 queue=gold > unavail_queue.out 2>&1 &&
test_debug "cat unavail_queue.out" &&
grep "ERROR: mf_priority: queue not valid for user: gold" unavail_queue.out
'

test_expect_success 'cancel job' '
flux job cancel $jobid2
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'
Expand Down

0 comments on commit 1a84215

Please sign in to comment.