Skip to content

Commit

Permalink
Merge pull request #6642 from garlick/bulk_cleanup
Browse files Browse the repository at this point in the history
libsubprocess: cosmetic cleanup
  • Loading branch information
mergify[bot] authored Feb 14, 2025
2 parents 934fcd8 + ac5dd98 commit 820dd72
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/common/libsubprocess/bulk-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ static int exec_start_cmd (struct bulk_exec *exec,
/* Set the unit name for the "sdexec" service. This is done here
* for each rank instead of once in bulk_exec_push_cmd() to ensure
* the name is unique when there are multiple brokers per node.
* Ex: shell-0-fTE9HHdZvi3.service, imp-kill-1-fTE9HHdZvi3.service.
* Ex: shell-0-fTE9HHdZvi3.service.
* (N.B. systemd doesn't like "ƒ" in the unit name hence f58plain).
*/
if (streq (exec->service, "sdexec")) {
Expand Down
6 changes: 0 additions & 6 deletions src/common/libsubprocess/bulk-exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ int bulk_exec_push_cmd (struct bulk_exec *exec,
int bulk_exec_start (flux_t *h, struct bulk_exec *exec);

/* Send signal to ranks. Set ranks=NULL for all.
* If an IMP path has been set then bulk_exec_imp_kill() will be used.
*/
flux_future_t * bulk_exec_kill (struct bulk_exec *exec,
const struct idset *ranks,
Expand All @@ -80,11 +79,6 @@ flux_future_t * bulk_exec_kill (struct bulk_exec *exec,
*/
void bulk_exec_kill_log_error (flux_future_t *f, flux_jobid_t id);

flux_future_t *bulk_exec_imp_kill (struct bulk_exec *exec,
const char *imp_path,
const struct idset *ranks,
int signal);

int bulk_exec_cancel (struct bulk_exec *exec);

/* Returns max wait status returned from all exited processes */
Expand Down
28 changes: 18 additions & 10 deletions src/common/libsubprocess/test/bulk-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,22 @@ void on_error (struct bulk_exec *exec, flux_subprocess_t *p, void *arg)
{
if (p) {
flux_subprocess_state_t state = flux_subprocess_state (p);
log_msg ("%d: pid %ju: %s", flux_subprocess_rank (p),
(uintmax_t) flux_subprocess_pid (p),
flux_subprocess_state_string (state));
log_msg ("%d: pid %ju: %s",
flux_subprocess_rank (p),
(uintmax_t)flux_subprocess_pid (p),
flux_subprocess_state_string (state));
}
flux_future_t *f = bulk_exec_kill (exec, NULL, 9);
if (flux_future_get (f, NULL) < 0)
log_err_exit ("bulk_exec_kill");
}

void on_output (struct bulk_exec *exec, flux_subprocess_t *p,
const char *stream, const char *data,
int data_len, void *arg)
void on_output (struct bulk_exec *exec,
flux_subprocess_t *p,
const char *stream,
const char *data,
int data_len,
void *arg)
{
int rank = flux_subprocess_rank (p);
FILE *fp = streq (stream, "stdout") ? stdout : stderr;
Expand All @@ -81,8 +85,10 @@ static void kill_cb (flux_future_t *f, void *arg)
flux_future_destroy (f);
}

static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void signal_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct bulk_exec *exec = arg;
int signum = flux_signal_watcher_get_signum (w);
Expand All @@ -94,8 +100,10 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
flux_watcher_stop (w);
}

static void check_cancel_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void check_cancel_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct bulk_exec *exec = arg;
if (cancel_after && bulk_exec_started_count (exec) >= cancel_after) {
Expand Down

0 comments on commit 820dd72

Please sign in to comment.