Skip to content

Commit

Permalink
validate_cb (): use new helper function
Browse files Browse the repository at this point in the history
Problem: There are now helper functions available for the plugin to make
use of to help with lookups and access of the internal map for users and
banks, but the plugin does not make use of them.

Restructure the validate_cb () function to make use of the new
get_user_info () helper function.

Change the error message for when a user/bank cannot be found and adjust
the respective tests that look for this error message.

Improve the comments in this function/add new ones to help with some of
the clarity in this function.
  • Loading branch information
cmoussa1 committed Dec 4, 2023
1 parent 48714be commit fafd79e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 53 deletions.
89 changes: 39 additions & 50 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,21 @@ static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid)


/*
* Look up the userid of the submitted job in the multimap; if user is not found
* in the map, reject the job saying the user wasn't found in the
* flux-accounting database.
* Perform basic validation of a user/bank's submitted job. If a bank or
* queue is specified on submission, ensure that the user is allowed to
* submit a job under them. Check the active job limits for the user/bank
* on submission as well to make sure that they are under this limit when
* the job is submitted.
*
* This callback will also make sure that the user/bank belongs to
* the flux-accounting DB; there are two behaviors supported here:
*
* if the plugin has SOME data about users/banks and the user does not have
* an entry in the plugin, the job will be rejected.
*
* if the plugin has NO data about users/banks and the user does not have an
* entry in the plugin, the job will be held until data is received by the
* plugin.
*/
static int validate_cb (flux_plugin_t *p,
const char *topic,
Expand All @@ -781,11 +793,10 @@ static int validate_cb (flux_plugin_t *p,
int max_run_jobs, cur_active_jobs, max_active_jobs, queue_factor = 0;
double fairshare = 0.0;
bool only_dne_data;
user_bank_info *user_bank;

std::map<int, std::map<std::string, user_bank_info>>::iterator it;
std::map<std::string, user_bank_info>::iterator bank_it;
std::map<std::string, user_bank_info>::iterator q_it;

// unpack the attributes of the user/bank's submitted job when it
// enters job.validate and place them into their respective variables
flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -797,67 +808,45 @@ static int validate_cb (flux_plugin_t *p,
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

// make sure user belongs to flux-accounting DB; there are two behaviors
// supported in this plugin:
//
// if the plugin has SOME data about users/banks and the user does not
// have an entry in the plugin, the job will be rejected.
//
// if the plugin has NO data about users/banks and the user does not have
// an entry in the plugin, the job will be held until data is received by
// the plugin.
it = users.find (userid);
if (it == users.end ()) {
// check if the map only contains DNE entries
// perform a lookup in the users map of the unpacked user/bank
user_bank = get_user_info (userid, bank);

if (user_bank == NULL) {
// the user/bank could not be found in the plugin's internal map,
// so perform a check to see if the map has any loaded
// flux-accounting data before rejecting the job
bool only_dne_data = check_map_for_dne_only ();

if (users.empty () || only_dne_data) {
add_missing_bank_info (p, h, userid);
return 0;
} else {
return flux_jobtap_reject_job (p, args,
"no bank found for user: %i", userid);
return flux_jobtap_reject_job (p,
args,
"cannot find user/bank or "
"user/default bank entry "
"for: %i", userid);
}
}

// make sure user belongs to bank they specified; if no bank was passed in,
// look up their default bank
if (bank != NULL) {
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return flux_jobtap_reject_job (p, args,
"user does not belong to specified bank");
} else {
bank = const_cast<char*> (users_def_bank[userid].c_str ());
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return flux_jobtap_reject_job (p, args,
"user/default bank entry does not exist");
}

// if user/bank entry was disabled, reject job with a message saying the
// entry has been disabled
if (bank_it->second.active == 0)
if (user_bank->active == 0)
// the user/bank entry was disabled; reject the job
return flux_jobtap_reject_job (p, args, "user/bank entry has been "
"disabled from flux-accounting DB");

// validate the queue if one is passed in; if the user does not have access
// to the queue they specified, reject the job
queue_factor = get_queue_info (queue, bank_it);

if (queue_factor == INVALID_QUEUE)
if (get_queue_info (queue, user_bank->queues) == INVALID_QUEUE)
// the user/bank specified a queue that they do not belong to;
// reject the job
return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s",
queue);

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
max_active_jobs = bank_it->second.max_active_jobs;
cur_active_jobs = user_bank->cur_active_jobs;
max_active_jobs = user_bank->max_active_jobs;

// if a user/bank has reached their max_active_jobs limit, subsequently
// submitted jobs will be rejected
if (state == FLUX_JOB_STATE_NEW) {
if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs)
// the user/bank is already at their max_active_jobs limit;
// reject the job
return flux_jobtap_reject_job (p,
args,
"user has max active jobs");
Expand Down
2 changes: 1 addition & 1 deletion t/t1001-mf-priority-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ test_expect_success 'submit a job using default bank' '
test_expect_success 'submit a job using a bank the user does not belong to' '
test_must_fail flux submit --setattr=system.bank=account1 -n1 hostname > bad_bank.out 2>&1 &&
test_debug "cat bad_bank.out" &&
grep "user does not belong to specified bank" bad_bank.out
grep "cannot find user/bank or user/default bank entry for:" bad_bank.out
'

test_expect_success 'reject job when invalid bank format is passed in' '
Expand Down
2 changes: 1 addition & 1 deletion t/t1022-mf-priority-issue346.t
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test_expect_success 'cancel job' '

test_expect_success 'submit a job to plugin while not having an entry in the plugin' '
test_must_fail flux python ${SUBMIT_AS} 1003 hostname > no_user_entry.out 2>&1 &&
grep "no bank found for user" no_user_entry.out
grep "cannot find user/bank or user/default bank entry for:" no_user_entry.out
'

test_expect_success 'shut down flux-accounting service' '
Expand Down
2 changes: 1 addition & 1 deletion t/t1028-mf-priority-issue385.t
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ test_expect_success 'check that jobs transition to RUN' '
test_expect_success 'submitting a job under invalid user while plugin has data fails' '
test_must_fail flux python ${SUBMIT_AS} 9999 hostname > invalid_user.out 2>&1 &&
test_debug "cat invalid_user.out" &&
grep "flux-job: no bank found for user: 9999" invalid_user.out
grep "cannot find user/bank or user/default bank entry for: 9999" invalid_user.out
'

test_expect_success 'cancel running jobs' '
Expand Down

0 comments on commit fafd79e

Please sign in to comment.