diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 35576e03..4ea616e9 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -8,6 +8,6 @@ jobtapdir = \ $(fluxlibdir)/job-manager/plugins/ jobtap_LTLIBRARIES = mf_priority.la -mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp +mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 36b5853c..3500ec8f 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -32,6 +32,8 @@ extern "C" { // custom bank_info class file #include "accounting.hpp" +// custom job resource counting file +#include "jj.hpp" // the plugin does not know about the association who submitted a job and will // assign default values to the association until it receives information from @@ -198,6 +200,42 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid) } +/* + * Using the jobspec from a job, increment the cur_nodes and cur_cores counts + * for an association. + */ +static int increment_resources (Association *b, json_t *jobspec) +{ + struct jj_counts counts; + + if (jj_get_counts_json (jobspec, &counts) < 0) + return -1; + + b->cur_nodes = b->cur_nodes + counts.nnodes; + b->cur_cores = b->cur_cores + (counts.nslots * counts.slot_size); + + return 0; +} + + +/* + * Using the jobspec from a job, decrement the cur_nodes and cur_cores counts + * for an association. + */ +static int decrement_resources (Association *b, json_t *jobspec) +{ + struct jj_counts counts; + + if (jj_get_counts_json (jobspec, &counts) < 0) + return -1; + + b->cur_nodes = b->cur_nodes - counts.nnodes; + b->cur_cores = b->cur_cores - (counts.nslots * counts.slot_size); + + return 0; +} + + /****************************************************************************** * * * Callbacks * @@ -883,6 +921,19 @@ static int run_cb (flux_plugin_t *p, { int userid; Association *b; + json_t *jobspec = NULL; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:o}", + "jobspec", &jobspec) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } b = static_cast (flux_jobtap_job_aux_get (p, @@ -897,8 +948,27 @@ static int run_cb (flux_plugin_t *p, return -1; } - // increment the user's current running jobs count + // increment the user's current running jobs and resources counts b->cur_run_jobs++; + if (jobspec == NULL) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.run: failed to unpack " \ + "jobspec"); + return -1; + } else { + if (increment_resources (b, jobspec) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.run: failed to increment " \ + "resource count"); + return -1; + } + } return 0; } @@ -1099,12 +1169,14 @@ static int inactive_cb (flux_plugin_t *p, { int userid; Association *b; + json_t *jobspec = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i}", - "userid", &userid) < 0) { + "{s:i, s:o}", + "userid", &userid, + "jobspec", &jobspec) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -1131,8 +1203,27 @@ static int inactive_cb (flux_plugin_t *p, return 0; // this job was running, so decrement the current running jobs count - // and look to see if any held jobs can be released + // and the resources count and look to see if any held jobs can be released b->cur_run_jobs--; + if (jobspec == NULL) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.inactive: failed to " \ + "unpack jobspec"); + return -1; + } else { + if (decrement_resources (b, jobspec) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "job.state.inactive: failed to " \ + "decrement resource count"); + return -1; + } + } // if the user/bank combo has any currently held jobs and the user is now // under their max jobs limit, remove the dependency from first held job