Skip to content

Commit

Permalink
Merge pull request #427 from cmoussa1/issue#426
Browse files Browse the repository at this point in the history
plugin: move flux-accounting-specific helper functions, remove unused ones
  • Loading branch information
mergify[bot] authored Mar 6, 2024
2 parents 4573715 + e8d8b66 commit b2f42b6
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 113 deletions.
47 changes: 47 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,50 @@ void split_string_and_push_back (const char *list,
vec.push_back (substr);
}
}


int get_queue_info (char *queue,
const std::vector<std::string> &permissible_queues,
const std::map<std::string, Queue> &queues)
{
if (queue != NULL) {
// check #1) the queue passed in exists in the queues map;
// if the queue cannot be found, this means that flux-accounting
// does not know about the queue, and thus should return a default
// factor
auto q_it = queues.find (queue);
if (q_it == queues.end ())
return UNKNOWN_QUEUE;

// check #2) the queue passed in is valid for the association
auto vect_it = std::find (permissible_queues.begin (),
permissible_queues.end (),
queue);
if (vect_it == permissible_queues.end ())
// the queue passed in is not valid for the association
return INVALID_QUEUE;

try {
return queues.at (queue).priority;
} catch (const std::out_of_range &e) {
return UNKNOWN_QUEUE;
}
}

// no queue was specified, so just use a default queue factor
return NO_QUEUE_SPECIFIED;
}


bool check_map_for_dne_only (std::map<int, std::map<std::string, Association>>
&users,
std::map<int, std::string> &users_def_bank)
{
for (const auto& entry : users) {
auto it = users_def_bank.find(entry.first);
if (it != users_def_bank.end() && it->second != "DNE")
return false;
}

return true;
}
34 changes: 34 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C" {
#include <map>
#include <iterator>
#include <sstream>
#include <algorithm>

// all attributes are per-user/bank
class Association {
Expand All @@ -46,6 +47,26 @@ class Association {
json_t* to_json () const; // convert object to JSON string
};

// - UKNOWN_QUEUE: a queue is specified for a submitted job that flux-accounting
// does not know about
// - NO_QUEUE_SPECIFIED: no queue was specified for this job
// - INVALID_QUEUE: the association does not have permission to run jobs under
// this queue
#define UNKNOWN_QUEUE 0
#define NO_QUEUE_SPECIFIED 0
#define INVALID_QUEUE -6

// min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not
// currently used or enforced in this plugin, so their values have no
// effect in queue limit enforcement.
class Queue {
public:
int min_nodes_per_job;
int max_nodes_per_job;
int max_time_per_job;
int priority;
};

// get an Association object that points to user/bank in the users map;
// return nullptr on failure
Association* get_association (int userid,
Expand All @@ -62,4 +83,17 @@ json_t* convert_map_to_json (std::map<int, std::map<std::string, Association>>
void split_string_and_push_back (const char *list,
std::vector<std::string> &vec);

// validate a potentially passed-in queue by an association and return the
// integer priority associated with the queue
int get_queue_info (char *queue,
const std::vector<std::string> &permissible_queues,
const std::map<std::string, Queue> &queues);

// check the contents of the users map to see if every user's bank is a
// temporary "DNE" value; if it is, the plugin is still waiting on
// flux-accounting data
bool check_map_for_dne_only (std::map<int, std::map<std::string, Association>>
&users,
std::map<int, std::string> &users_def_bank);

#endif // ACCOUNTING_H
123 changes: 11 additions & 112 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,10 @@ extern "C" {
// flux-accounting
#define BANK_INFO_MISSING 999

// a queue is specified for a submitted job that flux-accounting does not know
// about
#define UNKNOWN_QUEUE 0

// no queue is specified for a submitted job
#define NO_QUEUE_SPECIFIED 0

// a queue was specified for a submitted job that flux-accounting knows about and
// the association does not have permission to run jobs under
#define INVALID_QUEUE -6

std::map<int, std::map<std::string, Association>> users;
std::map<std::string, struct queue_info> queues;
std::map<std::string, Queue> queues;
std::map<int, std::string> users_def_bank;

// min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not
// currently used or enforced in this plugin, so their values have no
// effect in queue limit enforcement.
struct queue_info {
int min_nodes_per_job;
int max_nodes_per_job;
int max_time_per_job;
int priority;
};

/******************************************************************************
* *
* Helper Functions *
Expand Down Expand Up @@ -125,88 +104,6 @@ int64_t priority_calculation (flux_plugin_t *p,
}


static int get_queue_info (char *queue,
std::vector<std::string> permissible_queues)
{
std::map<std::string, struct queue_info>::iterator q_it;

// make sure that if a queue is passed in, it is a valid queue for the
// user to run jobs in
if (queue != NULL) {
// check #1) the queue passed in exists in the queues map;
// if the queue cannot be found, this means that flux-accounting
// does not know about the queue, and thus should return a default
// factor
q_it = queues.find (queue);
if (q_it == queues.end ())
return UNKNOWN_QUEUE;

// check #2) the queue passed in is a valid option to pass for user
std::vector<std::string>::iterator vect_it;
vect_it = std::find (permissible_queues.begin (),
permissible_queues.end (), queue);

if (vect_it == permissible_queues.end ())
return INVALID_QUEUE;
else
// add priority associated with the passed in queue to bank_info
return queues[queue].priority;
} else {
// no queue was specified, so just use a default queue factor
return NO_QUEUE_SPECIFIED;
}
}


static void split_string (char *queues, Association *b)
{
std::stringstream s_stream;

s_stream << queues; // create string stream from string
while (s_stream.good ()) {
std::string substr;
getline (s_stream, substr, ','); // get string delimited by comma
b->queues.push_back (substr);
}
}


int check_queue_factor (flux_plugin_t *p,
int queue_factor,
char *queue,
char *prefix = (char *) "")
{
if (queue_factor == INVALID_QUEUE) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"%sQueue not valid for user: %s",
prefix, queue);
return -1;
}

return 0;
}


// Scan the users map and look at each user's default bank to see if any one
// of them have a valid bank (i.e one that is not "DNE"; if any of the users do
// do have a valid bank, it will return false)
static bool check_map_for_dne_only ()
{
// the users map iterated through in this for-loop, along with the
// users_def_bank map used to look up a user's default bank, are
// both global variables
for (const auto& entry : users) {
auto def_bank_it = users_def_bank.find(entry.first);
if (def_bank_it != users_def_bank.end() &&
def_bank_it->second != "DNE")
return false;
}

return true;
}


/*
* Update the jobspec with the default bank the association used to
* submit their job under.
Expand Down Expand Up @@ -382,7 +279,7 @@ static void rec_q_cb (flux_t *h,
"priority", &priority) < 0)
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);

struct queue_info *q;
Queue *q;
q = &queues[queue];

q->min_nodes_per_job = min_nodes_per_job;
Expand Down Expand Up @@ -467,7 +364,7 @@ static int priority_cb (flux_plugin_t *p,
users,
users_def_bank);
if (assoc == nullptr) {
if (check_map_for_dne_only () == true)
if (check_map_for_dne_only (users, users_def_bank) == true)
// the plugin is still waiting on flux-accounting data to be
// loaded in; keep the job in PRIORITY
return flux_jobtap_priority_unavail (p, args);
Expand All @@ -488,7 +385,9 @@ static int priority_cb (flux_plugin_t *p,
return flux_jobtap_priority_unavail (p, args);

// fetch priority of the associated queue
assoc->queue_factor = get_queue_info (queue, assoc->queues);
assoc->queue_factor = get_queue_info (queue,
assoc->queues,
queues);
if (assoc->queue_factor == INVALID_QUEUE)
// the queue the association specified is invalid
return -1;
Expand Down Expand Up @@ -613,7 +512,7 @@ static int validate_cb (flux_plugin_t *p,
// the assocation could not be found in the plugin's internal map,
// so perform a check to see if the map has any loaded
// flux-accounting data before rejecting the job
bool only_dne_data = check_map_for_dne_only ();
bool only_dne_data = check_map_for_dne_only (users, users_def_bank);

if (users.empty () || only_dne_data) {
add_missing_bank_info (p, h, userid);
Expand All @@ -632,7 +531,7 @@ static int validate_cb (flux_plugin_t *p,
return flux_jobtap_reject_job (p, args, "user/bank entry has been "
"disabled from flux-accounting DB");

if (get_queue_info (queue, a->queues) == INVALID_QUEUE)
if (get_queue_info (queue, a->queues, queues) == INVALID_QUEUE)
// the user/bank specified a queue that they do not belong to;
// reject the job
return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s",
Expand Down Expand Up @@ -714,7 +613,7 @@ static int new_cb (flux_plugin_t *p,
}

// assign priority associated with validated queue
b->queue_factor = get_queue_info (queue, b->queues);
b->queue_factor = get_queue_info (queue, b->queues, queues);

max_run_jobs = b->max_run_jobs;
cur_active_jobs = b->cur_active_jobs;
Expand Down Expand Up @@ -885,7 +784,7 @@ static int job_updated (flux_plugin_t *p,
// the queue for the job has been updated, so fetch the priority
// associated with this queue and assign it to the Association object
// associated with the job
a->queue_factor = get_queue_info (updated_queue, a->queues);
a->queue_factor = get_queue_info (updated_queue, a->queues, queues);

return 0;
}
Expand Down Expand Up @@ -926,7 +825,7 @@ static int update_queue_cb (flux_plugin_t *p,
"for uid: %i", userid);

// validate the updated queue and make sure the user/bank has access to it;
if (get_queue_info (updated_queue, a->queues) == INVALID_QUEUE)
if (get_queue_info (updated_queue, a->queues, queues) == INVALID_QUEUE)
// user/bank does not have access to this queue; reject the update
return flux_jobtap_error (p,
args,
Expand Down
Loading

0 comments on commit b2f42b6

Please sign in to comment.