Skip to content

Commit

Permalink
plugin: add unpack of queue priorities
Browse files Browse the repository at this point in the history
Problem: The priority plugin cannot unpack any defined integer
priorities for queues in a configuration file, but there is at least
one use case to be able to set these priorities in a TOML config file.

Add an unpack of a "accounting.queue-priorities" table in the callback
for conf.update. The table will have any queues and their associated
integer priorities. If the queue already exists in the plugin's
internal queues map, update the associated priority of the queue. If
it does not exist, add the queue to the plugin's internal map.
  • Loading branch information
cmoussa1 committed Nov 25, 2024
1 parent 1e6b306 commit 72f91df
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,26 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid)

/*
* Get config information about the various priority factor weights
* and assign them in the priority_weights map.
* and assign them in the priority_weights map. Update the queues map with
* any defined integer priorities.
*/
static int conf_update_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int fshare_weight = -1, queue_weight = -1;
json_t *queue_priorities = nullptr;
flux_t *h = flux_jobtap_get_flux (p);

// unpack the various factors to be used in job priority calculation
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s?{s?{s?{s?i, s?i}}}}",
"{s?{s?{s?{s?i, s?i}, s?o}}}",
"conf", "accounting", "factor-weights",
"fairshare", &fshare_weight,
"queue", &queue_weight) < 0) {
"queue", &queue_weight,
"queue-priorities", &queue_priorities) < 0) {
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: conf.update: flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
Expand All @@ -234,6 +237,37 @@ static int conf_update_cb (flux_plugin_t *p,
if (queue_weight != -1)
priority_weights["queue"] = queue_weight;

if (queue_priorities) {
const char *key;
json_t *value;

json_object_foreach (queue_priorities, key, value) {
if (!key || !json_is_integer (value)) {
flux_log_error (h, "mf_priority: invalid data in queue-priorities");
continue;
}

const std::string queue_name (key);
int priority = json_integer_value (value);

auto it = queues.find (queue_name);
if (it != queues.end ()) {
// update the priority for the existing queue
it->second.priority = priority;
} else {
// queue does not exist; create a new queue and insert it
// and set the limits to their default values defined in
// the flux-accounting DB
Queue new_queue;
new_queue.min_nodes_per_job = 1;
new_queue.max_nodes_per_job = 1;
new_queue.max_time_per_job = 60;
new_queue.priority = priority;
queues[queue_name] = new_queue;
}
}
}

return 0;
}

Expand Down

0 comments on commit 72f91df

Please sign in to comment.