Skip to content

Commit

Permalink
Merge pull request nats-io#535 from nats-io/kv_update_discard_policy
Browse files Browse the repository at this point in the history
[UPDATED] js_CreateKeyValue updates discard policy to new
  • Loading branch information
kozlovic authored Mar 15, 2022
2 parents b8724de + 77f49e5 commit d8c3445
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 3 deletions.
75 changes: 72 additions & 3 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,54 @@ _createKV(kvStore **new_kv, jsCtx *js, const char *bucket)
return NATS_UPDATE_ERR_STACK(s);
}

static bool
_sameStrings(const char *s1, const char *s2)
{
bool s1Empty = nats_IsStringEmpty(s1);
bool s2Empty = nats_IsStringEmpty(s2);

// Same if both empty.
if (s1Empty && s2Empty)
return true;

// Not same if one is empty while other is not.
if ((s1Empty && !s2Empty) || (!s1Empty && s2Empty))
return false;

// Return result of comparison of s1 and s2
return (strcmp(s1, s2) == 0 ? true : false);
}

static bool
_sameStreamCfg(jsStreamConfig *oc, jsStreamConfig *nc)
{
// Check some of the stream's configuration properties only,
// the ones that we set when creating a KV stream.
if (!_sameStrings(oc->Description, nc->Description))
return false;
if (oc->SubjectsLen != nc->SubjectsLen)
return false;
if (!_sameStrings(oc->Subjects[0], nc->Subjects[0]))
return false;
if (oc->MaxMsgsPerSubject != nc->MaxMsgsPerSubject)
return false;
if (oc->MaxBytes != nc->MaxBytes)
return false;
if (oc->MaxAge != nc->MaxAge)
return false;
if (oc->MaxMsgSize != nc->MaxMsgSize)
return false;
if (oc->Storage != nc->Storage)
return false;
if (oc->Replicas != nc->Replicas)
return false;
if (oc->AllowRollup != nc->AllowRollup)
return false;
if (oc->DenyDelete != nc->DenyDelete)
return false;
return true;
}

natsStatus
js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
{
Expand Down Expand Up @@ -222,15 +270,19 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
}
if (s == NATS_OK)
{
int64_t maxBytes = (cfg->MaxBytes == 0 ? -1 : cfg->MaxBytes);
int32_t maxMsgSize = (cfg->MaxValueSize == 0 ? -1 : cfg->MaxValueSize);
jsErrCode jerr = 0;

jsStreamConfig_Init(&sc);
sc.Name = kv->stream;
sc.Description = cfg->Description;
sc.Subjects = (const char*[1]){subject};
sc.SubjectsLen = 1;
sc.MaxMsgsPerSubject = history;
sc.MaxBytes = cfg->MaxBytes;
sc.MaxBytes = maxBytes;
sc.MaxAge = cfg->TTL;
sc.MaxMsgSize = cfg->MaxValueSize;
sc.MaxMsgSize = maxMsgSize;
sc.Storage = cfg->StorageType;
sc.Replicas = replicas;
sc.AllowRollup = true;
Expand All @@ -240,7 +292,24 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
if (natsConn_srvVersionAtLeast(kv->js->nc, 2, 7, 2))
sc.Discard = js_DiscardNew;

s = js_AddStream(NULL, js, &sc, NULL, NULL);
s = js_AddStream(NULL, js, &sc, NULL, &jerr);
if ((s != NATS_OK) && (jerr == JSStreamNameExistErr))
{
jsStreamInfo *si = NULL;

nats_clearLastError();
s = js_GetStreamInfo(&si, js, sc.Name, NULL, NULL);
if (s == NATS_OK)
{
si->Config->Discard = sc.Discard;
if (_sameStreamCfg(si->Config, &sc))
s = js_UpdateStream(NULL, js, &sc, NULL, NULL);
else
s = nats_setError(NATS_ERR, "%s",
"Existing configuration is different");
}
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
*new_kv = kv;
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ KeyValueDeleteVsPurge
KeyValueDeleteTombstones
KeyValueDeleteMarkerThreshold
KeyValueCrossAccount
KeyValueDiscardOldToNew
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
83 changes: 83 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -28790,6 +28790,88 @@ test_KeyValueCrossAccount(void)
remove(confFile);
}

static natsStatus
_checkDiscard(jsCtx *js, jsDiscardPolicy expected, kvStore **newKV)
{
kvStore *kv = NULL;
jsStreamInfo *si = NULL;
kvConfig kvc;
natsStatus s;

kvConfig_Init(&kvc);
kvc.Bucket = "TEST";
s = js_CreateKeyValue(&kv, js, &kvc);
IFOK(s, js_GetStreamInfo(&si, js, "KV_TEST", NULL, NULL));
IFOK(s, (si->Config->Discard == expected ? NATS_OK : NATS_ERR));

jsStreamInfo_Destroy(si);

*newKV = kv;

return s;
}

static void
test_KeyValueDiscardOldToNew(void)
{
kvStore *kv = NULL;
kvEntry *e = NULL;
kvConfig kvc;
natsStatus s;
int i;

JS_SETUP(2, 7, 2);

// We are going to go from 2.7.1->2.7.2->2.7.1 and 2.7.2 again.
for (i=0; i<2; i++)
{
// Change the server version in the connection to
// create as-if we were connecting to a v2.7.1 server.
natsConn_Lock(nc);
nc->srvVersion.ma = 2;
nc->srvVersion.mi = 7;
nc->srvVersion.up = 1;
natsConn_Unlock(nc);

test("Check discard (old): ");
s = _checkDiscard(js, js_DiscardOld, &kv);
if ((s == NATS_OK) && (i == 0))
s = kvStore_PutString(NULL, kv, "foo", "value");
testCond(s == NATS_OK);
kvStore_Destroy(kv);
kv = NULL;

// Now change version to 2.7.2
natsConn_Lock(nc);
nc->srvVersion.ma = 2;
nc->srvVersion.mi = 7;
nc->srvVersion.up = 2;
natsConn_Unlock(nc);

test("Check discard (new): ");
s = _checkDiscard(js, js_DiscardNew, &kv);
IFOK(s, kvStore_Get(&e, kv, "foo"));
if ((s == NATS_OK) && (strcmp(kvEntry_ValueString(e), "value") != 0))
s = NATS_ERR;
testCond(s == NATS_OK);
kvEntry_Destroy(e);
e = NULL;
kvStore_Destroy(kv);
kv = NULL;
}

test("Check that other changes are rejected: ");
kvConfig_Init(&kvc);
kvc.Bucket = "TEST";
kvc.MaxBytes = 1024*1024;
s = js_CreateKeyValue(&kv, js, &kvc);
testCond((s == NATS_ERR)
&& (strstr(nats_GetLastError(NULL), "configuration is different") != NULL));
kvStore_Destroy(kv);

JS_TEARDOWN;
}

#if defined(NATS_HAS_STREAMING)

static int
Expand Down Expand Up @@ -31245,6 +31327,7 @@ static testInfo allTests[] =
{"KeyValueDeleteTombstones", test_KeyValueDeleteTombstones},
{"KeyValueDeleteMarkerThreshold", test_KeyValuePurgeDeletesMarkerThreshold},
{"KeyValueCrossAccount", test_KeyValueCrossAccount},
{"KeyValueDiscardOldToNew", test_KeyValueDiscardOldToNew},

#if defined(NATS_HAS_STREAMING)
{"StanPBufAllocator", test_StanPBufAllocator},
Expand Down

0 comments on commit d8c3445

Please sign in to comment.