From e91488d6dff741bd9f667d07560910b233e811d3 Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Mon, 8 Aug 2022 23:41:39 -0700 Subject: [PATCH 1/6] Refactor action code strings into an enum-like object --- lib/agent.js | 71 ++++++++++++++++++++--------------- lib/client/connection.js | 53 +++++++++++++------------- lib/shared/message-actions.js | 21 +++++++++++ 3 files changed, 88 insertions(+), 57 deletions(-) create mode 100644 lib/shared/message-actions.js diff --git a/lib/agent.js b/lib/agent.js index ebb3bc7d9..eca8057e2 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,8 +1,9 @@ var hat = require('hat'); +var ShareDBError = require('./error'); +var logger = require('./logger'); +var Action = require('./shared/message-actions').Action; var types = require('./types'); var util = require('./util'); -var logger = require('./logger'); -var ShareDBError = require('./error'); var ERROR_CODE = ShareDBError.CODES; @@ -58,7 +59,7 @@ function Agent(backend, stream) { this.custom = {}; // Send the legacy message to initialize old clients with the random agent Id - this.send(this._initMessage('init')); + this.send(this._initMessage(Action.InitLegacy)); } module.exports = Agent; @@ -174,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query var agent = this; emitter.onExtra = function(extra) { - agent.send({a: 'q', id: queryId, extra: extra}); + agent.send({a: Action.QueryReply, id: queryId, extra: extra}); }; emitter.onDiff = function(diff) { @@ -186,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query } // Consider stripping the collection out of the data we send here // if it matches the query's collection. - agent.send({a: 'q', id: queryId, diff: diff}); + agent.send({a: Action.QueryReply, id: queryId, diff: diff}); }; emitter.onError = function(err) { @@ -250,7 +251,7 @@ Agent.prototype.send = function(message) { Agent.prototype._sendOp = function(collection, id, op) { var message = { - a: 'op', + a: Action.Op, c: collection, d: id, v: op.v, @@ -354,22 +355,30 @@ Agent.prototype._open = function() { // Check a request to see if its valid. Returns an error if there's a problem. Agent.prototype._checkRequest = function(request) { - if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') { + if (request.a === Action.QueryFetch || request.a === Action.QuerySubscribe || request.a === Action.QueryUnsubscribe) { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; - } else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') { + } else if (request.a === Action.Op || + request.a === Action.Fetch || + request.a === Action.Subscribe || + request.a === Action.Unsubscribe || + request.a === Action.Presence) { // Doc-based request. if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (request.d != null && typeof request.d !== 'string') return 'Invalid id'; - if (request.a === 'op' || request.a === 'p') { + if (request.a === Action.Op || request.a === Action.Presence) { if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version'; } - if (request.a === 'p') { + if (request.a === Action.Presence) { if (typeof request.id !== 'string') return 'Missing presence ID'; } - } else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') { + } else if ( + request.a === Action.BulkFetch || + request.a === Action.BulkSubscribe || + request.a === Action.BulkUnsubscribe + ) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (typeof request.b !== 'object') return 'Invalid bulk subscribe data'; @@ -383,28 +392,28 @@ Agent.prototype._handleMessage = function(request, callback) { if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage)); switch (request.a) { - case 'hs': + case Action.Handshake: if (request.id) this.src = request.id; - return callback(null, this._initMessage('hs')); - case 'qf': + return callback(null, this._initMessage(Action.Handshake)); + case Action.QueryFetch: return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback); - case 'qs': + case Action.QuerySubscribe: return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback); - case 'qu': + case Action.QueryUnsubscribe: return this._queryUnsubscribe(request.id, callback); - case 'bf': + case Action.BulkFetch: return this._fetchBulk(request.c, request.b, callback); - case 'bs': + case Action.BulkSubscribe: return this._subscribeBulk(request.c, request.b, callback); - case 'bu': + case Action.BulkUnsubscribe: return this._unsubscribeBulk(request.c, request.b, callback); - case 'f': + case Action.Fetch: return this._fetch(request.c, request.d, request.v, callback); - case 's': + case Action.Subscribe: return this._subscribe(request.c, request.d, request.v, callback); - case 'u': + case Action.Unsubscribe: return this._unsubscribe(request.c, request.d, callback); - case 'op': + case Action.Op: // Normalize the properties submitted var op = createClientOp(request, this._src()); if (op.seq >= util.MAX_SAFE_INTEGER) { @@ -415,11 +424,11 @@ Agent.prototype._handleMessage = function(request, callback) { } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); return this._submit(request.c, request.d, op, callback); - case 'nf': + case Action.SnapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); - case 'nt': + case Action.SnapshotFetchByTimestamp: return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); - case 'p': + case Action.Presence: if (!this.backend.presenceEnabled) return; var presence = this._createPresence(request); if (presence.t && !util.supportsPresence(types.map[presence.t])) { @@ -429,10 +438,10 @@ Agent.prototype._handleMessage = function(request, callback) { }); } return this._broadcastPresence(presence, callback); - case 'ps': + case Action.PresenceSubscribe: if (!this.backend.presenceEnabled) return; return this._subscribePresence(request.ch, request.seq, callback); - case 'pu': + case Action.PresenceUnsubscribe: return this._unsubscribePresence(request.ch, request.seq, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); @@ -761,7 +770,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) { Agent.prototype._createPresence = function(request) { return { - a: 'p', + a: Action.Presence, ch: request.ch, src: this._src(), id: request.id, // Presence ID, not Doc ID (which is 'd') @@ -813,7 +822,7 @@ Agent.prototype._requestPresence = function(channel, callback) { Agent.prototype._handlePresenceData = function(presence) { if (presence.src === this._src()) return; - if (presence.r) return this.send({a: 'pr', ch: presence.ch}); + if (presence.r) return this.send({a: Action.PresenceRequest, ch: presence.ch}); var backend = this.backend; var context = { @@ -824,7 +833,7 @@ Agent.prototype._handlePresenceData = function(presence) { backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { if (error) { if (backend.doNotForwardSendPresenceErrorsToClient) backend.errorHandler(error, {agent: agent}); - else agent.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + else agent.send({a: Action.Presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); return; } agent.send(presence); diff --git a/lib/client/connection.js b/lib/client/connection.js index d29570afb..1b551af12 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -6,6 +6,7 @@ var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-reques var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request'); var emitter = require('../emitter'); var ShareDBError = require('../error'); +var Action = require('../shared/message-actions').Action; var types = require('../types'); var util = require('../util'); var logger = require('../logger'); @@ -194,25 +195,25 @@ Connection.prototype.handleMessage = function(message) { // Switch on the message action. Most messages are for documents and are // handled in the doc class. switch (message.a) { - case 'init': + case Action.InitLegacy: // Client initialization packet return this._handleLegacyInit(message); - case 'hs': + case Action.Handshake: return this._handleHandshake(err, message); - case 'qf': + case Action.QueryFetch: var query = this.queries[message.id]; if (query) query._handleFetch(err, message.data, message.extra); return; - case 'qs': + case Action.QuerySubscribe: var query = this.queries[message.id]; if (query) query._handleSubscribe(err, message.data, message.extra); return; - case 'qu': + case Action.QueryUnsubscribe: // Queries are removed immediately on calls to destroy, so we ignore // replies to query unsubscribes. Perhaps there should be a callback for // destroy, but this is currently unimplemented return; - case 'q': + case Action.QueryReply: // Query message. Pass this to the appropriate query object. var query = this.queries[message.id]; if (!query) return; @@ -221,36 +222,36 @@ Connection.prototype.handleMessage = function(message) { if (message.hasOwnProperty('extra')) query._handleExtra(message.extra); return; - case 'bf': + case Action.BulkFetch: return this._handleBulkMessage(err, message, '_handleFetch'); - case 'bs': - case 'bu': + case Action.BulkSubscribe: + case Action.BulkUnsubscribe: return this._handleBulkMessage(err, message, '_handleSubscribe'); - case 'nf': - case 'nt': + case Action.SnapshotFetch: + case Action.SnapshotFetchByTimestamp: return this._handleSnapshotFetch(err, message); - case 'f': + case Action.Fetch: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleFetch(err, message.data); return; - case 's': - case 'u': + case Action.Subscribe: + case Action.Unsubscribe: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleSubscribe(err, message.data); return; - case 'op': + case Action.Op: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; - case 'p': + case Action.Presence: return this._handlePresence(err, message); - case 'ps': + case Action.PresenceSubscribe: return this._handlePresenceSubscribe(err, message); - case 'pu': + case Action.PresenceUnsubscribe: return this._handlePresenceUnsubscribe(err, message); - case 'pr': + case Action.PresenceRequest: return this._handlePresenceRequest(err, message); default: @@ -434,22 +435,22 @@ Connection.prototype._sendAction = function(action, doc, version) { }; Connection.prototype.sendFetch = function(doc) { - return this._sendAction('f', doc, doc.version); + return this._sendAction(Action.Fetch, doc, doc.version); }; Connection.prototype.sendSubscribe = function(doc) { - return this._sendAction('s', doc, doc.version); + return this._sendAction(Action.Subscribe, doc, doc.version); }; Connection.prototype.sendUnsubscribe = function(doc) { - return this._sendAction('u', doc); + return this._sendAction(Action.Unsubscribe, doc); }; Connection.prototype.sendOp = function(doc, op) { // Ensure the doc is registered so that it receives the reply message this._addDoc(doc); var message = { - a: 'op', + a: Action.Op, c: doc.collection, d: doc.id, v: doc.version, @@ -553,7 +554,7 @@ Connection.prototype._destroyQuery = function(query) { // The callback should have the signature function(error, results, extra) // where results is a list of Doc objects. Connection.prototype.createFetchQuery = function(collection, q, options, callback) { - return this._createQuery('qf', collection, q, options, callback); + return this._createQuery(Action.QueryFetch, collection, q, options, callback); }; // Create a subscribe query. Subscribe queries return with the initial data @@ -563,7 +564,7 @@ Connection.prototype.createFetchQuery = function(collection, q, options, callbac // If present, the callback should have the signature function(error, results, extra) // where results is a list of Doc objects. Connection.prototype.createSubscribeQuery = function(collection, q, options, callback) { - return this._createQuery('qs', collection, q, options, callback); + return this._createQuery(Action.QuerySubscribe, collection, q, options, callback); }; Connection.prototype.hasPending = function() { @@ -716,7 +717,7 @@ Connection.prototype._handleLegacyInit = function(message) { }; Connection.prototype._initializeHandshake = function() { - this.send({a: 'hs', id: this.id}); + this.send({a: Action.Handshake, id: this.id}); }; Connection.prototype._handleHandshake = function(error, message) { diff --git a/lib/shared/message-actions.js b/lib/shared/message-actions.js new file mode 100644 index 000000000..33beec7ec --- /dev/null +++ b/lib/shared/message-actions.js @@ -0,0 +1,21 @@ +exports.Action = { + InitLegacy: 'init', + Handshake: 'hs', + QueryFetch: 'qf', + QuerySubscribe: 'qs', + QueryUnsubscribe: 'qu', + QueryReply: 'q', + BulkFetch: 'bf', + BulkSubscribe: 'bs', + BulkUnsubscribe: 'bu', + Fetch: 'f', + Subscribe: 's', + Unsubscribe: 'u', + Op: 'op', + SnapshotFetch: 'nf', + SnapshotFetchByTimestamp: 'nt', + Presence: 'p', + PresenceSubscribe: 'ps', + PresenceUnsubscribe: 'pu', + PresenceRequest: 'pr' +}; From 461662d9df246d5e52023c08e58b6e9db0dcf9f2 Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Tue, 9 Aug 2022 09:17:59 -0700 Subject: [PATCH 2/6] Updates to message-actios from code review feedback --- lib/agent.js | 72 +++++++++++++++++------------------ lib/message-actions.js | 21 ++++++++++ lib/shared/message-actions.js | 21 ---------- 3 files changed, 57 insertions(+), 57 deletions(-) create mode 100644 lib/message-actions.js delete mode 100644 lib/shared/message-actions.js diff --git a/lib/agent.js b/lib/agent.js index eca8057e2..4ebd846f2 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,7 +1,7 @@ var hat = require('hat'); var ShareDBError = require('./error'); var logger = require('./logger'); -var Action = require('./shared/message-actions').Action; +var Action = require('./message-actions').ACTIONS; var types = require('./types'); var util = require('./util'); @@ -59,7 +59,7 @@ function Agent(backend, stream) { this.custom = {}; // Send the legacy message to initialize old clients with the random agent Id - this.send(this._initMessage(Action.InitLegacy)); + this.send(this._initMessage(Action.initLegacy)); } module.exports = Agent; @@ -175,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query var agent = this; emitter.onExtra = function(extra) { - agent.send({a: Action.QueryReply, id: queryId, extra: extra}); + agent.send({a: Action.queryReply, id: queryId, extra: extra}); }; emitter.onDiff = function(diff) { @@ -187,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query } // Consider stripping the collection out of the data we send here // if it matches the query's collection. - agent.send({a: Action.QueryReply, id: queryId, diff: diff}); + agent.send({a: Action.queryReply, id: queryId, diff: diff}); }; emitter.onError = function(err) { @@ -251,7 +251,7 @@ Agent.prototype.send = function(message) { Agent.prototype._sendOp = function(collection, id, op) { var message = { - a: Action.Op, + a: Action.op, c: collection, d: id, v: op.v, @@ -355,29 +355,29 @@ Agent.prototype._open = function() { // Check a request to see if its valid. Returns an error if there's a problem. Agent.prototype._checkRequest = function(request) { - if (request.a === Action.QueryFetch || request.a === Action.QuerySubscribe || request.a === Action.QueryUnsubscribe) { + if (request.a === Action.queryFetch || request.a === Action.querySubscribe || request.a === Action.queryUnsubscribe) { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; - } else if (request.a === Action.Op || - request.a === Action.Fetch || - request.a === Action.Subscribe || - request.a === Action.Unsubscribe || - request.a === Action.Presence) { + } else if (request.a === Action.op || + request.a === Action.fetch || + request.a === Action.subscribe || + request.a === Action.unsubscribe || + request.a === Action.presence) { // Doc-based request. if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (request.d != null && typeof request.d !== 'string') return 'Invalid id'; - if (request.a === Action.Op || request.a === Action.Presence) { + if (request.a === Action.op || request.a === Action.presence) { if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version'; } - if (request.a === Action.Presence) { + if (request.a === Action.presence) { if (typeof request.id !== 'string') return 'Missing presence ID'; } } else if ( - request.a === Action.BulkFetch || - request.a === Action.BulkSubscribe || - request.a === Action.BulkUnsubscribe + request.a === Action.bulkFetch || + request.a === Action.bulkSubscribe || + request.a === Action.bulkUnsubscribe ) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; @@ -392,28 +392,28 @@ Agent.prototype._handleMessage = function(request, callback) { if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage)); switch (request.a) { - case Action.Handshake: + case Action.handshake: if (request.id) this.src = request.id; - return callback(null, this._initMessage(Action.Handshake)); - case Action.QueryFetch: + return callback(null, this._initMessage(Action.handshake)); + case Action.queryFetch: return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback); - case Action.QuerySubscribe: + case Action.querySubscribe: return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback); - case Action.QueryUnsubscribe: + case Action.queryUnsubscribe: return this._queryUnsubscribe(request.id, callback); - case Action.BulkFetch: + case Action.bulkFetch: return this._fetchBulk(request.c, request.b, callback); - case Action.BulkSubscribe: + case Action.bulkSubscribe: return this._subscribeBulk(request.c, request.b, callback); - case Action.BulkUnsubscribe: + case Action.bulkUnsubscribe: return this._unsubscribeBulk(request.c, request.b, callback); - case Action.Fetch: + case Action.fetch: return this._fetch(request.c, request.d, request.v, callback); - case Action.Subscribe: + case Action.subscribe: return this._subscribe(request.c, request.d, request.v, callback); - case Action.Unsubscribe: + case Action.unsubscribe: return this._unsubscribe(request.c, request.d, callback); - case Action.Op: + case Action.op: // Normalize the properties submitted var op = createClientOp(request, this._src()); if (op.seq >= util.MAX_SAFE_INTEGER) { @@ -424,11 +424,11 @@ Agent.prototype._handleMessage = function(request, callback) { } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); return this._submit(request.c, request.d, op, callback); - case Action.SnapshotFetch: + case Action.snapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); - case Action.SnapshotFetchByTimestamp: + case Action.snapshotFetchByTimestamp: return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); - case Action.Presence: + case Action.presence: if (!this.backend.presenceEnabled) return; var presence = this._createPresence(request); if (presence.t && !util.supportsPresence(types.map[presence.t])) { @@ -438,10 +438,10 @@ Agent.prototype._handleMessage = function(request, callback) { }); } return this._broadcastPresence(presence, callback); - case Action.PresenceSubscribe: + case Action.presenceSubscribe: if (!this.backend.presenceEnabled) return; return this._subscribePresence(request.ch, request.seq, callback); - case Action.PresenceUnsubscribe: + case Action.presenceUnsubscribe: return this._unsubscribePresence(request.ch, request.seq, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); @@ -770,7 +770,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) { Agent.prototype._createPresence = function(request) { return { - a: Action.Presence, + a: Action.presence, ch: request.ch, src: this._src(), id: request.id, // Presence ID, not Doc ID (which is 'd') @@ -822,7 +822,7 @@ Agent.prototype._requestPresence = function(channel, callback) { Agent.prototype._handlePresenceData = function(presence) { if (presence.src === this._src()) return; - if (presence.r) return this.send({a: Action.PresenceRequest, ch: presence.ch}); + if (presence.r) return this.send({a: Action.presenceRequest, ch: presence.ch}); var backend = this.backend; var context = { @@ -833,7 +833,7 @@ Agent.prototype._handlePresenceData = function(presence) { backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { if (error) { if (backend.doNotForwardSendPresenceErrorsToClient) backend.errorHandler(error, {agent: agent}); - else agent.send({a: Action.Presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + else agent.send({a: Action.presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); return; } agent.send(presence); diff --git a/lib/message-actions.js b/lib/message-actions.js new file mode 100644 index 000000000..5e2b59517 --- /dev/null +++ b/lib/message-actions.js @@ -0,0 +1,21 @@ +exports.ACTIONS = { + initLegacy: 'init', + handshake: 'hs', + queryFetch: 'qf', + querySubscribe: 'qs', + queryUnsubscribe: 'qu', + queryReply: 'q', + bulkFetch: 'bf', + bulkSubscribe: 'bs', + bulkUnsubscribe: 'bu', + fetch: 'f', + subscribe: 's', + unsubscribe: 'u', + op: 'op', + snapshotFetch: 'nf', + snapshotFetchByTimestamp: 'nt', + presence: 'p', + presenceSubscribe: 'ps', + presenceUnsubscribe: 'pu', + presenceRequest: 'pr' +}; diff --git a/lib/shared/message-actions.js b/lib/shared/message-actions.js deleted file mode 100644 index 33beec7ec..000000000 --- a/lib/shared/message-actions.js +++ /dev/null @@ -1,21 +0,0 @@ -exports.Action = { - InitLegacy: 'init', - Handshake: 'hs', - QueryFetch: 'qf', - QuerySubscribe: 'qs', - QueryUnsubscribe: 'qu', - QueryReply: 'q', - BulkFetch: 'bf', - BulkSubscribe: 'bs', - BulkUnsubscribe: 'bu', - Fetch: 'f', - Subscribe: 's', - Unsubscribe: 'u', - Op: 'op', - SnapshotFetch: 'nf', - SnapshotFetchByTimestamp: 'nt', - Presence: 'p', - PresenceSubscribe: 'ps', - PresenceUnsubscribe: 'pu', - PresenceRequest: 'pr' -}; From c4e82d6d5eaad77fac26942af81dd169c012449d Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Tue, 9 Aug 2022 09:23:39 -0700 Subject: [PATCH 3/6] Update action refactoring missed in automated renames --- lib/agent.js | 72 ++++++++++++++++++++-------------------- lib/client/connection.js | 56 +++++++++++++++---------------- 2 files changed, 64 insertions(+), 64 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 4ebd846f2..d72c84aba 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,7 +1,7 @@ var hat = require('hat'); var ShareDBError = require('./error'); var logger = require('./logger'); -var Action = require('./message-actions').ACTIONS; +var ACTIONS = require('./message-actions').ACTIONS; var types = require('./types'); var util = require('./util'); @@ -59,7 +59,7 @@ function Agent(backend, stream) { this.custom = {}; // Send the legacy message to initialize old clients with the random agent Id - this.send(this._initMessage(Action.initLegacy)); + this.send(this._initMessage(ACTIONS.initLegacy)); } module.exports = Agent; @@ -175,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query var agent = this; emitter.onExtra = function(extra) { - agent.send({a: Action.queryReply, id: queryId, extra: extra}); + agent.send({a: ACTIONS.queryReply, id: queryId, extra: extra}); }; emitter.onDiff = function(diff) { @@ -187,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query } // Consider stripping the collection out of the data we send here // if it matches the query's collection. - agent.send({a: Action.queryReply, id: queryId, diff: diff}); + agent.send({a: ACTIONS.queryReply, id: queryId, diff: diff}); }; emitter.onError = function(err) { @@ -251,7 +251,7 @@ Agent.prototype.send = function(message) { Agent.prototype._sendOp = function(collection, id, op) { var message = { - a: Action.op, + a: ACTIONS.op, c: collection, d: id, v: op.v, @@ -355,29 +355,29 @@ Agent.prototype._open = function() { // Check a request to see if its valid. Returns an error if there's a problem. Agent.prototype._checkRequest = function(request) { - if (request.a === Action.queryFetch || request.a === Action.querySubscribe || request.a === Action.queryUnsubscribe) { + if (request.a === ACTIONS.queryFetch || request.a === ACTIONS.querySubscribe || request.a === ACTIONS.queryUnsubscribe) { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; - } else if (request.a === Action.op || - request.a === Action.fetch || - request.a === Action.subscribe || - request.a === Action.unsubscribe || - request.a === Action.presence) { + } else if (request.a === ACTIONS.op || + request.a === ACTIONS.fetch || + request.a === ACTIONS.subscribe || + request.a === ACTIONS.unsubscribe || + request.a === ACTIONS.presence) { // Doc-based request. if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (request.d != null && typeof request.d !== 'string') return 'Invalid id'; - if (request.a === Action.op || request.a === Action.presence) { + if (request.a === ACTIONS.op || request.a === ACTIONS.presence) { if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version'; } - if (request.a === Action.presence) { + if (request.a === ACTIONS.presence) { if (typeof request.id !== 'string') return 'Missing presence ID'; } } else if ( - request.a === Action.bulkFetch || - request.a === Action.bulkSubscribe || - request.a === Action.bulkUnsubscribe + request.a === ACTIONS.bulkFetch || + request.a === ACTIONS.bulkSubscribe || + request.a === ACTIONS.bulkUnsubscribe ) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; @@ -392,28 +392,28 @@ Agent.prototype._handleMessage = function(request, callback) { if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage)); switch (request.a) { - case Action.handshake: + case ACTIONS.handshake: if (request.id) this.src = request.id; - return callback(null, this._initMessage(Action.handshake)); - case Action.queryFetch: + return callback(null, this._initMessage(ACTIONS.handshake)); + case ACTIONS.queryFetch: return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback); - case Action.querySubscribe: + case ACTIONS.querySubscribe: return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback); - case Action.queryUnsubscribe: + case ACTIONS.queryUnsubscribe: return this._queryUnsubscribe(request.id, callback); - case Action.bulkFetch: + case ACTIONS.bulkFetch: return this._fetchBulk(request.c, request.b, callback); - case Action.bulkSubscribe: + case ACTIONS.bulkSubscribe: return this._subscribeBulk(request.c, request.b, callback); - case Action.bulkUnsubscribe: + case ACTIONS.bulkUnsubscribe: return this._unsubscribeBulk(request.c, request.b, callback); - case Action.fetch: + case ACTIONS.fetch: return this._fetch(request.c, request.d, request.v, callback); - case Action.subscribe: + case ACTIONS.subscribe: return this._subscribe(request.c, request.d, request.v, callback); - case Action.unsubscribe: + case ACTIONS.unsubscribe: return this._unsubscribe(request.c, request.d, callback); - case Action.op: + case ACTIONS.op: // Normalize the properties submitted var op = createClientOp(request, this._src()); if (op.seq >= util.MAX_SAFE_INTEGER) { @@ -424,11 +424,11 @@ Agent.prototype._handleMessage = function(request, callback) { } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); return this._submit(request.c, request.d, op, callback); - case Action.snapshotFetch: + case ACTIONS.snapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); - case Action.snapshotFetchByTimestamp: + case ACTIONS.snapshotFetchByTimestamp: return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); - case Action.presence: + case ACTIONS.presence: if (!this.backend.presenceEnabled) return; var presence = this._createPresence(request); if (presence.t && !util.supportsPresence(types.map[presence.t])) { @@ -438,10 +438,10 @@ Agent.prototype._handleMessage = function(request, callback) { }); } return this._broadcastPresence(presence, callback); - case Action.presenceSubscribe: + case ACTIONS.presenceSubscribe: if (!this.backend.presenceEnabled) return; return this._subscribePresence(request.ch, request.seq, callback); - case Action.presenceUnsubscribe: + case ACTIONS.presenceUnsubscribe: return this._unsubscribePresence(request.ch, request.seq, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); @@ -770,7 +770,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) { Agent.prototype._createPresence = function(request) { return { - a: Action.presence, + a: ACTIONS.presence, ch: request.ch, src: this._src(), id: request.id, // Presence ID, not Doc ID (which is 'd') @@ -822,7 +822,7 @@ Agent.prototype._requestPresence = function(channel, callback) { Agent.prototype._handlePresenceData = function(presence) { if (presence.src === this._src()) return; - if (presence.r) return this.send({a: Action.presenceRequest, ch: presence.ch}); + if (presence.r) return this.send({a: ACTIONS.presenceRequest, ch: presence.ch}); var backend = this.backend; var context = { @@ -833,7 +833,7 @@ Agent.prototype._handlePresenceData = function(presence) { backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { if (error) { if (backend.doNotForwardSendPresenceErrorsToClient) backend.errorHandler(error, {agent: agent}); - else agent.send({a: Action.presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + else agent.send({a: ACTIONS.presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); return; } agent.send(presence); diff --git a/lib/client/connection.js b/lib/client/connection.js index 1b551af12..592124bec 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -6,7 +6,7 @@ var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-reques var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request'); var emitter = require('../emitter'); var ShareDBError = require('../error'); -var Action = require('../shared/message-actions').Action; +var ACTIONS = require('../message-actions').ACTIONS; var types = require('../types'); var util = require('../util'); var logger = require('../logger'); @@ -195,25 +195,25 @@ Connection.prototype.handleMessage = function(message) { // Switch on the message action. Most messages are for documents and are // handled in the doc class. switch (message.a) { - case Action.InitLegacy: + case ACTIONS.initLegacy: // Client initialization packet return this._handleLegacyInit(message); - case Action.Handshake: + case ACTIONS.handshake: return this._handleHandshake(err, message); - case Action.QueryFetch: + case ACTIONS.queryFetch: var query = this.queries[message.id]; if (query) query._handleFetch(err, message.data, message.extra); return; - case Action.QuerySubscribe: + case ACTIONS.querySubscribe: var query = this.queries[message.id]; if (query) query._handleSubscribe(err, message.data, message.extra); return; - case Action.QueryUnsubscribe: + case ACTIONS.queryUnsubscribe: // Queries are removed immediately on calls to destroy, so we ignore // replies to query unsubscribes. Perhaps there should be a callback for // destroy, but this is currently unimplemented return; - case Action.QueryReply: + case ACTIONS.queryReply: // Query message. Pass this to the appropriate query object. var query = this.queries[message.id]; if (!query) return; @@ -222,36 +222,36 @@ Connection.prototype.handleMessage = function(message) { if (message.hasOwnProperty('extra')) query._handleExtra(message.extra); return; - case Action.BulkFetch: + case ACTIONS.bulkFetch: return this._handleBulkMessage(err, message, '_handleFetch'); - case Action.BulkSubscribe: - case Action.BulkUnsubscribe: + case ACTIONS.bulkSubscribe: + case ACTIONS.bulkUnsubscribe: return this._handleBulkMessage(err, message, '_handleSubscribe'); - case Action.SnapshotFetch: - case Action.SnapshotFetchByTimestamp: + case ACTIONS.snapshotFetch: + case ACTIONS.snapshotFetchByTimestamp: return this._handleSnapshotFetch(err, message); - case Action.Fetch: + case ACTIONS.fetch: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleFetch(err, message.data); return; - case Action.Subscribe: - case Action.Unsubscribe: + case ACTIONS.subscribe: + case ACTIONS.unsubscribe: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleSubscribe(err, message.data); return; - case Action.Op: + case ACTIONS.op: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; - case Action.Presence: + case ACTIONS.presence: return this._handlePresence(err, message); - case Action.PresenceSubscribe: + case ACTIONS.presenceSubscribe: return this._handlePresenceSubscribe(err, message); - case Action.PresenceUnsubscribe: + case ACTIONS.presenceUnsubscribe: return this._handlePresenceUnsubscribe(err, message); - case Action.PresenceRequest: + case ACTIONS.presenceRequest: return this._handlePresenceRequest(err, message); default: @@ -417,7 +417,7 @@ Connection.prototype._sendBulk = function(action, collection, values) { } }; -Connection.prototype._sendAction = function(action, doc, version) { +Connection.prototype._sendActions = function(action, doc, version) { // Ensure the doc is registered so that it receives the reply message this._addDoc(doc); if (this.bulk) { @@ -435,22 +435,22 @@ Connection.prototype._sendAction = function(action, doc, version) { }; Connection.prototype.sendFetch = function(doc) { - return this._sendAction(Action.Fetch, doc, doc.version); + return this._sendActions(ACTIONS.fetch, doc, doc.version); }; Connection.prototype.sendSubscribe = function(doc) { - return this._sendAction(Action.Subscribe, doc, doc.version); + return this._sendActions(ACTIONS.subscribe, doc, doc.version); }; Connection.prototype.sendUnsubscribe = function(doc) { - return this._sendAction(Action.Unsubscribe, doc); + return this._sendActions(ACTIONS.unsubscribe, doc); }; Connection.prototype.sendOp = function(doc, op) { // Ensure the doc is registered so that it receives the reply message this._addDoc(doc); var message = { - a: Action.Op, + a: ACTIONS.op, c: doc.collection, d: doc.id, v: doc.version, @@ -554,7 +554,7 @@ Connection.prototype._destroyQuery = function(query) { // The callback should have the signature function(error, results, extra) // where results is a list of Doc objects. Connection.prototype.createFetchQuery = function(collection, q, options, callback) { - return this._createQuery(Action.QueryFetch, collection, q, options, callback); + return this._createQuery(ACTIONS.queryFetch, collection, q, options, callback); }; // Create a subscribe query. Subscribe queries return with the initial data @@ -564,7 +564,7 @@ Connection.prototype.createFetchQuery = function(collection, q, options, callbac // If present, the callback should have the signature function(error, results, extra) // where results is a list of Doc objects. Connection.prototype.createSubscribeQuery = function(collection, q, options, callback) { - return this._createQuery(Action.QuerySubscribe, collection, q, options, callback); + return this._createQuery(ACTIONS.querySubscribe, collection, q, options, callback); }; Connection.prototype.hasPending = function() { @@ -717,7 +717,7 @@ Connection.prototype._handleLegacyInit = function(message) { }; Connection.prototype._initializeHandshake = function() { - this.send({a: Action.Handshake, id: this.id}); + this.send({a: ACTIONS.handshake, id: this.id}); }; Connection.prototype._handleHandshake = function(error, message) { From 8879b28e1a6a8f314e1fd375b1fc587b31baaa4a Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Tue, 9 Aug 2022 09:28:07 -0700 Subject: [PATCH 4/6] Rename queryReply to queryUpdate, use shared action object from query.js --- lib/agent.js | 4 ++-- lib/client/connection.js | 2 +- lib/client/query.js | 5 +++-- lib/message-actions.js | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index d72c84aba..339b969ab 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -175,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query var agent = this; emitter.onExtra = function(extra) { - agent.send({a: ACTIONS.queryReply, id: queryId, extra: extra}); + agent.send({a: ACTIONS.queryUpdate, id: queryId, extra: extra}); }; emitter.onDiff = function(diff) { @@ -187,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query } // Consider stripping the collection out of the data we send here // if it matches the query's collection. - agent.send({a: ACTIONS.queryReply, id: queryId, diff: diff}); + agent.send({a: ACTIONS.queryUpdate, id: queryId, diff: diff}); }; emitter.onError = function(err) { diff --git a/lib/client/connection.js b/lib/client/connection.js index 592124bec..8e3209d2e 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -213,7 +213,7 @@ Connection.prototype.handleMessage = function(message) { // replies to query unsubscribes. Perhaps there should be a callback for // destroy, but this is currently unimplemented return; - case ACTIONS.queryReply: + case ACTIONS.queryUpdate: // Query message. Pass this to the appropriate query object. var query = this.queries[message.id]; if (!query) return; diff --git a/lib/client/query.js b/lib/client/query.js index 5815e977f..a8665dc8a 100644 --- a/lib/client/query.js +++ b/lib/client/query.js @@ -1,4 +1,5 @@ var emitter = require('../emitter'); +var ACTIONS = require('../message-actions').ACTIONS; var util = require('../util'); // Queries are live requests to the database for particular sets of fields. @@ -75,8 +76,8 @@ Query.prototype.send = function() { // Destroy the query object. Any subsequent messages for the query will be // ignored by the connection. Query.prototype.destroy = function(callback) { - if (this.connection.canSend && this.action === 'qs') { - this.connection.send({a: 'qu', id: this.id}); + if (this.connection.canSend && this.action === ACTIONS.querySubscribe) { + this.connection.send({a: ACTIONS.queryUnsubscribe, id: this.id}); } this.connection._destroyQuery(this); // There is a callback for consistency, but we don't actually wait for the diff --git a/lib/message-actions.js b/lib/message-actions.js index 5e2b59517..ad2b7536b 100644 --- a/lib/message-actions.js +++ b/lib/message-actions.js @@ -4,7 +4,7 @@ exports.ACTIONS = { queryFetch: 'qf', querySubscribe: 'qs', queryUnsubscribe: 'qu', - queryReply: 'q', + queryUpdate: 'q', bulkFetch: 'bf', bulkSubscribe: 'bs', bulkUnsubscribe: 'bu', From 55fa1f3e63e415e83484ccfcaa68a05400232f58 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Tue, 9 Aug 2022 17:31:37 +0100 Subject: [PATCH 5/6] Catch missed actions --- lib/agent.js | 6 +++++- lib/client/presence/local-presence.js | 3 ++- lib/client/presence/presence.js | 3 ++- lib/client/snapshot-request/snapshot-timestamp-request.js | 3 ++- lib/client/snapshot-request/snapshot-version-request.js | 3 ++- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 339b969ab..ce2e43a46 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -355,7 +355,11 @@ Agent.prototype._open = function() { // Check a request to see if its valid. Returns an error if there's a problem. Agent.prototype._checkRequest = function(request) { - if (request.a === ACTIONS.queryFetch || request.a === ACTIONS.querySubscribe || request.a === ACTIONS.queryUnsubscribe) { + if ( + request.a === ACTIONS.queryFetch || + request.a === ACTIONS.querySubscribe || + request.a === ACTIONS.queryUnsubscribe + ) { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; } else if (request.a === ACTIONS.op || diff --git a/lib/client/presence/local-presence.js b/lib/client/presence/local-presence.js index b9468d045..204007617 100644 --- a/lib/client/presence/local-presence.js +++ b/lib/client/presence/local-presence.js @@ -1,4 +1,5 @@ var emitter = require('../../emitter'); +var ACTIONS = require('../../message-actions').ACTIONS; var util = require('../../util'); module.exports = LocalPresence; @@ -59,7 +60,7 @@ LocalPresence.prototype._ack = function(error, presenceVersion) { LocalPresence.prototype._message = function() { return { - a: 'p', + a: ACTIONS.presence, ch: this.presence.channel, id: this.presenceId, p: this.value, diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js index c0227f93e..107355e34 100644 --- a/lib/client/presence/presence.js +++ b/lib/client/presence/presence.js @@ -4,6 +4,7 @@ var RemotePresence = require('./remote-presence'); var util = require('../../util'); var async = require('async'); var hat = require('hat'); +var ACTIONS = require('../../message-actions').ACTIONS; module.exports = Presence; function Presence(connection, channel) { @@ -70,7 +71,7 @@ Presence.prototype.destroy = function(callback) { Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) { this.wantSubscribe = !!wantSubscribe; - var action = this.wantSubscribe ? 'ps' : 'pu'; + var action = this.wantSubscribe ? ACTIONS.presenceSubscribe : ACTIONS.presenceUnsubscribe; var seq = this.connection._presenceSeq++; this._subscriptionCallbacksBySeq[seq] = callback; if (this.connection.canSend) { diff --git a/lib/client/snapshot-request/snapshot-timestamp-request.js b/lib/client/snapshot-request/snapshot-timestamp-request.js index 15789137b..8d4cec70d 100644 --- a/lib/client/snapshot-request/snapshot-timestamp-request.js +++ b/lib/client/snapshot-request/snapshot-timestamp-request.js @@ -1,5 +1,6 @@ var SnapshotRequest = require('./snapshot-request'); var util = require('../../util'); +var ACTIONS = require('../../message-actions').ACTIONS; module.exports = SnapshotTimestampRequest; @@ -17,7 +18,7 @@ SnapshotTimestampRequest.prototype = Object.create(SnapshotRequest.prototype); SnapshotTimestampRequest.prototype._message = function() { return { - a: 'nt', + a: ACTIONS.snapshotFetchByTimestamp, id: this.requestId, c: this.collection, d: this.id, diff --git a/lib/client/snapshot-request/snapshot-version-request.js b/lib/client/snapshot-request/snapshot-version-request.js index d352a676a..369bf82ee 100644 --- a/lib/client/snapshot-request/snapshot-version-request.js +++ b/lib/client/snapshot-request/snapshot-version-request.js @@ -1,5 +1,6 @@ var SnapshotRequest = require('./snapshot-request'); var util = require('../../util'); +var ACTIONS = require('../../message-actions').ACTIONS; module.exports = SnapshotVersionRequest; @@ -17,7 +18,7 @@ SnapshotVersionRequest.prototype = Object.create(SnapshotRequest.prototype); SnapshotVersionRequest.prototype._message = function() { return { - a: 'nf', + a: ACTIONS.snapshotFetch, id: this.requestId, c: this.collection, d: this.id, From bbeaeb9239e34d9185df05e561f62e2464dddc41 Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Tue, 9 Aug 2022 09:39:31 -0700 Subject: [PATCH 6/6] Add top-level export for MESSAGE_ACTIONS --- lib/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/index.js b/lib/index.js index df4b3f0a6..46fd974c7 100644 --- a/lib/index.js +++ b/lib/index.js @@ -9,6 +9,7 @@ Backend.logger = require('./logger'); Backend.MemoryDB = require('./db/memory'); Backend.MemoryMilestoneDB = require('./milestone-db/memory'); Backend.MemoryPubSub = require('./pubsub/memory'); +Backend.MESSAGE_ACTIONS = require('./message-actions').ACTIONS; Backend.MilestoneDB = require('./milestone-db'); Backend.ot = require('./ot'); Backend.projections = require('./projections');