Skip to content

Commit

Permalink
Merge pull request #9519 from rickard-green/rickard/prio-msg/OTP-19198
Browse files Browse the repository at this point in the history
Priority message fixes
  • Loading branch information
rickard-green authored Mar 3, 2025
2 parents 3ff68a3 + 5f5cffa commit d0e1e29
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 79 deletions.
113 changes: 71 additions & 42 deletions erts/emulator/beam/bif.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,47 @@ BIF_RETTYPE spawn_3(BIF_ALIST_3)

/**********************************************************************/

/* Utility to add a new link between processes p and another internal
* process (rpid). Process p must be the currently executing process.
/*
* link/1 and link/2
*/

static ERTS_INLINE int
link_modify_flags(Uint32 *flagsp, Uint32 add_flags, Uint32 rm_flags)
{
Uint32 fs, fs_before;

fs = fs_before = *flagsp;

fs |= add_flags;
fs &= ~rm_flags;

*flagsp = fs;

if (!(fs_before & ERTS_ML_FLG_PRIO_ML) & !!(fs & ERTS_ML_FLG_PRIO_ML))
return 1;

if (!!(fs_before & ERTS_ML_FLG_PRIO_ML) & !(fs & ERTS_ML_FLG_PRIO_ML))
return -1;

return 0;
}

/* create a link to the process */
static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
{
BIF_RETTYPE ret_val;
int prio_change = 0;
Uint32 add_flags = 0, rm_flags = 0;

if (ERTS_IS_P_TRACED_FL(c_p, F_TRACE_PROCS)) {
trace_proc(c_p, ERTS_PROC_LOCK_MAIN, c_p, am_link, other);
}

if (is_not_nil(opts)) {
add_flags = erts_link_opts(opts, &rm_flags);
if (add_flags == (Uint32) ~0) {
c_p->fvalue = am_badopt;
ERTS_BIF_PREP_ERROR(ret_val, c_p, BADARG | EXF_HAS_EXT_INFO);
return ret_val;
}
add_flags = erts_link_opts(opts, &rm_flags);
if (add_flags == (Uint32) ~0) {
c_p->fvalue = am_badopt;
ERTS_BIF_PREP_ERROR(ret_val, c_p, BADARG | EXF_HAS_EXT_INFO);
goto done;
}

ERTS_BIF_PREP_RET(ret_val, am_true); /* Prepare for success... */
Expand All @@ -132,7 +152,7 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
ErtsLink *lnk, *rlnk;

if (c_p->common.id == other)
return ret_val;
goto done;

if (!erts_proc_lookup(other) && !(c_p->flags & F_TRAP_EXIT))
goto res_no_proc;
Expand All @@ -141,12 +161,11 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
&created,
ERTS_LNK_TYPE_PROC,
other);
lnk->flags |= add_flags;
prio_change = link_modify_flags(&lnk->flags, add_flags, rm_flags);
if (!created) {
ErtsILink *ilnk = (ErtsILink *) lnk;
if (!ilnk->unlinking) {
lnk->flags &= ~rm_flags;
return ret_val;
goto done;
}
ilnk->unlinking = 0;
}
Expand All @@ -158,7 +177,7 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
erts_proc_sig_send_link_exit(NULL, other, rlnk, am_noproc, NIL);
}

return ret_val;
goto done;
}

if (is_internal_port(other)) {
Expand All @@ -177,12 +196,11 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
&created,
ERTS_LNK_TYPE_PORT,
other);
lnk->flags |= add_flags;
prio_change = link_modify_flags(&lnk->flags, add_flags, rm_flags);
if (!created) {
ErtsILink *ilnk = (ErtsILink *) lnk;
if (!ilnk->unlinking) {
lnk->flags &= ~rm_flags;
return ret_val;
goto done;
}
ilnk->unlinking = 0;
}
Expand All @@ -205,7 +223,7 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
default:
break;
}
return ret_val;
goto done;
}
else if (is_external_port(other)
&& external_port_dist_entry(other) == erts_this_dist_entry) {
Expand All @@ -220,20 +238,19 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
ERTS_LNK_TYPE_DIST_PORT,
c_p->common.id,
other);
lnk->flags |= add_flags;
prio_change = link_modify_flags(&lnk->flags, add_flags, rm_flags);
if (!created) {
ErtsELink *elnk = erts_link_to_elink(lnk);
if (!elnk->unlinking) {
lnk->flags &= ~rm_flags;
return ret_val;
goto done;
}
elnk->unlinking = 0;
}

erts_proc_sig_send_dist_link_exit(erts_this_dist_entry, other,
c_p->common.id, NULL, NULL,
am_noproc, NIL);
return ret_val;
goto done;
}

if (is_external_pid(other)) {
Expand All @@ -253,16 +270,15 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
ERTS_LNK_TYPE_DIST_PROC,
c_p->common.id,
other);
lnk->flags |= add_flags;
prio_change = link_modify_flags(&lnk->flags, add_flags, rm_flags);
elnk = erts_link_to_elink(lnk);

if (dep == erts_this_dist_entry) {
lnk->flags &= ~rm_flags;
elnk->unlinking = 0;
erts_proc_sig_send_dist_link_exit(erts_this_dist_entry, other,
c_p->common.id, NULL, NULL,
am_noproc, NIL);
return ret_val;
goto done;
}

if (created) {
Expand All @@ -272,8 +288,7 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)
}
else {
if (!elnk->unlinking) {
lnk->flags &= ~rm_flags;
return ret_val; /* Already present... */
goto done; /* Already present... */
}
/*
* We need to replace the link if the connection has changed.
Expand Down Expand Up @@ -368,11 +383,20 @@ static BIF_RETTYPE link_opt(Process *c_p, Eterm other, Eterm opts)

elnk->unlinking = 0;

return ret_val;
goto done;
}

ERTS_BIF_PREP_ERROR(ret_val, c_p, BADARG);

done:

if (prio_change) {
if (prio_change > 0)
erts_proc_sig_prio_item_added(c_p, ERTS_PRIO_ITEM_TYPE_LINK);
else
erts_proc_sig_prio_item_deleted(c_p, ERTS_PRIO_ITEM_TYPE_LINK);
}

return ret_val;

res_no_proc: {
Expand Down Expand Up @@ -1145,6 +1169,11 @@ BIF_RETTYPE spawn_request_abandon_1(BIF_ALIST_1)

ASSERT(erts_monitor_is_origin(omon));

if (omon->flags & ERTS_ML_FLG_PRIO_ML) {
omon->flags &= ~ERTS_ML_FLG_PRIO_ML;
erts_proc_sig_prio_item_deleted(BIF_P, ERTS_PRIO_ITEM_TYPE_MONITOR);
}

if (omon->flags & ERTS_ML_FLG_SPAWN_LINK) {
/* Leave it for reply... */
omon->flags |= ERTS_ML_FLG_SPAWN_ABANDONED;
Expand All @@ -1164,19 +1193,20 @@ BIF_RETTYPE spawn_request_abandon_1(BIF_ALIST_1)


/**********************************************************************/

static ERTS_INLINE void
unlink_clear_prio(Process *c_p, Uint32 *flagsp)
{
if ((*flagsp) & ERTS_ML_FLG_PRIO_ML) {
*flagsp &= ~ERTS_ML_FLG_PRIO_ML;
erts_proc_sig_prio_item_deleted(c_p, ERTS_PRIO_ITEM_TYPE_LINK);
}
}

/* remove a link from a process */
BIF_RETTYPE unlink_1(BIF_ALIST_1)
{

#define ERTS_UNLINK_CLEAR_PRIO__(C_P, FS) \
do { \
int prio__ = !!((FS) & ERTS_ML_FLG_PRIO_ML); \
(FS) &= ~ERTS_ML_FLG_PRIO_ML; \
if (prio__) \
erts_proc_sig_prio_item_deleted((C_P), \
ERTS_PRIO_ITEM_TYPE_LINK); \
} while (0)

if (ERTS_IS_P_TRACED_FL(BIF_P, F_TRACE_PROCS)) {
trace_proc(BIF_P, ERTS_PROC_LOCK_MAIN,
BIF_P, am_unlink, BIF_ARG_1);
Expand All @@ -1192,7 +1222,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
Uint64 id = erts_proc_sig_send_unlink(&BIF_P->common,
BIF_P->common.id,
&ilnk->link);
ERTS_UNLINK_CLEAR_PRIO__(BIF_P, ilnk->link.flags);
unlink_clear_prio(BIF_P, &ilnk->link.flags);
if (id)
ilnk->unlinking = id;
else {
Expand All @@ -1216,7 +1246,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
ErtsPortOpResult res = ERTS_PORT_OP_DROPPED;
Port *prt;

ERTS_UNLINK_CLEAR_PRIO__(BIF_P, ilnk->link.flags);
unlink_clear_prio(BIF_P, &ilnk->link.flags);

/* Send unlink signal */
prt = erts_port_lookup(BIF_ARG_1, ERTS_PORT_SFLGS_DEAD);
Expand Down Expand Up @@ -1267,7 +1297,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
if (elnk->unlinking)
BIF_RET(am_true);

ERTS_UNLINK_CLEAR_PRIO__(BIF_P, lnk->flags);
unlink_clear_prio(BIF_P, &lnk->flags);

unlink_id = erts_proc_sig_new_unlink_id(&BIF_P->common);
elnk->unlinking = unlink_id;
Expand Down Expand Up @@ -1304,6 +1334,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
ErtsLink *lnk = erts_link_tree_lookup(ERTS_P_LINKS(BIF_P), BIF_ARG_1);
if (lnk) {
ErtsELink *elnk;
unlink_clear_prio(BIF_P, &lnk->flags);
erts_link_to_other(lnk, &elnk);
erts_link_release_both(&elnk->ld);
}
Expand All @@ -1312,8 +1343,6 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
/* Links to Remote ports not supported... */

BIF_ERROR(BIF_P, BADARG);

#undef ERTS_UNLINK_CLEAR_PRIO__
}

BIF_RETTYPE hibernate_3(BIF_ALIST_3)
Expand Down
3 changes: 2 additions & 1 deletion erts/emulator/beam/dist.c
Original file line number Diff line number Diff line change
Expand Up @@ -6030,7 +6030,8 @@ BIF_RETTYPE erts_internal_dist_spawn_request_4(BIF_ALIST_4)
Eterm *tp = tuple_val(car);
if (am_reply_tag == tp[1]
|| am_reply == tp[1]
|| am_monitor == tp[1]) {
|| am_monitor == tp[1]
|| am_link == tp[1]) {
rm_cnt++;
/* skip option */
if (rm_cnt == rm_opts) {
Expand Down
Loading

0 comments on commit d0e1e29

Please sign in to comment.