Skip to content

Commit

Permalink
Make subscription filtering tree-shakable
Browse files Browse the repository at this point in the history
We expose a MessageIteractions module which allows users to pass a
MessageFilter object to RealtimeChannel’s `subscribe` and `unsubscribe`.

Resolves #1397.
  • Loading branch information
lawrence-forooghian committed Oct 30, 2023
1 parent 948f255 commit 7c379cf
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 5 deletions.
1 change: 1 addition & 0 deletions scripts/moduleReport.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const moduleNames = [
'WebSocketTransport',
'XHRRequest',
'FetchRequest',
'MessageInteractions',
];

// List of all free-standing functions exported by the library along with the
Expand Down
10 changes: 10 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic';
import { throwMissingModuleError } from '../util/utils';
import { MsgPack } from 'common/types/msgpack';
import { HTTPRequestImplementations } from 'platform/web/lib/http/http';
import { FilteredSubscriptions } from './filteredsubscriptions';

/**
`BaseClient` acts as the base class for all of the client classes exported by the SDK. It is an implementation detail and this class is not advertised publicly.
Expand All @@ -39,6 +40,7 @@ class BaseClient {
readonly _MsgPack: MsgPack | null;
// Extra HTTP request implementations available to this client, in addition to those in web’s Http.bundledRequestImplementations
readonly _additionalHTTPRequestImplementations: HTTPRequestImplementations;
private readonly __FilteredSubscriptions: typeof FilteredSubscriptions | null;

constructor(options: ClientOptions | string, modules: ModulesMap) {
this._additionalHTTPRequestImplementations = modules;
Expand Down Expand Up @@ -93,6 +95,7 @@ class BaseClient {

this._rest = modules.Rest ? new modules.Rest(this) : null;
this._Crypto = modules.Crypto ?? null;
this.__FilteredSubscriptions = modules.MessageInteractions ?? null;
}

private get rest(): Rest {
Expand All @@ -102,6 +105,13 @@ class BaseClient {
return this._rest;
}

get _FilteredSubscriptions(): typeof FilteredSubscriptions {
if (!this.__FilteredSubscriptions) {
throwMissingModuleError('MessageInteractions');
}
return this.__FilteredSubscriptions;
}

get channels() {
return this.rest.channels;
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import initialiseWebSocketTransport from '../transport/websockettransport';
import { FilteredSubscriptions } from './filteredsubscriptions';

/**
`DefaultRealtime` is the class that the non tree-shakable version of the SDK exports as `Realtime`. It ensures that this version of the SDK includes all of the functionality which is optionally available in the tree-shakable version.
Expand All @@ -27,6 +28,7 @@ export class DefaultRealtime extends BaseRealtime {
MsgPack,
RealtimePresence,
WebSocketTransport: initialiseWebSocketTransport,
MessageInteractions: FilteredSubscriptions,
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/lib/client/modulesmap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import RealtimePresence from './realtimepresence';
import { TransportInitialiser } from '../transport/connectionmanager';
import XHRRequest from 'platform/web/lib/http/request/xhrrequest';
import fetchRequest from 'platform/web/lib/http/request/fetchrequest';
import { FilteredSubscriptions } from './filteredsubscriptions';

export interface ModulesMap {
Rest?: typeof Rest;
Expand All @@ -16,6 +17,7 @@ export interface ModulesMap {
XHRStreaming?: TransportInitialiser;
XHRRequest?: typeof XHRRequest;
FetchRequest?: typeof fetchRequest;
MessageInteractions?: typeof FilteredSubscriptions;
}

export const allCommonModules: ModulesMap = { Rest };
9 changes: 4 additions & 5 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import ConnectionManager from '../transport/connectionmanager';
import ConnectionStateChange from './connectionstatechange';
import { ErrCallback, PaginatedResultCallback } from '../../types/utils';
import BaseRealtime from './baserealtime';
import { FilteredSubscriptions } from './filteredsubscriptions';

interface RealtimeHistoryParams {
start?: number;
Expand Down Expand Up @@ -415,7 +414,7 @@ class RealtimeChannel extends Channel {

// Filtered
if (event && typeof event === 'object' && !Array.isArray(event)) {
FilteredSubscriptions.subscribeFilter(this, event, listener);
this.client._FilteredSubscriptions.subscribeFilter(this, event, listener);
} else {
this.subscriptions.on(event, listener);
}
Expand All @@ -428,9 +427,9 @@ class RealtimeChannel extends Channel {

// If we either have a filtered listener, a filter or both we need to do additional processing to find the original function(s)
if ((typeof event === 'object' && !listener) || this.filteredSubscriptions?.has(listener)) {
FilteredSubscriptions.getAndDeleteFilteredSubscriptions(this, event, listener).forEach((l) =>
this.subscriptions.off(l)
);
this.client._FilteredSubscriptions
.getAndDeleteFilteredSubscriptions(this, event, listener)
.forEach((l) => this.subscriptions.off(l));
return;
}

Expand Down
1 change: 1 addition & 0 deletions src/platform/web/modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ export * from './modules/realtimepresence';
export * from './modules/transports';
export * from './modules/http';
export { Rest } from '../../common/lib/client/rest';
export { FilteredSubscriptions as MessageInteractions } from '../../common/lib/client/filteredsubscriptions';
export { BaseRest, BaseRealtime };
85 changes: 85 additions & 0 deletions test/browser/modules.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
WebSocketTransport,
FetchRequest,
XHRRequest,
MessageInteractions,
} from '../../build/modules/index.js';

describe('browser/modules', function () {
Expand Down Expand Up @@ -508,4 +509,88 @@ describe('browser/modules', function () {
});
});
});

describe('MessageInteractions', () => {
describe('BaseRealtime', () => {
describe('without MessageInteractions', () => {
it('is able to subscribe to and unsubscribe from channel events, as long as a MessageFilter isn’t passed', async () => {
const realtime = new BaseRealtime(ablyClientOptions(), { WebSocketTransport });
const channel = realtime.channels.get('channel');
await channel.attach();

const subscribeReceivedMessagePromise = new Promise((resolve) => channel.subscribe(resolve));

await channel.publish('message', 'body');

const subscribeReceivedMessage = await subscribeReceivedMessagePromise;
expect(subscribeReceivedMessage.data).to.equal('body');
});

it('throws an error when attempting to subscribe to channel events using a MessageFilter', async () => {
const realtime = new BaseRealtime(ablyClientOptions(), { WebSocketTransport });
const channel = realtime.channels.get('channel');

let thrownError = null;
try {
await channel.subscribe({ clientId: 'someClientId' }, () => {});
} catch (error) {
thrownError = error;
}

expect(thrownError).not.to.be.null;
expect(thrownError.message).to.equal('MessageInteractions module not provided');
});
});

describe('with MessageInteractions', () => {
it('can take a MessageFilter argument when subscribing to and unsubscribing from channel events', async () => {
const realtime = new BaseRealtime(ablyClientOptions(), { WebSocketTransport, MessageInteractions });
const channel = realtime.channels.get('channel');

await channel.attach();

// Test `subscribe` with a filter: send two messages with different clientIds, and check that unfiltered subscription receives both messages but clientId-filtered subscription only receives the matching one.
const messageFilter = { clientId: 'someClientId' }; // note that `unsubscribe` compares filter by reference, I found that a bit surprising

const filteredSubscriptionReceivedMessages = [];
channel.subscribe(messageFilter, (message) => {
filteredSubscriptionReceivedMessages.push(message);
});

const unfilteredSubscriptionReceivedFirstTwoMessagesPromise = new Promise((resolve) => {
const receivedMessages = [];
channel.subscribe(function listener(message) {
receivedMessages.push(message);
if (receivedMessages.length === 2) {
channel.unsubscribe(listener);
resolve();
}
});
});

await channel.publish(await decodeMessage({ clientId: 'someClientId' }));
await channel.publish(await decodeMessage({ clientId: 'someOtherClientId' }));
await unfilteredSubscriptionReceivedFirstTwoMessagesPromise;

expect(filteredSubscriptionReceivedMessages.length).to.equal(1);
expect(filteredSubscriptionReceivedMessages[0].clientId).to.equal('someClientId');

// Test `unsubscribe` with a filter: call `unsubscribe` with the clientId filter, publish a message matching the filter, check that only the unfiltered listener recieves it
channel.unsubscribe(messageFilter);

const unfilteredSubscriptionReceivedNextMessagePromise = new Promise((resolve) => {
channel.subscribe(function listener() {
channel.unsubscribe(listener);
resolve();
});
});

await channel.publish(await decodeMessage({ clientId: 'someClientId' }));
await unfilteredSubscriptionReceivedNextMessagePromise;

expect(filteredSubscriptionReceivedMessages.length).to./* (still) */ equal(1);
});
});
});
});
});

0 comments on commit 7c379cf

Please sign in to comment.