Skip to content

Commit

Permalink
plugin: track resources used per-job
Browse files Browse the repository at this point in the history
Problem: The priority plugin does not keep track of the resources
allocated per-job when an association submits a job, which will be
needed when the plugin begins to enforce limits on how many resources
an association can use across all of their running jobs.

Add an increment/decrement of node+core counts in job.state.run and
job.state.inactive by unpacking the jobspec of the job and
calculating the number of nodes and cores assigned with the job.
Increment and decrement these values in the "cur_nodes" and "cur_cores"
attributes of the Association object.
  • Loading branch information
cmoussa1 committed Jan 9, 2025
1 parent d9962f0 commit b96a61e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ jobtapdir = \
$(fluxlibdir)/job-manager/plugins/

jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
99 changes: 95 additions & 4 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ extern "C" {

// custom bank_info class file
#include "accounting.hpp"
// custom job resource counting file
#include "jj.hpp"

// the plugin does not know about the association who submitted a job and will
// assign default values to the association until it receives information from
Expand Down Expand Up @@ -198,6 +200,42 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid)
}


/*
* Using the jobspec from a job, increment the cur_nodes and cur_cores counts
* for an association.
*/
static int increment_resources (Association *b, json_t *jobspec)
{
struct jj_counts counts;

if (jj_get_counts_json (jobspec, &counts) < 0)
return -1;

b->cur_nodes = b->cur_nodes + counts.nnodes;
b->cur_cores = b->cur_cores + (counts.nslots * counts.slot_size);

return 0;
}


/*
* Using the jobspec from a job, decrement the cur_nodes and cur_cores counts
* for an association.
*/
static int decrement_resources (Association *b, json_t *jobspec)
{
struct jj_counts counts;

if (jj_get_counts_json (jobspec, &counts) < 0)
return -1;

b->cur_nodes = b->cur_nodes - counts.nnodes;
b->cur_cores = b->cur_cores - (counts.nslots * counts.slot_size);

return 0;
}


/******************************************************************************
* *
* Callbacks *
Expand Down Expand Up @@ -883,6 +921,19 @@ static int run_cb (flux_plugin_t *p,
{
int userid;
Association *b;
json_t *jobspec = NULL;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:o}",
"jobspec", &jobspec) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
return -1;
}

b = static_cast<Association *>
(flux_jobtap_job_aux_get (p,
Expand All @@ -897,8 +948,27 @@ static int run_cb (flux_plugin_t *p,
return -1;
}

// increment the user's current running jobs count
// increment the user's current running jobs and resources counts
b->cur_run_jobs++;
if (jobspec == NULL) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.run: failed to unpack " \
"jobspec");
return -1;
} else {
if (increment_resources (b, jobspec) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.run: failed to increment " \
"resource count");
return -1;
}
}

return 0;
}
Expand Down Expand Up @@ -1099,12 +1169,14 @@ static int inactive_cb (flux_plugin_t *p,
{
int userid;
Association *b;
json_t *jobspec = NULL;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i}",
"userid", &userid) < 0) {
"{s:i, s:o}",
"userid", &userid,
"jobspec", &jobspec) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
Expand All @@ -1131,8 +1203,27 @@ static int inactive_cb (flux_plugin_t *p,
return 0;

// this job was running, so decrement the current running jobs count
// and look to see if any held jobs can be released
// and the resources count and look to see if any held jobs can be released
b->cur_run_jobs--;
if (jobspec == NULL) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.inactive: failed to " \
"unpack jobspec");
return -1;
} else {
if (decrement_resources (b, jobspec) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.inactive: failed to " \
"decrement resource count");
return -1;
}
}

// if the user/bank combo has any currently held jobs and the user is now
// under their max jobs limit, remove the dependency from first held job
Expand Down

0 comments on commit b96a61e

Please sign in to comment.