diff --git a/src/Makefile.am b/src/Makefile.am index 0de6c18e..cdba7431 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,7 +17,8 @@ noinst_HEADERS = \ fairness/reader/data_reader_db.hpp \ fairness/writer/data_writer_base.hpp \ fairness/writer/data_writer_db.hpp \ - fairness/writer/data_writer_stdout.hpp + fairness/writer/data_writer_stdout.hpp \ + plugins/accounting.hpp fairness_libweighted_tree_la_SOURCES = \ fairness/account/account.cpp \ @@ -46,7 +47,8 @@ fairness_libweighted_tree_la_CXXFLAGS = \ TESTS = \ weighted_tree_test01.t \ data_reader_db_test01.t \ - data_writer_db_test01.t + data_writer_db_test01.t \ + accounting_test01.t check_PROGRAMS = $(TESTS) TEST_EXTENSIONS = .t @@ -93,6 +95,14 @@ data_writer_db_test01_t_LDADD = \ fairness/libweighted_tree.la \ common/libtap/libtap.la +accounting_test01_t_SOURCES = \ + plugins/test/accounting_test01.cpp \ + plugins/accounting.cpp \ + plugins/accounting.hpp +accounting_test01_t_CXXFLAGS = $(AM_CXXFLAGS) -I$(top_srcdir) +accounting_test01_t_LDADD = \ + common/libtap/libtap.la + noinst_PROGRAMS = \ cmd/flux-account-update-fshare \ cmd/flux-account-shares diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index f4ac138c..35576e03 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -8,5 +8,6 @@ jobtapdir = \ $(fluxlibdir)/job-manager/plugins/ jobtap_LTLIBRARIES = mf_priority.la -mf_priority_la_SOURCES = mf_priority.cpp +mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp +mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp new file mode 100644 index 00000000..a5576ed7 --- /dev/null +++ b/src/plugins/accounting.cpp @@ -0,0 +1,12 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#include "accounting.hpp" + diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp new file mode 100644 index 00000000..407bb96d --- /dev/null +++ b/src/plugins/accounting.hpp @@ -0,0 +1,36 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +// header file for the Accounting class + +#ifndef ACCOUNTING_H +#define ACCOUNTING_H + +#include +#include +#include +#include + +// all attributes are per-user/bank +class Association { +public: + std::string bank_name; // name of bank + double fairshare; // fair share value + int max_run_jobs; // max number of running jobs + int cur_run_jobs; // current number of running jobs + int max_active_jobs; // max number of active jobs + int cur_active_jobs; // current number of active jobs + std::vector held_jobs; // list of currently held job ID's + std::vector queues; // list of accessible queues + int queue_factor; // priority factor associated with queue + int active; // active status +}; + +#endif // ACCOUNTING_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index f9479bff..d14e8586 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -29,6 +29,9 @@ extern "C" { #include #include +// custom bank_info class file +#include "accounting.hpp" + // the plugin does not know about the association who submitted a job and will // assign default values to the association until it receives information from // flux-accounting @@ -58,24 +61,12 @@ enum bank_info_codes { BANK_NO_DEFAULT }; -typedef std::pair::iterator> bank_info_result; +typedef std::pair::iterator> bank_info_result; -std::map> users; +std::map> users; std::map queues; std::map users_def_bank; -struct bank_info { - double fairshare; - int max_run_jobs; - int cur_run_jobs; - int max_active_jobs; - int cur_active_jobs; - std::vector held_jobs; - std::vector queues; - int queue_factor; - int active; -}; - // min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not // currently used or enforced in this plugin, so their values have no // effect in queue limit enforcement. @@ -112,7 +103,7 @@ int64_t priority_calculation (flux_plugin_t *p, double fshare_factor = 0.0, priority = 0.0; int queue_factor = 0; int fshare_weight, queue_weight; - struct bank_info *b; + Association *b; fshare_weight = 100000; queue_weight = 10000; @@ -123,7 +114,7 @@ int64_t priority_calculation (flux_plugin_t *p, if (urgency == FLUX_JOB_URGENCY_EXPEDITE) return FLUX_JOB_PRIORITY_MAX; - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -151,7 +142,7 @@ int64_t priority_calculation (flux_plugin_t *p, static int get_queue_info ( char *queue, - std::map::iterator bank_it) + std::map::iterator bank_it) { std::map::iterator q_it; @@ -183,7 +174,7 @@ static int get_queue_info ( } -static void split_string (char *queues, struct bank_info *b) +static void split_string (char *queues, Association *b) { std::stringstream s_stream; @@ -217,7 +208,7 @@ int check_queue_factor (flux_plugin_t *p, * Add held job IDs to a JSON array to be added to a bank_info JSON object. */ static json_t *add_held_jobs ( - const std::pair &b) + const std::pair &b) { json_t *held_jobs = NULL; @@ -248,7 +239,7 @@ static json_t *add_held_jobs ( * Create a JSON object for a bank that a user belongs to. */ static json_t *pack_bank_info_object ( - const std::pair &b) + const std::pair &b) { json_t *bank_info, *held_jobs = NULL; @@ -281,7 +272,7 @@ static json_t *pack_bank_info_object ( */ static json_t *banks_to_json ( flux_plugin_t *p, - std::pair> &u) + std::pair> &u) { json_t *bank_info, *banks = NULL; @@ -313,7 +304,7 @@ static json_t *banks_to_json ( */ static json_t *user_to_json ( flux_plugin_t *p, - std::pair> u) + std::pair> u) { json_t *user = json_object (); // JSON object for one user json_t *userid, *banks = NULL; @@ -372,7 +363,7 @@ static bool check_map_for_dne_only () static int update_jobspec_bank (flux_plugin_t *p, int userid) { char *bank = NULL; - std::map>::iterator it; + std::map>::iterator it; it = users.find (userid); if (it == users.end ()) { @@ -400,8 +391,8 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid) // associated with the submitted job static bank_info_result get_bank_info (int userid, char *bank) { - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; it = users.find (userid); if (it == users.end ()) { @@ -521,9 +512,10 @@ static void rec_update_cb (flux_t *h, "active", &active) < 0) flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text); - struct bank_info *b; + Association *b; b = &users[uid][bank]; + b->bank_name = bank; b->fairshare = fshare; b->max_run_jobs = max_running_jobs; b->max_active_jobs = max_active_jobs; @@ -631,7 +623,7 @@ static int priority_cb (flux_plugin_t *p, char *bank = NULL; char *queue = NULL; int64_t priority; - struct bank_info *b; + Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -648,7 +640,7 @@ static int priority_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -660,8 +652,8 @@ static int priority_cb (flux_plugin_t *p, return -1; } - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; if (b->max_run_jobs == BANK_INFO_MISSING) { // try to look up user again @@ -750,11 +742,12 @@ static int priority_cb (flux_plugin_t *p, static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid) { - struct bank_info *b; + Association *b; b = &users[userid]["DNE"]; users_def_bank[userid] = "DNE"; + b->bank_name = "DNE"; b->fairshare = 0.1; b->max_run_jobs = BANK_INFO_MISSING; b->cur_run_jobs = 0; @@ -790,9 +783,9 @@ static int validate_cb (flux_plugin_t *p, double fairshare = 0.0; bool only_dne_data; - std::map>::iterator it; - std::map::iterator bank_it; - std::map::iterator q_it; + std::map>::iterator it; + std::map::iterator bank_it; + std::map::iterator q_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -885,10 +878,10 @@ static int new_cb (flux_plugin_t *p, char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; double fairshare = 0.0; - struct bank_info *b; + Association *b; - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -900,7 +893,7 @@ static int new_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1002,7 +995,7 @@ static int depend_cb (flux_plugin_t *p, { int userid; long int id; - struct bank_info *b; + Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1016,7 +1009,7 @@ static int depend_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1054,9 +1047,9 @@ static int run_cb (flux_plugin_t *p, void *data) { int userid; - struct bank_info *b; + Association *b; - b = static_cast + b = static_cast (flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1085,11 +1078,11 @@ static int job_updated (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; char *queue = NULL; - struct bank_info *b; + Association *b; if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, @@ -1101,7 +1094,7 @@ static int job_updated (flux_plugin_t *p, return flux_jobtap_error (p, args, "unable to unpack plugin args"); // grab bank_info struct for user/bank (if any) - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1156,7 +1149,7 @@ static int update_queue_cb (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; char *queue = NULL; @@ -1210,9 +1203,9 @@ static int inactive_cb (flux_plugin_t *p, void *data) { int userid; - struct bank_info *b; - std::map>::iterator it; - std::map::iterator bank_it; + Association *b; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1226,7 +1219,7 @@ static int inactive_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp new file mode 100644 index 00000000..650a69ff --- /dev/null +++ b/src/plugins/test/accounting_test01.cpp @@ -0,0 +1,100 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +extern "C" { +#if HAVE_CONFIG_H +#include "config.h" +#endif +} + +#include +#include +#include +#include +#include + +#include "src/plugins/accounting.hpp" +#include "src/common/libtap/tap.h" + +// define a test users map to run tests on +std::map> users; +std::map users_def_bank; + + +/* + * helper function to add a user/bank to the users map + */ +void add_user_to_map ( + std::map> &users, + int userid, + const std::string& bank, + Association a) +{ + // insert user to users map + users[userid][bank] = { + a.bank_name, + a.fairshare, + a.max_run_jobs, + a.cur_run_jobs, + a.max_active_jobs, + a.cur_active_jobs, + a.held_jobs, + a.queues, + a.queue_factor, + a.active + }; +} + + +/* + * helper function to add test users to the users map + */ +void initialize_map ( + std::map> &users) +{ + Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, {}, 0, 1}; + Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, {}, 0, 1}; + + add_user_to_map (users, 1001, "bank_A", user1); + users_def_bank[1001] = "bank_A"; + + // purposely do not add user2 to the def_bank_map + add_user_to_map (users, 1002, "bank_A", user2); +} + + +// ensure we can access a user/bank in the users map +static void test_direct_map_access ( + std::map> &users) +{ + ok (users[1001]["bank_A"].bank_name == "bank_A", + "a user/bank from users map can be accessed directly"); +} + + +int main (int argc, char* argv[]) +{ + // declare the number of tests that we plan to run + plan (1); + + // add users to the test map + initialize_map (users); + + test_direct_map_access (users); + + // indicate we are done testing + done_testing (); + + return EXIT_SUCCESS; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */