Skip to content

Commit

Permalink
Merge pull request nats-io#604 from nats-io/js_ordered_cons
Browse files Browse the repository at this point in the history
Clone consumer config prior to call AddConsumer on ordred cons recreate
  • Loading branch information
kozlovic authored Oct 12, 2022
2 parents b27cf1e + 81254f1 commit 0869715
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2485,6 +2485,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha

if (opts->Ordered)
{
const char *tmp = NULL;

cfg->FlowControl = true;
cfg->AckPolicy = js_AckNone;
cfg->MaxDeliver = 1;
Expand All @@ -2494,7 +2496,12 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
cfg->MemoryStorage = true;
cfg->Replicas = 1;

// Let's clone without the delivery subject because it will need
// to be set to something new when recreating it anyway.
tmp = cfg->DeliverSubject;
cfg->DeliverSubject = NULL;
s = js_cloneConsumerConfig(cfg, &ocCfg);
cfg->DeliverSubject = tmp;
}
else
{
Expand Down Expand Up @@ -3019,21 +3026,16 @@ _recreateOrderedCons(void *closure)
natsSub_Lock(sub);
t = oci->thread;
jsi = sub->jsi;
s = js_cloneConsumerConfig(jsi->ocCfg, &cc);
natsSub_Unlock(sub);

NATS_FREE((char*) jsi->ocCfg->DeliverSubject);
jsi->ocCfg->DeliverSubject = NULL;
DUP_STRING(s, jsi->ocCfg->DeliverSubject, sub->subject);
if (s == NATS_OK)
{
// Create consumer request for starting policy.
cc = jsi->ocCfg;
cc->DeliverPolicy = js_DeliverByStartSequence;
cc->OptStartSeq = oci->sseq;
}
natsSub_Unlock(sub);
cc->DeliverSubject = oci->ndlv;
cc->DeliverPolicy = js_DeliverByStartSequence;
cc->OptStartSeq = oci->sseq;

if (s == NATS_OK)
{
s = js_AddConsumer(&ci, jsi->js, jsi->stream, cc, NULL, NULL);
if (s == NATS_OK)
{
Expand All @@ -3050,6 +3052,11 @@ _recreateOrderedCons(void *closure)

jsConsumerInfo_Destroy(ci);
}

// Clear cc->DeliverSubject now before destroying it (since
// cc->DeliverSubject points to oci->ndlv (not a copy)
cc->DeliverSubject = NULL;
js_destroyConsumerConfig(cc);
}
}
if (s != NATS_OK)
Expand Down

0 comments on commit 0869715

Please sign in to comment.