From 7c379cf633b83195120483ca3f5896f5a11fa311 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Fri, 27 Oct 2023 15:24:48 -0300 Subject: [PATCH] Make subscription filtering tree-shakable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We expose a MessageIteractions module which allows users to pass a MessageFilter object to RealtimeChannel’s `subscribe` and `unsubscribe`. Resolves #1397. --- scripts/moduleReport.js | 1 + src/common/lib/client/baseclient.ts | 10 +++ src/common/lib/client/defaultrealtime.ts | 2 + src/common/lib/client/modulesmap.ts | 2 + src/common/lib/client/realtimechannel.ts | 9 ++- src/platform/web/modules.ts | 1 + test/browser/modules.test.js | 85 ++++++++++++++++++++++++ 7 files changed, 105 insertions(+), 5 deletions(-) diff --git a/scripts/moduleReport.js b/scripts/moduleReport.js index 609a3d1e93..eba2ed8c4e 100644 --- a/scripts/moduleReport.js +++ b/scripts/moduleReport.js @@ -11,6 +11,7 @@ const moduleNames = [ 'WebSocketTransport', 'XHRRequest', 'FetchRequest', + 'MessageInteractions', ]; // List of all free-standing functions exported by the library along with the diff --git a/src/common/lib/client/baseclient.ts b/src/common/lib/client/baseclient.ts index 2e72d57232..a01db6819a 100644 --- a/src/common/lib/client/baseclient.ts +++ b/src/common/lib/client/baseclient.ts @@ -15,6 +15,7 @@ import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import { throwMissingModuleError } from '../util/utils'; import { MsgPack } from 'common/types/msgpack'; import { HTTPRequestImplementations } from 'platform/web/lib/http/http'; +import { FilteredSubscriptions } from './filteredsubscriptions'; /** `BaseClient` acts as the base class for all of the client classes exported by the SDK. It is an implementation detail and this class is not advertised publicly. @@ -39,6 +40,7 @@ class BaseClient { readonly _MsgPack: MsgPack | null; // Extra HTTP request implementations available to this client, in addition to those in web’s Http.bundledRequestImplementations readonly _additionalHTTPRequestImplementations: HTTPRequestImplementations; + private readonly __FilteredSubscriptions: typeof FilteredSubscriptions | null; constructor(options: ClientOptions | string, modules: ModulesMap) { this._additionalHTTPRequestImplementations = modules; @@ -93,6 +95,7 @@ class BaseClient { this._rest = modules.Rest ? new modules.Rest(this) : null; this._Crypto = modules.Crypto ?? null; + this.__FilteredSubscriptions = modules.MessageInteractions ?? null; } private get rest(): Rest { @@ -102,6 +105,13 @@ class BaseClient { return this._rest; } + get _FilteredSubscriptions(): typeof FilteredSubscriptions { + if (!this.__FilteredSubscriptions) { + throwMissingModuleError('MessageInteractions'); + } + return this.__FilteredSubscriptions; + } + get channels() { return this.rest.channels; } diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index 2204ab71ac..1dd3d402dd 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -10,6 +10,7 @@ import { MsgPack } from 'common/types/msgpack'; import RealtimePresence from './realtimepresence'; import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; import initialiseWebSocketTransport from '../transport/websockettransport'; +import { FilteredSubscriptions } from './filteredsubscriptions'; /** `DefaultRealtime` is the class that the non tree-shakable version of the SDK exports as `Realtime`. It ensures that this version of the SDK includes all of the functionality which is optionally available in the tree-shakable version. @@ -27,6 +28,7 @@ export class DefaultRealtime extends BaseRealtime { MsgPack, RealtimePresence, WebSocketTransport: initialiseWebSocketTransport, + MessageInteractions: FilteredSubscriptions, }); } diff --git a/src/common/lib/client/modulesmap.ts b/src/common/lib/client/modulesmap.ts index dff32a69e4..dab5c8b718 100644 --- a/src/common/lib/client/modulesmap.ts +++ b/src/common/lib/client/modulesmap.ts @@ -5,6 +5,7 @@ import RealtimePresence from './realtimepresence'; import { TransportInitialiser } from '../transport/connectionmanager'; import XHRRequest from 'platform/web/lib/http/request/xhrrequest'; import fetchRequest from 'platform/web/lib/http/request/fetchrequest'; +import { FilteredSubscriptions } from './filteredsubscriptions'; export interface ModulesMap { Rest?: typeof Rest; @@ -16,6 +17,7 @@ export interface ModulesMap { XHRStreaming?: TransportInitialiser; XHRRequest?: typeof XHRRequest; FetchRequest?: typeof fetchRequest; + MessageInteractions?: typeof FilteredSubscriptions; } export const allCommonModules: ModulesMap = { Rest }; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 0986f426ab..0c78fc0c2c 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -14,7 +14,6 @@ import ConnectionManager from '../transport/connectionmanager'; import ConnectionStateChange from './connectionstatechange'; import { ErrCallback, PaginatedResultCallback } from '../../types/utils'; import BaseRealtime from './baserealtime'; -import { FilteredSubscriptions } from './filteredsubscriptions'; interface RealtimeHistoryParams { start?: number; @@ -415,7 +414,7 @@ class RealtimeChannel extends Channel { // Filtered if (event && typeof event === 'object' && !Array.isArray(event)) { - FilteredSubscriptions.subscribeFilter(this, event, listener); + this.client._FilteredSubscriptions.subscribeFilter(this, event, listener); } else { this.subscriptions.on(event, listener); } @@ -428,9 +427,9 @@ class RealtimeChannel extends Channel { // If we either have a filtered listener, a filter or both we need to do additional processing to find the original function(s) if ((typeof event === 'object' && !listener) || this.filteredSubscriptions?.has(listener)) { - FilteredSubscriptions.getAndDeleteFilteredSubscriptions(this, event, listener).forEach((l) => - this.subscriptions.off(l) - ); + this.client._FilteredSubscriptions + .getAndDeleteFilteredSubscriptions(this, event, listener) + .forEach((l) => this.subscriptions.off(l)); return; } diff --git a/src/platform/web/modules.ts b/src/platform/web/modules.ts index e0907b322b..df51319679 100644 --- a/src/platform/web/modules.ts +++ b/src/platform/web/modules.ts @@ -49,4 +49,5 @@ export * from './modules/realtimepresence'; export * from './modules/transports'; export * from './modules/http'; export { Rest } from '../../common/lib/client/rest'; +export { FilteredSubscriptions as MessageInteractions } from '../../common/lib/client/filteredsubscriptions'; export { BaseRest, BaseRealtime }; diff --git a/test/browser/modules.test.js b/test/browser/modules.test.js index ab04ac42b4..b462b91a17 100644 --- a/test/browser/modules.test.js +++ b/test/browser/modules.test.js @@ -18,6 +18,7 @@ import { WebSocketTransport, FetchRequest, XHRRequest, + MessageInteractions, } from '../../build/modules/index.js'; describe('browser/modules', function () { @@ -508,4 +509,88 @@ describe('browser/modules', function () { }); }); }); + + describe('MessageInteractions', () => { + describe('BaseRealtime', () => { + describe('without MessageInteractions', () => { + it('is able to subscribe to and unsubscribe from channel events, as long as a MessageFilter isn’t passed', async () => { + const realtime = new BaseRealtime(ablyClientOptions(), { WebSocketTransport }); + const channel = realtime.channels.get('channel'); + await channel.attach(); + + const subscribeReceivedMessagePromise = new Promise((resolve) => channel.subscribe(resolve)); + + await channel.publish('message', 'body'); + + const subscribeReceivedMessage = await subscribeReceivedMessagePromise; + expect(subscribeReceivedMessage.data).to.equal('body'); + }); + + it('throws an error when attempting to subscribe to channel events using a MessageFilter', async () => { + const realtime = new BaseRealtime(ablyClientOptions(), { WebSocketTransport }); + const channel = realtime.channels.get('channel'); + + let thrownError = null; + try { + await channel.subscribe({ clientId: 'someClientId' }, () => {}); + } catch (error) { + thrownError = error; + } + + expect(thrownError).not.to.be.null; + expect(thrownError.message).to.equal('MessageInteractions module not provided'); + }); + }); + + describe('with MessageInteractions', () => { + it('can take a MessageFilter argument when subscribing to and unsubscribing from channel events', async () => { + const realtime = new BaseRealtime(ablyClientOptions(), { WebSocketTransport, MessageInteractions }); + const channel = realtime.channels.get('channel'); + + await channel.attach(); + + // Test `subscribe` with a filter: send two messages with different clientIds, and check that unfiltered subscription receives both messages but clientId-filtered subscription only receives the matching one. + const messageFilter = { clientId: 'someClientId' }; // note that `unsubscribe` compares filter by reference, I found that a bit surprising + + const filteredSubscriptionReceivedMessages = []; + channel.subscribe(messageFilter, (message) => { + filteredSubscriptionReceivedMessages.push(message); + }); + + const unfilteredSubscriptionReceivedFirstTwoMessagesPromise = new Promise((resolve) => { + const receivedMessages = []; + channel.subscribe(function listener(message) { + receivedMessages.push(message); + if (receivedMessages.length === 2) { + channel.unsubscribe(listener); + resolve(); + } + }); + }); + + await channel.publish(await decodeMessage({ clientId: 'someClientId' })); + await channel.publish(await decodeMessage({ clientId: 'someOtherClientId' })); + await unfilteredSubscriptionReceivedFirstTwoMessagesPromise; + + expect(filteredSubscriptionReceivedMessages.length).to.equal(1); + expect(filteredSubscriptionReceivedMessages[0].clientId).to.equal('someClientId'); + + // Test `unsubscribe` with a filter: call `unsubscribe` with the clientId filter, publish a message matching the filter, check that only the unfiltered listener recieves it + channel.unsubscribe(messageFilter); + + const unfilteredSubscriptionReceivedNextMessagePromise = new Promise((resolve) => { + channel.subscribe(function listener() { + channel.unsubscribe(listener); + resolve(); + }); + }); + + await channel.publish(await decodeMessage({ clientId: 'someClientId' })); + await unfilteredSubscriptionReceivedNextMessagePromise; + + expect(filteredSubscriptionReceivedMessages.length).to./* (still) */ equal(1); + }); + }); + }); + }); });