Skip to content

Commit

Permalink
Merge pull request #537 from dongahn/shared_ptr
Browse files Browse the repository at this point in the history
Add smart pointer support and misc. cleanup
  • Loading branch information
trws authored Nov 19, 2019
2 parents 8342b7d + fd9207a commit b54ff48
Show file tree
Hide file tree
Showing 29 changed files with 963 additions and 803 deletions.
58 changes: 30 additions & 28 deletions qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct qmanager_ctx_t {
flux_t *h;
qmanager_args_t args;
schedutil_t *schedutil;
queue_policy_base_t *queue;
std::shared_ptr<queue_policy_base_t> queue;
};


Expand All @@ -65,7 +65,7 @@ struct qmanager_ctx_t {
* *
******************************************************************************/

static int post_sched_loop (qmanager_ctx_t *ctx)
static int post_sched_loop (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;
std::shared_ptr<job_t> job = nullptr;
Expand Down Expand Up @@ -106,7 +106,8 @@ extern "C" int jobmanager_hello_cb (flux_t *h,

{
int rc = -1;
qmanager_ctx_t *ctx = (qmanager_ctx_t *)arg;
std::shared_ptr<qmanager_ctx_t> ctx = nullptr;
ctx = *(static_cast<std::shared_ptr<qmanager_ctx_t> *>(arg));
std::shared_ptr<job_t> running_job
= std::make_shared<job_t> (job_state_kind_t::
RUNNING, id, uid, prio, ts, R);
Expand All @@ -124,7 +125,8 @@ extern "C" int jobmanager_hello_cb (flux_t *h,
extern "C" void jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg,
const char *jobspec, void *arg)
{
qmanager_ctx_t *ctx = (qmanager_ctx_t *)arg;
std::shared_ptr<qmanager_ctx_t> ctx = nullptr;
ctx = *(static_cast<std::shared_ptr<qmanager_ctx_t> *>(arg));
std::shared_ptr<job_t> job = std::make_shared<job_t> ();

if (schedutil_alloc_request_decode (msg, &job->id, &job->priority,
Expand All @@ -150,7 +152,8 @@ extern "C" void jobmanager_free_cb (flux_t *h, const flux_msg_t *msg,
const char *R, void *arg)
{
flux_jobid_t id;
qmanager_ctx_t *ctx = (qmanager_ctx_t *)arg;
std::shared_ptr<qmanager_ctx_t> ctx = nullptr;
ctx = *(static_cast<std::shared_ptr<qmanager_ctx_t> *>(arg));

if (schedutil_free_request_decode (msg, &id) < 0) {
flux_log_error (h, "%s: schedutil_free_request_decode", __FUNCTION__);
Expand All @@ -176,19 +179,21 @@ extern "C" void jobmanager_free_cb (flux_t *h, const flux_msg_t *msg,
}

static void jobmanager_exception_cb (flux_t *h, flux_jobid_t id,
const char *t, int s, void *a)
const char *type, int severity, void *arg)
{
std::shared_ptr<job_t> job;
qmanager_ctx_t *ctx = (qmanager_ctx_t *)a;
std::shared_ptr<qmanager_ctx_t> ctx = nullptr;
ctx = *(static_cast<std::shared_ptr<qmanager_ctx_t> *>(arg));

if (s > 0 || (job = ctx->queue->lookup (id)) == nullptr
if (severity > 0 || (job = ctx->queue->lookup (id)) == nullptr
|| !job->is_pending ())
return;
if (ctx->queue->remove (id) < 0) {
flux_log_error (h, "%s: remove job (%jd)", __FUNCTION__, (intmax_t)id);
return;
}
std::string note = std::string ("alloc aborted due to exception type=") + t;
std::string note = std::string ("alloc aborted due to exception type=")
+ type;
if (schedutil_alloc_respond_denied (ctx->schedutil,
job->msg,
note.c_str ()) < 0) {
Expand All @@ -198,7 +203,8 @@ static void jobmanager_exception_cb (flux_t *h, flux_jobid_t id,
flux_log (h, LOG_DEBUG, "%s (id=%jd)", note.c_str (), (intmax_t)id);
}

static int process_args (qmanager_ctx_t *ctx, int argc, char **argv)
static int process_args (std::shared_ptr<qmanager_ctx_t> &ctx,
int argc, char **argv)
{
int rc = 0;
qmanager_args_t &args = ctx->args;
Expand Down Expand Up @@ -233,7 +239,7 @@ static void set_default_args (qmanager_args_t &args)
args.policy_params = "";
}

static int handshake_jobmanager (qmanager_ctx_t *ctx)
static int handshake_jobmanager (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;
int queue_depth = 0; /* Not implemented in job-manager */
Expand All @@ -243,11 +249,11 @@ static int handshake_jobmanager (qmanager_ctx_t *ctx)
jobmanager_alloc_cb,
jobmanager_free_cb,
jobmanager_exception_cb,
ctx))) {
&ctx))) {
flux_log_error (ctx->h, "%s: schedutil_create", __FUNCTION__);
goto out;
}
if (schedutil_hello (ctx->schedutil, jobmanager_hello_cb, ctx) < 0) {
if (schedutil_hello (ctx->schedutil, jobmanager_hello_cb, &ctx) < 0) {
flux_log_error (ctx->h, "%s: schedutil_hello", __FUNCTION__);
goto out;
}
Expand All @@ -260,7 +266,7 @@ static int handshake_jobmanager (qmanager_ctx_t *ctx)
return rc;
}

static int enforce_queue_policy (qmanager_ctx_t *ctx)
static int enforce_queue_policy (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;
ctx->queue = create_queue_policy (ctx->args.queue_policy, "module");
Expand Down Expand Up @@ -291,23 +297,22 @@ static int enforce_queue_policy (qmanager_ctx_t *ctx)
return rc;
}

static qmanager_ctx_t *qmanager_new (flux_t *h)
static std::shared_ptr<qmanager_ctx_t> qmanager_new (flux_t *h)
{
qmanager_ctx_t *ctx = NULL;

if (!(ctx = new (std::nothrow) qmanager_ctx_t ())) {
std::shared_ptr<qmanager_ctx_t> ctx = nullptr;
try {
ctx = std::make_shared<qmanager_ctx_t> ();
ctx->h = h;
ctx->schedutil = NULL;
set_default_args (ctx->args);
} catch (std::bad_alloc &e) {
errno = ENOMEM;
goto out;
}
ctx->h = h;
ctx->schedutil = NULL;
set_default_args (ctx->args);

out:
return ctx;
}

static void qmanager_destroy (qmanager_ctx_t *ctx)
static void qmanager_destroy (std::shared_ptr<qmanager_ctx_t> &ctx)
{
if (ctx) {
int saved_errno = errno;
Expand All @@ -316,10 +321,7 @@ static void qmanager_destroy (qmanager_ctx_t *ctx)
flux_respond_error (ctx->h, job->msg, ENOSYS, "unloading");
while ((job = ctx->queue->complete_pop ()) != nullptr)
flux_respond_error (ctx->h, job->msg, ENOSYS, "unloading");
delete ctx->queue;
ctx->queue = NULL;
schedutil_destroy (ctx->schedutil);
delete (ctx);
errno = saved_errno;
}
}
Expand All @@ -335,7 +337,7 @@ extern "C" int mod_main (flux_t *h, int argc, char **argv)
{
int rc = -1;
try {
qmanager_ctx_t *ctx = NULL;
std::shared_ptr<qmanager_ctx_t> ctx = nullptr;
if (!(ctx = qmanager_new (h))) {
flux_log_error (h, "%s: qmanager_new", __FUNCTION__);
return rc;
Expand Down
5 changes: 4 additions & 1 deletion qmanager/policies/queue_policy_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
#ifndef QUEUE_POLICY_FACTORY_HPP
#define QUEUE_POLICY_FACTORY_HPP

#include <memory>
#include <string>

namespace Flux {
namespace queue_manager {

bool known_queue_policy (const std::string &policy);
queue_policy_base_t *create_queue_policy (const std::string &policy);
std::shared_ptr<queue_policy_base_t> create_queue_policy (
const std::string &policy,
const std::string &reapi);

} // namespace Flux::queue_manager
} // namespace Flux
Expand Down
72 changes: 34 additions & 38 deletions qmanager/policies/queue_policy_factory_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,50 +58,46 @@ bool known_queue_policy (const std::string &policy)
return rc;
}

queue_policy_base_t *create_queue_policy (const std::string &policy,
const std::string &reapi)
std::shared_ptr<queue_policy_base_t> create_queue_policy (
const std::string &policy,
const std::string &reapi)
{
queue_policy_base_t *p = NULL;
if (policy == "fcfs") {
if (reapi == "module") {
p = (queue_policy_base_t *)
new (std::nothrow)queue_policy_fcfs_t<reapi_module_t> ();
}
else if (reapi == "cli") {
p = (queue_policy_base_t *)
new (std::nothrow)queue_policy_fcfs_t<reapi_cli_t> ();
}
}
else if (policy == "easy") {
if (reapi == "module") {
p = (queue_policy_base_t *)
new (std::nothrow)queue_policy_easy_t<reapi_module_t> ();
}
else if (reapi == "cli") {
p = (queue_policy_base_t *)
new (std::nothrow)queue_policy_easy_t<reapi_cli_t> ();
}
}
else if (policy == "hybrid") {
if (reapi == "module") {
p = (queue_policy_base_t *)
new (std::nothrow)queue_policy_hybrid_t<reapi_module_t> ();
std::shared_ptr<queue_policy_base_t> p = nullptr;

try {
if (policy == "fcfs") {
if (reapi == "module")
p = std::make_shared<queue_policy_fcfs_t<reapi_module_t>> ();
else if (reapi == "cli")
p = std::make_shared<queue_policy_fcfs_t<reapi_cli_t>> ();
}
else if (reapi == "cli") {
p = (queue_policy_base_t *)
new (std::nothrow)queue_policy_hybrid_t<reapi_cli_t> ();
else if (policy == "easy") {
if (reapi == "module")
p = std::make_shared<queue_policy_easy_t<reapi_module_t>> ();
else if (reapi == "cli")
p = std::make_shared<queue_policy_easy_t<reapi_cli_t>> ();
}
}
else if (policy == "conservative") {
if (reapi == "module") {
p = (queue_policy_base_t *) new (std::nothrow)
queue_policy_conservative_t<reapi_module_t> ();
else if (policy == "hybrid") {
if (reapi == "module")
p = std::make_shared<queue_policy_hybrid_t<reapi_module_t>> ();
else if (reapi == "cli")
p = std::make_shared<queue_policy_hybrid_t<reapi_cli_t>> ();
}
else if (reapi == "cli") {
p = (queue_policy_base_t *) new (std::nothrow)
queue_policy_conservative_t<reapi_cli_t> ();
else if (policy == "conservative") {
if (reapi == "module") {
p = std::make_shared<queue_policy_conservative_t<
reapi_module_t>> ();
}
else if (reapi == "cli") {
p = std::make_shared<queue_policy_conservative_t<
reapi_cli_t>> ();
}
}
} catch (std::bad_alloc &e) {
errno = ENOMEM;
p = nullptr;
}

return p;
}

Expand Down
Loading

0 comments on commit b54ff48

Please sign in to comment.