Skip to content

Commit

Permalink
Merge pull request #54 from pozil/pozil/publishBatch
Browse files Browse the repository at this point in the history
feat: publish event batch
  • Loading branch information
pozil authored Dec 6, 2024
2 parents 95cee29 + 7031b88 commit 154199f
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 140 deletions.
181 changes: 131 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ See the [official Pub/Sub API repo](https://github.com/developerforce/pub-sub-ap
- [Logging](#logging)
- [Quick Start Example](#quick-start-example)
- [Other Examples](#other-examples)
- [Publish a platform event](#publish-a-platform-event)
- [Publish a single platform event](#publish-a-single-platform-event)
- [Publish a batch of platform events](#publish-a-batch-of-platform-events)
- [Subscribe with a replay ID](#subscribe-with-a-replay-id)
- [Subscribe to past events in retention window](#subscribe-to-past-events-in-retention-window)
- [Subscribe using a managed subscription](#subscribe-using-a-managed-subscription)
Expand All @@ -24,6 +25,7 @@ See the [official Pub/Sub API repo](https://github.com/developerforce/pub-sub-ap
- [Common Issues](#common-issues)
- [Reference](#reference)
- [PubSubApiClient](#pubsubapiclient)
- [PublishCallback](#publishcallback)
- [SubscribeCallback](#subscribecallback)
- [SubscriptionInfo](#subscriptioninfo)
- [EventParseError](#eventparseerror)
Expand All @@ -44,7 +46,7 @@ In v4 and earlier versions of this client:
In v5:

- you pass your configuration with an object in the client constructor. The `.env` file is no longer a requirement, you are free to store your configuration where you want.
- you connect with a unique `connect()` method.
- you connect with a unique [`connect()`](#async-connect--promisevoid) method.

### Event handling

Expand Down Expand Up @@ -206,34 +208,38 @@ Here's an example that will get you started quickly. It listens to up to 3 accou
// Prepare event callback
const subscribeCallback = (subscription, callbackType, data) => {
if (callbackType === 'event') {
// Event received
console.log(
`${subscription.topicName} - ``Handling ${event.payload.ChangeEventHeader.entityName} change event ` +
`with ID ${event.replayId} ` +
`(${subscription.receivedEventCount}/${subscription.requestedEventCount} ` +
`events received so far)`
);
// Safely log event payload as a JSON string
console.log(
JSON.stringify(
event,
(key, value) =>
/* Convert BigInt values into strings and keep other types unchanged */
typeof value === 'bigint'
? value.toString()
: value,
2
)
);
} else if (callbackType === 'lastEvent') {
// Last event received
console.log(
`${subscription.topicName} - Reached last of ${subscription.requestedEventCount} requested event on channel. Closing connection.`
);
} else if (callbackType === 'end') {
// Client closed the connection
console.log('Client shut down gracefully.');
switch (callbackType) {
case 'event':
// Event received
console.log(
`${subscription.topicName} - Handling ${event.payload.ChangeEventHeader.entityName} change event ` +
`with ID ${data.replayId} ` +
`(${subscription.receivedEventCount}/${subscription.requestedEventCount} ` +
`events received so far)`
);
// Safely log event payload as a JSON string
console.log(
JSON.stringify(
data,
(key, value) =>
/* Convert BigInt values into strings and keep other types unchanged */
typeof value === 'bigint'
? value.toString()
: value,
2
)
);
break;
case 'lastEvent':
// Last event received
console.log(
`${subscription.topicName} - Reached last of ${subscription.requestedEventCount} requested event on channel. Closing connection.`
);
break;
case 'end':
// Client closed the connection
console.log('Client shut down gracefully.');
break;
}
};
Expand Down Expand Up @@ -345,9 +351,12 @@ Here's an example that will get you started quickly. It listens to up to 3 accou
## Other Examples
### Publish a platform event
### Publish a single platform event
Publish a `Sample__e` Platform Event with a `Message__c` field:
> [!NOTE]
> For best performances, use `publishBatch` when publishing event batches.
Publish a single `Sample__e` platform events with a `Message__c` field using [publish](#async-publishtopicname-payload-correlationkey--promisepublishresult):
```js
const payload = {
Expand All @@ -359,6 +368,35 @@ const publishResult = await client.publish('/event/Sample__e', payload);
console.log('Published event: ', JSON.stringify(publishResult));
```
### Publish a batch of platform events
Publish a batch of `Sample__e` platform events using [publishBatch](#async-publishbatchtopicname-events-publishcallback):
```js
// Prepare publish callback
const publishCallback = (info, callbackType, data) => {
switch (callbackType) {
case 'publishResponse':
console.log(JSON.stringify(data));
break;
}
};
// Prepare events
const events = [
{
payload: {
CreatedDate: new Date().getTime(), // Non-null value required but there's no validity check performed on this field
CreatedById: '005_________', // Valid user ID
Message__c: { string: 'Hello world' } // Field is nullable so we need to specify the 'string' type
}
}
];
// Publish event batch
client.publishBatch('/event/Sample__e', events, publishCallback);
```
### Subscribe with a replay ID
Subscribe to 5 account change events starting from a replay ID:
Expand Down Expand Up @@ -416,7 +454,7 @@ When working with high volumes of events you can control the incoming flow of ev
This is the overall process:
1. Pass a number of requested events in your subscribe call.
1. Handle the `lastevent` callback type from subscribe callback to detect the end of the event batch.
1. Handle the `lastevent` [callback type](#subscribecallback) from subscribe callback to detect the end of the event batch.
1. Subscribe to an additional batch of events with `client.requestAdditionalEvents(...)`. If you don't request additional events at this point, the gRPC subscription will close automatically (default Pub/Sub API behavior).
The code below illustrate how you can achieve event flow control:
Expand All @@ -429,20 +467,24 @@ try {
// Prepare event callback
const subscribeCallback = (subscription, callbackType, data) => {
if (callbackType === 'event') {
// Logic for handling a single event.
// Unless you request additional events later, this should get called up to 10 times
// given the initial subscription boundary.
} else if (callbackType === 'lastEvent') {
// Last event received
console.log(
`${eventEmitter.getTopicName()} - Reached last requested event on channel.`
);
// Request 10 additional events
client.requestAdditionalEvents(eventEmitter, 10);
} else if (callbackType === 'end') {
// Client closed the connection
console.log('Client shut down gracefully.');
switch (callbackType) {
case 'event':
// Logic for handling a single event.
// Unless you request additional events later, this should get called up to 10 times
// given the initial subscription boundary.
break;
case 'lastEvent':
// Last event received
console.log(
`${eventEmitter.getTopicName()} - Reached last requested event on channel.`
);
// Request 10 additional events
client.requestAdditionalEvents(eventEmitter, 10);
break;
case 'end':
// Client closed the connection
console.log('Client shut down gracefully.');
break;
}
};
Expand Down Expand Up @@ -541,16 +583,26 @@ Returns: Promise that holds the channel's [connectivity state](https://grpc.gith
#### `async publish(topicName, payload, [correlationKey]) → {Promise.<PublishResult>}`
Publishes a payload to a topic using the gRPC client.
Publishes an payload to a topic using the gRPC client. This is a synchronous operation, use `publishBatch` when publishing event batches.
Returns: Promise holding a `PublishResult` object with `replayId` and `correlationKey`.
| Name | Type | Description |
| ---------------- | ------ | ----------------------------------------------------------------------------------------- |
| `topicName` | string | name of the topic that we're subscribing to |
| `payload` | Object | |
| `topicName` | string | name of the topic that we're publishing on |
| `payload` | Object | payload of the event that is being published |
| `correlationKey` | string | optional correlation key. If you don't provide one, we'll generate a random UUID for you. |
#### `async publishBatch(topicName, events, publishCallback)`
Publishes a batch of events using the gRPC client's publish stream.
| Name | Type | Description |
| ----------------- | ----------------------------------- | ------------------------------------------------- |
| `topicName` | string | name of the topic that we're publishing on |
| `events` | [PublisherEvent](#publisherEvent)[] | events to be published |
| `publishCallback` | [PublishCallback](#publishCallback) | callback function for handling publish responses. |
#### `async subscribe(topicName, subscribeCallback, [numRequested])`
Subscribes to a topic.
Expand Down Expand Up @@ -612,6 +664,35 @@ Request additional events on an existing managed subscription.
| `subscriptionId` | string | managed subscription ID. |
| `numRequested` | number | number of events requested. |
### PublishCallback
Callback function that lets you process batch publish responses.
The function takes three parameters:
| Name | Type | Description |
| -------------- | ----------------------- | --------------------------------------------------------------------- |
| `info` | `{ topicName: string }` | callback information |
| `callbackType` | string | name of the callback type (see table below). |
| `data` | [Object] | data that is passed with the callback (depends on the callback type). |
Callback types:
| Name | Callback Data | Description |
| ----------------- | ------------------------------------- | -------------------------------------------------------------------------------------------------------- |
| `publishResponse` | [PublishResponse](#publishresponse) | Client received a publish response. The attached data is the publish confirmation for a batch of events. |
| `error` | Object | Signals an event publishing error or a gRPC stream error. |
| `grpcKeepalive` | `{ schemaId: string, rpcId: string }` | Server publishes this gRPC keep alive message every 270 seconds (or less) if there are no events. |
| `grpcStatus` | Object | Misc gRPC stream status information. |
#### PublishResponse
| Name | Type | Description |
| ---------- | ------------------------------------------------ | -------------------------------------------------------------------------------------------- |
| `schemaId` | string | topic schema ID |
| `rpcId` | string | RPC ID |
| `results` | `{ replayId: string, correlationKey: string }[]` | Event publish confirmations. Each confirmation contains the replay ID and a correlation key. |
### SubscribeCallback
Callback function that lets you process incoming Pub/Sub API events while keeping track of the topic name and the volume of events requested/received.
Expand Down
Loading

0 comments on commit 154199f

Please sign in to comment.