Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: improve job.update/job.update...queue callbacks #423

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 42 additions & 113 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,6 @@
// the association does not have permission to run jobs under
#define INVALID_QUEUE -6

// different codes to return as a result of looking up user/bank information:
//
// BANK_SUCCESS: we found an entry for the passed-in user/bank
// BANK_USER_NOT_FOUND: the user could not be found in the plugin map
// BANK_INVALID: the user specified a bank they don't belong to
// BANK_NO_DEFAULT: the user does not have a default bank in the plugin map
enum bank_info_codes {
BANK_SUCCESS,
BANK_USER_NOT_FOUND,
BANK_INVALID,
BANK_NO_DEFAULT
};

typedef std::pair<bank_info_codes, std::map<std::string, Association>::iterator> bank_info_result;

std::map<int, std::map<std::string, Association>> users;
std::map<std::string, struct queue_info> queues;
std::map<int, std::string> users_def_bank;
Expand Down Expand Up @@ -250,38 +235,6 @@
}


// Given a userid and an optional bank, locate the associated information in
// the plugin's internal users map. The return value is a pair: the first value
// is a return code to indicate success or the type of failure, and the second
// value is an iterator that points to the appropriate user/bank's information
// associated with the submitted job
static bank_info_result get_bank_info (int userid, char *bank)
{
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

it = users.find (userid);
if (it == users.end ()) {
return {BANK_USER_NOT_FOUND, bank_it};
}

// make sure user belongs to bank they specified; if no bank was passed in,
// look up their default bank
if (bank != NULL) {
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return {BANK_INVALID, bank_it};
} else {
bank = const_cast<char*> (users_def_bank[userid].c_str ());
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return {BANK_NO_DEFAULT, bank_it};
}

return {BANK_SUCCESS, bank_it};
}


/******************************************************************************
* *
* Callbacks *
Expand Down Expand Up @@ -890,128 +843,104 @@


/*
* apply an update on a job with regard to its queue once it has been
* Apply an update on a job with regard to its queue once it has been
* validated.
*/
static int job_updated (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, Association>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;
Association *b;
char *updated_queue = NULL;
Association *a;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}, s:{s?s}}",
"userid", &userid,
"jobspec", "attributes", "system", "bank", &bank,
"updates",
"attributes.system.queue", &queue) < 0)
"attributes.system.queue", &updated_queue) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// grab bank_info struct for user/bank (if any)
b = static_cast<Association *> (flux_jobtap_job_aux_get (
// grab Association object from job
a = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));

if (b == NULL) {
if (a == NULL) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority",
0, "job.update: bank info is missing");

return -1;
}

// look up user/bank info based on unpacked information
bank_info_result lookup_result = get_bank_info (userid, bank);

if (lookup_result.first == BANK_USER_NOT_FOUND) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.update: cannot find info for user: ",
userid);
} else if (lookup_result.first == BANK_INVALID) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.update: not a member of %s",
bank);
} else if (lookup_result.first == BANK_NO_DEFAULT) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.update: user/default bank "
"entry does not exist");
} else if (lookup_result.first == BANK_SUCCESS) {
bank_it = lookup_result.second;
}
// look up association
a = get_association (userid, bank, users, users_def_bank);

// if the queue for the job has been updated, fetch the priority of the
// validated queue and assign it to the associated bank_info struct
if (queue != NULL) {
int queue_factor = get_queue_info (queue, bank_it->second.queues);
b->queue_factor = queue_factor;
}
if (a == nullptr)
flux_jobtap_raise_exception (p,

Check warning on line 885 in src/plugins/mf_priority.cpp

View check run for this annotation

Codecov / codecov/patch

src/plugins/mf_priority.cpp#L885

Added line #L885 was not covered by tests
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"cannot find user/bank or "
"user/default bank entry "
"for uid: %i", userid);

if (updated_queue != NULL)
// the queue for the job has been updated, so fetch the priority
// associated with this queue and assign it to the Association object
// associated with the job
a->queue_factor = get_queue_info (updated_queue, a->queues);

return 0;
}


/*
* check for an updated queue and validate it for a user/bank; if the
* user/bank does not have access to the queue they are trying to update
* Check for an updated queue and validate it for an association; if the
* association does not have access to the queue they are trying to update
* their job for, reject the update and keep the job in its current queue.
*/
static int update_queue_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, Association>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;
char *updated_queue = NULL;
Association *a;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:s, s{s{s{s?s}}}}",
"userid", &userid,
"value", &queue,
"value", &updated_queue,
"jobspec", "attributes", "system", "bank",
&bank) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// look up user/bank info based on unpacked information
bank_info_result lookup_result = get_bank_info (userid, bank);
// look up association
a = get_association (userid, bank, users, users_def_bank);

if (lookup_result.first == BANK_USER_NOT_FOUND) {
return flux_jobtap_error (p,
args,
"mf_priority: cannot find info for user ",
userid);
} else if (lookup_result.first == BANK_INVALID) {
return flux_jobtap_error (p,
args,
"mf_priority: not a member of %s",
bank);
} else if (lookup_result.first == BANK_NO_DEFAULT) {
if (a == nullptr)
return flux_jobtap_reject_job (p,

Check warning on line 931 in src/plugins/mf_priority.cpp

View check run for this annotation

Codecov / codecov/patch

src/plugins/mf_priority.cpp#L931

Added line #L931 was not covered by tests
args,
"cannot find user/bank or "
"user/default bank entry "
"for uid: %i", userid);

// validate the updated queue and make sure the user/bank has access to it;
if (get_queue_info (updated_queue, a->queues) == INVALID_QUEUE)
// user/bank does not have access to this queue; reject the update
return flux_jobtap_error (p,
args,
"mf_priority: user/default bank entry does "
"not exist");
} else if (lookup_result.first == BANK_SUCCESS) {
bank_it = lookup_result.second;

// validate the updated queue and make sure the user/bank has
// access to it; if not, reject the update
if (get_queue_info (queue, bank_it->second.queues) == INVALID_QUEUE)
return flux_jobtap_error (p,
args,
"mf_priority: queue not valid for user: %s",
queue);
}
"mf_priority: queue not valid for user: %s",
updated_queue);

return 0;
}
Expand Down
Loading