Skip to content

Commit

Permalink
Merge pull request nats-io#486 from nats-io/js_update_examples
Browse files Browse the repository at this point in the history
[UPDATED] JetStream examples
  • Loading branch information
kozlovic authored Nov 11, 2021
2 parents a8b3850 + d368a3b commit 9c54a4d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 3 deletions.
4 changes: 3 additions & 1 deletion examples/js-pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ int main(int argc, char **argv)

// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
// Since we don't provide subjects, the subjects it will default to the stream name.
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
Expand Down
65 changes: 63 additions & 2 deletions examples/js-sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int main(int argc, char **argv)
jsOptions jsOpts;
jsSubOptions so;
natsStatus s;
bool delStream = false;

opts = parseArgs(argc, argv, usage);

Expand All @@ -82,12 +83,50 @@ int main(int argc, char **argv)
{
so.Stream = stream;
so.Consumer = durable;
so.Config.FlowControl = flowctrl;
if (flowctrl)
{
so.Config.FlowControl = true;
so.Config.Heartbeat = (int64_t)1E9;
}
}

if (s == NATS_OK)
s = natsConnection_JetStream(&js, conn, &jsOpts);

if (s == NATS_OK)
{
jsStreamInfo *si = NULL;

// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;

// Since we are the one creating this stream, we can delete at the end.
delStream = true;

// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);

// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}

if (s == NATS_OK)
{
if (pull)
Expand Down Expand Up @@ -152,9 +191,31 @@ int main(int argc, char **argv)

if (s == NATS_OK)
{
printStats(STATS_IN|STATS_COUNT,conn, sub, stats);
printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
printPerf("Received");
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;

// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);

jsStreamInfo_Destroy(si);
}
if (delStream)
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
}
else
{
printf("Error: %d - %s - jerr=%d\n", s, natsStatus_GetText(s), jerr);
Expand Down

0 comments on commit 9c54a4d

Please sign in to comment.