Skip to content

Commit

Permalink
WIP: Forward parent death to descendant processes (unix)
Browse files Browse the repository at this point in the history
If the uds_fd connection to the parent BEAM is broken or closed, react
by killing all spawned children.  When a spawned port is closed, kill
the associated OS process.

A concise demonstration of the problem being solved is to run the
following command with and without the patch, then kill the BEAM.
Without the patch, the "sleep" process will continue:

    erl -noshell -eval 'os:cmd("sleep 60")'

Keeps the mapping of all living child processes so that it's possible
to iterate over them during clean up.  Previously, child processes
were only stored in forker_hash if the :in bit was set.

A new forker command message is introduced, which allows it to kill
the child and clean up internal resources if the port is closed before
the process ends naturally.

TODO:
* Needs a decision made between killing the process or process group.
* Separate patch for win32
  • Loading branch information
adamwight committed Feb 27, 2025
1 parent f64ad0a commit f7eee72
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 79 deletions.
136 changes: 77 additions & 59 deletions erts/emulator/sys/unix/erl_child_setup.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@
# define FD_ZERO(FD_SET_PTR) memset(FD_SET_PTR, 0, sizeof(fd_set))
#endif

typedef struct exit_status {
HashBucket hb;
pid_t os_pid;
Eterm port_id;
bool want_exit_status;
} ErtsSysExitStatus;

static Hash *forker_hash;

static char abort_reason[200]; /* for core dump inspection */

static void ABORT(const char* fmt, ...)
Expand Down Expand Up @@ -174,8 +183,8 @@ static ssize_t write_all(int fd, const char *buff, size_t size) {
return pos;
}

static void add_os_pid_to_port_id_mapping(Eterm, pid_t);
static Eterm get_port_id(pid_t);
static void kill_child(pid_t os_pid);
static void kill_all_children(void);
static int forker_hash_init(void);

static int max_files = -1;
Expand Down Expand Up @@ -564,6 +573,7 @@ main(int argc, char *argv[])
tcsetattr(0,TCSANOW,&initial_tty_mode);
}
DEBUG_PRINT("erl_child_setup failed to read from uds: %d, %d", res, errno);
kill_all_children();
_exit(0);
}

Expand All @@ -572,97 +582,104 @@ main(int argc, char *argv[])
if (isatty(0) && isatty(1)) {
tcsetattr(0,TCSANOW,&initial_tty_mode);
}
kill_all_children();
_exit(0);
}
/* Since we use unix domain sockets and send the entire data in
one go we *should* get the entire payload at once. */
ASSERT(res == sizeof(proto));
ASSERT(proto.action == ErtsSysForkerProtoAction_Start);
if (proto.action == ErtsSysForkerProtoAction_Start) {
ErtsSysExitStatus es;

sys_sigblock(SIGCHLD);
sys_sigblock(SIGCHLD);

errno = 0;
errno = 0;

os_pid = fork();
if (os_pid == 0)
start_new_child(pipes);
os_pid = fork();
if (os_pid == 0)
start_new_child(pipes);

add_os_pid_to_port_id_mapping(proto.u.start.port_id, os_pid);
es.os_pid = os_pid;
es.port_id = proto.u.start.port_id;
es.want_exit_status = proto.u.start.want_exit_status;
hash_put(forker_hash, &es);

/* We write an ack here, but expect the reply on
the pipes[0] inside the fork */
proto.action = ErtsSysForkerProtoAction_Go;
proto.u.go.os_pid = os_pid;
proto.u.go.error_number = errno;
write_all(pipes[1], (char *)&proto, sizeof(proto));
/* We write an ack here, but expect the reply on
the pipes[0] inside the fork */
proto.action = ErtsSysForkerProtoAction_Go;
proto.u.go.os_pid = os_pid;
proto.u.go.error_number = errno;
write_all(pipes[1], (char *)&proto, sizeof(proto));

#ifdef FORKER_PROTO_START_ACK
proto.action = ErtsSysForkerProtoAction_StartAck;
write_all(uds_fd, (char *)&proto, sizeof(proto));
proto.action = ErtsSysForkerProtoAction_StartAck;
write_all(uds_fd, (char *)&proto, sizeof(proto));
#endif

sys_sigrelease(SIGCHLD);
close(pipes[0]);
close(pipes[1]);
close(pipes[2]);
sys_sigrelease(SIGCHLD);
close(pipes[0]);
close(pipes[1]);
close(pipes[2]);
} else if (proto.action == ErtsSysForkerProtoAction_Stop) {
ErtsSysExitStatus est, *es;
est.os_pid = proto.u.stop.os_pid;
es = hash_remove(forker_hash, &est);
if (es) {
kill_child(es->os_pid);
free(es);
}
} else {
#ifdef DEBUG
ABORT("Unknown command from parent: %d", proto.action);
#endif
}
}

if (FD_ISSET(sigchld_pipe[0], &read_fds)) {
int ibuff[2];
ErtsSysForkerProto proto;
ErtsSysExitStatus est, *es;
res = read_all(sigchld_pipe[0], (char *)ibuff, sizeof(ibuff));
if (res <= 0) {
ABORT("Failed to read from sigchld pipe: %d (%d)", res, errno);
}

proto.u.sigchld.port_id = get_port_id((pid_t)(ibuff[0]));

if (proto.u.sigchld.port_id == THE_NON_VALUE)
continue; /* exit status report not requested */

proto.action = ErtsSysForkerProtoAction_SigChld;
proto.u.sigchld.error_number = ibuff[1];
DEBUG_PRINT("send sigchld to %d (errno = %d)", uds_fd, ibuff[1]);
if (write_all(uds_fd, (char *)&proto, sizeof(proto)) < 0) {
/* The uds was close, which most likely means that the VM
has exited. This will be detected when we try to read
from the uds_fd. */
DEBUG_PRINT("Failed to write to uds: %d (%d)", uds_fd, errno);
est.os_pid = (pid_t)ibuff[0];
es = hash_remove(forker_hash, &est);

if (es && es->want_exit_status) {
proto.action = ErtsSysForkerProtoAction_SigChld;
proto.u.sigchld.port_id = es->port_id;
proto.u.sigchld.error_number = ibuff[1];
DEBUG_PRINT("send sigchld to %d (errno = %d)", uds_fd, ibuff[1]);
if (write_all(uds_fd, (char *)&proto, sizeof(proto)) < 0) {
/* The uds was close, which most likely means that the VM
has exited. This will be detected when we try to read
from the uds_fd. */
DEBUG_PRINT("Failed to write to uds: %d (%d)", uds_fd, errno);
}
free(es);
}
}
}
return 1;
}

typedef struct exit_status {
HashBucket hb;
pid_t os_pid;
Eterm port_id;
} ErtsSysExitStatus;

static Hash *forker_hash;
/* Kill child process groups on VM termination so they don't become orphaned. */

static void add_os_pid_to_port_id_mapping(Eterm port_id, pid_t os_pid)
{
if (port_id != THE_NON_VALUE) {
/* exit status report requested */
ErtsSysExitStatus es;
es.os_pid = os_pid;
es.port_id = port_id;
hash_put(forker_hash, &es);
static void kill_child(pid_t os_pid) {
if (os_pid > 0 && kill(os_pid, SIGTERM) != 0) {
DEBUG_PRINT("error killing process %d: %d", os_pid, errno);
}
}

static Eterm get_port_id(pid_t os_pid)
{
ErtsSysExitStatus est, *es;
Eterm port_id;
est.os_pid = os_pid;
es = hash_remove(forker_hash, &est);
if (!es) return THE_NON_VALUE;
port_id = es->port_id;
free(es);
return port_id;
static void fun_kill_foreach(ErtsSysExitStatus *es, void *unused) {
kill_child(es->os_pid);
}

static void kill_all_children(void) {
DEBUG_PRINT("cleaning up by killing all %d child processes", forker_hash->nobjs);
hash_foreach(forker_hash, (HFOREACH_FUN)fun_kill_foreach, NULL);
}

static int fcmp(void *a, void *b)
Expand Down Expand Up @@ -691,6 +708,7 @@ static void *falloc(void *e)
ErtsSysExitStatus *ne = malloc(sizeof(ErtsSysExitStatus));
ne->os_pid = se->os_pid;
ne->port_id = se->port_id;
ne->want_exit_status = se->want_exit_status;
return ne;
}

Expand Down
7 changes: 6 additions & 1 deletion erts/emulator/sys/unix/erl_child_setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ typedef struct ErtsSysForkerProto_ {
ErtsSysForkerProtoAction_StartAck,
ErtsSysForkerProtoAction_Go,
ErtsSysForkerProtoAction_SigChld,
ErtsSysForkerProtoAction_Ack
ErtsSysForkerProtoAction_Ack,
ErtsSysForkerProtoAction_Stop
} action;
union {
struct {
ErtsSysPortId port_id;
int fds[3];
bool want_exit_status;
} start;
struct {
pid_t os_pid;
Expand All @@ -71,6 +73,9 @@ typedef struct ErtsSysForkerProto_ {
ErtsSysPortId port_id;
int error_number;
} sigchld;
struct {
pid_t os_pid;
} stop;
} u;
} ErtsSysForkerProto;

Expand Down
55 changes: 36 additions & 19 deletions erts/emulator/sys/unix/sys_drivers.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ static ErlDrvData spawn_start(ErlDrvPort port_num, char* name,
proto->u.start.fds[0] = ofd[0];
proto->u.start.fds[1] = ifd[1];
proto->u.start.fds[2] = stderrfd;
proto->u.start.port_id = opts->exit_status ? erts_drvport2id(port_num) : THE_NON_VALUE;
proto->u.start.port_id = erts_drvport2id(port_num);
proto->u.start.want_exit_status = opts->exit_status;
if (erl_drv_port_control(forker_port, ERTS_FORKER_DRV_CONTROL_MAGIC_NUMBER,
(char*)proto, sizeof(*proto))) {
/* The forker port has been killed, we close both fd's which will
Expand Down Expand Up @@ -1046,6 +1047,16 @@ static void stop(ErlDrvData ev)
driver_select(prt, abs(dd->ofd->fd), ERL_DRV_USE, 0); /* close(ofd); */
}

if (dd->pid > 0) {
ErtsSysForkerProto *proto =
erts_alloc(ERTS_ALC_T_DRV_CTRL_DATA, sizeof(ErtsSysForkerProto));
memset(proto, 0, sizeof(ErtsSysForkerProto));
proto->action = ErtsSysForkerProtoAction_Stop;
proto->u.stop.os_pid = dd->pid;
erl_drv_port_control(forker_port, ERTS_FORKER_DRV_CONTROL_MAGIC_NUMBER,
(char*)proto, sizeof(*proto));
}

erts_free(ERTS_ALC_T_DRV_TAB, dd);
}

Expand Down Expand Up @@ -1814,29 +1825,35 @@ static ErlDrvSSizeT forker_control(ErlDrvData e, unsigned int cmd, char *buf,
first_call = 0;
}

driver_enq(port_num, buf, len);
if (driver_sizeq(port_num) > sizeof(*proto)) {
return 0;
}

if ((res = sys_uds_write(forker_fd, (char*)proto, sizeof(*proto),
proto->u.start.fds, 3, 0)) < 0) {
if (errno == ERRNO_BLOCK || errno == EINTR) {
driver_select(port_num, forker_fd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
return 0;
} else if (errno == EMFILE) {
forker_sigchld(proto->u.start.port_id, errno);
forker_deq(port_num, proto);
if (proto->action == ErtsSysForkerProtoAction_Start) {
driver_enq(port_num, buf, len);
if (driver_sizeq(port_num) > sizeof(*proto)) {
return 0;
} else {
erts_exit(ERTS_DUMP_EXIT, "Failed to write to erl_child_setup: %d\n", errno);
}
}

if ((res = sys_uds_write(forker_fd, (char*)proto, sizeof(*proto),
proto->u.start.fds, 3, 0)) < 0) {
if (errno == ERRNO_BLOCK || errno == EINTR) {
driver_select(port_num, forker_fd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
return 0;
} else if (errno == EMFILE) {
forker_sigchld(proto->u.start.port_id, errno);
forker_deq(port_num, proto);
return 0;
} else {
erts_exit(ERTS_DUMP_EXIT, "Failed to write to erl_child_setup: %d\n", errno);
}
}

#ifndef FORKER_PROTO_START_ACK
ASSERT(res == sizeof(*proto));
forker_deq(port_num, proto);
ASSERT(res == sizeof(*proto));
forker_deq(port_num, proto);
#endif
} else if (proto->action == ErtsSysForkerProtoAction_Stop) {
if ((res = write(forker_fd, (char*)proto, sizeof(*proto))) < 0) {
erts_exit(ERTS_DUMP_EXIT, "Failed to write stop to erl_child_setup: %d\n", errno);
}
}

return 0;
}
5 changes: 5 additions & 0 deletions erts/preloaded/src/erlang.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7475,6 +7475,11 @@ reported to the owning process using signals of the form
The maximum number of ports that can be open at the same time can be configured
by passing command-line flag [`+Q`](erl_cmd.md#max_ports) to [erl](erl_cmd.md).
When a port is closed or the VM shuts down, spawned executables are sent a
`SIGTERM` on unix. The child may still outlive the VM if it traps the signal.
Note that any processes started under a shell using `spawn` will not terminate
unless they respond to stdin or stdout being closed.
""".
-doc #{ category => ports }.
-spec open_port(PortName, PortSettings) -> port() when
Expand Down

0 comments on commit f7eee72

Please sign in to comment.