Skip to content

Commit

Permalink
shell: enable ranges from RFC14 also for slots
Browse files Browse the repository at this point in the history
Problem: slots are not included in the RFC 20 resource set R that is
returned by the scheduler, so if both slots and cores are ranges, one
must resolve them consistently (i.e. guess what the scheduler did)

Add infrastructure for parsing and looping over RFC 14 ranges and use
it to determine a consistent node/slot/core count combination
  • Loading branch information
sam-maloney committed Feb 14, 2025
1 parent 3a2f0f9 commit 216a80e
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 20 deletions.
6 changes: 4 additions & 2 deletions src/shell/info.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ struct jobspec *lookup_jobspec_get (flux_future_t *f, json_error_t *error)
* json_t * objects)
*/
if (json_unpack_ex (job->jobspec, error, 0,
"{s:i s:o s:o s:{s?{s?s s?O s?{s?O}}}}",
"{s:i s:o s:[{s:o s:o}] s:{s?{s?s s?O s?{s?O}}}}",
"version", &job->version,
"resources", &job->resources,
"tasks", &job->tasks,
"tasks",
"command", &job->command,
"count", &job->count,
"attributes",
"system",
"cwd", &job->cwd,
Expand Down
213 changes: 196 additions & 17 deletions src/shell/jobspec.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <limits.h>
#include <flux/core.h>
#include <flux/shell.h>
#include <jansson.h>
Expand All @@ -23,6 +24,14 @@
#include "info.h"
#include "rcalc.h"

struct range {
int min;
int max;
char operator;
int operand;
int current_value;
};

void set_error (json_error_t *error, const char *fmt, ...)
{
va_list ap;
Expand All @@ -45,11 +54,176 @@ void jobspec_destroy (struct jobspec *job)
}
}

struct range *create_range (json_t *json_range,
int lower,
int upper,
json_error_t *error)
{
struct range *range;
const char *operator = NULL;
json_error_t loc_error;

if (!(range = calloc (1, sizeof (*range)))) {
set_error (error, "create_range: Out of memory");
}
// set defaults for optional fields and initialize current_value
range->max = INT_MAX;
range->operator = '+';
range->operand = 1;
// allow single integer counts; just leads to a degenerate range
if (json_is_integer (json_range)) {
range->min = json_integer_value (json_range);
range->max = range->min;
goto out;
}
if (json_unpack_ex(json_range, &loc_error, 0,
"{s:i, s?i, s?s, s?i}",
"min", &range->min,
"max", &range->max,
"operator", &operator,
"operand", &range->operand) < 0) {
set_error (error, "create_range: %s", loc_error.text);
goto error;
}
// if no operator specified, then leave as default (assuming only min)
if (operator) {
range->operator = operator[0];
}
// check validity of operator/operand combination
switch (range->operator) {
case '+':
if (range->operand < 1) {
set_error (error, "create_range: operand must be >= 1 for addition '+'");
goto error;
}
break;
case '*':
if (range->operand < 2) {
set_error (error, "create_range: operand must be >= 2 for multiplication '*'");
goto error;
}
break;
case '^':
if (range->operand < 2) {
set_error (error, "create_range: operand must be >= 2 for exponentiation '^'");
goto error;
}
if (range->min < 2) {
set_error (error, "create_range: min must be >= 2 for exponentiation '^'");
goto error;
}
break;
default:
set_error (error, "create_range: unknown operator '%c'", range->operator);
goto error;
}
out:
// enforce limits
range->min = range->min < lower ? lower : range->min;
range->max = range->max > upper ? upper : range->max;
// validate final min/max combination
if (range->min < 1) {
set_error (error, "create_range: min must be >= 1");
goto error;
}
if (range->max < range->min) {
set_error (error, "create_range: max must be >= min");
goto error;
}
// start current_value at min, although range_begin will also handle this
range->current_value = range->min;
return range;
error:
free (range);
return NULL;
}

static int range_begin (struct range *range)
{
range->current_value = range->min;
return range->current_value;
}

static int range_end (struct range *range)
{
return range->current_value > range->max;
}

static int range_next (struct range *range)
{
switch (range->operator) {
case '+':
range->current_value += range->operand;
break;
case '*':
range->current_value *= range->operand;
break;
case '^':
int base = range->current_value;
for (int i = 1; i < range->operand; ++i) {
range->current_value *= base;
}
}
return range->current_value;
}

static int resolve_slot_range (struct shell_info *info,
json_t *slot_json,
json_t *core_json,
json_error_t *error)
{
struct range *crange = NULL;
struct range *srange = NULL;
int total_nodes = rcalc_total_nodes (info->rcalc);
int total_cores = rcalc_total_cores (info->rcalc);
int slot_count = -1;
int node_multiplier = 1;
int cores_per_node = total_nodes;

if (info->jobspec->node_count > 0) {
node_multiplier = info->jobspec->node_count;
cores_per_node = total_cores / node_multiplier;
}
// if the core count is just a single integer, the determination is simple
if (json_is_integer (core_json)) {
return cores_per_node / json_integer_value (core_json);
}
// create_range calls set_error, so just goto out on failure
if (!(crange = create_range (core_json, 1, cores_per_node, error))) {
goto out;
}
if (!(srange = create_range (slot_json,
total_nodes / node_multiplier,
cores_per_node / crange->min,
error))) {
goto out;
}
// loop over slot and core ranges until we find a combination that matches
for (range_begin(srange); !range_end(srange); range_next(srange)) {
for (range_begin(crange); !range_end(crange); range_next(crange)) {
int test_slot_count = node_multiplier * srange->current_value;
if (test_slot_count * crange->current_value == total_cores) {
slot_count = test_slot_count;
goto out;
}
}
}
// no specific error was detected, but no matching combination was found
if (slot_count < 1) {

Check warning

Code scanning / CodeQL

Comparison result is always the same Warning

Comparison is always true because slot_count <= -1.
set_error (error, "resolve_slot_range: unable to determine slot count");
}
out:
free (srange);
free (crange);
return slot_count;
}

static int recursive_parse_helper (struct shell_info *info,
json_t *curr_resource,
json_error_t *error,
int level,
int multiplier)
int multiplier,
json_t *slot_range)
{
struct jobspec *job = info->jobspec;
rcalc_t *r = info->rcalc;
Expand Down Expand Up @@ -108,12 +282,11 @@ static int recursive_parse_helper (struct shell_info *info,
set_error (error, "slot resource encountered after slot resource");
return -1;
}
if (!json_is_integer (count)) {
set_error (error, "count must be integer for slot resource");
return -1;
if (json_is_integer (count)) {
job->slot_count = multiplier * json_integer_value (count);
} else {
slot_range = count;
}

job->slot_count = multiplier * json_integer_value (count);

// Check if we already encountered the `node` resource
if (job->node_count > 0) {
Expand All @@ -124,7 +297,7 @@ static int recursive_parse_helper (struct shell_info *info,
job->slots_per_node = job->slot_count / job->node_count;
}
} else if (streq (type, "core")) {
if (job->slot_count < 1) {
if (job->slot_count < 1 && !slot_range) {
set_error (error, "core resource encountered before slot resource");
return -1;
}
Expand All @@ -133,6 +306,16 @@ static int recursive_parse_helper (struct shell_info *info,
return -1;
}

if (slot_range) {
job->slot_count = resolve_slot_range(info, slot_range, count, error);
if (job->slot_count < 1) {
// set_error should already have been called
return -1;
}
if (job->node_count > 0) {
job->slots_per_node = job->slot_count / job->node_count;
}
}
job->cores_per_slot = rcalc_total_cores (r) / job->slot_count;
// N.B.: despite having found everything we were looking for (i.e.,
// node, slot, and core resources), we have to keep recursing to
Expand All @@ -145,7 +328,8 @@ static int recursive_parse_helper (struct shell_info *info,
with,
error,
level+1,
multiplier)
multiplier,
slot_range)
< 0) {
return -1;
}
Expand Down Expand Up @@ -191,20 +375,18 @@ static int recursive_parse_jobspec_resources (struct shell_info *info,
job->slots_per_node = -1;
job->node_count = -1;

int rc = recursive_parse_helper (info, job->resources, error, 0, 1);
int rc = recursive_parse_helper (info, job->resources, error, 0, 1, NULL);

if ((rc == 0) && (job->cores_per_slot < 1)) {
set_error (error, "Missing core resource");
return -1;
}
return rc;

}

int jobspec_parse (struct shell_info *info, json_error_t *error)
{
struct jobspec *job = info->jobspec;
json_t *count;

if (job->version != 1) {
shell_warn ("Unsupported jobspec version: expected 1 got %d",
Expand All @@ -230,16 +412,13 @@ int jobspec_parse (struct shell_info *info, json_error_t *error)

/* Set job->task_count
*/
if (json_unpack_ex (job->tasks, error, 0, "{s:o}", "count", &count) < 0) {
goto error;
}
if (json_object_size (count) != 1) {
if (json_object_size (job->count) != 1) {
set_error (error, "tasks count must have exactly one key set");
goto error;
}
if (json_unpack (count, "{s:i}", "total", &job->task_count) < 0) {
if (json_unpack (job->count, "{s:i}", "total", &job->task_count) < 0) {
int per_slot;
if (json_unpack (count, "{s:i}", "per_slot", &per_slot) < 0) {
if (json_unpack (job->count, "{s:i}", "per_slot", &per_slot) < 0) {
set_error (error, "Unable to parse tasks count");
goto error;
}
Expand Down
2 changes: 1 addition & 1 deletion src/shell/jobspec.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ struct jobspec {
int slots_per_node; // number of slots per node (-1=unspecified)
int node_count; // number of nodes (-1=unspecified)
json_t *command;
json_t *count;
const char *cwd;
json_t *resources;
json_t *tasks;
json_t *environment;
json_t *options; // attributes.system.shell.options, if any
};
Expand Down

0 comments on commit 216a80e

Please sign in to comment.