From 5b3e2dd18e885c9af4e0d12afb60b95654a6fa1b Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 27 Sep 2023 15:58:20 -0700 Subject: [PATCH 1/8] plugin: add bank name to bank_info struct Problem: the bank_info struct does not contain the name of the bank in the struct, which can make it more tedious to find the name of the bank because you have to look at the key of the map in order to find it. It would be more convenient if the name was also in the struct. Add the bank name to the struct attached with each user/bank combination. --- src/plugins/mf_priority.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 8c457cf0d..75a6d9649 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -65,6 +65,7 @@ std::map queues; std::map users_def_bank; struct bank_info { + std::string bank_name; double fairshare; int max_run_jobs; int cur_run_jobs; @@ -524,6 +525,7 @@ static void rec_update_cb (flux_t *h, struct bank_info *b; b = &users[uid][bank]; + b->bank_name = bank; b->fairshare = fshare; b->max_run_jobs = max_running_jobs; b->max_active_jobs = max_active_jobs; @@ -753,6 +755,7 @@ static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid) b = &users[userid]["DNE"]; users_def_bank[userid] = "DNE"; + b->bank_name = "DNE"; b->fairshare = 0.1; b->max_run_jobs = BANK_INFO_MISSING; b->cur_run_jobs = 0; From a7be7e24a8b96ba460f46ed6a0d1df71cf494287 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 27 Sep 2023 16:07:20 -0700 Subject: [PATCH 2/8] plugin: add job.update for banks Problem: flux-accounting currently has no support for a user to update the bank of their job. Add a new callback to the plugin which looks for the "job.update.attributes.system.bank" topic string and validates the new bank that the user is trying to update their job to. If they have access to the bank they want to update their job to but it is currently at its max active jobs OR max running jobs limits, reject the update. Add a check for this update in the job.update callback which will look for an updated bank. If the updated bank has been validated, change the bank_info struct assigned to that job to reflect the attributes of the new bank. --- src/plugins/mf_priority.cpp | 119 ++++++++++++++++++++++++++++++++++-- 1 file changed, 114 insertions(+), 5 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 75a6d9649..000d94f7d 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -1078,8 +1078,8 @@ static int run_cb (flux_plugin_t *p, /* - * apply an update on a job with regard to its queue once it has been - * validated. + * apply an update on a job with regard to its queue or associated bank once + * the requested attribute has been validated. */ static int job_updated (flux_plugin_t *p, const char *topic, @@ -1089,16 +1089,20 @@ static int job_updated (flux_plugin_t *p, std::map::iterator bank_it; int userid; char *bank = NULL; + char *new_bank = NULL; char *queue = NULL; struct bank_info *b; + flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s}}}, s:{s?s}}", + "{s:i, s{s{s{s?s}}}, s:{s?s, s?s}}", "userid", &userid, - "jobspec", "attributes", "system", "bank", &bank, + "jobspec", "attributes", "system", "bank", + &bank, "updates", - "attributes.system.queue", &queue) < 0) + "attributes.system.queue", &queue, + "attributes.system.bank", &new_bank) < 0) return flux_jobtap_error (p, args, "unable to unpack plugin args"); // grab bank_info struct for user/bank (if any) @@ -1136,6 +1140,38 @@ static int job_updated (flux_plugin_t *p, bank_it = lookup_result.second; } + // if the user is updating the bank they are submitting the job under, we + // need to update the bank_info struct associated with the job + if (new_bank != NULL) { + // need to get attributes of new bank + bank_info_result new_bank_lookup = get_bank_info (userid, new_bank); + if (lookup_result.first != BANK_SUCCESS) + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "job.update: could not find bank " + "information for user"); + + bank_it = new_bank_lookup.second; + + // first, decrement the active jobs count of the old bank + b->cur_active_jobs--; + + // update the bank_info struct associated with the job to use + // the attributes of the updated bank + b = &bank_it->second; + + // next, update the current active jobs count for the updated bank + b->cur_active_jobs++; + + // finally, set new the updated bank_info struct to the job + if (flux_jobtap_job_aux_set (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:bank_info", + b, + NULL) < 0) + flux_log_error (h, "flux_jobtap_job_aux_set"); + } + // if the queue for the job has been updated, fetch the priority of the // validated queue and assign it to the associated bank_info struct if (queue != NULL) { @@ -1205,6 +1241,78 @@ static int update_queue_cb (flux_plugin_t *p, } +/* + * check for an updated bank and validate it for a user/bank; if the + * user/bank does not have access to the bank they are trying to update + * their job for, reject the update and keep the job under its current bank. + * + * also check the active jobs and running jobs limits for the new bank; if the + * new bank is currently at its max active jobs or max running jobs limit, + * reject the update and keep the job under its current bank. + */ +static int update_bank_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + std::map::iterator bank_it; + int userid; + char *bank = NULL; + + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:i, s:s}", + "userid", &userid, + "value", &bank) < 0) { + return flux_jobtap_error (p, args, "unable to unpack bank arg"); + } + + // look up user/bank info based on unpacked information + bank_info_result lookup_result = get_bank_info (userid, bank); + + if (lookup_result.first == BANK_USER_NOT_FOUND) { + return flux_jobtap_error (p, + args, + "mf_priority: cannot find info for user ", + userid); + } else if (lookup_result.first == BANK_INVALID) { + return flux_jobtap_error (p, + args, + "mf_priority: not a member of %s", + bank); + } else if (lookup_result.first == BANK_NO_DEFAULT) { + return flux_jobtap_error (p, + args, + "mf_priority: user/default bank entry does " + "not exist"); + } else if (lookup_result.first == BANK_SUCCESS) { + bank_it = lookup_result.second; + + // need to check that the new bank is not already at the max + // active jobs limit; if so, reject the update + int max_active_jobs = bank_it->second.max_active_jobs; + int cur_active_jobs = bank_it->second.cur_active_jobs; + if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs) + return flux_jobtap_error (p, + args, + "mf_priority: new bank is already at " + "max-active-jobs limit"); + + // also need to check that new bank is not already at max run jobs + // limit; if so, reject the update + int max_run_jobs = bank_it->second.max_run_jobs; + int cur_run_jobs = bank_it->second.cur_run_jobs; + if (max_run_jobs > 0 && cur_run_jobs == max_run_jobs) + return flux_jobtap_error (p, + args, + "mf_priority: new bank is already at " + "max-run-jobs limit"); + } + + return 0; +} + + static int inactive_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, @@ -1278,6 +1386,7 @@ static const struct flux_plugin_handler tab[] = { { "job.state.run", run_cb, NULL}, { "plugin.query", query_cb, NULL}, { "job.update.attributes.system.queue", update_queue_cb, NULL }, + { "job.update.attributes.system.bank", update_bank_cb, NULL }, { 0 }, }; From b124558208570cb87870333dac7a67287ed50235 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 27 Sep 2023 16:08:14 -0700 Subject: [PATCH 3/8] t: add tests for updating bank Problem: flux-accounting has no tests for updating the bank of a pending job. Add some tests. --- t/Makefile.am | 1 + t/t1031-mf-priority-update-bank.t | 127 ++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100755 t/t1031-mf-priority-update-bank.t diff --git a/t/Makefile.am b/t/Makefile.am index a74f6b44c..5e106ea98 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -32,6 +32,7 @@ TESTSCRIPTS = \ t1028-mf-priority-issue385.t \ t1029-mf-priority-default-bank.t \ t1030-mf-priority-update-queue.t \ + t1031-mf-priority-update-bank.t \ t5000-valgrind.t \ python/t1000-example.py diff --git a/t/t1031-mf-priority-update-bank.t b/t/t1031-mf-priority-update-bank.t new file mode 100755 index 000000000..b2407aa9e --- /dev/null +++ b/t/t1031-mf-priority-update-bank.t @@ -0,0 +1,127 @@ +#!/bin/bash + +test_description='test updating the bank for a pending job in priority plugin' + +. `dirname $0`/sharness.sh + +mkdir -p conf.d + +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job -o,--config-path=$(pwd)/conf.d + +flux setattr log-stderr-level 1 + +test_expect_success 'create flux-accounting DB' ' + flux account -p $(pwd)/FluxAccountingTest.db create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} && + flux jobtap list | grep mf_priority +' + +test_expect_success 'add some banks to the DB' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root A 1 && + flux account add-bank --parent-bank=root B 1 && + flux account add-bank --parent-bank=root C 1 +' + +test_expect_success 'add a user to the DB' ' + flux account add-user \ + --username=user1 \ + --userid=5001 \ + --bank=A && + flux account add-user \ + --username=user1 \ + --userid=5001 \ + --bank=B \ + --max-active-jobs=3 \ + --max-running-jobs=2 +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p $(pwd)/FluxAccountingTest.db +' + +test_expect_success 'submit one job under default bank, two under non-default bank' ' + job1=$(flux python ${SUBMIT_AS} 5001 --urgency=0 sleep 30) && + job2=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --urgency=0 sleep 30) && + job3=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --urgency=0 sleep 30) +' + +test_expect_success 'update of bank of pending job works' ' + flux update $job1 bank=B && + flux job wait-event -t 30 $job1 priority && + flux job eventlog $job1 > eventlog.out && + grep "attributes.system.bank=\"B\"" eventlog.out +' + +test_expect_success 'trying to update to a bank user does not have access to fails in job.validate' ' + test_must_fail flux update $job1 bank=C > invalid_bank.out 2>&1 && + test_debug "cat invalid_bank.out" && + grep "not a member of C" invalid_bank.out +' + +test_expect_success 'trying to update to a bank that does not exist fails in job.validate' ' + test_must_fail flux update $job1 bank=foo > nonexistent_bank.out 2>&1 && + test_debug "cat nonexistent_bank.out" && + grep "mf_priority: not a member of foo" nonexistent_bank.out +' + +test_expect_success 'update a job to another bank that is at max-active-jobs limit' ' + job4=$(flux python ${SUBMIT_AS} 5001 --urgency=0 sleep 30) && + test_must_fail flux update $job4 bank=B > max_active_jobs.out 2>&1 && + test_debug "cat max_active_jobs.out" && + grep "new bank is already at max-active-jobs limit" max_active_jobs.out && + flux job cancel $job4 +' + +test_expect_success 'cancel one of the jobs so bank is not at max-active-jobs limit' ' + flux job cancel $job3 +' + +test_expect_success 'update urgency of held jobs so they transition to RUN' ' + flux job urgency $job1 default && + flux job urgency $job2 default && + flux job wait-event -t 10 $job1 alloc && + flux job wait-event -t 10 $job2 alloc +' + +test_expect_success 'update a job to another bank that is at max-run-jobs limit' ' + job5=$(flux python ${SUBMIT_AS} 5001 --urgency=0 sleep 30) && + test_must_fail flux update $job5 bank=B > max_run_jobs.out 2>&1 && + test_debug "cat max_run_jobs.out" && + grep "new bank is already at max-run-jobs limit" max_run_jobs.out && + flux job cancel $job5 +' + +test_expect_success 'cancel jobs' ' + flux job cancel $job1 && + flux job cancel $job2 +' + +test_expect_success 'submit job under non-default bank' ' + job6=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --urgency=0 sleep 30) +' + +test_expect_success 'updating job to default bank works' ' + flux update $job6 bank=A && + flux job wait-event -t 30 $job6 priority && + flux job eventlog $job6 > eventlog.out && + grep "attributes.system.bank=\"A\"" eventlog.out +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done From 64ad371b0711dbc2e9d3dbda330ca34ef51df8bb Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 6 Nov 2023 09:19:16 -0800 Subject: [PATCH 4/8] t: add tests for updating both queue and bank Problem: flux-accounting has no tests for updating both the queue and the bank of a job at the same time. Add a new test file which tests updating both the queue and the bank at the same time as well as some combinations where at least one attribute update is expected to fail (i.e an invalid bank or an invalid queue). In both of the cases where at least one attribute fails, the entire request will fail and the job will remain under its original attributes. --- t/Makefile.am | 1 + t/t1032-mf-priority-update-job.t | 115 +++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100755 t/t1032-mf-priority-update-job.t diff --git a/t/Makefile.am b/t/Makefile.am index 5e106ea98..f9e2be2e4 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -33,6 +33,7 @@ TESTSCRIPTS = \ t1029-mf-priority-default-bank.t \ t1030-mf-priority-update-queue.t \ t1031-mf-priority-update-bank.t \ + t1032-mf-priority-update-job.t \ t5000-valgrind.t \ python/t1000-example.py diff --git a/t/t1032-mf-priority-update-job.t b/t/t1032-mf-priority-update-job.t new file mode 100755 index 000000000..d2d2b7f8e --- /dev/null +++ b/t/t1032-mf-priority-update-job.t @@ -0,0 +1,115 @@ +#!/bin/bash + +test_description='test updating a combination of queue and bank for a pending job in priority plugin' + +. `dirname $0`/sharness.sh + +mkdir -p conf.d + +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job -o,--config-path=$(pwd)/conf.d + +flux setattr log-stderr-level 1 + +test_expect_success 'create flux-accounting DB' ' + flux account -p $(pwd)/FluxAccountingTest.db create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} && + flux jobtap list | grep mf_priority +' + +test_expect_success 'add some banks to the DB' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root A 1 && + flux account add-bank --parent-bank=root B 1 +' + +test_expect_success 'add some queues to the DB' ' + flux account add-queue bronze --priority=100 && + flux account add-queue silver --priority=200 && + flux account add-queue gold --priority=300 +' + +test_expect_success 'add a user to the DB' ' + flux account add-user --username=user5001 \ + --userid=5001 \ + --bank=A \ + --queues="bronze,silver" && + flux account add-user --username=user5001 \ + --userid=5001 \ + --bank=B \ + --queues="bronze,silver" +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p $(pwd)/FluxAccountingTest.db +' + +test_expect_success 'configure flux with some queues' ' + cat >conf.d/queues.toml <<-EOT && + [queues.bronze] + [queues.silver] + [queues.gold] + EOT + flux config reload +' + +test_expect_success 'submit job for testing' ' + jobid=$(flux python ${SUBMIT_AS} 5001 --queue=bronze sleep 30) && + flux job wait-event -f json $jobid priority +' + +test_expect_success 'update both queue and bank successfully' ' + flux update $jobid queue=silver bank=B && + flux job wait-event -f json $jobid priority && + flux job eventlog $jobid > eventlog.out && + grep "attributes.system.queue=\"silver\"" eventlog.out && + grep 2050000 eventlog.out && + grep "attributes.system.bank=\"B\"" eventlog.out && + flux job cancel $jobid +' + +test_expect_success 'submit another job for testing' ' + jobid=$(flux python ${SUBMIT_AS} 5001 --queue=bronze sleep 30) && + flux job wait-event -f json $jobid priority +' + +test_expect_success 'update job with invalid combination (invalid bank)' ' + test_must_fail flux update $jobid queue=silver bank=foo > nonexistent_bank.out 2>&1 && + test_debug "cat nonexistent_bank.out" && + grep "mf_priority: not a member of foo" nonexistent_bank.out +' + +test_expect_success 'check that job is still in original queue' ' + flux job info $jobid jobspec > jobspec.out && + grep "\"queue\":\"bronze\"" jobspec.out +' + +test_expect_success 'update job with invalud combination (invalid queue)' ' + test_must_fail flux update $jobid bank=B queue=gold > unavail_queue.out 2>&1 && + test_debug "cat unavail_queue.out" && + grep "ERROR: mf_priority: queue not valid for user: gold" unavail_queue.out +' + +test_expect_success 'check that job is still under original bank' ' + flux job eventlog $jobid > eventlog.out && + grep "attributes.system.bank=\"A\"" eventlog.out && + flux job cancel $jobid +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done From fb22786e8de305f27414858216572b76481bb04e Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Thu, 16 Nov 2023 12:31:04 -0800 Subject: [PATCH 5/8] plugin: create new user_bank_info class Problem: The plugin uses its own bank_info struct to hold user/bank information and has a number of methods for access and modification of these structs. As the plugin's feature set has grown, so have the requirements for the bank_info struct, resulting in a very large and hard-to-parse piece code. Begin to clean up this plugin. Start by creating a new user_bank_info class and place it in a separate file that gets compiled with the plugin. Replace all instances of "struct bank_info" with the new "user_bank_info" class type. --- src/Makefile.am | 3 +- src/plugins/Makefile.am | 3 +- src/plugins/bank_info.cpp | 11 +++++ src/plugins/bank_info.hpp | 34 +++++++++++++ src/plugins/mf_priority.cpp | 96 +++++++++++++++++-------------------- 5 files changed, 92 insertions(+), 55 deletions(-) create mode 100644 src/plugins/bank_info.cpp create mode 100644 src/plugins/bank_info.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 0de6c18ef..35c193961 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,7 +17,8 @@ noinst_HEADERS = \ fairness/reader/data_reader_db.hpp \ fairness/writer/data_writer_base.hpp \ fairness/writer/data_writer_db.hpp \ - fairness/writer/data_writer_stdout.hpp + fairness/writer/data_writer_stdout.hpp \ + plugins/bank_info.hpp fairness_libweighted_tree_la_SOURCES = \ fairness/account/account.cpp \ diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index f4ac138ce..257892065 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -8,5 +8,6 @@ jobtapdir = \ $(fluxlibdir)/job-manager/plugins/ jobtap_LTLIBRARIES = mf_priority.la -mf_priority_la_SOURCES = mf_priority.cpp +mf_priority_la_SOURCES = mf_priority.cpp bank_info.cpp +mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/bank_info.cpp b/src/plugins/bank_info.cpp new file mode 100644 index 000000000..538c057a3 --- /dev/null +++ b/src/plugins/bank_info.cpp @@ -0,0 +1,11 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#include "bank_info.hpp" diff --git a/src/plugins/bank_info.hpp b/src/plugins/bank_info.hpp new file mode 100644 index 000000000..89d9690d7 --- /dev/null +++ b/src/plugins/bank_info.hpp @@ -0,0 +1,34 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +// header file for the bank_info class + +#ifndef BANK_INFO_H +#define BANK_INFO_H + +#include +#include + +// all attributes are per-user/bank +class user_bank_info { +public: + std::string bank_name; // name of bank + double fairshare; // fair share value + int max_run_jobs; // max number of running jobs + int cur_run_jobs; // current number of running jobs + int max_active_jobs; // max number of active jobs + int cur_active_jobs; // current number of active jobs + std::vector held_jobs; // list of currently held job ID's + std::vector queues; // list of accessible queues + int queue_factor; // priority factor associated with queue + int active; // active status +}; + +#endif // BANK_INFO_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 000d94f7d..46d4fc7e6 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -20,6 +20,9 @@ extern "C" { #include } +// custom bank_info class file +#include "bank_info.hpp" + #include #include #include @@ -58,25 +61,12 @@ enum bank_info_codes { BANK_NO_DEFAULT }; -typedef std::pair::iterator> bank_info_result; +typedef std::pair::iterator> bank_info_result; -std::map> users; +std::map> users; std::map queues; std::map users_def_bank; -struct bank_info { - std::string bank_name; - double fairshare; - int max_run_jobs; - int cur_run_jobs; - int max_active_jobs; - int cur_active_jobs; - std::vector held_jobs; - std::vector queues; - int queue_factor; - int active; -}; - // min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not // currently used or enforced in this plugin, so their values have no // effect in queue limit enforcement. @@ -113,7 +103,7 @@ int64_t priority_calculation (flux_plugin_t *p, double fshare_factor = 0.0, priority = 0.0; int queue_factor = 0; int fshare_weight, queue_weight; - struct bank_info *b; + user_bank_info *b; fshare_weight = 100000; queue_weight = 10000; @@ -124,7 +114,7 @@ int64_t priority_calculation (flux_plugin_t *p, if (urgency == FLUX_JOB_URGENCY_EXPEDITE) return FLUX_JOB_PRIORITY_MAX; - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -152,7 +142,7 @@ int64_t priority_calculation (flux_plugin_t *p, static int get_queue_info ( char *queue, - std::map::iterator bank_it) + std::map::iterator bank_it) { std::map::iterator q_it; @@ -184,7 +174,7 @@ static int get_queue_info ( } -static void split_string (char *queues, struct bank_info *b) +static void split_string (char *queues, user_bank_info *b) { std::stringstream s_stream; @@ -218,7 +208,7 @@ int check_queue_factor (flux_plugin_t *p, * Add held job IDs to a JSON array to be added to a bank_info JSON object. */ static json_t *add_held_jobs ( - const std::pair &b) + const std::pair &b) { json_t *held_jobs = NULL; @@ -249,7 +239,7 @@ static json_t *add_held_jobs ( * Create a JSON object for a bank that a user belongs to. */ static json_t *pack_bank_info_object ( - const std::pair &b) + const std::pair &b) { json_t *bank_info, *held_jobs = NULL; @@ -282,7 +272,7 @@ static json_t *pack_bank_info_object ( */ static json_t *banks_to_json ( flux_plugin_t *p, - std::pair> &u) + std::pair> &u) { json_t *bank_info, *banks = NULL; @@ -314,7 +304,7 @@ static json_t *banks_to_json ( */ static json_t *user_to_json ( flux_plugin_t *p, - std::pair> u) + std::pair> u) { json_t *user = json_object (); // JSON object for one user json_t *userid, *banks = NULL; @@ -373,7 +363,7 @@ static bool check_map_for_dne_only () static int update_jobspec_bank (flux_plugin_t *p, int userid) { char *bank = NULL; - std::map>::iterator it; + std::map>::iterator it; it = users.find (userid); if (it == users.end ()) { @@ -401,8 +391,8 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid) // associated with the submitted job static bank_info_result get_bank_info (int userid, char *bank) { - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; it = users.find (userid); if (it == users.end ()) { @@ -479,7 +469,7 @@ static int query_cb (flux_plugin_t *p, * Unpack a payload from an external bulk update service and place it in the * multimap datastructure. */ -static void rec_update_cb (flux_t *h, +static void rec_update_cb(flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) @@ -522,7 +512,7 @@ static void rec_update_cb (flux_t *h, "active", &active) < 0) flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text); - struct bank_info *b; + user_bank_info *b; b = &users[uid][bank]; b->bank_name = bank; @@ -633,7 +623,7 @@ static int priority_cb (flux_plugin_t *p, char *bank = NULL; char *queue = NULL; int64_t priority; - struct bank_info *b; + user_bank_info *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -650,7 +640,7 @@ static int priority_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -662,8 +652,8 @@ static int priority_cb (flux_plugin_t *p, return -1; } - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; if (b->max_run_jobs == BANK_INFO_MISSING) { // try to look up user again @@ -750,7 +740,7 @@ static int priority_cb (flux_plugin_t *p, static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid) { - struct bank_info *b; + user_bank_info *b; b = &users[userid]["DNE"]; users_def_bank[userid] = "DNE"; @@ -791,9 +781,9 @@ static int validate_cb (flux_plugin_t *p, double fairshare = 0.0; bool only_dne_data; - std::map>::iterator it; - std::map::iterator bank_it; - std::map::iterator q_it; + std::map>::iterator it; + std::map::iterator bank_it; + std::map::iterator q_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -886,10 +876,10 @@ static int new_cb (flux_plugin_t *p, char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; double fairshare = 0.0; - struct bank_info *b; + user_bank_info *b; - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -901,7 +891,7 @@ static int new_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1003,7 +993,7 @@ static int depend_cb (flux_plugin_t *p, { int userid; long int id; - struct bank_info *b; + user_bank_info *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1017,7 +1007,7 @@ static int depend_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1055,9 +1045,9 @@ static int run_cb (flux_plugin_t *p, void *data) { int userid; - struct bank_info *b; + user_bank_info *b; - b = static_cast + b = static_cast (flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1086,12 +1076,12 @@ static int job_updated (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; char *new_bank = NULL; char *queue = NULL; - struct bank_info *b; + user_bank_info *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1106,7 +1096,7 @@ static int job_updated (flux_plugin_t *p, return flux_jobtap_error (p, args, "unable to unpack plugin args"); // grab bank_info struct for user/bank (if any) - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1193,7 +1183,7 @@ static int update_queue_cb (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; char *queue = NULL; @@ -1255,7 +1245,7 @@ static int update_bank_cb (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; @@ -1319,9 +1309,9 @@ static int inactive_cb (flux_plugin_t *p, void *data) { int userid; - struct bank_info *b; - std::map>::iterator it; - std::map::iterator bank_it; + user_bank_info *b; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1335,7 +1325,7 @@ static int inactive_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); From 7fc053097b0b4b4c42b5ce94c1af79d16b038569 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 21 Nov 2023 14:27:48 -0800 Subject: [PATCH 6/8] bank_info: add helper functions for map lookup Problem: There is some functionality in the priority plugin (mf_priority.cpp) that is specific to accessing user_bank_info objects in a map data structure. These methods can be abstracted out to the class files to clean up some of the plugin code. Add two new functions to the bank_info.cpp class file that handle looking up a user/bank combo and fetching a user_bank_info object from the users map. --- src/plugins/bank_info.cpp | 40 +++++++++++++++++++++++++++++++++++++++ src/plugins/bank_info.hpp | 26 +++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/src/plugins/bank_info.cpp b/src/plugins/bank_info.cpp index 538c057a3..0368c15f2 100644 --- a/src/plugins/bank_info.cpp +++ b/src/plugins/bank_info.cpp @@ -9,3 +9,43 @@ \************************************************************/ #include "bank_info.hpp" + +int user_bank_lookup (int userid, char *bank) +{ + auto it = users.find (userid); + if (it == users.end ()) { + return BANK_USER_NOT_FOUND; + } + + // make sure user belongs to bank they specified; if no bank was passed in, + // look up their default bank + if (bank != NULL) { + auto bank_it = it->second.find (std::string (bank)); + if (bank_it == it->second.end ()) + return BANK_INVALID; + } else { + bank = const_cast (users_def_bank[userid].c_str ()); + auto bank_it = it->second.find (std::string (bank)); + if (bank_it == it->second.end ()) + return BANK_NO_DEFAULT; + } + + return BANK_SUCCESS; +} + + +user_bank_info get_user_info (int userid, char *bank) +{ + std::map::iterator bank_it; + + auto it = users.find (userid); + + if (bank != NULL) { + bank_it = it->second.find (std::string (bank)); + } else { + bank = const_cast (users_def_bank[userid].c_str ()); + bank_it = it->second.find (std::string (bank)); + } + + return bank_it->second; +} diff --git a/src/plugins/bank_info.hpp b/src/plugins/bank_info.hpp index 89d9690d7..3e95d4a06 100644 --- a/src/plugins/bank_info.hpp +++ b/src/plugins/bank_info.hpp @@ -15,6 +15,8 @@ #include #include +#include +#include // all attributes are per-user/bank class user_bank_info { @@ -31,4 +33,28 @@ class user_bank_info { int active; // active status }; +// different codes to return as a result of looking up user/bank information: +// +// BANK_SUCCESS: we found an entry for the passed-in user/bank +// BANK_USER_NOT_FOUND: the user could not be found in the plugin map +// BANK_INVALID: the user specified a bank they don't belong to +// BANK_NO_DEFAULT: the user does not have a default bank in the plugin map +enum bank_info_codes { + BANK_SUCCESS, + BANK_USER_NOT_FOUND, + BANK_INVALID, + BANK_NO_DEFAULT +}; + +// these data structures are defined in the priority plugin +extern std::map> users; +extern std::map users_def_bank; + +// check if a user has an entry in the users map +int user_bank_lookup (int userid, char *bank); + +// get a user_bank_info object that points to user/bank +// information in users map +user_bank_info get_user_info (int userid, char *bank); + #endif // BANK_INFO_H From 5c1f549b7f75aaca8f249680b84df5468c84fb7b Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 21 Nov 2023 14:35:17 -0800 Subject: [PATCH 7/8] plugin: use new helper functions in bank_info.cpp Problem: The plugin now has access to helper functions for accessing user/bank information in bank_info.cpp, but it does not make use of them. In validate_cb (), use the new helper functions to help with the validation of a user/bank job. --- src/plugins/mf_priority.cpp | 56 +++++++++++++------------------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 46d4fc7e6..f173bdb66 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -48,19 +48,6 @@ extern "C" { // the association does not have permission to run jobs under #define INVALID_QUEUE -6 -// different codes to return as a result of looking up user/bank information: -// -// BANK_SUCCESS: we found an entry for the passed-in user/bank -// BANK_USER_NOT_FOUND: the user could not be found in the plugin map -// BANK_INVALID: the user specified a bank they don't belong to -// BANK_NO_DEFAULT: the user does not have a default bank in the plugin map -enum bank_info_codes { - BANK_SUCCESS, - BANK_USER_NOT_FOUND, - BANK_INVALID, - BANK_NO_DEFAULT -}; - typedef std::pair::iterator> bank_info_result; std::map> users; @@ -780,10 +767,9 @@ static int validate_cb (flux_plugin_t *p, int max_run_jobs, cur_active_jobs, max_active_jobs, queue_factor = 0; double fairshare = 0.0; bool only_dne_data; + user_bank_info user_bank; std::map>::iterator it; - std::map::iterator bank_it; - std::map::iterator q_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -805,8 +791,9 @@ static int validate_cb (flux_plugin_t *p, // if the plugin has NO data about users/banks and the user does not have // an entry in the plugin, the job will be held until data is received by // the plugin. - it = users.find (userid); - if (it == users.end ()) { + int lookup_result = user_bank_lookup(userid, bank); + + if (lookup_result == BANK_USER_NOT_FOUND) { // check if the map only contains DNE entries bool only_dne_data = check_map_for_dne_only (); @@ -817,26 +804,25 @@ static int validate_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "no bank found for user: %i", userid); } - } - - // make sure user belongs to bank they specified; if no bank was passed in, - // look up their default bank - if (bank != NULL) { - bank_it = it->second.find (std::string (bank)); - if (bank_it == it->second.end ()) - return flux_jobtap_reject_job (p, args, - "user does not belong to specified bank"); + } else if (lookup_result == BANK_INVALID) { + return flux_jobtap_reject_job (p, + args, + "user does not belong to specified " + "bank"); + } else if (lookup_result == BANK_NO_DEFAULT) { + return flux_jobtap_reject_job (p, + args, + "user/default bank entry does not " + "exist"); } else { - bank = const_cast (users_def_bank[userid].c_str ()); - bank_it = it->second.find (std::string (bank)); - if (bank_it == it->second.end ()) - return flux_jobtap_reject_job (p, args, - "user/default bank entry does not exist"); + // bank/default bank has been validated & found, so we can + // fetch their information + user_bank = get_user_info (userid, bank); } // if user/bank entry was disabled, reject job with a message saying the // entry has been disabled - if (bank_it->second.active == 0) + if (user_bank.active == 0) return flux_jobtap_reject_job (p, args, "user/bank entry has been " "disabled from flux-accounting DB"); @@ -848,10 +834,8 @@ static int validate_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s", queue); - max_run_jobs = bank_it->second.max_run_jobs; - fairshare = bank_it->second.fairshare; - cur_active_jobs = bank_it->second.cur_active_jobs; - max_active_jobs = bank_it->second.max_active_jobs; + cur_active_jobs = user_bank.cur_active_jobs; + max_active_jobs = user_bank.max_active_jobs; // if a user/bank has reached their max_active_jobs limit, subsequently // submitted jobs will be rejected From 74aefe0d1a2ec662bb285d14cd7b888776dad2fb Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 21 Nov 2023 14:36:44 -0800 Subject: [PATCH 8/8] plugin: change args to get_queue_info () Problem: The arguments to the get_queue_info () function include an iterator to the bank_info object, which is used in the function to access the user/bank's list of permissible queues for validation. This can be confusing to read and would be easier if just the list of permissible queues was passed to the function. Change the argument to the get_queue_info () function from an iterator to the user/bank object to just the user/bank's list of permissible queues. Change the calls to this function to account for the argument change. --- src/plugins/mf_priority.cpp | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index f173bdb66..801d5d376 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -127,9 +127,8 @@ int64_t priority_calculation (flux_plugin_t *p, } -static int get_queue_info ( - char *queue, - std::map::iterator bank_it) +static int get_queue_info (char *queue, + std::vector permissible_queues) { std::map::iterator q_it; @@ -146,10 +145,10 @@ static int get_queue_info ( // check #2) the queue passed in is a valid option to pass for user std::vector::iterator vect_it; - vect_it = std::find (bank_it->second.queues.begin (), - bank_it->second.queues.end (), queue); + vect_it = std::find (permissible_queues.begin (), + permissible_queues.end (), queue); - if (vect_it == bank_it->second.queues.end ()) + if (vect_it == permissible_queues.end ()) return INVALID_QUEUE; else // add priority associated with the passed in queue to bank_info @@ -675,7 +674,7 @@ static int priority_cb (flux_plugin_t *p, } // fetch priority associated with passed-in queue (or default queue) - bank_it->second.queue_factor = get_queue_info (queue, bank_it); + bank_it->second.queue_factor = get_queue_info (queue, bank_it->second.queues); if (check_queue_factor (p, bank_it->second.queue_factor, queue) < 0) @@ -828,9 +827,9 @@ static int validate_cb (flux_plugin_t *p, // validate the queue if one is passed in; if the user does not have access // to the queue they specified, reject the job - queue_factor = get_queue_info (queue, bank_it); + // queue_factor = get_queue_info (queue, user_bank.queues); - if (queue_factor == INVALID_QUEUE) + if (get_queue_info (queue, user_bank.queues) == INVALID_QUEUE) return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s", queue); @@ -936,7 +935,7 @@ static int new_cb (flux_plugin_t *p, // assign priority associated with validated queue to bank_info struct // associated with job - b->queue_factor = get_queue_info (queue, bank_it); + b->queue_factor = get_queue_info (queue, b->queues); // if a user/bank has reached their max_active_jobs limit, subsequently // submitted jobs will be rejected @@ -1149,7 +1148,7 @@ static int job_updated (flux_plugin_t *p, // if the queue for the job has been updated, fetch the priority of the // validated queue and assign it to the associated bank_info struct if (queue != NULL) { - int queue_factor = get_queue_info (queue, bank_it); + int queue_factor = get_queue_info (queue, bank_it->second.queues); b->queue_factor = queue_factor; } @@ -1204,7 +1203,7 @@ static int update_queue_cb (flux_plugin_t *p, // validate the updated queue and make sure the user/bank has // access to it; if not, reject the update - if (get_queue_info (queue, bank_it) == INVALID_QUEUE) + if (get_queue_info (queue, bank_it->second.queues) == INVALID_QUEUE) return flux_jobtap_error (p, args, "mf_priority: queue not valid for user: %s",