Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk transactions #690

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var ACTIONS = require('./message-actions').ACTIONS;
var types = require('./types');
var util = require('./util');
var protocol = require('./protocol');
const Transaction = require('./transaction/transaction');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -70,6 +71,8 @@ function Agent(backend, stream) {
this._firstReceivedMessage = null;
this._handshakeReceived = false;

this._transactions = Object.create(null);

// Send the legacy message to initialize old clients with the random agent Id
this.send(this._initMessage(ACTIONS.initLegacy));
}
Expand Down Expand Up @@ -461,16 +464,7 @@ Agent.prototype._handleMessage = function(request, callback) {
case ACTIONS.unsubscribe:
return this._unsubscribe(request.c, request.d, callback);
case ACTIONS.op:
// Normalize the properties submitted
var op = createClientOp(request, this._src());
if (op.seq >= util.MAX_SAFE_INTEGER) {
return callback(new ShareDBError(
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
));
}
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));
return this._submit(request.c, request.d, op, callback);
return this._submit(request, callback);
case ACTIONS.snapshotFetch:
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case ACTIONS.snapshotFetchByTimestamp:
Expand All @@ -494,6 +488,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._requestPresence(request.ch, callback);
case ACTIONS.pingPong:
return this._pingPong(callback);
case ACTIONS.transactionCommit:
return this._commitTransaction(request, callback);
default:
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
}
Expand Down Expand Up @@ -761,9 +757,24 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) {
util.nextTick(callback);
};

Agent.prototype._submit = function(collection, id, op, callback) {
Agent.prototype._submit = function(request, callback) {
// Normalize the properties submitted
var op = createClientOp(request, this._src());
if (op.seq >= util.MAX_SAFE_INTEGER) {
return callback(new ShareDBError(
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
));
}
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));

var collection = request.c;
var id = request.d;
var options = {};
if (request.t) options.transaction = request.t;

var agent = this;
this.backend.submit(this, collection, id, op, null, function(err, ops, request) {
this.backend.submit(this, collection, id, op, options, function(err, ops, request) {
// Message to acknowledge the op was successfully submitted
var ack = {src: op.src, seq: op.seq, v: op.v};
if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps;
Expand Down Expand Up @@ -982,6 +993,12 @@ Agent.prototype._setProtocol = function(request) {
this.protocol.minor = request.protocolMinor;
};

Agent.prototype._commitTransaction = function(request, callback) {
var transaction = new Transaction(this, request.id, request.o);
this._transactions[transaction.id] = transaction;
transaction.submit(callback);
};

function createClientOp(request, clientId) {
// src can be provided if it is not the same as the current agent,
// such as a resubmission after a reconnect, but it usually isn't needed
Expand Down
46 changes: 44 additions & 2 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var util = require('../util');
var logger = require('../logger');
var DocPresenceEmitter = require('./presence/doc-presence-emitter');
var protocol = require('../protocol');
var Transaction = require('./transaction');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -64,6 +65,8 @@ function Connection(socket) {
// A unique message number for presence
this._presenceSeq = 1;

this._transactions = Object.create(null);

// Equals agent.src on the server
this.id = null;

Expand Down Expand Up @@ -258,6 +261,8 @@ Connection.prototype.handleMessage = function(message) {
return this._handlePresenceRequest(err, message);
case ACTIONS.pingPong:
return this._handlePingPong(err);
case ACTIONS.transactionCommit:
return this._handleTransactionCommit(err, message);

default:
logger.warn('Ignoring unrecognized message', message);
Expand Down Expand Up @@ -454,6 +459,29 @@ Connection.prototype.sendUnsubscribe = function(doc) {
Connection.prototype.sendOp = function(doc, op) {
// Ensure the doc is registered so that it receives the reply message
this._addDoc(doc);
var message = this._opMessage(doc, op);
this.send(message);
};

Connection.prototype._opMessage = function(doc, op) {
// The src + seq number is a unique ID representing this operation. This tuple
// is used on the server to detect when ops have been sent multiple times and
// on the client to match acknowledgement of an op back to the inflightOp.
// Note that the src could be different from this.connection.id after a
// reconnect, since an op may still be pending after the reconnection and
// this.connection.id will change. In case an op is sent multiple times, we
// also need to be careful not to override the original seq value.
if (op.seq == null) {
if (this.seq >= util.MAX_SAFE_INTEGER) {
return doc.emit('error', new ShareDBError(
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
));
}

op.seq = this.seq++;
}

var message = {
a: ACTIONS.op,
c: doc.collection,
Expand All @@ -463,14 +491,16 @@ Connection.prototype.sendOp = function(doc, op) {
seq: op.seq,
x: {}
};

if ('op' in op) message.op = op.op;
if (op.create) message.create = op.create;
if (op.del) message.del = op.del;

if (doc.submitSource) message.x.source = op.source;
this.send(message);
if (op.transaction) message.t = op.transaction;
return message;
};


/**
* Sends a message down the socket
*/
Expand Down Expand Up @@ -782,6 +812,18 @@ Connection.prototype._initialize = function(message) {
this._setState('connected');
};

Connection.prototype.startTransaction = function() {
var transaction = new Transaction(this);
return this._transactions[transaction.id] = transaction;
};

Connection.prototype._handleTransactionCommit = function(error, message) {
var transaction = this._transactions[message.id];
if (!transaction) return;
transaction._handleCommit(error, message);
delete this._transactions[message.id];
};

Connection.prototype.getPresence = function(channel) {
var connection = this;
var presence = util.digOrCreate(this._presences, channel, function() {
Expand Down
Loading
Loading