diff --git a/src/js.c b/src/js.c index 1932222e4..3a0b787c4 100644 --- a/src/js.c +++ b/src/js.c @@ -1034,9 +1034,49 @@ jsSub_deleteConsumer(natsSubscription *sub) return NATS_OK; s = js_DeleteConsumer(js, stream, consumer, NULL, NULL); + if (s == NATS_NOT_FOUND) + s = nats_setError(s, "failed to delete consumer '%s': not found", consumer); return NATS_UPDATE_ERR_STACK(s); } +// Runs under the subscription lock, but lock will be released, +// the connection lock will be possibly acquired/released, then +// the subscription lock reacquired. +void +jsSub_deleteConsumerAfterDrain(natsSubscription *sub) +{ + natsConnection *nc = NULL; + const char *consumer = NULL; + natsStatus s; + + if ((sub->jsi == NULL) || !sub->jsi->dc) + return; + + nc = sub->conn; + consumer = sub->jsi->consumer; + + // Need to release sub lock since deletion of consumer + // will require the connection lock, etc.. + natsSub_Unlock(sub); + + s = jsSub_deleteConsumer(sub); + if (s != NATS_OK) + { + natsConn_Lock(nc); + if (nc->opts->asyncErrCb != NULL) + { + char tmp[256]; + snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %d (%s)", + consumer, s, natsStatus_GetText(s)); + natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp)); + } + natsConn_Unlock(nc); + } + + // Reacquire the lock before returning. + natsSub_Lock(sub); +} + static natsStatus _copyString(char **new_str, const char *str, int l) { @@ -1301,23 +1341,23 @@ natsSubscription_GetSequenceMismatch(jsConsumerSequenceMismatch *csm, natsSubscr if ((csm == NULL) || (sub == NULL)) return nats_setDefaultError(NATS_INVALID_ARG); - natsSub_Lock(sub); + natsSubAndLdw_Lock(sub); if (sub->jsi == NULL) { - natsSub_Unlock(sub); + natsSubAndLdw_Unlock(sub); return nats_setError(NATS_INVALID_SUBSCRIPTION, "%s", jsErrNotAJetStreamSubscription); } jsi = sub->jsi; if (jsi->dseq == jsi->ldseq) { - natsSub_Unlock(sub); + natsSubAndLdw_Unlock(sub); return NATS_NOT_FOUND; } memset(csm, 0, sizeof(jsConsumerSequenceMismatch)); csm->Stream = jsi->sseq; csm->ConsumerClient = jsi->dseq; csm->ConsumerServer = jsi->ldseq; - natsSub_Unlock(sub); + natsSubAndLdw_Unlock(sub); return NATS_OK; } diff --git a/src/jsm.c b/src/jsm.c index 7f54f7124..ab4d7016d 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -1830,11 +1830,8 @@ js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer, // If we got a response, check for error and success result. IFOK(s, _unmarshalSuccessResp(&success, resp, errCode)); - if ((s == NATS_NOT_FOUND) || ((s == NATS_OK) && !success)) - { - const char *nferr = (s == NATS_NOT_FOUND ? ": not found" : ""); - s = nats_setError(s, "failed to delete consumer '%s'%s", consumer,nferr); - } + if ((s == NATS_OK) && !success) + s = nats_setError(s, "failed to delete consumer '%s'", consumer); NATS_FREE(subj); natsMsg_Destroy(resp); diff --git a/src/natsp.h b/src/natsp.h index 911e3fda6..1aef3e53f 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -772,6 +772,9 @@ jsSub_free(jsSub *sub); natsStatus jsSub_deleteConsumer(natsSubscription *sub); +void +jsSub_deleteConsumerAfterDrain(natsSubscription *sub); + natsStatus jsSub_trackSequences(jsSub *jsi, const char *reply); diff --git a/src/sub.c b/src/sub.c index 80b000377..13e70cf4a 100644 --- a/src/sub.c +++ b/src/sub.c @@ -120,6 +120,8 @@ natsSubAndLdw_Unlock(natsSubscription *sub) natsMutex_Unlock(sub->mu); } +// Runs under the subscription lock but will release it for a JS subscription +// if the JS consumer needs to be deleted. static void _setDrainCompleteState(natsSubscription *sub) { @@ -128,6 +130,20 @@ _setDrainCompleteState(natsSubscription *sub) // switched to "drain complete", swith the state. if (!natsSub_drainComplete(sub)) { + // For JS subscription we may need to delete the JS consumer, but + // we want to do so here ONLY if there was really a drain started. + // So need to check on drain started state. Also, note that if + // jsSub_deleteConsumerAfterDrain is invoked, the lock may be + // released/reacquired in that function. + if ((sub->jsi != NULL) && natsSub_drainStarted(sub) && sub->jsi->dc) + { + jsSub_deleteConsumerAfterDrain(sub); + // Check drainCompete state again, since another thread may have + // beat us to it while lock was released. + if (natsSub_drainComplete(sub)) + return; + } + // If drain status is not already set (could be done in _flushAndDrain // if flush fails, or timeout occurs), set it here to report if the // connection or subscription has been closed prior to drain completion. @@ -741,10 +757,7 @@ natsSub_nextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout, bool removeSub = true; } if (removeSub) - { - _setDrainCompleteState(sub); _retain(sub); - } } if ((s == NATS_OK) && natsMsg_IsNoResponders(msg)) { @@ -764,6 +777,7 @@ natsSub_nextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout, bool if (removeSub) { + natsSub_setDrainCompleteState(sub); natsConn_removeSubscription(nc, sub); natsSub_release(sub); } @@ -913,19 +927,14 @@ _flushAndDrain(void *closure) natsThread *t = NULL; int64_t timeout = 0; int64_t deadline = 0; - bool dc = false; - const char *consumer= NULL; + bool sync = false; natsStatus s; natsSub_Lock(sub); nc = sub->conn; t = sub->drainThread; timeout = sub->drainTimeout; - if ((sub->jsi != NULL) && sub->jsi->dc && (sub->jsi->consumer != NULL)) - { - dc = true; - consumer = sub->jsi->consumer; - } + sync = (sub->msgCb == NULL ? true : false); natsSub_Unlock(sub); // Make sure that negative value is considered no timeout. @@ -957,26 +966,20 @@ _flushAndDrain(void *closure) s = NATS_OK; // Wait for drain to complete or deadline is reached. natsSub_Lock(sub); - while ((s != NATS_TIMEOUT) && !natsSub_drainComplete(sub)) - s = natsCondition_AbsoluteTimedWait(sub->cond, sub->mu, deadline); - natsSub_Unlock(sub); - - if ((s == NATS_OK) && dc) + // For sync subs, it is possible that we get here and users have + // already called NextMsg() for all pending messages before the sub + // was marked as "draining", so if we detect this situation, we need + // to switch status to complete here. + if (sync && !natsSub_drainComplete(sub) && (sub->msgList.msgs == 0)) { - natsStatus ls = jsSub_deleteConsumer(sub); - if (ls != NATS_OK) - { - natsConn_Lock(nc); - if (nc->opts->asyncErrCb != NULL) - { - char tmp[256]; - snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %d (%s)", - consumer, ls, natsStatus_GetText(ls)); - natsAsyncCb_PostErrHandler(nc, sub, ls, NATS_STRDUP(tmp)); - } - natsConn_Unlock(nc); - } + _setDrainCompleteState(sub); } + else + { + while ((s != NATS_TIMEOUT) && !natsSub_drainComplete(sub)) + s = natsCondition_AbsoluteTimedWait(sub->cond, sub->mu, deadline); + } + natsSub_Unlock(sub); if (s != NATS_OK) { diff --git a/test/test.c b/test/test.c index 3ffd66266..970f7d78b 100644 --- a/test/test.c +++ b/test/test.c @@ -23523,6 +23523,29 @@ test_JetStreamSubscribe(void) natsSubscription_Destroy(sub); sub = NULL; + test("Create consumer: "); + jsSubOptions_Init(&so); + so.Config.Durable = "delcons2sync"; + s = js_SubscribeSync(&sub, js, "foo", NULL, &so, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Drain deletes consumer: "); + s = natsSubscription_Drain(sub); + for (i=0; i<3; i++) + { + natsMsg *msg = NULL; + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + IFOK(s, natsMsg_Ack(msg, NULL)); + natsMsg_Destroy(msg); + msg = NULL; + } + IFOK(s, natsSubscription_WaitForDrainCompletion(sub, 1000)); + IFOK(s, js_GetConsumerInfo(&ci, js, "TEST", "delcons2sync", NULL, &jerr)); + testCond((s == NATS_NOT_FOUND) && (ci == NULL) && (jerr == JSConsumerNotFoundErr) + && (nats_GetLastError(NULL) == NULL)); + natsSubscription_Destroy(sub); + sub = NULL; + test("Create consumer: "); jsSubOptions_Init(&so); so.Config.Durable = "delcons3"; @@ -24468,9 +24491,9 @@ test_JetStreamSubscribeIdleHearbeat(void) test("Check HB received: "); nats_Sleep(300); - natsMutex_Lock(sub->mu); + natsSubAndLdw_Lock(sub); s = (sub->jsi->dseq == 1 ? NATS_OK : NATS_ERR); - natsMutex_Unlock(sub->mu); + natsSubAndLdw_Unlock(sub); testCond(s == NATS_OK); test("Check HB is not given to app: "); @@ -24543,9 +24566,9 @@ test_JetStreamSubscribeIdleHearbeat(void) // Send real message so that all clears up s = js_Publish(NULL, js, "foo", "msg3", 4, NULL, &jerr); nats_Sleep(300); - natsMutex_Lock(sub->mu); + natsSubAndLdw_Lock(sub); s = (sub->jsi->ssmn == false ? NATS_OK : NATS_ERR); - natsMutex_Unlock(sub->mu); + natsSubAndLdw_Unlock(sub); testCond(s == NATS_OK); test("Skip again: "); @@ -24647,9 +24670,9 @@ test_JetStreamSubscribeIdleHearbeat(void) // Send real message so that all clears up s = js_Publish(NULL, js, "foo", "msg4", 4, NULL, &jerr); nats_Sleep(300); - natsMutex_Lock(sub->mu); + natsSubAndLdw_Lock(sub); s = (sub->jsi->ssmn == false && sub->jsi->sm == false ? NATS_OK : NATS_ERR); - natsMutex_Unlock(sub->mu); + natsSubAndLdw_Unlock(sub); testCond(s == NATS_OK); test("Skip again: ");