Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: track the resources used across all of an association's running jobs #561

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ noinst_HEADERS = \
fairness/writer/data_writer_base.hpp \
fairness/writer/data_writer_db.hpp \
fairness/writer/data_writer_stdout.hpp \
plugins/accounting.hpp
plugins/accounting.hpp \
plugins/jj.hpp

fairness_libweighted_tree_la_SOURCES = \
fairness/account/account.cpp \
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ jobtapdir = \
$(fluxlibdir)/job-manager/plugins/

jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
5 changes: 4 additions & 1 deletion src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ json_t* Association::to_json () const

// 'o' steals the reference for both held_job_ids and user_queues
json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i, s:o,"
" s:o, s:i, s:o, s:s, s:i, s:i, s:i}",
" s:o, s:i, s:o, s:s, s:i, s:i, s:i,"
" s:i, s:i}",
"bank_name", bank_name.c_str (),
"fairshare", fairshare,
"max_run_jobs", max_run_jobs,
Expand All @@ -100,6 +101,8 @@ json_t* Association::to_json () const
"def_project", def_project.c_str (),
"max_nodes", max_nodes,
"max_cores", max_cores,
"cur_nodes", cur_nodes,
"cur_cores", cur_cores,
"active", active);

if (!u)
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Association {
std::string def_project; // default project
int max_nodes; // max num nodes across all running jobs
int max_cores; // max num cores across all running jobs
int cur_nodes; // current number of used nodes
int cur_cores; // current number of used cores

// methods
json_t* to_json () const; // convert object to JSON string
Expand Down
165 changes: 165 additions & 0 deletions src/plugins/jj.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <errno.h>
#include <string.h>
#include <jansson.h>

#include "jj.hpp"

static int jj_read_level (json_t *o, int level, struct jj_counts *jj);

static int jj_read_vertex (json_t *o, int level, struct jj_counts *jj)
{
int count;
const char *type = NULL;
json_t *with = NULL;
json_error_t error;
int exclusive = 0;

if (json_unpack_ex (o, &error, 0, "{ s:s s:i s?b s?o }",
"type", &type,
"count", &count,
"exclusive", &exclusive,
"with", &with) < 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"level %d: %s", level, error.text);
errno = EINVAL;
return -1;
}
if (count <= 0) {
sprintf (jj->error, "Invalid count %d for type '%s'",
count, type);
errno = EINVAL;
return -1;
}
if (streq (type, "node")) {
jj->nnodes = count;
if (exclusive)
jj->exclusive = true;
}
else if (streq (type, "slot"))
jj->nslots = count;
else if (streq (type, "core"))
jj->slot_size = count;
else if (streq (type, "gpu"))
jj->slot_gpus = count;
else {
sprintf (jj->error, "Unsupported resource type '%s'", type);
errno = EINVAL;
return -1;
}
if (with)
return jj_read_level (with, level+1, jj);
return 0;

}

static int jj_read_level (json_t *o, int level, struct jj_counts *jj)
{
int i;
json_t *v = NULL;

if (!json_is_array (o)) {
snprintf (jj->error, sizeof (jj->error) - 1,
"level %d: must be an array", level);
errno = EINVAL;
return -1;
}
json_array_foreach (o, i, v) {
if (jj_read_vertex (v, level, jj) < 0)
return -1;
}
return 0;
}

int jj_get_counts (const char *spec, struct jj_counts *jj)
{
json_t *o = NULL;
json_error_t error;
int rc = -1;

if ((o = json_loads (spec, 0, &error)) == NULL) {
snprintf (jj->error, sizeof (jj->error) - 1,
"JSON load: %s", error.text);
errno = EINVAL;
return -1;
}

rc = jj_get_counts_json (o, jj);
json_decref (o);
return rc;
}

int jj_get_counts_json (json_t *jobspec, struct jj_counts *jj)
{
int version;
json_t *resources = NULL;
json_error_t error;

if (!jj) {
errno = EINVAL;
return -1;
}
memset (jj, 0, sizeof (*jj));

if (json_unpack_ex (jobspec, &error, 0, "{s:i s:o}",
"version", &version,
"resources", &resources) < 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"at top level: %s", error.text);
errno = EINVAL;
return -1;
}
if (version != 1) {
snprintf (jj->error, sizeof (jj->error) - 1,
"Invalid version: expected 1, got %d", version);
errno = EINVAL;
return -1;
}
/* N.B. attributes.system is generally optional, but
* attributes.system.duration is required in jobspec version 1 */
if (json_unpack_ex (jobspec, &error, 0, "{s:{s:{s:F}}}",
"attributes",
"system",
"duration", &jj->duration) < 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"at top level: getting duration: %s", error.text);
errno = EINVAL;
return -1;
}
if (jj_read_level (resources, 0, jj) < 0)
return -1;

if (jj->nslots <= 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"Unable to determine slot count");
errno = EINVAL;
return -1;
}
if (jj->slot_size <= 0) {
snprintf (jj->error, sizeof (jj->error) - 1,
"Unable to determine slot size");
errno = EINVAL;
return -1;
}
if (jj->nnodes)
jj->nslots *= jj->nnodes;
return 0;
}

}
/* vi: ts=4 sw=4 expandtab
*/
62 changes: 62 additions & 0 deletions src/plugins/jj.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

extern "C" {
#ifndef HAVE_JJ_H
#define HAVE_JJ_H 1

#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <jansson.h>
#include <stdbool.h>

#define JJ_ERROR_TEXT_LENGTH 256

/**
* streq - Are two strings equal?
* @a: first string
* @b: first string
*
* This macro is arguably more readable than "!strcmp(a, b)".
*
* Example:
* if (streq(somestring, ""))
* printf("String is empty!\n");
*/
#define streq(a,b) (strcmp((a),(b)) == 0)
jameshcorbett marked this conversation as resolved.
Show resolved Hide resolved

struct jj_counts {
int nnodes; /* total number of nodes requested */
int nslots; /* total number of slots requested */
int slot_size; /* number of cores per slot */
int slot_gpus; /* number of gpus per slot */

bool exclusive; /* enable node exclusive allocation if available */

double duration; /* attributes.system.duration if set */

char error[JJ_ERROR_TEXT_LENGTH]; /* On error, contains error description */
};

/* Parse jobspec from json string `spec`, return resource request summary
* in `counts` on success.
* Returns 0 on success and -1 on failure with errno set and jj->error[]
* with an error message string.
*/

int jj_get_counts (const char *spec, struct jj_counts *counts);

/* Identical to jj_get_counts, but take json_t */
int jj_get_counts_json (json_t *jobspec, struct jj_counts *counts);

#endif /* !HAVE_JJ_H */
}
Loading
Loading