Skip to content

Commit

Permalink
Merge pull request nats-io#460 from nats-io/js_drain_del_cons_race
Browse files Browse the repository at this point in the history
Ensure that JS consumer is deleted before "drain complete" notification
  • Loading branch information
kozlovic authored Aug 26, 2021
2 parents 2f18dcb + 1ccda22 commit 92ac896
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 43 deletions.
48 changes: 44 additions & 4 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1034,9 +1034,49 @@ jsSub_deleteConsumer(natsSubscription *sub)
return NATS_OK;

s = js_DeleteConsumer(js, stream, consumer, NULL, NULL);
if (s == NATS_NOT_FOUND)
s = nats_setError(s, "failed to delete consumer '%s': not found", consumer);
return NATS_UPDATE_ERR_STACK(s);
}

// Runs under the subscription lock, but lock will be released,
// the connection lock will be possibly acquired/released, then
// the subscription lock reacquired.
void
jsSub_deleteConsumerAfterDrain(natsSubscription *sub)
{
natsConnection *nc = NULL;
const char *consumer = NULL;
natsStatus s;

if ((sub->jsi == NULL) || !sub->jsi->dc)
return;

nc = sub->conn;
consumer = sub->jsi->consumer;

// Need to release sub lock since deletion of consumer
// will require the connection lock, etc..
natsSub_Unlock(sub);

s = jsSub_deleteConsumer(sub);
if (s != NATS_OK)
{
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %d (%s)",
consumer, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
natsConn_Unlock(nc);
}

// Reacquire the lock before returning.
natsSub_Lock(sub);
}

static natsStatus
_copyString(char **new_str, const char *str, int l)
{
Expand Down Expand Up @@ -1301,23 +1341,23 @@ natsSubscription_GetSequenceMismatch(jsConsumerSequenceMismatch *csm, natsSubscr
if ((csm == NULL) || (sub == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsSub_Lock(sub);
natsSubAndLdw_Lock(sub);
if (sub->jsi == NULL)
{
natsSub_Unlock(sub);
natsSubAndLdw_Unlock(sub);
return nats_setError(NATS_INVALID_SUBSCRIPTION, "%s", jsErrNotAJetStreamSubscription);
}
jsi = sub->jsi;
if (jsi->dseq == jsi->ldseq)
{
natsSub_Unlock(sub);
natsSubAndLdw_Unlock(sub);
return NATS_NOT_FOUND;
}
memset(csm, 0, sizeof(jsConsumerSequenceMismatch));
csm->Stream = jsi->sseq;
csm->ConsumerClient = jsi->dseq;
csm->ConsumerServer = jsi->ldseq;
natsSub_Unlock(sub);
natsSubAndLdw_Unlock(sub);
return NATS_OK;
}

Expand Down
7 changes: 2 additions & 5 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1830,11 +1830,8 @@ js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,

// If we got a response, check for error and success result.
IFOK(s, _unmarshalSuccessResp(&success, resp, errCode));
if ((s == NATS_NOT_FOUND) || ((s == NATS_OK) && !success))
{
const char *nferr = (s == NATS_NOT_FOUND ? ": not found" : "");
s = nats_setError(s, "failed to delete consumer '%s'%s", consumer,nferr);
}
if ((s == NATS_OK) && !success)
s = nats_setError(s, "failed to delete consumer '%s'", consumer);

NATS_FREE(subj);
natsMsg_Destroy(resp);
Expand Down
3 changes: 3 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ jsSub_free(jsSub *sub);
natsStatus
jsSub_deleteConsumer(natsSubscription *sub);

void
jsSub_deleteConsumerAfterDrain(natsSubscription *sub);

natsStatus
jsSub_trackSequences(jsSub *jsi, const char *reply);

Expand Down
59 changes: 31 additions & 28 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ natsSubAndLdw_Unlock(natsSubscription *sub)
natsMutex_Unlock(sub->mu);
}

// Runs under the subscription lock but will release it for a JS subscription
// if the JS consumer needs to be deleted.
static void
_setDrainCompleteState(natsSubscription *sub)
{
Expand All @@ -128,6 +130,20 @@ _setDrainCompleteState(natsSubscription *sub)
// switched to "drain complete", swith the state.
if (!natsSub_drainComplete(sub))
{
// For JS subscription we may need to delete the JS consumer, but
// we want to do so here ONLY if there was really a drain started.
// So need to check on drain started state. Also, note that if
// jsSub_deleteConsumerAfterDrain is invoked, the lock may be
// released/reacquired in that function.
if ((sub->jsi != NULL) && natsSub_drainStarted(sub) && sub->jsi->dc)
{
jsSub_deleteConsumerAfterDrain(sub);
// Check drainCompete state again, since another thread may have
// beat us to it while lock was released.
if (natsSub_drainComplete(sub))
return;
}

// If drain status is not already set (could be done in _flushAndDrain
// if flush fails, or timeout occurs), set it here to report if the
// connection or subscription has been closed prior to drain completion.
Expand Down Expand Up @@ -741,10 +757,7 @@ natsSub_nextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout, bool
removeSub = true;
}
if (removeSub)
{
_setDrainCompleteState(sub);
_retain(sub);
}
}
if ((s == NATS_OK) && natsMsg_IsNoResponders(msg))
{
Expand All @@ -764,6 +777,7 @@ natsSub_nextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout, bool

if (removeSub)
{
natsSub_setDrainCompleteState(sub);
natsConn_removeSubscription(nc, sub);
natsSub_release(sub);
}
Expand Down Expand Up @@ -913,19 +927,14 @@ _flushAndDrain(void *closure)
natsThread *t = NULL;
int64_t timeout = 0;
int64_t deadline = 0;
bool dc = false;
const char *consumer= NULL;
bool sync = false;
natsStatus s;

natsSub_Lock(sub);
nc = sub->conn;
t = sub->drainThread;
timeout = sub->drainTimeout;
if ((sub->jsi != NULL) && sub->jsi->dc && (sub->jsi->consumer != NULL))
{
dc = true;
consumer = sub->jsi->consumer;
}
sync = (sub->msgCb == NULL ? true : false);
natsSub_Unlock(sub);

// Make sure that negative value is considered no timeout.
Expand Down Expand Up @@ -957,26 +966,20 @@ _flushAndDrain(void *closure)
s = NATS_OK;
// Wait for drain to complete or deadline is reached.
natsSub_Lock(sub);
while ((s != NATS_TIMEOUT) && !natsSub_drainComplete(sub))
s = natsCondition_AbsoluteTimedWait(sub->cond, sub->mu, deadline);
natsSub_Unlock(sub);

if ((s == NATS_OK) && dc)
// For sync subs, it is possible that we get here and users have
// already called NextMsg() for all pending messages before the sub
// was marked as "draining", so if we detect this situation, we need
// to switch status to complete here.
if (sync && !natsSub_drainComplete(sub) && (sub->msgList.msgs == 0))
{
natsStatus ls = jsSub_deleteConsumer(sub);
if (ls != NATS_OK)
{
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %d (%s)",
consumer, ls, natsStatus_GetText(ls));
natsAsyncCb_PostErrHandler(nc, sub, ls, NATS_STRDUP(tmp));
}
natsConn_Unlock(nc);
}
_setDrainCompleteState(sub);
}
else
{
while ((s != NATS_TIMEOUT) && !natsSub_drainComplete(sub))
s = natsCondition_AbsoluteTimedWait(sub->cond, sub->mu, deadline);
}
natsSub_Unlock(sub);

if (s != NATS_OK)
{
Expand Down
35 changes: 29 additions & 6 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -23523,6 +23523,29 @@ test_JetStreamSubscribe(void)
natsSubscription_Destroy(sub);
sub = NULL;

test("Create consumer: ");
jsSubOptions_Init(&so);
so.Config.Durable = "delcons2sync";
s = js_SubscribeSync(&sub, js, "foo", NULL, &so, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Drain deletes consumer: ");
s = natsSubscription_Drain(sub);
for (i=0; i<3; i++)
{
natsMsg *msg = NULL;
IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000));
IFOK(s, natsMsg_Ack(msg, NULL));
natsMsg_Destroy(msg);
msg = NULL;
}
IFOK(s, natsSubscription_WaitForDrainCompletion(sub, 1000));
IFOK(s, js_GetConsumerInfo(&ci, js, "TEST", "delcons2sync", NULL, &jerr));
testCond((s == NATS_NOT_FOUND) && (ci == NULL) && (jerr == JSConsumerNotFoundErr)
&& (nats_GetLastError(NULL) == NULL));
natsSubscription_Destroy(sub);
sub = NULL;

test("Create consumer: ");
jsSubOptions_Init(&so);
so.Config.Durable = "delcons3";
Expand Down Expand Up @@ -24468,9 +24491,9 @@ test_JetStreamSubscribeIdleHearbeat(void)

test("Check HB received: ");
nats_Sleep(300);
natsMutex_Lock(sub->mu);
natsSubAndLdw_Lock(sub);
s = (sub->jsi->dseq == 1 ? NATS_OK : NATS_ERR);
natsMutex_Unlock(sub->mu);
natsSubAndLdw_Unlock(sub);
testCond(s == NATS_OK);

test("Check HB is not given to app: ");
Expand Down Expand Up @@ -24543,9 +24566,9 @@ test_JetStreamSubscribeIdleHearbeat(void)
// Send real message so that all clears up
s = js_Publish(NULL, js, "foo", "msg3", 4, NULL, &jerr);
nats_Sleep(300);
natsMutex_Lock(sub->mu);
natsSubAndLdw_Lock(sub);
s = (sub->jsi->ssmn == false ? NATS_OK : NATS_ERR);
natsMutex_Unlock(sub->mu);
natsSubAndLdw_Unlock(sub);
testCond(s == NATS_OK);

test("Skip again: ");
Expand Down Expand Up @@ -24647,9 +24670,9 @@ test_JetStreamSubscribeIdleHearbeat(void)
// Send real message so that all clears up
s = js_Publish(NULL, js, "foo", "msg4", 4, NULL, &jerr);
nats_Sleep(300);
natsMutex_Lock(sub->mu);
natsSubAndLdw_Lock(sub);
s = (sub->jsi->ssmn == false && sub->jsi->sm == false ? NATS_OK : NATS_ERR);
natsMutex_Unlock(sub->mu);
natsSubAndLdw_Unlock(sub);
testCond(s == NATS_OK);

test("Skip again: ");
Expand Down

0 comments on commit 92ac896

Please sign in to comment.