Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support annotations (rest and realtime)
Browse files Browse the repository at this point in the history
SimonWoolf committed Jan 27, 2025
1 parent 29eb528 commit 71439a3
Showing 19 changed files with 647 additions and 19 deletions.
193 changes: 193 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
@@ -953,6 +953,18 @@ export interface RestHistoryParams {
limit?: number;
}

/**
* Describes the parameters accepted by {@link RestAnnotations.get}.
*/
export interface GetAnnotationsParams {
/**
* An upper limit on the number of messages returned. The default is 100, and the maximum is 1000.
*
* @defaultValue 100
*/
limit?: number;
}

/**
* The `RestPresenceParams` interface describes the parameters accepted by {@link Presence.get}.
*/
@@ -1989,6 +2001,68 @@ export declare interface RealtimePresence {
leaveClient(clientId: string, data?: any): Promise<void>;
}

/**
* Functionality for annotating messages with small pieces of data, such as emoji
* reactions, that the server will roll up into the message as a summary.
*/
export declare interface RealtimeAnnotations {
/**
* Registers a listener that is called each time an {@link Annotation} matching a given refType.
* Note that if you want to receive individual realtime annotations (instead of just the rolled-up summaries), you will need to request the annotation_subscribe ChannelMode in ChannelOptions, since they are not delivered by default. In general, most clients will not bother with subscribing to individual annotations, and will instead just look at the summary updates.
*
* @param refType - A specific refType string or an array of them to register the listener for.
* @param listener - An event listener function.
* @returns A promise which resolves upon success of the channel {@link RealtimeChannel.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
subscribe(refType: string | Array<string>, listener?: messageCallback<PresenceMessage>): Promise<void>;
/**
* Registers a listener that is called each time an {@link Annotation} is received on the channel.
* Note that if you want to receive individual realtime annotations (instead of just the rolled-up summaries), you will need to request the annotation_subscribe ChannelMode in ChannelOptions, since they are not delivered by default. In general, most clients will not bother with subscribing to individual annotations, and will instead just look at the summary updates.
*
* @param listener - An event listener function.
* @returns A promise which resolves upon success of the channel {@link RealtimeChannel.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
subscribe(listener?: messageCallback<Annotation>): Promise<void>;
/**
* Deregisters a specific listener that is registered to receive {@link Annotation} on the channel for a given refType.
*
* @param refType - A specific refType (or array of refTypes) to deregister the listener for.
* @param listener - An event listener function.
*/
unsubscribe(refType: string | Array<string>, listener: messageCallback<Annotation>): void;
/**
* Deregisters any listener that is registered to receive {@link Annotation} on the channel for a specific refType
*
* @param refType - A specific refType (or array of refTypes) to deregister the listeners for.
*/
unsubscribe(refType: string | Array<string>): void;
/**
* Deregisters a specific listener that is registered to receive {@link Annotation} on the channel.
*
* @param listener - An event listener function.
*/
unsubscribe(listener: messageCallback<Annotation>): void;
/**
* Deregisters all listeners currently receiving {@link Annotation} for the channel.
*/
unsubscribe(): void;
/**
* Publish a new annotation for a message.
*
* @param refSerial - The `serial` of the message to annotate.
* @param refType - What type of annotation you want.
* @param data - The contents of the annotation.
*/
publish(refSerial: string, refType: string, data: string | ArrayBuffer | Uint8Array): Promise<void>;
/**
* Get all annotations for a given message (as a paginated result)
*
* @param serial - The `serial` of the message to get annotations for.
* @param params - Restrictions on which annotations to get (in particular a limit)
*/
get(serial: string, params: GetAnnotationsParams | null): Promise<void>;
}

/**
* Enables devices to subscribe to push notifications for a channel.
*/
@@ -2035,6 +2109,10 @@ export declare interface Channel {
* A {@link Presence} object.
*/
presence: Presence;
/**
* {@link RestAnnotations}
*/
annotations: RestAnnotations;
/**
* A {@link PushChannel} object.
*/
@@ -2079,6 +2157,28 @@ export declare interface Channel {
status(): Promise<ChannelDetails>;
}

/**
* Functionality for annotating messages with small pieces of data, such as emoji
* reactions, that the server will roll up into the message as a summary.
*/
export declare interface RestAnnotations {
/**
* Publish a new annotation for a message.
*
* @param refSerial - The `serial` of the message to annotate.
* @param refType - What type of annotation you want.
* @param data - The contents of the annotation.
*/
publish(refSerial: string, refType: string, data: string | ArrayBuffer | Uint8Array): Promise<void>;
/**
* Get all annotations for a given message (as a paginated result)
*
* @param serial - The `serial` of the message to get annotations for.
* @param params - Restrictions on which annotations to get (in particular a limit)
*/
get(serial: string, params: GetAnnotationsParams | null): Promise<void>;
}

/**
* Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {@link RealtimePresence} object of a channel.
*/
@@ -2151,6 +2251,10 @@ export declare interface RealtimeChannel extends EventEmitter<channelEventCallba
* A {@link RealtimePresence} object.
*/
presence: RealtimePresence;
/**
* {@link RealtimeAnnotations}
*/
annotations: RealtimeAnnotations;
/**
* Attach to this channel ensuring the channel is created in the Ably system and all messages published on the channel are received by any channel listeners registered using {@link RealtimeChannel.subscribe | `subscribe()`}. Any resulting channel state change will be emitted to any listeners registered using the {@link EventEmitter.on | `on()`} or {@link EventEmitter.once | `once()`} methods. As a convenience, `attach()` is called implicitly if {@link RealtimeChannel.subscribe | `subscribe()`} for the channel is called, or {@link RealtimePresence.enter | `enter()`} or {@link RealtimePresence.subscribe | `subscribe()`} are called on the {@link RealtimePresence} object for this channel.
*
@@ -2382,6 +2486,48 @@ export interface Message {
operation?: Operation;
}

/**
* An annotation to a message, received from Ably
*/
export interface Annotation {
/**
* Unique ID assigned by Ably to this annotation.
*/
id: string;
/**
* The client ID of the publisher of this annotation (if any).
*/
clientId?: string;
/**
* The annotation payload, if provided.
*/
data?: any;
/**
* This is typically empty, as all annotations received from Ably are automatically decoded client-side using this value. However, if the annotation encoding cannot be processed, this attribute contains the remaining transformations not applied to the `data` payload.
*/
encoding?: string;
/**
* Timestamp of when the annotation was received by Ably, as milliseconds since the Unix epoch.
*/
timestamp: number;
/**
* The action, whether this is an annotation being added or removed, one of the {@link AnnotationAction} enum values.
*/
action: AnnotationAction;
/**
* This message's unique serial (lexicographically totally ordered).
*/
serial: string;
/**
* The serial of the message (of type message.create) that this annotation is annotating.
*/
refSerial: string;
/**
* The kind of annotation it is (for example, an emoji reaction)
*/
refType: string;
}

/**
* Contains the details of an operation, such as update or deletion, supplied by the actioning client.
*/
@@ -2437,6 +2583,25 @@ export type MessageAction =
| MessageActions.META_OCCUPANCY
| MessageActions.MESSAGE_SUMMARY;

/**
* The namespace containing the different types of annotation actions.
*/
declare namespace AnnotationActions {
/**
* Annotation action for a created annotation.
*/
type ANNOTATION_CREATE = 'annotation.create';
/**
* Annotation action for a deleted annotation.
*/
type ANNOTATION_DELETE = 'annotation.delete';
}

/**
* The possible values of the 'action' field of an {@link Annotation}.
*/
export type AnnotationAction = AnnotationActions.ANNOTATION_CREATE | AnnotationActions.ANNOTATION_DELETE;

/**
* A message received from Ably.
*/
@@ -2529,6 +2694,26 @@ export interface PresenceMessageStatic {
fromValues(values: Partial<Pick<PresenceMessage, 'clientId' | 'data' | 'extras'>>): PresenceMessage;
}

/**
* Static utilities related to annotations.
*/
export interface AnnotationStatic {
/**
* Decodes and decrypts a deserialized `Annotation`-like object using the cipher in {@link ChannelOptions}. Any residual transforms that cannot be decoded or decrypted will be in the `encoding` property. Intended for users receiving messages from a source other than a REST or Realtime channel (for example a queue) to avoid having to parse the encoding string and action.
*
* @param JsonObject - The deserialized `Annotation`-like object to decode and decrypt.
* @param channelOptions - A {@link ChannelOptions} object containing the cipher.
*/
fromEncoded: (JsonObject: any, channelOptions?: ChannelOptions) => Promise<Annotation>;
/**
* Decodes and decrypts an array of deserialized `Annotation`-like object using the cipher in {@link ChannelOptions}. Any residual transforms that cannot be decoded or decrypted will be in the `encoding` property. Intended for users receiving messages from a source other than a REST or Realtime channel (for example a queue) to avoid having to parse the encoding string.
*
* @param JsonArray - An array of deserialized `Annotation`-like objects to decode and decrypt.
* @param channelOptions - A {@link ChannelOptions} object containing the cipher.
*/
fromEncodedArray: (JsonArray: any[], channelOptions?: ChannelOptions) => Promise<Annotation[]>;
}

/**
* Cipher Key used in {@link CipherParamOptions}. If set to a `string`, the value must be base64 encoded.
*/
@@ -2889,6 +3074,10 @@ export declare class Rest implements RestClient {
* Static utilities related to presence messages.
*/
static PresenceMessage: PresenceMessageStatic;
/**
* Static utilities related to annotations.
*/
static Annotation: AnnotationStatic;

// Requirements of RestClient

@@ -2940,6 +3129,10 @@ export declare class Realtime implements RealtimeClient {
* Static utilities related to presence messages.
*/
static PresenceMessage: PresenceMessageStatic;
/**
* Static utilities related to annotations.
*/
static Annotation: AnnotationStatic;

// Requirements of RealtimeClient

3 changes: 3 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import * as Utils from '../util/utils';
import Platform from '../../platform';
import { Rest } from './rest';
import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic';
import { AnnotationsPlugin } from './modularplugins';
import { throwMissingPluginError } from '../util/utils';
import { MsgPack } from 'common/types/msgpack';
import { HTTPRequestImplementations } from 'platform/web/lib/http/http';
@@ -46,6 +47,7 @@ class BaseClient {
// Extra HTTP request implementations available to this client, in addition to those in web’s Http.bundledRequestImplementations
readonly _additionalHTTPRequestImplementations: HTTPRequestImplementations | null;
private readonly __FilteredSubscriptions: typeof FilteredSubscriptions | null;
readonly _Annotations: AnnotationsPlugin | null;
readonly logger: Logger;
_device?: LocalDevice;

@@ -98,6 +100,7 @@ class BaseClient {
this._rest = options.plugins?.Rest ? new options.plugins.Rest(this) : null;
this._Crypto = options.plugins?.Crypto ?? null;
this.__FilteredSubscriptions = options.plugins?.MessageInteractions ?? null;
this._Annotations = options.plugins?.Annotations ?? null;
}

get rest(): Rest {
11 changes: 11 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
@@ -9,10 +9,14 @@ import { DefaultMessage } from '../types/defaultmessage';
import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import { DefaultAnnotation } from '../types/defaultannotation';
import WebSocketTransport from '../transport/websockettransport';
import { FilteredSubscriptions } from './filteredsubscriptions';
import { PresenceMap } from './presencemap';
import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage';
import RealtimeAnnotations from './realtimeannotations';
import RestAnnotations from './restannotations';
import Annotation, { WireAnnotation } from '../types/annotation';
import { Http } from 'common/types/http';
import Defaults from '../util/defaults';
import Logger from '../util/logger';
@@ -38,6 +42,12 @@ export class DefaultRealtime extends BaseRealtime {
PresenceMessage,
WirePresenceMessage,
},
Annotations: {
Annotation,
WireAnnotation,
RealtimeAnnotations,
RestAnnotations,
},
WebSocketTransport,
MessageInteractions: FilteredSubscriptions,
}),
@@ -62,6 +72,7 @@ export class DefaultRealtime extends BaseRealtime {

static Message = DefaultMessage;
static PresenceMessage = DefaultPresenceMessage;
static Annotation = DefaultAnnotation;

static _MsgPack: MsgPack | null = null;

11 changes: 11 additions & 0 deletions src/common/lib/client/defaultrest.ts
Original file line number Diff line number Diff line change
@@ -5,7 +5,11 @@ import Platform from 'common/platform';
import { DefaultMessage } from '../types/defaultmessage';
import { MsgPack } from 'common/types/msgpack';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import { DefaultAnnotation } from '../types/defaultannotation';
import { Http } from 'common/types/http';
import RealtimeAnnotations from './realtimeannotations';
import RestAnnotations from './restannotations';
import Annotation, { WireAnnotation } from '../types/annotation';
import Defaults from '../util/defaults';
import Logger from '../util/logger';

@@ -25,6 +29,12 @@ export class DefaultRest extends BaseRest {
...allCommonModularPlugins,
Crypto: DefaultRest.Crypto ?? undefined,
MsgPack: DefaultRest._MsgPack ?? undefined,
Annotations: {
Annotation,
WireAnnotation,
RealtimeAnnotations,
RestAnnotations,
},
}),
);
}
@@ -43,6 +53,7 @@ export class DefaultRest extends BaseRest {

static Message = DefaultMessage;
static PresenceMessage = DefaultPresenceMessage;
static Annotation = DefaultAnnotation;

static _MsgPack: MsgPack | null = null;

11 changes: 11 additions & 0 deletions src/common/lib/client/modularplugins.ts
Original file line number Diff line number Diff line change
@@ -2,10 +2,13 @@ import { Rest } from './rest';
import { IUntypedCryptoStatic } from '../../types/ICryptoStatic';
import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import RealtimeAnnotations from './realtimeannotations';
import RestAnnotations from './restannotations';
import XHRRequest from 'platform/web/lib/http/request/xhrrequest';
import fetchRequest from 'platform/web/lib/http/request/fetchrequest';
import { FilteredSubscriptions } from './filteredsubscriptions';
import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage';
import Annotation, { WireAnnotation } from '../types/annotation';
import { TransportCtor } from '../transport/transport';
import * as PushPlugin from 'plugins/push';

@@ -18,11 +21,19 @@ export type RealtimePresencePlugin = PresenceMessagePlugin & {
RealtimePresence: typeof RealtimePresence;
};

export type AnnotationsPlugin = {
Annotation: typeof Annotation;
WireAnnotation: typeof WireAnnotation;
RealtimeAnnotations: typeof RealtimeAnnotations;
RestAnnotations: typeof RestAnnotations;
};

export interface ModularPlugins {
Rest?: typeof Rest;
Crypto?: IUntypedCryptoStatic;
MsgPack?: MsgPack;
RealtimePresence?: RealtimePresencePlugin;
Annotations?: AnnotationsPlugin;
WebSocketTransport?: TransportCtor;
XHRPolling?: TransportCtor;
XHRRequest?: typeof XHRRequest;
93 changes: 93 additions & 0 deletions src/common/lib/client/realtimeannotations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import EventEmitter from '../util/eventemitter';
import Logger from '../util/logger';
import Annotation from '../types/annotation';
import { actions, flags } from '../types/protocolmessagecommon';
import { fromValues as protocolMessageFromValues } from '../types/protocolmessage';
import type { CipherOptions } from '../types/basemessage';
import ErrorInfo from '../types/errorinfo';
import RealtimeChannel from './realtimechannel';
import RestAnnotations, { RestGetAnnotationsParams } from './restannotations';
import type { PaginatedResult } from './paginatedresource';

class RealtimeAnnotations {
private channel: RealtimeChannel;
private logger: Logger;
private subscriptions: EventEmitter;

constructor(channel: RealtimeChannel) {
this.channel = channel;
this.logger = channel.logger;
this.subscriptions = new EventEmitter(this.logger);
}

async publish(refSerial: string, refType: string, data: any): Promise<void> {
const channelName = this.channel.name;
const annotation = Annotation.fromValues({
action: 'annotation.create',
refSerial,
refType,
data,
});

// TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken
const wireAnnotation = await annotation.encode(this.channel.channelOptions as CipherOptions);

this.channel._throwIfUnpublishableState();

Logger.logAction(
this.logger,
Logger.LOG_MICRO,
'RealtimeAnnotations.publish()',
'channelName = ' + channelName + ', sending annotation with refSerial = ' + refSerial + ', refType = ' + refType,
);

const pm = protocolMessageFromValues({
action: actions.ANNOTATION,
channel: channelName,
annotations: [wireAnnotation],
});
return this.channel.sendMessage(pm);
}

async subscribe(..._args: unknown[] /* [refType], listener */): Promise<void> {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
const listener = args[1];
const channel = this.channel;

if (channel.state === 'failed') {
throw ErrorInfo.fromValues(channel.invalidStateError());
}

await channel.attach();

if ((this.channel._mode & flags.ANNOTATION_SUBSCRIBE) === 0) {
throw new ErrorInfo(
"You're trying to add an annotation listener, but you haven't requested the annotation_subscribe channel mode in ChannelOptions, so this won't do anything (we only deliver annotations to clients who have explicitly requested them)",
93001,
400,
);
}

this.subscriptions.on(event, listener);
}

unsubscribe(..._args: unknown[] /* [event], listener */): void {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
const listener = args[1];
this.subscriptions.off(event, listener);
}

_processIncoming(annotations: Annotation[]): void {
for (const annotation of annotations) {
this.subscriptions.emit(annotation.refType || '', annotation);
}
}

async get(serial: string, params: RestGetAnnotationsParams | null): Promise<PaginatedResult<Annotation>> {
return RestAnnotations.prototype.get.call(this, serial, params);
}
}

export default RealtimeAnnotations;
26 changes: 26 additions & 0 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import { PaginatedResult } from './paginatedresource';
import type { PushChannel } from 'plugins/push';
import type { WirePresenceMessage } from '../types/presencemessage';
import type RealtimePresence from './realtimepresence';
import type RealtimeAnnotations from './realtimeannotations';

interface RealtimeHistoryParams {
start?: number;
@@ -57,12 +58,19 @@ class RealtimeChannel extends EventEmitter {
channelOptions: ChannelOptions;
client: BaseRealtime;
private _presence: RealtimePresence | null;
private _annotations: RealtimeAnnotations | null = null;
get presence(): RealtimePresence {
if (!this._presence) {
Utils.throwMissingPluginError('RealtimePresence');
}
return this._presence;
}
get annotations(): RealtimeAnnotations {
if (!this._annotations) {
Utils.throwMissingPluginError('Annotations');
}
return this._annotations;
}
connectionManager: ConnectionManager;
state: API.ChannelState;
subscriptions: EventEmitter;
@@ -96,6 +104,9 @@ class RealtimeChannel extends EventEmitter {
this.channelOptions = normaliseChannelOptions(client._Crypto ?? null, this.logger, options);
this.client = client;
this._presence = client._RealtimePresence ? new client._RealtimePresence.RealtimePresence(this) : null;
if (client._Annotations) {
this._annotations = new client._Annotations.RealtimeAnnotations(this);
}
this.connectionManager = client.connection.connectionManager;
this.state = 'initialized';
this.subscriptions = new EventEmitter(this.logger);
@@ -646,6 +657,21 @@ class RealtimeChannel extends EventEmitter {
break;
}

case actions.ANNOTATION: {
populateFieldsFromParent(message);
const options = this.channelOptions;
if (this._annotations) {
const annotations = await Promise.all(
(message.annotations || []).map((wpm) => {
return wpm.decode(options, this.logger);
}),
);

this._annotations._processIncoming(annotations);
}
break;
}

case actions.ERROR: {
/* there was a channel-specific error */
const err = message.error as ErrorInfo;
74 changes: 74 additions & 0 deletions src/common/lib/client/restannotations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import * as Utils from '../util/utils';
import Annotation, { WireAnnotation, _fromEncodedArray } from '../types/annotation';
import type { CipherOptions } from '../types/basemessage';
import RestChannel from './restchannel';
import Defaults from '../util/defaults';
import PaginatedResource, { PaginatedResult } from './paginatedresource';
import Resource from './resource';

export interface RestGetAnnotationsParams {
limit?: number;
}

class RestAnnotations {
private channel: RestChannel;

constructor(channel: RestChannel) {
this.channel = channel;
}

async publish(refSerial: string, refType: string, data: any): Promise<void> {
const annotation = Annotation.fromValues({
action: 'annotation.create',
refSerial,
refType,
data,
});

// TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken
const wireAnnotation = await annotation.encode(this.channel.channelOptions as CipherOptions);

const client = this.channel.client,
options = client.options,
format = options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json,
headers = Defaults.defaultPostHeaders(client.options, { format }),
params = {};

const requestBody = Utils.encodeBody([wireAnnotation], client._MsgPack, format);

await Resource.post(
client,
client.rest.channelMixin.basePath(this.channel) + '/annotations',
requestBody,
headers,
params,
null,
true,
);
}

async get(serial: string, params: RestGetAnnotationsParams | null): Promise<PaginatedResult<Annotation>> {
const client = this.channel.client,
format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json,
envelope = client.http.supportsLinkHeaders ? undefined : format,
headers = Defaults.defaultGetHeaders(client.options, { format });

Utils.mixin(headers, client.options.headers);

return new PaginatedResource(
client,
client.rest.channelMixin.basePath(this.channel) + '/messages/' + serial + '/annotations',
headers,
envelope,
async (body, _, unpacked) => {
const decoded = (
unpacked ? body : Utils.decodeBody(body, client._MsgPack, format)
) as Utils.Properties<WireAnnotation>[];

return _fromEncodedArray(decoded, this.channel);
},
).get(params as Record<string, unknown>);
}
}

export default RestAnnotations;
11 changes: 11 additions & 0 deletions src/common/lib/client/restchannel.ts
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import Defaults, { normaliseChannelOptions } from '../util/defaults';
import { RestHistoryParams } from './restchannelmixin';
import { RequestBody } from 'common/types/http';
import type { PushChannel } from 'plugins/push';
import type RestAnnotations from './restannotations';

const MSG_ID_ENTROPY_BYTES = 9;

@@ -32,6 +33,13 @@ class RestChannel {
presence: RestPresence;
channelOptions: ChannelOptions;
_push?: PushChannel;
private _annotations: RestAnnotations | null = null;
get annotations(): RestAnnotations {
if (!this._annotations) {
Utils.throwMissingPluginError('Annotations');
}
return this._annotations;
}

constructor(client: BaseRest, name: string, channelOptions?: ChannelOptions) {
Logger.logAction(client.logger, Logger.LOG_MINOR, 'RestChannel()', 'started; name = ' + name);
@@ -42,6 +50,9 @@ class RestChannel {
if (client.options.plugins?.Push) {
this._push = new client.options.plugins.Push.PushChannel(this);
}
if (client._Annotations) {
this._annotations = new client._Annotations.RestAnnotations(this);
}
}

get push() {
6 changes: 5 additions & 1 deletion src/common/lib/transport/comettransport.ts
Original file line number Diff line number Diff line change
@@ -353,7 +353,11 @@ abstract class CometTransport extends Transport {
if (items && items.length)
for (let i = 0; i < items.length; i++)
this.onProtocolMessage(
protocolMessageFromDeserialized(items[i], this.connectionManager.realtime._RealtimePresence),
protocolMessageFromDeserialized(
items[i],
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._Annotations,
),
);
} catch (e) {
Logger.logAction(
2 changes: 1 addition & 1 deletion src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
@@ -1805,7 +1805,7 @@ class ConnectionManager extends EventEmitter {

Logger.LOG_MICRO,
'ConnectionManager.send()',
'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence),
'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence, this.realtime._Annotations),
);
}
this.queue(msg, callback);
8 changes: 6 additions & 2 deletions src/common/lib/transport/protocol.ts
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ export class PendingMessage {
this.merged = false;
const action = message.action;
this.sendAttempted = false;
this.ackRequired = action == actions.MESSAGE || action == actions.PRESENCE;
this.ackRequired = action == actions.MESSAGE || action == actions.PRESENCE || action == actions.ANNOTATION;
}
}

@@ -78,7 +78,11 @@ class Protocol extends EventEmitter {
Logger.LOG_MICRO,
'Protocol.send()',
'sending msg; ' +
stringifyProtocolMessage(pendingMessage.message, this.transport.connectionManager.realtime._RealtimePresence),
stringifyProtocolMessage(
pendingMessage.message,
this.transport.connectionManager.realtime._RealtimePresence,
this.transport.connectionManager.realtime._Annotations,
),
);
}
pendingMessage.sendAttempted = true;
6 changes: 5 additions & 1 deletion src/common/lib/transport/transport.ts
Original file line number Diff line number Diff line change
@@ -128,7 +128,11 @@ abstract class Transport extends EventEmitter {
'received on ' +
this.shortName +
': ' +
stringifyProtocolMessage(message, this.connectionManager.realtime._RealtimePresence) +
stringifyProtocolMessage(
message,
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._Annotations,
) +
'; connectionId = ' +
this.connectionManager.connectionId,
);
1 change: 1 addition & 0 deletions src/common/lib/transport/websockettransport.ts
Original file line number Diff line number Diff line change
@@ -140,6 +140,7 @@ class WebSocketTransport extends Transport {
data,
this.connectionManager.realtime._MsgPack,
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._Annotations,
this.format,
),
);
123 changes: 123 additions & 0 deletions src/common/lib/types/annotation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import Logger from '../util/logger';
import { BaseMessage, encode, decode, wireToJSON, normalizeCipherOptions, CipherOptions, strMsg } from './basemessage';
import * as API from '../../../../ably';
import * as Utils from '../util/utils';

import type { IUntypedCryptoStatic } from '../../types/ICryptoStatic';
import type { Properties } from '../util/utils';
import type RestChannel from '../client/restchannel';
import type RealtimeChannel from '../client/realtimechannel';
import type { ChannelOptions } from '../../types/channel';
type Channel = RestChannel | RealtimeChannel;

const actions = ['annotation.create', 'annotation.delete'];

export async function fromEncoded(
logger: Logger,
Crypto: IUntypedCryptoStatic | null,
encoded: WireAnnotation,
inputOptions?: API.ChannelOptions,
): Promise<Annotation> {
const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null);
const wa = WireAnnotation.fromValues(encoded);
return wa.decode(options, logger);
}

export async function fromEncodedArray(
logger: Logger,
Crypto: IUntypedCryptoStatic | null,
encodedArray: WireAnnotation[],
options?: API.ChannelOptions,
): Promise<Annotation[]> {
return Promise.all(
encodedArray.map(function (encoded) {
return fromEncoded(logger, Crypto, encoded, options);
}),
);
}

// these forms of the functions are used internally when we have a channel instance
// already, so don't need to normalise channel options
export async function _fromEncoded(encoded: Properties<WireAnnotation>, channel: Channel): Promise<Annotation> {
return WireAnnotation.fromValues(encoded).decode(channel.channelOptions, channel.logger);
}

export async function _fromEncodedArray(
encodedArray: Properties<WireAnnotation>[],
channel: Channel,
): Promise<Annotation[]> {
return Promise.all(
encodedArray.map(function (encoded) {
return _fromEncoded(encoded, channel);
}),
);
}

// for tree-shakability
export function fromValues(values: Properties<Annotation>) {
return Annotation.fromValues(values);
}

class Annotation extends BaseMessage {
action?: API.AnnotationAction;
serial?: string;
refSerial?: string;
refType?: string;

async encode(options: CipherOptions): Promise<WireAnnotation> {
const res = Object.assign(new WireAnnotation(), this, {
action: actions.indexOf(this.action || 'annotation.create'),
});
return encode(res, options);
}

static fromValues(values: Properties<Annotation>): Annotation {
return Object.assign(new Annotation(), values);
}

static fromValuesArray(values: Properties<Annotation>[]): Annotation[] {
return values.map((v) => Annotation.fromValues(v));
}

toString() {
return strMsg(this, 'Annotation');
}
}

export class WireAnnotation extends BaseMessage {
action?: number;
serial?: string;
refSerial?: string;
refType?: string;

toJSON(...args: any[]) {
return wireToJSON.call(this, ...args);
}

static fromValues(values: Properties<WireAnnotation>): WireAnnotation {
return Object.assign(new WireAnnotation(), values);
}

static fromValuesArray(values: Properties<WireAnnotation>[]): WireAnnotation[] {
return values.map((v) => WireAnnotation.fromValues(v));
}

async decode(channelOptions: ChannelOptions, logger: Logger): Promise<Annotation> {
const res = Object.assign(new Annotation(), {
...this,
action: actions[this.action!],
});
try {
await decode(res, channelOptions);
} catch (e) {
Logger.logAction(logger, Logger.LOG_ERROR, 'WireAnnotation.decode()', Utils.inspectError(e));
}
return res;
}

toString() {
return strMsg(this, 'WireAnnotation');
}
}

export default Annotation;
3 changes: 3 additions & 0 deletions src/common/lib/types/basemessage.ts
Original file line number Diff line number Diff line change
@@ -236,6 +236,9 @@ export function populateFieldsFromParent(parent: ProtocolMessage) {
case actions.SYNC:
msgs = parent.presence!;
break;
case actions.ANNOTATION:
msgs = parent.annotations!;
break;
default:
throw new ErrorInfo('Unexpected action ' + parent.action, 40000, 400);
}
22 changes: 22 additions & 0 deletions src/common/lib/types/defaultannotation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import * as API from '../../../../ably';
import Logger from '../util/logger';
import Annotation, { fromEncoded, fromEncodedArray, WireAnnotation } from './annotation';
import Platform from 'common/platform';
import type { Properties } from '../util/utils';

/**
`DefaultAnnotation` is the class returned by `DefaultRest` and `DefaultRealtime`’s `Annotation` static property. It introduces the static methods described in the `AnnotationStatic` interface of the public API of the non tree-shakable version of the library.
*/
export class DefaultAnnotation extends Annotation {
static async fromEncoded(encoded: unknown, inputOptions?: API.ChannelOptions): Promise<Annotation> {
return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireAnnotation, inputOptions);
}

static async fromEncodedArray(encodedArray: Array<unknown>, options?: API.ChannelOptions): Promise<Annotation[]> {
return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WireAnnotation[], options);
}

static fromValues(values: Properties<Annotation>): Annotation {
return Annotation.fromValues(values);
}
}
40 changes: 33 additions & 7 deletions src/common/lib/types/protocolmessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { MsgPack } from 'common/types/msgpack';
import * as API from '../../../../ably';
import { PresenceMessagePlugin } from '../client/modularplugins';
import { AnnotationsPlugin } from '../client/modularplugins';
import * as Utils from '../util/utils';
import ErrorInfo from './errorinfo';
import { WireMessage } from './message';
import PresenceMessage, { WirePresenceMessage } from './presencemessage';
import Annotation, { WireAnnotation } from './annotation';
import RealtimeAnnotations from '../client/realtimeannotations';
import RestAnnotations from '../client/restannotations';
import { flags, flagNames, channelModes, ActionName } from './protocolmessagecommon';
import type { Properties } from '../util/utils';

@@ -24,15 +28,17 @@ export function deserialize(
serialized: unknown,
MsgPack: MsgPack | null,
presenceMessagePlugin: PresenceMessagePlugin | null,
annotationsPlugin: AnnotationsPlugin | null,
format?: Utils.Format,
): ProtocolMessage {
const deserialized = Utils.decodeBody<Record<string, unknown>>(serialized, MsgPack, format);
return fromDeserialized(deserialized, presenceMessagePlugin);
return fromDeserialized(deserialized, presenceMessagePlugin, annotationsPlugin);
}

export function fromDeserialized(
deserialized: Record<string, unknown>,
presenceMessagePlugin: PresenceMessagePlugin | null,
annotationsPlugin: AnnotationsPlugin | null,
): ProtocolMessage {
let error: ErrorInfo | undefined;
if (deserialized.error) {
@@ -51,21 +57,36 @@ export function fromDeserialized(
);
}

return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages, error });
let annotations: WireAnnotation[] | undefined;
if (annotationsPlugin && deserialized.annotations) {
annotations = annotationsPlugin.WireAnnotation.fromValuesArray(
deserialized.annotations as Array<Properties<WireAnnotation>>,
);
}

return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages, annotations, error });
}

/**
* Used by the tests.
*/
export function fromDeserializedIncludingDependencies(deserialized: Record<string, unknown>): ProtocolMessage {
return fromDeserialized(deserialized, { PresenceMessage, WirePresenceMessage });
return fromDeserialized(
deserialized,
{ PresenceMessage, WirePresenceMessage },
{ Annotation, WireAnnotation, RealtimeAnnotations, RestAnnotations },
);
}

export function fromValues(values: unknown): ProtocolMessage {
export function fromValues(values: Properties<ProtocolMessage>): ProtocolMessage {
return Object.assign(new ProtocolMessage(), values);
}

export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin | null): string {
export function stringify(
msg: any,
presenceMessagePlugin: PresenceMessagePlugin | null,
annotationsPlugin: AnnotationsPlugin | null,
): string {
let result = '[ProtocolMessage';
if (msg.action !== undefined) result += '; action=' + ActionName[msg.action] || msg.action;

@@ -79,6 +100,9 @@ export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin
if (msg.messages) result += '; messages=' + toStringArray(WireMessage.fromValuesArray(msg.messages));
if (msg.presence && presenceMessagePlugin)
result += '; presence=' + toStringArray(presenceMessagePlugin.WirePresenceMessage.fromValuesArray(msg.presence));
if (msg.annotations && annotationsPlugin) {
result += '; annotations=' + toStringArray(annotationsPlugin.WireAnnotation.fromValuesArray(msg.annotations));
}
if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString();
if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken;
if (msg.flags) result += '; flags=' + flagNames.filter(msg.hasFlag).join(',');
@@ -112,8 +136,10 @@ class ProtocolMessage {
messages?: WireMessage[];
// This will be undefined if we skipped decoding this property due to user not requesting presence functionality — see `fromDeserialized`
presence?: WirePresenceMessage[];
annotations?: WireAnnotation[];
auth?: unknown;
connectionDetails?: Record<string, unknown>;
params?: Record<string, string>;

hasFlag = (flag: string): boolean => {
return ((this.flags as number) & flags[flag]) > 0;
@@ -123,8 +149,8 @@ class ProtocolMessage {
return (this.flags = (this.flags as number) | flags[flag]);
}

getMode(): number | undefined {
return this.flags && this.flags & flags.MODE_ALL;
getMode(): number {
return (this.flags || 0) & flags.MODE_ALL;
}

encodeModesToFlags(modes: API.ChannelMode[]): void {
22 changes: 15 additions & 7 deletions src/common/lib/types/protocolmessagecommon.ts
Original file line number Diff line number Diff line change
@@ -49,11 +49,19 @@ export const flags: { [key: string]: number } = {

export const flagNames = Object.keys(flags);

flags.MODE_ALL = flags.PRESENCE
| flags.PUBLISH
| flags.SUBSCRIBE
| flags.PRESENCE_SUBSCRIBE
| flags.ANNOTATION_PUBLISH
| flags.ANNOTATION_SUBSCRIBE;
flags.MODE_ALL =
flags.PRESENCE |
flags.PUBLISH |
flags.SUBSCRIBE |
flags.PRESENCE_SUBSCRIBE |
flags.ANNOTATION_PUBLISH |
flags.ANNOTATION_SUBSCRIBE;

export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE'];
export const channelModes = [
'PRESENCE',
'PUBLISH',
'SUBSCRIBE',
'PRESENCE_SUBSCRIBE',
'ANNOTATION_PUBLISH',
'ANNOTATION_SUBSCRIBE',
];

0 comments on commit 71439a3

Please sign in to comment.