Skip to content

Commit

Permalink
Extract PresenceMap to its own file for unit-testability
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonWoolf committed Nov 13, 2024
1 parent 18a2559 commit e94ddbd
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 195 deletions.
2 changes: 2 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import RealtimePresence from './realtimepresence';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import WebSocketTransport from '../transport/websockettransport';
import { FilteredSubscriptions } from './filteredsubscriptions';
import { PresenceMap } from './presencemap';
import {
fromValues as presenceMessageFromValues,
fromValuesArray as presenceMessagesFromValuesArray,
Expand Down Expand Up @@ -69,4 +70,5 @@ export class DefaultRealtime extends BaseRealtime {

// Used by tests
static _Http = Http;
static _PresenceMap = PresenceMap;
}
205 changes: 205 additions & 0 deletions src/common/lib/client/presencemap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import * as Utils from '../util/utils';
import EventEmitter from '../util/eventemitter';
import Logger from '../util/logger';
import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage';

import type RealtimePresence from './realtimepresence';

type compFn = (item: PresenceMessage, existing: PresenceMessage) => boolean;

export interface RealtimePresenceParams {
waitForSync?: boolean;
clientId?: string;
connectionId?: string;
}

function newerThan(item: PresenceMessage, existing: PresenceMessage): boolean {
/* RTP2b1: if either is synthesised, compare by timestamp */
if (item.isSynthesized() || existing.isSynthesized()) {
// RTP2b1a: if equal, prefer the newly-arrived one
return (item.timestamp as number) >= (existing.timestamp as number);
}

/* RTP2b2 */
const itemOrderings = item.parseId(),
existingOrderings = existing.parseId();
if (itemOrderings.msgSerial === existingOrderings.msgSerial) {
return itemOrderings.index > existingOrderings.index;
} else {
return itemOrderings.msgSerial > existingOrderings.msgSerial;
}
}

export class PresenceMap extends EventEmitter {
map: Record<string, PresenceMessage>;
residualMembers: Record<string, PresenceMessage> | null;
syncInProgress: boolean;
presence: RealtimePresence;
memberKey: (item: PresenceMessage) => string;
newerThan: compFn;

constructor(presence: RealtimePresence, memberKey: (item: PresenceMessage) => string, newer: compFn = newerThan) {
super(presence.logger);
this.presence = presence;
this.map = Object.create(null);
this.syncInProgress = false;
this.residualMembers = null;
this.memberKey = memberKey;
this.newerThan = newer;
}

get(key: string) {
return this.map[key];
}

getClient(clientId: string) {
const map = this.map,
result = [];
for (const key in map) {
const item = map[key];
if (item.clientId == clientId && item.action != 'absent') result.push(item);
}
return result;
}

list(params: RealtimePresenceParams) {
const map = this.map,
clientId = params && params.clientId,
connectionId = params && params.connectionId,
result = [];

for (const key in map) {
const item = map[key];
if (item.action === 'absent') continue;
if (clientId && clientId != item.clientId) continue;
if (connectionId && connectionId != item.connectionId) continue;
result.push(item);
}
return result;
}

put(item: PresenceMessage) {
if (item.action === 'enter' || item.action === 'update') {
item = presenceMessageFromValues(item);
item.action = 'present';
}
const map = this.map,
key = this.memberKey(item);
/* we've seen this member, so do not remove it at the end of sync */
if (this.residualMembers) delete this.residualMembers[key];

/* compare the timestamp of the new item with any existing member (or ABSENT witness) */
const existingItem = map[key];
if (existingItem && !this.newerThan(item, existingItem)) {
return false;
}
map[key] = item;
return true;
}

values() {
const map = this.map,
result = [];
for (const key in map) {
const item = map[key];
if (item.action != 'absent') result.push(item);
}
return result;
}

remove(item: PresenceMessage) {
const map = this.map,
key = this.memberKey(item);
const existingItem = map[key];

if (existingItem && !this.newerThan(item, existingItem)) {
return false;
}

/* RTP2f */
if (this.syncInProgress) {
item = presenceMessageFromValues(item);
item.action = 'absent';
map[key] = item;
} else {
delete map[key];
}

return true;
}

startSync() {
const map = this.map,
syncInProgress = this.syncInProgress;
Logger.logAction(
this.logger,
Logger.LOG_MINOR,
'PresenceMap.startSync()',
'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress,
);
/* we might be called multiple times while a sync is in progress */
if (!this.syncInProgress) {
this.residualMembers = Utils.copy(map);
this.setInProgress(true);
}
}

endSync() {
const map = this.map,
syncInProgress = this.syncInProgress;
Logger.logAction(
this.logger,
Logger.LOG_MINOR,
'PresenceMap.endSync()',
'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress,
);
if (syncInProgress) {
/* we can now strip out the ABSENT members, as we have
* received all of the out-of-order sync messages */
for (const memberKey in map) {
const entry = map[memberKey];
if (entry.action === 'absent') {
delete map[memberKey];
}
}
/* any members that were present at the start of the sync,
* and have not been seen in sync, can be removed, and leave events emitted */
this.presence._synthesizeLeaves(Utils.valuesArray(this.residualMembers as Record<string, PresenceMessage>));
for (const memberKey in this.residualMembers) {
delete map[memberKey];
}
this.residualMembers = null;

/* finish, notifying any waiters */
this.setInProgress(false);
}
this.emit('sync');
}

waitSync(callback: () => void) {
const syncInProgress = this.syncInProgress;
Logger.logAction(
this.logger,
Logger.LOG_MINOR,
'PresenceMap.waitSync()',
'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress,
);
if (!syncInProgress) {
callback();
return;
}
this.once('sync', callback);
}

clear() {
this.map = {};
this.setInProgress(false);
this.residualMembers = null;
}

setInProgress(inProgress: boolean) {
Logger.logAction(this.logger, Logger.LOG_MICRO, 'PresenceMap.setInProgress()', 'inProgress = ' + inProgress);
this.syncInProgress = inProgress;
this.presence.syncComplete = !inProgress;
}
}
Loading

0 comments on commit e94ddbd

Please sign in to comment.