From 6e8b1f1aa273c1a7f5dae5d93c230e7d157a8a7b Mon Sep 17 00:00:00 2001 From: Jason Sherman Date: Thu, 25 Jul 2024 15:08:40 -0700 Subject: [PATCH] Adjust subjects and message metadata. Add notes about consumers and provide an example. Signed-off-by: Jason Sherman --- .../Integrations/Event-Stream-Service.md | 135 +++++++++++++++--- 1 file changed, 119 insertions(+), 16 deletions(-) diff --git a/docs/Capabilities/Integrations/Event-Stream-Service.md b/docs/Capabilities/Integrations/Event-Stream-Service.md index 3a89167..538b20b 100644 --- a/docs/Capabilities/Integrations/Event-Stream-Service.md +++ b/docs/Capabilities/Integrations/Event-Stream-Service.md @@ -2,7 +2,7 @@ *** # CHEFS Event Stream Service -**NOTE (July 09, 2024) - this is in active development and not yet released. This documentation will be updated as development progresses.** +**NOTE (July 25, 2024) - this is in active development and not yet released. This documentation will be updated as development progresses.** CHEFS is adding an Event Streaming Service which allows Form Owners to consume and process real-time data about the forms (ex. a new version of the form has been published) and submissions. @@ -41,7 +41,7 @@ Under this stream will be two major subject prefixes: 2. `PRIVATE.forms` -Consumers specify which subjects they wish to process and often use wildcards. For example: a consumer can specify `PUBLIC.forms.>` and `PRIVATE.forms.*.submissions.>`. This would consume ALL public forms events and only private submission events. +Consumers specify which subjects they wish to process and often use wildcards. For example: a consumer can specify `PUBLIC.forms.>` and `PRIVATE.forms.submissions.>`. This would consume ALL public forms events and only private submission events. `PUBLIC.forms` can be consumed by any client. These events will contain only metadata and no personal/private information. @@ -56,19 +56,18 @@ As stated, the two major subjects are `PUBLIC.forms` and `PRIVATE.forms`; these `PUBLIC|PRIVATE..
..` - domain = `forms` -- form id = the id of the form firing these events - class = `schema` or `submission`. - type = `created`, `deleted`, `modified` and more - +- form id = the id of the form firing these events +- Using wildcards a consumer could listen for events on a specific form id or a specific type (i.e. only listen to submission created events across any form). -- `PUBLIC.forms..schema.created` -- `PUBLIC.forms..schema.modified` -- `PUBLIC.forms..schema.deleted` +- `PUBLIC.forms.schema.published.` +- `PUBLIC.forms.schema.unpublished.` -- `PUBLIC.forms..submission.created` -- `PUBLIC.forms..submission.modified` -- `PUBLIC.forms..submission.deleted` +- `PUBLIC.forms.submission.created.` +- `PUBLIC.forms.submission.modified.` +- `PUBLIC.forms.submission.deleted.` More events to come. @@ -76,15 +75,12 @@ More events to come. | Attribute | Notes | | --- | --- | -| seqNo | A CHEFS system generated and tracked sequence number for ordering messages | -| timestamp | UTC Timestamp when event was added to stream. This is **not** the timestamp of the CHEFS form or submission record. | | `source` | `chefs` - where did this event originate? | | `domain` | `forms` - top level classification for event | | `class` | `submission` or `schema` - secondary classification for event | | `type` | `created`, `deleted`, `modified` - tertiary classification for event | | `formId` | uuid - CHEFS form id . Form that originates the event | | `formVersionId` | uuid - CHEFS form version id. Only if value exists at time of event. | -| `published` | boolean - For `schema` class events, the formVersion.published value. | `submissionId` | uuid - CHEFS submission id. Only applies for `submission` class events. | | `draft` | boolean - For `submission` class events, the submission.draft value | @@ -93,8 +89,6 @@ An example to show the overall structure of an event message is: ```json { meta: { - seqNo: , - timestamp: , source: 'chefs', domain: 'forms', class: 'submission' @@ -105,8 +99,117 @@ An example to show the overall structure of an event message is: draft: false, }, payload: { - submission: + data: + } + +``` + +### NATS Message Metadata + +nats messages contain very valuable metadata that consumers should leverage for optimal processing. Each message on the stream will have a [sequence number](https://github.com/nats-io/nats.docs/blob/803d660c33496c9b7ba42360945be58621bbba0b/nats-concepts/seq_num.md) and a timestamp. Consumers can schedule batch consumption based on the sequence or timestamp of their last processed event. + +### Example Consumer +The following is a trivialized example of a [pull consumer](https://docs.nats.io/nats-concepts/jetstream/consumers). It is up to the external application that consumes/listens to the events to decide how to set up their consumer. This is one way in one language (JavaScript). Please review the documentation about [consumers](https://docs.nats.io/nats-concepts/jetstream/consumers) and review the approved [examples](https://natsbyexample.com) for more information. + +The example will ask for a batch of messages every 5 seconds - illustrating some basic pull behaviour. We can see as it processes the messages that there is a sequence number (`m.seq`) and a timestamp (`m.info.timestampNanos`) which we could leverage for different [delivery policies](https://docs.nats.io/nats-concepts/jetstream/consumers#deliverpolicy) such as get all messages since X sequence number. + +**Do not use this code for your client! Simplified example only!** + +```js +const { AckPolicy, connect } = require("nats"); + +// connection info +const servers = ["localhost:4222", "localhost:4223", "localhost:4224"]; + +let nc = undefined; // nats connection +let js = undefined; // jet stream +let jsm = undefined; // jet stream manager +let consumer = undefined; // pull consumer (ordered, ephemeral) + +// stream info +const STREAM_NAME = "CHEFS"; +const FILTER_SUBJECTS = ["PUBLIC.forms.>", "PRIVATE.forms.>"]; +const MAX_MESSAGES = 2; +const DURABLE_NAME = "pullConsumer"; + +const printMsg = (m) => { + // illustrate grabbing the sequence and timestamp from the nats message... + try { + const ts = new Date(m.info.timestampNanos / 1000000).toISOString(); + console.log( + `msg seq: ${m.seq}, subject: ${m.subject}, timestamp: ${ts}, streamSequence: ${m.info.streamSequence}, deliverySequence: ${m.info.deliverySequence}` + ); + // illustrate (one way of) grabbing message content as json + console.log(JSON.stringify(m.json(), null, 2)); + } catch (e) { + console.error(`Error printing message: ${e.message}`); + } +}; + +const init = async () => { + if (nc && nc.info != undefined) { + // already connected. + return; + } else { + // open a connection... + try { + // no credentials provided. + // anonymous connections have read access to the stream + console.log(`connect to nats server(s) ${servers} as 'anonymous'...`); + nc = await connect({ + servers: servers, + }); + + console.log("access jetstream..."); + js = nc.jetstream(); + console.log("get jetstream manager..."); + jsm = await js.jetstreamManager(); + await jsm.consumers.add(STREAM_NAME, { + ack_policy: AckPolicy.Explicit, + durable_name: DURABLE_NAME, + }); + consumer = await js.consumers.get(STREAM_NAME, DURABLE_NAME); + } catch (e) { + console.error(e); + process.exit(0); + } + } +}; + +const pull = async () => { + console.log("fetch..."); + let iter = await consumer.fetch({ + filterSubjects: FILTER_SUBJECTS, + max_messages: MAX_MESSAGES, + }); + for await (const m of iter) { + printMsg(m); + m.ack(); } +}; + +const main = async () => { + await init(); + await pull(); + setTimeout(main, 5000); // process a batch every 5 seconds +}; + +main(); + +const shutdown = async () => { + console.log("\nshutdown..."); + console.log("drain connection..."); + await nc.drain(); + process.exit(0); +}; + +process.on("SIGTERM", shutdown); +process.on("SIGINT", shutdown); +process.on("SIGUSR1", shutdown); +process.on("SIGUSR2", shutdown); +process.on("exit", () => { + console.log("exit."); +}); ```