From 72f91dfa25c9c6b2c07ec2df37871e9ff0c4dfc0 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Mon, 25 Nov 2024 11:03:44 -0800 Subject: [PATCH] plugin: add unpack of queue priorities Problem: The priority plugin cannot unpack any defined integer priorities for queues in a configuration file, but there is at least one use case to be able to set these priorities in a TOML config file. Add an unpack of a "accounting.queue-priorities" table in the callback for conf.update. The table will have any queues and their associated integer priorities. If the queue already exists in the plugin's internal queues map, update the associated priority of the queue. If it does not exist, add the queue to the plugin's internal map. --- src/plugins/mf_priority.cpp | 40 ++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index e1825d04..6930b4cf 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -205,7 +205,8 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid) /* * Get config information about the various priority factor weights - * and assign them in the priority_weights map. + * and assign them in the priority_weights map. Update the queues map with + * any defined integer priorities. */ static int conf_update_cb (flux_plugin_t *p, const char *topic, @@ -213,15 +214,17 @@ static int conf_update_cb (flux_plugin_t *p, void *data) { int fshare_weight = -1, queue_weight = -1; + json_t *queue_priorities = nullptr; flux_t *h = flux_jobtap_get_flux (p); // unpack the various factors to be used in job priority calculation if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s?{s?{s?{s?i, s?i}}}}", + "{s?{s?{s?{s?i, s?i}, s?o}}}", "conf", "accounting", "factor-weights", "fairshare", &fshare_weight, - "queue", &queue_weight) < 0) { + "queue", &queue_weight, + "queue-priorities", &queue_priorities) < 0) { flux_log_error (flux_jobtap_get_flux (p), "mf_priority: conf.update: flux_plugin_arg_unpack: %s", flux_plugin_arg_strerror (args)); @@ -234,6 +237,37 @@ static int conf_update_cb (flux_plugin_t *p, if (queue_weight != -1) priority_weights["queue"] = queue_weight; + if (queue_priorities) { + const char *key; + json_t *value; + + json_object_foreach (queue_priorities, key, value) { + if (!key || !json_is_integer (value)) { + flux_log_error (h, "mf_priority: invalid data in queue-priorities"); + continue; + } + + const std::string queue_name (key); + int priority = json_integer_value (value); + + auto it = queues.find (queue_name); + if (it != queues.end ()) { + // update the priority for the existing queue + it->second.priority = priority; + } else { + // queue does not exist; create a new queue and insert it + // and set the limits to their default values defined in + // the flux-accounting DB + Queue new_queue; + new_queue.min_nodes_per_job = 1; + new_queue.max_nodes_per_job = 1; + new_queue.max_time_per_job = 60; + new_queue.priority = priority; + queues[queue_name] = new_queue; + } + } + } + return 0; }