From 92c5169c0fd9d873083544c37ace1620dcb78c59 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:27:18 +0000 Subject: [PATCH 1/7] wip --- lib/agent.js | 41 ++++-- lib/client/connection.js | 46 ++++++- lib/client/doc.js | 53 +++++--- lib/client/transaction.js | 60 +++++++++ lib/db/memory.js | 46 +++++-- lib/message-actions.js | 3 +- lib/submit-request.js | 56 ++++---- lib/transaction.js | 0 lib/transaction/transaction.js | 66 ++++++++++ lib/util.js | 8 +- test/client/transaction.js | 228 +++++++++++++++++++++++++++++++++ test/db-memory.js | 3 +- test/db.js | 4 + 13 files changed, 542 insertions(+), 72 deletions(-) create mode 100644 lib/client/transaction.js create mode 100644 lib/transaction.js create mode 100644 lib/transaction/transaction.js create mode 100644 test/client/transaction.js diff --git a/lib/agent.js b/lib/agent.js index a07650c8a..699f2e8d7 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -5,6 +5,7 @@ var ACTIONS = require('./message-actions').ACTIONS; var types = require('./types'); var util = require('./util'); var protocol = require('./protocol'); +const Transaction = require('./transaction/transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -70,6 +71,8 @@ function Agent(backend, stream) { this._firstReceivedMessage = null; this._handshakeReceived = false; + this._transactions = Object.create(null); + // Send the legacy message to initialize old clients with the random agent Id this.send(this._initMessage(ACTIONS.initLegacy)); } @@ -461,16 +464,7 @@ Agent.prototype._handleMessage = function(request, callback) { case ACTIONS.unsubscribe: return this._unsubscribe(request.c, request.d, callback); case ACTIONS.op: - // Normalize the properties submitted - var op = createClientOp(request, this._src()); - if (op.seq >= util.MAX_SAFE_INTEGER) { - return callback(new ShareDBError( - ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, - 'Connection seq has exceeded the max safe integer, maybe from being open for too long' - )); - } - if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); - return this._submit(request.c, request.d, op, callback); + return this._submit(request, callback); case ACTIONS.snapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); case ACTIONS.snapshotFetchByTimestamp: @@ -494,6 +488,8 @@ Agent.prototype._handleMessage = function(request, callback) { return this._requestPresence(request.ch, callback); case ACTIONS.pingPong: return this._pingPong(callback); + case ACTIONS.transactionCommit: + return this._commitTransaction(request, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); } @@ -761,9 +757,24 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { util.nextTick(callback); }; -Agent.prototype._submit = function(collection, id, op, callback) { +Agent.prototype._submit = function(request, callback) { + // Normalize the properties submitted + var op = createClientOp(request, this._src()); + if (op.seq >= util.MAX_SAFE_INTEGER) { + return callback(new ShareDBError( + ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, + 'Connection seq has exceeded the max safe integer, maybe from being open for too long' + )); + } + if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); + + var collection = request.c; + var id = request.d; + var options = {}; + if (request.t) options.transaction = request.t; + var agent = this; - this.backend.submit(this, collection, id, op, null, function(err, ops, request) { + this.backend.submit(this, collection, id, op, options, function(err, ops, request) { // Message to acknowledge the op was successfully submitted var ack = {src: op.src, seq: op.seq, v: op.v}; if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps; @@ -982,6 +993,12 @@ Agent.prototype._setProtocol = function(request) { this.protocol.minor = request.protocolMinor; }; +Agent.prototype._commitTransaction = function(request, callback) { + var transaction = new Transaction(this, request.id, request.o); + this._transactions[transaction.id] = transaction; + transaction.submit(callback); +}; + function createClientOp(request, clientId) { // src can be provided if it is not the same as the current agent, // such as a resubmission after a reconnect, but it usually isn't needed diff --git a/lib/client/connection.js b/lib/client/connection.js index da89dfef5..b66f58eca 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -12,6 +12,7 @@ var util = require('../util'); var logger = require('../logger'); var DocPresenceEmitter = require('./presence/doc-presence-emitter'); var protocol = require('../protocol'); +var Transaction = require('./transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -64,6 +65,8 @@ function Connection(socket) { // A unique message number for presence this._presenceSeq = 1; + this._transactions = Object.create(null); + // Equals agent.src on the server this.id = null; @@ -258,6 +261,8 @@ Connection.prototype.handleMessage = function(message) { return this._handlePresenceRequest(err, message); case ACTIONS.pingPong: return this._handlePingPong(err); + case ACTIONS.transactionCommit: + return this._handleTransactionCommit(err, message); default: logger.warn('Ignoring unrecognized message', message); @@ -454,6 +459,29 @@ Connection.prototype.sendUnsubscribe = function(doc) { Connection.prototype.sendOp = function(doc, op) { // Ensure the doc is registered so that it receives the reply message this._addDoc(doc); + var message = this._opMessage(doc, op); + this.send(message); +}; + +Connection.prototype._opMessage = function(doc, op) { + // The src + seq number is a unique ID representing this operation. This tuple + // is used on the server to detect when ops have been sent multiple times and + // on the client to match acknowledgement of an op back to the inflightOp. + // Note that the src could be different from this.connection.id after a + // reconnect, since an op may still be pending after the reconnection and + // this.connection.id will change. In case an op is sent multiple times, we + // also need to be careful not to override the original seq value. + if (op.seq == null) { + if (this.seq >= util.MAX_SAFE_INTEGER) { + return doc.emit('error', new ShareDBError( + ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, + 'Connection seq has exceeded the max safe integer, maybe from being open for too long' + )); + } + + op.seq = this.seq++; + } + var message = { a: ACTIONS.op, c: doc.collection, @@ -463,14 +491,16 @@ Connection.prototype.sendOp = function(doc, op) { seq: op.seq, x: {} }; + if ('op' in op) message.op = op.op; if (op.create) message.create = op.create; if (op.del) message.del = op.del; + if (doc.submitSource) message.x.source = op.source; - this.send(message); + if (op.transaction) message.t = op.transaction; + return message; }; - /** * Sends a message down the socket */ @@ -782,6 +812,18 @@ Connection.prototype._initialize = function(message) { this._setState('connected'); }; +Connection.prototype.startTransaction = function() { + var transaction = new Transaction(this); + return this._transactions[transaction.id] = transaction; +}; + +Connection.prototype._handleTransactionCommit = function(error, message) { + var transaction = this._transactions[message.id]; + if (!transaction) return; + transaction._handleCommit(error, message); + delete this._transactions[message.id]; +}; + Connection.prototype.getPresence = function(channel) { var connection = this; var presence = util.digOrCreate(this._presences, channel, function() { diff --git a/lib/client/doc.js b/lib/client/doc.js index acb457374..48ea9697e 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -121,6 +121,8 @@ function Doc(connection, collection, id) { // Internal counter that gets incremented every time doc.data is updated. // Used as a cheap way to check if doc.data has changed. this._dataStateVersion = 0; + + this._transaction = null; } emitter.mixin(Doc); @@ -519,6 +521,10 @@ Doc.prototype.flush = function() { // Ignore if we can't send or we are already sending an op if (!this.connection.canSend || this.inflightOp) return; + // TODO: Transactions should wait for all docs to only have transaction ops left in pendingOps + // Transactions will be flushed as one call separately + if (this._transaction) return; + // Send first pending op unless paused if (!this.paused && this.pendingOps.length) { this._sendOp(); @@ -704,24 +710,6 @@ Doc.prototype._sendOp = function() { op.sentAt = Date.now(); op.retries = (op.retries == null) ? 0 : op.retries + 1; - // The src + seq number is a unique ID representing this operation. This tuple - // is used on the server to detect when ops have been sent multiple times and - // on the client to match acknowledgement of an op back to the inflightOp. - // Note that the src could be different from this.connection.id after a - // reconnect, since an op may still be pending after the reconnection and - // this.connection.id will change. In case an op is sent multiple times, we - // also need to be careful not to override the original seq value. - if (op.seq == null) { - if (this.connection.seq >= util.MAX_SAFE_INTEGER) { - return this.emit('error', new ShareDBError( - ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, - 'Connection seq has exceeded the max safe integer, maybe from being open for too long' - )); - } - - op.seq = this.connection.seq++; - } - this.connection.sendOp(this, op); // src isn't needed on the first try, since the server session will have the @@ -759,10 +747,14 @@ Doc.prototype._submit = function(op, source, callback) { if (this.type.normalize) op.op = this.type.normalize(op.op); } + var transactionError = this._setTransaction(op.transaction); + if (transactionError) return this._callbackOrEmitError(error); + try { this._pushOp(op, source, callback); this._otApply(op, source); } catch (error) { + // TODO: Transactions should abort and rollback all docs in transaction return this._hardRollback(error); } @@ -865,8 +857,10 @@ Doc.prototype.submitOp = function(component, options, callback) { callback = options; options = null; } + options = options || {}; var op = {op: component}; - var source = options && options.source; + if (options.transaction) op.transaction = this._normalizeTransactionId(options.transaction); + var source = options.source; this._submit(op, source, callback); }; @@ -1106,4 +1100,25 @@ Doc.prototype._clearInflightOp = function(err) { if (err && !called) return this.emit('error', err); }; +Doc.prototype._setTransaction = function(transactionId) { + if (transactionId == util.dig(this, '_transaction', 'id')) return; + + if (this._transaction) { + return new Error('Transaction in progress. Must commit transaction before submitting any other ops'); + } + + var transaction = this.connection._transactions[transactionId]; + if (!transaction) throw new Error('Transaction not found'); + transaction._registerDoc(this); +}; + +Doc.prototype._normalizeTransactionId = function(transaction) { + if (!transaction) return null; + if (typeof transaction === 'string') return transaction; + return transaction.id; +}; +Doc.prototype._callbackOrEmitError = function(error, callback) { + if (callback) return util.nextTick(callback, error); + this.emit('error', error); +}; diff --git a/lib/client/transaction.js b/lib/client/transaction.js new file mode 100644 index 000000000..0ba8e63dc --- /dev/null +++ b/lib/client/transaction.js @@ -0,0 +1,60 @@ +var emitter = require('../emitter'); +const {ACTIONS} = require('../message-actions'); + +var idCounter = 1; + +module.exports = Transaction; +function Transaction(connection) { + emitter.EventEmitter.call(this); + + // TODO: UUIDs? + this.id = (idCounter++).toString(); + + this._connection = connection; + this._callback = null; + this._docs = Object.create(null); +} +emitter.mixin(Transaction); + +Transaction.prototype.commit = function(callback) { + // TODO: Catch multiple calls + // TODO: Handle network changes + this._callback = callback; + this._connection.send({ + a: ACTIONS.transactionCommit, + id: this.id, + o: this._getOps() + }); +}; + +Transaction.prototype._handleCommit = function(error) { + // TODO: Trigger this._getOps() callbacks + // TODO: Should unset transaction on this._docs + // TODO: If error, should rollback docs + if (typeof this._callback === 'function') this._callback(error); + else if (error) this.emit('error', error); +}; + +Transaction.prototype._registerDoc = function(doc) { + console.log('register doc', doc.collection, doc.id); + var collection = this._docs[doc.collection] = this._docs[doc.collection] || Object.create(null); + collection[doc.id] = doc; + doc._transaction = this; +}; + +Transaction.prototype._getOps = function() { + var ops = []; + + for (var collection in this._docs) { + for (var id in this._docs[collection]) { + var doc = this._docs[collection][id]; + console.log(collection, id, !!doc); + for (var op of doc.pendingOps) { + if (op.transaction !== this.id) break; + ops.push(this._connection._opMessage(doc, op)); + } + } + } + + return ops; +}; diff --git a/lib/db/memory.js b/lib/db/memory.js index 2db274d6e..92725eefb 100644 --- a/lib/db/memory.js +++ b/lib/db/memory.js @@ -41,19 +41,43 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call var db = this; if (typeof callback !== 'function') throw new Error('Callback required'); util.nextTick(function() { - var version = db._getVersionSync(collection, id); - if (snapshot.v !== version + 1) { - var succeeded = false; - return callback(null, succeeded); + var result = db._commitSync(collection, id, op, snapshot); + callback(result.error, result.succeeded); + }); +}; + +MemoryDB.prototype.commitTransaction = function(commits, options, callback) { + // TODO: Replace with rfdc + var docs = JSON.stringify(this.docs); + var ops = JSON.stringify(this.ops); + + var error; + var succeeded = true; + for (var commit of commits) { + var result = this._commitSync(commit.collection, commit.id, commit.op, commit.snapshot); + succeeded = succeeded && result.succeeded; + error = result.error; + + if (!succeeded) { + this.docs = JSON.parse(docs); + this.ops = JSON.parse(ops); + break; } - var err = db._writeOpSync(collection, id, op); - if (err) return callback(err); - err = db._writeSnapshotSync(collection, id, snapshot); - if (err) return callback(err); + } - var succeeded = true; - callback(null, succeeded); - }); + callback(error, succeeded); +}; + +MemoryDB.prototype._commitSync = function(collection, id, op, snapshot) { + var version = this._getVersionSync(collection, id); + if (snapshot.v !== version + 1) { + return {error: null, succeeded: false}; + } + var error = this._writeOpSync(collection, id, op); + if (error) return {error: error, succeeded: false}; + error = this._writeSnapshotSync(collection, id, snapshot); + if (error) return {error: error, succeeded: false}; + return {error: null, succeeded: true}; }; // Get the named document from the database. The callback is called with (err, diff --git a/lib/message-actions.js b/lib/message-actions.js index e1a8e9942..aa2f51204 100644 --- a/lib/message-actions.js +++ b/lib/message-actions.js @@ -19,5 +19,6 @@ exports.ACTIONS = { presence: 'p', presenceSubscribe: 'ps', presenceUnsubscribe: 'pu', - presenceRequest: 'pr' + presenceRequest: 'pr', + transactionCommit: 'tc' }; diff --git a/lib/submit-request.js b/lib/submit-request.js index 8e6715123..cd91b1b5f 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -16,7 +16,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.collection = (projection) ? projection.target : index; this.id = id; this.op = op; - this.options = options; + this.options = options || {}; this.extra = op.x; delete op.x; @@ -43,6 +43,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.ops = []; this.channels = null; this._fixupOps = []; + this._transaction = agent._transactions[this.options.transaction]; } module.exports = SubmitRequest; @@ -217,6 +218,8 @@ SubmitRequest.prototype.commit = function(callback) { }; } + if (request._transaction) return request._transaction.ready(request, callback); + // Try committing the operation and snapshot to the database atomically backend.db.commit( request.collection, @@ -224,31 +227,38 @@ SubmitRequest.prototype.commit = function(callback) { request.op, request.snapshot, request.options, - function(err, succeeded) { - if (err) return callback(err); - if (!succeeded) { - // Between our fetch and our call to commit, another client committed an - // operation. We expect this to be relatively infrequent but normal. - return request.retry(callback); - } - if (!request.suppressPublish) { - var op = request.op; - op.c = request.collection; - op.d = request.id; - op.m = undefined; - // Needed for agent to detect if it can ignore sending the op back to - // the client that submitted it in subscriptions - if (request.collection !== request.index) op.i = request.index; - backend.pubsub.publish(request.channels, op); - } - if (request._shouldSaveMilestoneSnapshot(request.snapshot)) { - request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot); - } - callback(); - }); + request._handleCommitted(callback) + ); }); }; +SubmitRequest.prototype._handleCommitted = function(callback) { + var request = this; + var backend = this.backend; + return function(err, succeeded) { + if (err) return callback(err); + if (!succeeded) { + // Between our fetch and our call to commit, another client committed an + // operation. We expect this to be relatively infrequent but normal. + return request.retry(callback); + } + if (!request.suppressPublish) { + var op = request.op; + op.c = request.collection; + op.d = request.id; + op.m = undefined; + // Needed for agent to detect if it can ignore sending the op back to + // the client that submitted it in subscriptions + if (request.collection !== request.index) op.i = request.index; + backend.pubsub.publish(request.channels, op); + } + if (request._shouldSaveMilestoneSnapshot(request.snapshot)) { + request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot); + } + callback(); + } +}; + SubmitRequest.prototype.retry = function(callback) { this.retries++; if (this.maxRetries != null && this.retries > this.maxRetries) { diff --git a/lib/transaction.js b/lib/transaction.js new file mode 100644 index 000000000..e69de29bb diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js new file mode 100644 index 000000000..6c9044215 --- /dev/null +++ b/lib/transaction/transaction.js @@ -0,0 +1,66 @@ +var util = require('../util'); + +function Transaction(agent, id, ops) { + this.id = id; + + this._agent = agent; + this._backend = agent.backend; + + this._ops = ops; + this._readyRequests = []; + this._callbacks = []; +} +module.exports = Transaction; + +Transaction.prototype.submit = function(callback) { + var finished = false; + var finish = function(error) { + if (finished) return; + finished = true; + callback(error); + } + + var ops = this._ops; + var finishedCount = 0; + var opHandler = function(error) { + finishedCount++; + if (error) return finish(error); + console.log('op callback', finishedCount, error); + if (finishedCount !== ops.length) return; + finish(); + }; + + for (var op of ops) this._agent._submit(op, opHandler); +}; + +Transaction.prototype.ready = function(request, callback) { + // TODO: Clear these on retry + this._readyRequests.push(request); + this._callbacks.push(callback); + + if (!this._isReady()) return; + + var commits = Object.values(this._readyRequests).map(function(req) { + return { + collection: req.collection, + id: req.id, + op: req.op, + snapshot: req.snapshot, + options: req.options, + }; + }); + + var transaction = this; + var options = null; + this._backend.db.commitTransaction(commits, options, function(error, succeeded) { + if (error) return util.callEach(transaction._callbacks, error, succeeded); + if (!succeeded) { + // TODO: Retry + } + util.callEach(transaction._callbacks, null, succeeded); + }); +}; + +Transaction.prototype._isReady = function() { + return Object.keys(this._readyRequests).length === this._ops.length; +} diff --git a/lib/util.js b/lib/util.js index 0d613ed58..9f090eb8a 100644 --- a/lib/util.js +++ b/lib/util.js @@ -33,7 +33,7 @@ exports.dig = function() { var obj = arguments[0]; for (var i = 1; i < arguments.length; i++) { var key = arguments[i]; - obj = hasOwn(obj, key) ? obj[key] : (i === arguments.length - 1 ? undefined : Object.create(null)); + obj = obj && hasOwn(obj, key) ? obj[key] : (i === arguments.length - 1 ? undefined : Object.create(null)); } return obj; }; @@ -71,11 +71,13 @@ exports.supportsPresence = function(type) { return type && typeof type.transformPresence === 'function'; }; -exports.callEach = function(callbacks, error) { +exports.callEach = function() { + var args = Array.from(arguments); + var callbacks = args.shift(); var called = false; callbacks.forEach(function(callback) { if (callback) { - callback(error); + callback.apply(args); called = true; } }); diff --git a/test/client/transaction.js b/test/client/transaction.js new file mode 100644 index 000000000..81d428993 --- /dev/null +++ b/test/client/transaction.js @@ -0,0 +1,228 @@ +var async = require('async'); +var expect = require('chai').expect; +var errorHandler = require('../util').errorHandler; + +var idCounter = 0; + +module.exports = function() { + describe.only('transaction', function() { + var backend; + var connection; + + beforeEach(function() { + backend = this.backend; + connection = backend.connect(); + }); + + describe('single Doc', function() { + var id; + var doc; + var remoteDoc; + var transaction; + + beforeEach(function() { + id = (idCounter++).toString(); + doc = connection.get('dogs', id); + remoteDoc = backend.connect().get('dogs', id); + transaction = connection.startTransaction(); + + // TODO: Discuss if this is an acceptable API? Doc will always emit error on + // a failed transaction, since the ops may have been successfully acked for this Doc, and + // we force a hard rollback with no callback, which causes an 'error' event + doc.on('error', function() {}); + }); + + it.only('commits two ops as a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function (next) { + doc.submitOp([{p: ['age'], oi: 3}], {transaction: transaction}, errorHandler(next)); + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, errorHandler(next)); + expect(doc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + next(); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('does not commit the first op if the second op fails', function(done) { + backend.use('commit', function(request, next) { + if (!request.snapshot.data.tricks) return next(); + next(new Error('fail')); + }); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('deletes and creates as part of a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.del.bind(doc, {transaction: transaction}), + doc.create.bind(doc, {name: 'Recreated'}, 'json0', {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function (next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Recreated'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('does not delete if the following create fails', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.op.create ? new Error('Create not allowed') : null; + next(error); + }); + next(); + }, + doc.del.bind(doc, {transaction: transaction}), + function(next) { + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, function(error) { + expect(error.message).to.equal('Create not allowed'); + }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('transaction is behind remote', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), + doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: []}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('remote submits after but commits first', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: []}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + }); + + describe('multiple Docs', function() { + it('rolls back multiple Docs if one commit fails', function(done) { + var id1 = (idCounter++).toString(); + var id2 = (idCounter++).toString(); + var doc1 = connection.get('dogs', id1); + var doc2 = connection.get('dogs', id2); + var remoteDoc1 = backend.connect().get('dogs', id1); + var remoteDoc2 = backend.connect().get('dogs', id2); + + var transaction = connection.startTransaction(); + + // Doc1 will throw even though its op is accepted, since the + // whole transaction is rejected + doc1.on('error', function() {}); + doc2.on('error', function() {}); + + async.series([ + doc1.create.bind(doc1, {name: 'Gaspode'}), + doc2.create.bind(doc2, {name: 'Snoopy'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.id === id2 ? new Error('fail') : null; + next(error); + }); + next(); + }, + doc1.submitOp.bind(doc1, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc2.submitOp([{p: ['age'], oi: 4}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc2.once('load', next); + }, + remoteDoc1.fetch.bind(remoteDoc1), + remoteDoc2.fetch.bind(remoteDoc2), + function(next) { + expect(remoteDoc1.data).to.eql({name: 'Gaspode'}); + expect(remoteDoc2.data).to.eql({name: 'Snoopy'}); + expect(doc1.data).to.eql(remoteDoc1.data); + expect(doc2.data).to.eql(remoteDoc2.data); + next(); + } + ], done); + }); + }); + }); +} diff --git a/test/db-memory.js b/test/db-memory.js index 8c4774418..2a6febc58 100644 --- a/test/db-memory.js +++ b/test/db-memory.js @@ -69,7 +69,8 @@ describe('MemoryDB', function() { }, getQuery: function(options) { return {filter: options.query, sort: options.sort}; - } + }, + transactions: true }); describe('deleteOps', function() { diff --git a/test/db.js b/test/db.js index 5fc8a293d..16135a0e6 100644 --- a/test/db.js +++ b/test/db.js @@ -64,6 +64,10 @@ module.exports = function(options) { require('./client/projections')({getQuery: options.getQuery}); } + if (options.transactions) { + require('./client/transaction')(); + } + require('./client/submit')(); require('./client/submit-json1')(); require('./client/subscribe')(); From cad15f1b55e05d66c475c2313985bc91153ef10e Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Thu, 23 Jan 2025 16:25:56 +0000 Subject: [PATCH 2/7] hook up op acks --- lib/client/doc.js | 55 ++++++++++++---- lib/client/transaction.js | 22 +++++-- lib/submit-request.js | 22 ++++++- lib/transaction/transaction.js | 113 +++++++++++++++++++++++---------- test/client/transaction.js | 17 +++++ 5 files changed, 177 insertions(+), 52 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 48ea9697e..c96db260e 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -337,17 +337,20 @@ Doc.prototype._handleOp = function(err, message) { // committing the op to the database, and we should just clear the in-flight // op and call the callbacks. However, let's first catch ourselves up to // the remote, so that we're in a nice consistent state - return this.fetch(this._clearInflightOp.bind(this)); + return this.fetch(this._clearInflight.bind(this)); } - if (this.inflightOp) { + if (this._hasInflight()) { return this._rollback(err); } return this.emit('error', err); } - if (this.inflightOp && - message.src === this.inflightOp.src && - message.seq === this.inflightOp.seq) { + var shouldAck = !!this._transaction || ( + this.inflightOp && + message.src === this.inflightOp.src && + message.seq === this.inflightOp.seq + ); + if (shouldAck) { // The op has already been applied locally. Just update the version // and pending state appropriately this._opAcknowledged(message); @@ -942,7 +945,7 @@ Doc.prototype.toSnapshot = function() { // This is called when the server acknowledges an operation from the client. Doc.prototype._opAcknowledged = function(message) { - if (this.inflightOp.create) { + if (this.inflightOp && this.inflightOp.create) { this.version = message.v; } else if (message.v !== this.version) { // We should already be at the same version, because the server should @@ -973,7 +976,7 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; - this._clearInflightOp(); + this._clearInflight(); }; Doc.prototype._rollback = function(err) { @@ -1017,10 +1020,10 @@ Doc.prototype._rollback = function(err) { // an "Op submit rejected" error, this was done intentionally // and we should roll back but not return an error to the user. if (err.code === ERROR_CODE.ERR_OP_SUBMIT_REJECTED) { - return this._clearInflightOp(null); + return this._clearInflight(null); } - this._clearInflightOp(err); + this._clearInflight(err); }; Doc.prototype._hardRollback = function(err) { @@ -1087,12 +1090,16 @@ Doc.prototype._hardRollback = function(err) { }); }; -Doc.prototype._clearInflightOp = function(err) { - var inflightOp = this.inflightOp; +Doc.prototype._hasInflight = function() { + return !!(this.inflightOp || this._transaction); +}; - this.inflightOp = null; +Doc.prototype._clearInflight = function(err) { + var callbacks = []; + if (this.inflightOp) callbacks = this._clearInflightOp(); + else if (this._transaction) callbacks = this._clearTransaction(); - var called = util.callEach(inflightOp.callbacks, err); + var called = util.callEach(callbacks, err); this.flush(); this._emitNothingPending(); @@ -1100,6 +1107,28 @@ Doc.prototype._clearInflightOp = function(err) { if (err && !called) return this.emit('error', err); }; +Doc.prototype._clearInflightOp = function() { + var inflightOp = this.inflightOp; + this.inflightOp = null; + return inflightOp.callbacks; +}; + +Doc.prototype._clearTransaction = function() { + var transactionId = this._transaction.id; + var callbacks = []; + var i = 0; + for (; i < this.pendingOps.length; i++) { + var op = this.pendingOps[i]; + if (op.transaction !== transactionId) break; + callbacks = callbacks.concat(op.callbacks); + } + + this.pendingOps.splice(0, i); + this._transaction = null; + + return callbacks; +}; + Doc.prototype._setTransaction = function(transactionId) { if (transactionId == util.dig(this, '_transaction', 'id')) return; diff --git a/lib/client/transaction.js b/lib/client/transaction.js index 0ba8e63dc..8e91ce11c 100644 --- a/lib/client/transaction.js +++ b/lib/client/transaction.js @@ -1,5 +1,6 @@ var emitter = require('../emitter'); -const {ACTIONS} = require('../message-actions'); +var {ACTIONS} = require('../message-actions'); +var util = require('../util'); var idCounter = 1; @@ -27,16 +28,30 @@ Transaction.prototype.commit = function(callback) { }); }; -Transaction.prototype._handleCommit = function(error) { +Transaction.prototype._handleCommit = function(error, message) { // TODO: Trigger this._getOps() callbacks // TODO: Should unset transaction on this._docs // TODO: If error, should rollback docs if (typeof this._callback === 'function') this._callback(error); else if (error) this.emit('error', error); + + var acks = Object.create(null); + if (message.acks) { + for (var ack of message.acks) acks[ack.seq] = ack; + } + + for (var collection in this._docs) { + for (var id in this._docs[collection]) { + var doc = this._docs[collection][id]; + for (var op of doc.pendingOps) { + if (op.transaction !== this.id) break; + doc._handleOp(error, acks[op.seq]); + } + } + } }; Transaction.prototype._registerDoc = function(doc) { - console.log('register doc', doc.collection, doc.id); var collection = this._docs[doc.collection] = this._docs[doc.collection] || Object.create(null); collection[doc.id] = doc; doc._transaction = this; @@ -48,7 +63,6 @@ Transaction.prototype._getOps = function() { for (var collection in this._docs) { for (var id in this._docs[collection]) { var doc = this._docs[collection][id]; - console.log(collection, id, !!doc); for (var op of doc.pendingOps) { if (op.transaction !== this.id) break; ops.push(this._connection._opMessage(doc, op)); diff --git a/lib/submit-request.js b/lib/submit-request.js index cd91b1b5f..a16929bba 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -96,7 +96,23 @@ SubmitRequest.prototype.submit = function(callback) { var snapshotOptions = {}; snapshotOptions.agentCustom = request.agent.custom; - backend.db.getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { + + var getSnapshotFromDb = backend.db.getSnapshot.bind(backend.db); + var getSnapshot = getSnapshotFromDb; + // If we're in a transaction, try getting the snapshot from the transaction first, + // since the DB won't have an up-to-date Snapshot. The Snapshot on the transaction + // will also include any modifications made in middleware, for example, which can't + // be reproduced by just transforming the existing snapshot by ops + if (this._transaction) { + var transaction = this._transaction; + getSnapshot = function(collection, id, fields, snapshotOptions, callback) { + var snapshot = transaction.getSnapshot(collection, id, fields, snapshotOptions); + if (snapshot) return callback(null, snapshot); + getSnapshotFromDb(collection, id, fields, snapshotOptions, callback); + }; + } + + getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { if (err) return callback(err); request.snapshot = snapshot; @@ -218,7 +234,9 @@ SubmitRequest.prototype.commit = function(callback) { }; } - if (request._transaction) return request._transaction.ready(request, callback); + if (request._transaction) { + return request._transaction.ready(request, request._handleCommitted(callback)); + } // Try committing the operation and snapshot to the database atomically backend.db.commit( diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js index 6c9044215..440adae81 100644 --- a/lib/transaction/transaction.js +++ b/lib/transaction/transaction.js @@ -7,60 +7,107 @@ function Transaction(agent, id, ops) { this._backend = agent.backend; this._ops = ops; - this._readyRequests = []; - this._callbacks = []; + this._pendingOps = Object.create(null); + + for (var op of ops) { + var docOps = util.digOrCreate(this._pendingOps, op.c, op.d, function() { + return []; + }); + op.v = op.v + docOps.length; + docOps.push(op); + } + + this._finished = false; + this._callback = null; + + this._readyRequests = Object.create(null); + this._requestCallbacks = []; + this._acks = []; } module.exports = Transaction; Transaction.prototype.submit = function(callback) { - var finished = false; - var finish = function(error) { - if (finished) return; - finished = true; - callback(error); - } + // TODO: Handle multiple calls? + this._callback = callback; - var ops = this._ops; - var finishedCount = 0; - var opHandler = function(error) { - finishedCount++; - if (error) return finish(error); - console.log('op callback', finishedCount, error); - if (finishedCount !== ops.length) return; - finish(); - }; - - for (var op of ops) this._agent._submit(op, opHandler); + for (var collection in this._pendingOps) { + for (var id in this._pendingOps[collection]) { + this._submitNextDocOp(collection, id); + } + } }; Transaction.prototype.ready = function(request, callback) { // TODO: Clear these on retry - this._readyRequests.push(request); - this._callbacks.push(callback); + var docRequests = util.digOrCreate(this._readyRequests, request.collection, request.id, function() { + return []; + }); + docRequests.push(request); + this._requestCallbacks.push(callback); + + if (this._isReady()) return this._commitTransaction(); + this._submitNextDocOp(request.collection, request.id); +}; - if (!this._isReady()) return; +Transaction.prototype.getSnapshot = function(collection, id, fields, snapshotOptions) { + // TODO: Support fields? + // TODO: Support options? + var requests = util.dig(this._readyRequests, collection, id); + if (!requests) return; + return util.clone(requests[requests.length - 1].snapshot); +}; - var commits = Object.values(this._readyRequests).map(function(req) { +Transaction.prototype._submitNextDocOp = function(collection, id) { + var transaction = this; + var ops = this._pendingOps[collection][id]; + var op = ops.shift(); + this._agent._submit(op, function(error, ack) { + if (error) transaction._finish(error); + transaction._acks.push(ack); + if (transaction._acks.length === transaction._ops.length) transaction._finish(); + }); +}; + +Transaction.prototype._isReady = function() { + return this._requestCallbacks.length === this._ops.length; +}; + +Transaction.prototype._commitTransaction = function() { + var requests = this._flatReadyRequests(); + var commits = requests.map(function(request) { return { - collection: req.collection, - id: req.id, - op: req.op, - snapshot: req.snapshot, - options: req.options, + collection: request.collection, + id: request.id, + op: request.op, + snapshot: request.snapshot, + options: request.options, }; }); var transaction = this; var options = null; this._backend.db.commitTransaction(commits, options, function(error, succeeded) { - if (error) return util.callEach(transaction._callbacks, error, succeeded); + if (error) return transaction._finish(error); if (!succeeded) { // TODO: Retry } - util.callEach(transaction._callbacks, null, succeeded); + util.callEach(transaction._requestCallbacks, null, true); }); }; -Transaction.prototype._isReady = function() { - return Object.keys(this._readyRequests).length === this._ops.length; -} +Transaction.prototype._flatReadyRequests = function() { + var requests = []; + for (var collection in this._readyRequests) { + for (var id in this._readyRequests[collection]) { + requests = requests.concat(this._readyRequests[collection][id]); + } + } + return requests; +}; + +Transaction.prototype._finish = function(error) { + if (this._finished) return; + this._finished = true; + if (error) return this._callback(error); + this._callback(null, {acks: this._acks}); +}; diff --git a/test/client/transaction.js b/test/client/transaction.js index 81d428993..34b6e55c2 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -33,6 +33,8 @@ module.exports = function() { }); it.only('commits two ops as a transaction', function(done) { + doc.preventCompose = true; + async.series([ doc.create.bind(doc, {name: 'Gaspode'}), function (next) { @@ -64,6 +66,21 @@ module.exports = function() { ], done); }); + it.only('fires the submitOp callback after a transaction commits', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + doc.submitOp([{p: ['age'], oi: 3}], {transaction: transaction}, next); + transaction.commit(errorHandler(next)); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql(doc.data); + next(); + } + ], done); + }); + it('does not commit the first op if the second op fails', function(done) { backend.use('commit', function(request, next) { if (!request.snapshot.data.tricks) return next(); From 43ceb932f22c30824f7a6157ebbfe457c29b453a Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Thu, 23 Jan 2025 17:15:43 +0000 Subject: [PATCH 3/7] fix rollback --- lib/client/doc.js | 63 ++++++++++++++++++++++---------------- lib/util.js | 2 +- test/client/transaction.js | 20 +++++++----- 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index c96db260e..69b2199a5 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -984,36 +984,47 @@ Doc.prototype._rollback = function(err) { // just the inflight op if possible. If not possible to invert, cancel all // pending ops and fetch the latest from the server to get us back into a // working state, then call back - var op = this.inflightOp; - - if (!('op' in op && op.type.invert)) { - return this._hardRollback(err); + var ops = []; + if (this.inflightOp) ops.push(this.inflightOp); + if (this._transaction) { + for (var op of this.pendingOps) { + if (op.transaction !== this._transaction.id) break; + ops.push(op); + } } - try { - op.op = op.type.invert(op.op); - } catch (error) { - // If the op doesn't support `.invert()`, we just reload the doc - // instead of trying to locally revert it. - return this._hardRollback(err); - } + for (var op of ops) { + if (!('op' in op && op.type.invert)) { + return this._hardRollback(err); + } - // Transform the undo operation by any pending ops. - for (var i = 0; i < this.pendingOps.length; i++) { - var transformErr = transformX(this.pendingOps[i], op); - if (transformErr) return this._hardRollback(transformErr); - } + try { + op.op = op.type.invert(op.op); + } catch (error) { + // If the op doesn't support `.invert()`, we just reload the doc + // instead of trying to locally revert it. + return this._hardRollback(err); + } - // ... and apply it locally, reverting the changes. - // - // This operation is applied to look like it comes from a remote source. - // I'm still not 100% sure about this functionality, because its really a - // local op. Basically, the problem is that if the client's op is rejected - // by the server, the editor window should update to reflect the undo. - try { - this._otApply(op, false); - } catch (error) { - return this._hardRollback(error); + if (!this._transaction) { + // Transform the undo operation by any pending ops. + for (var i = 0; i < this.pendingOps.length; i++) { + var transformErr = transformX(this.pendingOps[i], op); + if (transformErr) return this._hardRollback(transformErr); + } + } + + // ... and apply it locally, reverting the changes. + // + // This operation is applied to look like it comes from a remote source. + // I'm still not 100% sure about this functionality, because its really a + // local op. Basically, the problem is that if the client's op is rejected + // by the server, the editor window should update to reflect the undo. + try { + this._otApply(op, false); + } catch (error) { + return this._hardRollback(error); + } } // The server has rejected submission of the current operation. If we get diff --git a/lib/util.js b/lib/util.js index 9f090eb8a..a66609e0d 100644 --- a/lib/util.js +++ b/lib/util.js @@ -77,7 +77,7 @@ exports.callEach = function() { var called = false; callbacks.forEach(function(callback) { if (callback) { - callback.apply(args); + callback.apply(null, args); called = true; } }); diff --git a/test/client/transaction.js b/test/client/transaction.js index 34b6e55c2..4449ed44d 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -32,7 +32,7 @@ module.exports = function() { doc.on('error', function() {}); }); - it.only('commits two ops as a transaction', function(done) { + it('commits two ops as a transaction', function(done) { doc.preventCompose = true; async.series([ @@ -66,7 +66,7 @@ module.exports = function() { ], done); }); - it.only('fires the submitOp callback after a transaction commits', function(done) { + it('fires the submitOp callback after a transaction commits', function(done) { async.series([ doc.create.bind(doc, {name: 'Gaspode'}), function(next) { @@ -81,7 +81,7 @@ module.exports = function() { ], done); }); - it('does not commit the first op if the second op fails', function(done) { + it.only('does not commit the first op if the second op fails', function(done) { backend.use('commit', function(request, next) { if (!request.snapshot.data.tricks) return next(); next(new Error('fail')); @@ -89,17 +89,23 @@ module.exports = function() { async.series([ doc.create.bind(doc, {name: 'Gaspode'}), - doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), function(next) { - doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { + var count = 0; + function handler(error) { + count++; expect(error.message).to.equal('fail'); - }); - doc.once('load', next); + if (count === 3) next(); + }; + + doc.submitOp([{p: ['age'], oi: 3}], {transaction: transaction}, handler); + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, handler); + transaction.commit(handler); }, remoteDoc.fetch.bind(remoteDoc), function(next) { expect(remoteDoc.data).to.eql({name: 'Gaspode'}); expect(doc.data).to.eql(remoteDoc.data); + expect(remoteDoc.version).to.equal(doc.version); next(); } ], done); From 9c9111a8f030c7b1492ffb4c7ddd24ae560070b9 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Thu, 23 Jan 2025 17:22:31 +0000 Subject: [PATCH 4/7] hook up create + del --- lib/client/doc.js | 8 +++++--- lib/client/transaction.js | 4 ---- test/client/transaction.js | 20 ++++++++++++++------ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 69b2199a5..984dcf124 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -885,6 +885,7 @@ Doc.prototype.create = function(data, type, options, callback) { callback = options; options = null; } + options = options || {}; if (!type) { type = types.defaultType.uri; } @@ -894,7 +895,8 @@ Doc.prototype.create = function(data, type, options, callback) { return this.emit('error', err); } var op = {create: {type: type, data: data}}; - var source = options && options.source; + if (options.transaction) op.transaction = this._normalizeTransactionId(options.transaction); + var source = options.source; this._submit(op, source, callback); }; @@ -916,11 +918,11 @@ Doc.prototype.del = function(options, callback) { return this.emit('error', err); } var op = {del: true}; - var source = options && options.source; + if (options.transaction) op.transaction = this._normalizeTransactionId(options.transaction); + var source = options.source; this._submit(op, source, callback); }; - // Stops the document from sending any operations to the server. Doc.prototype.pause = function() { this.paused = true; diff --git a/lib/client/transaction.js b/lib/client/transaction.js index 8e91ce11c..2e0524dbb 100644 --- a/lib/client/transaction.js +++ b/lib/client/transaction.js @@ -1,6 +1,5 @@ var emitter = require('../emitter'); var {ACTIONS} = require('../message-actions'); -var util = require('../util'); var idCounter = 1; @@ -29,9 +28,6 @@ Transaction.prototype.commit = function(callback) { }; Transaction.prototype._handleCommit = function(error, message) { - // TODO: Trigger this._getOps() callbacks - // TODO: Should unset transaction on this._docs - // TODO: If error, should rollback docs if (typeof this._callback === 'function') this._callback(error); else if (error) this.emit('error', error); diff --git a/test/client/transaction.js b/test/client/transaction.js index 4449ed44d..466ae0d03 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -81,7 +81,7 @@ module.exports = function() { ], done); }); - it.only('does not commit the first op if the second op fails', function(done) { + it('does not commit the first op if the second op fails', function(done) { backend.use('commit', function(request, next) { if (!request.snapshot.data.tricks) return next(); next(new Error('fail')); @@ -114,8 +114,11 @@ module.exports = function() { it('deletes and creates as part of a transaction', function(done) { async.series([ doc.create.bind(doc, {name: 'Gaspode'}), - doc.del.bind(doc, {transaction: transaction}), - doc.create.bind(doc, {name: 'Recreated'}, 'json0', {transaction: transaction}), + function(next) { + doc.del({transaction: transaction}, errorHandler(next)); + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, errorHandler(next)); + next(); + }, remoteDoc.fetch.bind(remoteDoc), function (next) { expect(remoteDoc.data).to.eql({name: 'Gaspode'}); @@ -141,11 +144,16 @@ module.exports = function() { }); next(); }, - doc.del.bind(doc, {transaction: transaction}), function(next) { - doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, function(error) { + function handler(error) { expect(error.message).to.equal('Create not allowed'); - }); + } + + doc.del({transaction: transaction}, handler); + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, handler); + transaction.commit(handler); + + // Should trigger hard rollback doc.once('load', next); }, remoteDoc.fetch.bind(remoteDoc), From 3119864fb6a66398e3446f811eb501e952f64c8a Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:38:28 +0000 Subject: [PATCH 5/7] fix ack --- lib/client/doc.js | 8 ++++---- test/client/transaction.js | 10 ++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 984dcf124..51953d4d1 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -345,11 +345,11 @@ Doc.prototype._handleOp = function(err, message) { return this.emit('error', err); } - var shouldAck = !!this._transaction || ( - this.inflightOp && + var isInflightAck = this.inflightOp && message.src === this.inflightOp.src && - message.seq === this.inflightOp.seq - ); + message.seq === this.inflightOp.seq; + var isTransactionAck = !!this._transaction && message.src === this.connection.id; + var shouldAck = isInflightAck || isTransactionAck; if (shouldAck) { // The op has already been applied locally. Just update the version // and pending state appropriately diff --git a/test/client/transaction.js b/test/client/transaction.js index 466ae0d03..98d48d393 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -170,14 +170,12 @@ module.exports = function() { doc.create.bind(doc, {tricks: ['fetch']}), remoteDoc.fetch.bind(remoteDoc), remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), - doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), - doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), - remoteDoc.fetch.bind(remoteDoc), function(next) { - expect(remoteDoc.data).to.eql({tricks: []}); - next(); + doc.submitOp([{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}, errorHandler(next)); + doc.submitOp([{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}, errorHandler(next)); + transaction.commit(next); }, - transaction.commit.bind(transaction), + doc.fetch.bind(doc), remoteDoc.fetch.bind(remoteDoc), function(next) { expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); From 3881cb4bc9b92128762078e899dafe49feabec1b Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:02:58 +0000 Subject: [PATCH 6/7] prevent compose wip --- lib/submit-request.js | 18 +++++-- lib/transaction.js | 0 lib/transaction/transaction.js | 91 ++++++++++++++++++++++++++-------- test/client/transaction.js | 22 ++++---- 4 files changed, 97 insertions(+), 34 deletions(-) delete mode 100644 lib/transaction.js diff --git a/lib/submit-request.js b/lib/submit-request.js index a16929bba..683934542 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -90,6 +90,7 @@ SubmitRequest.prototype.submit = function(callback) { var collection = this.collection; var id = this.id; var op = this.op; + console.log('submit', op); // Send a special projection so that getSnapshot knows to return all fields. // With a null projection, it strips document metadata var fields = {$submit: true}; @@ -106,13 +107,22 @@ SubmitRequest.prototype.submit = function(callback) { if (this._transaction) { var transaction = this._transaction; getSnapshot = function(collection, id, fields, snapshotOptions, callback) { - var snapshot = transaction.getSnapshot(collection, id, fields, snapshotOptions); - if (snapshot) return callback(null, snapshot); - getSnapshotFromDb(collection, id, fields, snapshotOptions, callback); + transaction.getSnapshotAndOps(request, function(error, snapshot, ops) { + if (!snapshot) return getSnapshotFromDb(collection, id, fields, snapshotOptions, callback); + console.log('>> got ops', request.op, ops); + var type = snapshot.type; + // TODO: Use this._transformOp()? Get a version mismatch, and don't want ops on this.ops though + for (var op of ops) { + var error = ot.transform(type, request.op, op); + if (error) return callback(error); + } + callback(null, snapshot); + }); }; } getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { + console.log('got snapshot', request.op, snapshot); if (err) return callback(err); request.snapshot = snapshot; @@ -160,6 +170,7 @@ SubmitRequest.prototype.submit = function(callback) { // Transform the op up to the current snapshot version, then apply var from = op.v; + console.log('get ops to snapshot', from, snapshot.v); backend.db.getOpsToSnapshot(collection, id, from, snapshot, {metadata: true}, function(err, ops) { if (err) return callback(err); @@ -287,6 +298,7 @@ SubmitRequest.prototype.retry = function(callback) { }; SubmitRequest.prototype._transformOp = function(ops) { + console.log('transform op', this.op, ops); var type = this.snapshot.type; for (var i = 0; i < ops.length; i++) { var op = ops[i]; diff --git a/lib/transaction.js b/lib/transaction.js deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js index 440adae81..84cc23f74 100644 --- a/lib/transaction/transaction.js +++ b/lib/transaction/transaction.js @@ -1,16 +1,19 @@ var util = require('../util'); +var emitter = require('../emitter'); function Transaction(agent, id, ops) { + emitter.EventEmitter.call(this); + this.id = id; this._agent = agent; this._backend = agent.backend; this._ops = ops; - this._pendingOps = Object.create(null); + this._docOps = Object.create(null); for (var op of ops) { - var docOps = util.digOrCreate(this._pendingOps, op.c, op.d, function() { + var docOps = util.digOrCreate(this._docOps, op.c, op.d, function() { return []; }); op.v = op.v + docOps.length; @@ -24,15 +27,18 @@ function Transaction(agent, id, ops) { this._requestCallbacks = []; this._acks = []; } +emitter.mixin(Transaction); module.exports = Transaction; Transaction.prototype.submit = function(callback) { // TODO: Handle multiple calls? this._callback = callback; - for (var collection in this._pendingOps) { - for (var id in this._pendingOps[collection]) { - this._submitNextDocOp(collection, id); + for (var collection in this._docOps) { + for (var id in this._docOps[collection]) { + for (var op of this._docOps[collection][id]) { + this._submitOp(op); + } } } }; @@ -45,26 +51,69 @@ Transaction.prototype.ready = function(request, callback) { docRequests.push(request); this._requestCallbacks.push(callback); + console.log('> REQ READY', request.op, request.snapshot); + this.emit('requestReady', request); + if (this._isReady()) return this._commitTransaction(); - this._submitNextDocOp(request.collection, request.id); }; -Transaction.prototype.getSnapshot = function(collection, id, fields, snapshotOptions) { +Transaction.prototype.getSnapshotAndOps = function(request, callback) { // TODO: Support fields? // TODO: Support options? - var requests = util.dig(this._readyRequests, collection, id); - if (!requests) return; - return util.clone(requests[requests.length - 1].snapshot); + var op = {c: request.collection, d: request.id, seq: request.op.seq}; + this._waitForPreviousOpRequest(op, function(req) { + if (!req) return callback(); + var versionDiff = req.snapshot.v - request.op.v; + var ops = req.ops.slice(-versionDiff); + if (ops.length) { + var offset = request.op.v - ops[0].v; + for (var op of ops) { + op.v = op.v + offset; + } + } + callback(null, util.clone(req.snapshot), util.clone(ops)); + }); +}; + +Transaction.prototype._waitForPreviousOpRequest = function(op, callback) { + console.log('wait for previous', op); + var collection = op.c; + var id = op.d; + + var previousOp; + var docOps = this._docOps[collection][id]; + for (var docOp of docOps) { + if (op.seq === docOp.seq) break; + previousOp = docOp; + } + + console.log('previous op', previousOp); + if (!previousOp) return callback(); + + var requests = util.dig(this._readyRequests, collection, id) || []; + for (var request of requests) { + if (request.op.seq === previousOp.seq) return callback(request); + } + + var transaction = this; + var handler = function(request) { + if (request.collection !== collection || request.id !== id || request.op.seq !== previousOp.seq) return; + transaction.off('requestReady', handler); + callback(request); + }; + + this.on('requestReady', handler); }; -Transaction.prototype._submitNextDocOp = function(collection, id) { +Transaction.prototype._submitOp = function(op) { var transaction = this; - var ops = this._pendingOps[collection][id]; - var op = ops.shift(); - this._agent._submit(op, function(error, ack) { - if (error) transaction._finish(error); - transaction._acks.push(ack); - if (transaction._acks.length === transaction._ops.length) transaction._finish(); + var agent = this._agent; + this._waitForPreviousOpRequest(op, function() { + agent._submit(op, function(error, ack) { + if (error) transaction._finish(error); + transaction._acks.push(ack); + if (transaction._acks.length === transaction._ops.length) transaction._finish(); + }); }); }; @@ -74,6 +123,7 @@ Transaction.prototype._isReady = function() { Transaction.prototype._commitTransaction = function() { var requests = this._flatReadyRequests(); + this._readyRequests = Object.create(null); var commits = requests.map(function(request) { return { collection: request.collection, @@ -83,15 +133,14 @@ Transaction.prototype._commitTransaction = function() { options: request.options, }; }); + var callbacks = this._requestCallbacks; + this._requestCallbacks = []; var transaction = this; var options = null; this._backend.db.commitTransaction(commits, options, function(error, succeeded) { if (error) return transaction._finish(error); - if (!succeeded) { - // TODO: Retry - } - util.callEach(transaction._requestCallbacks, null, true); + util.callEach(callbacks, null, succeeded); }); }; diff --git a/test/client/transaction.js b/test/client/transaction.js index 98d48d393..fbe32a741 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -23,6 +23,7 @@ module.exports = function() { beforeEach(function() { id = (idCounter++).toString(); doc = connection.get('dogs', id); + doc.preventCompose = true; remoteDoc = backend.connect().get('dogs', id); transaction = connection.startTransaction(); @@ -33,8 +34,6 @@ module.exports = function() { }); it('commits two ops as a transaction', function(done) { - doc.preventCompose = true; - async.series([ doc.create.bind(doc, {name: 'Gaspode'}), function (next) { @@ -165,7 +164,7 @@ module.exports = function() { ], done); }); - it('transaction is behind remote', function(done) { + it.only('transaction is behind remote', function(done) { async.series([ doc.create.bind(doc, {tricks: ['fetch']}), remoteDoc.fetch.bind(remoteDoc), @@ -186,18 +185,21 @@ module.exports = function() { }); it('remote submits after but commits first', function(done) { + var hasSubmittedRemote = false; + backend.use('commit', function(request, next) { + if (!request._transaction || hasSubmittedRemote) return next(); + hasSubmittedRemote = true; + remoteDoc.submitOp([{p: ['tricks', 0], ld: 'fetch'}], next); + }); + async.series([ doc.create.bind(doc, {tricks: ['fetch']}), remoteDoc.fetch.bind(remoteDoc), - doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), - remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), - doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), - remoteDoc.fetch.bind(remoteDoc), function(next) { - expect(remoteDoc.data).to.eql({tricks: []}); - next(); + doc.submitOp([{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}, errorHandler(next)); + doc.submitOp([{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}, errorHandler(next)); + transaction.commit(next); }, - transaction.commit.bind(transaction), remoteDoc.fetch.bind(remoteDoc), function(next) { expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); From 241863e8c06194bc90b72c23a53f742d9fc6576d Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:10:47 +0000 Subject: [PATCH 7/7] fix double counting acks --- lib/client/doc.js | 25 ++++++++++++++++++++----- lib/submit-request.js | 5 ----- lib/transaction/transaction.js | 15 ++++----------- test/client/transaction.js | 2 +- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 51953d4d1..12f8d58cb 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -978,7 +978,7 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; - this._clearInflight(); + this._clearInflight(null, message.seq); }; Doc.prototype._rollback = function(err) { @@ -1051,6 +1051,7 @@ Doc.prototype._hardRollback = function(err) { // Cancel all pending ops and reset if we can't invert this._setType(null); this.version = null; + // TODO: Clear transaction this.inflightOp = null; this.pendingOps = []; @@ -1107,15 +1108,29 @@ Doc.prototype._hasInflight = function() { return !!(this.inflightOp || this._transaction); }; -Doc.prototype._clearInflight = function(err) { +Doc.prototype._clearInflight = function(err, seq) { var callbacks = []; if (this.inflightOp) callbacks = this._clearInflightOp(); - else if (this._transaction) callbacks = this._clearTransaction(); + else if (this._transaction) { + if (!seq) { + callbacks = this._clearTransaction(); + } else { + var i = this.pendingOps.findIndex(function(pendingOp) { + return pendingOp.seq === seq; + }); + if (i >= 0) { + const op = this.pendingOps.splice(i, 1)[0]; + callbacks = callbacks.concat(op.callbacks); + } + } + } var called = util.callEach(callbacks, err); - this.flush(); - this._emitNothingPending(); + if (!this._transaction) { + this.flush(); + this._emitNothingPending(); + } if (err && !called) return this.emit('error', err); }; diff --git a/lib/submit-request.js b/lib/submit-request.js index 683934542..61deab891 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -90,7 +90,6 @@ SubmitRequest.prototype.submit = function(callback) { var collection = this.collection; var id = this.id; var op = this.op; - console.log('submit', op); // Send a special projection so that getSnapshot knows to return all fields. // With a null projection, it strips document metadata var fields = {$submit: true}; @@ -109,7 +108,6 @@ SubmitRequest.prototype.submit = function(callback) { getSnapshot = function(collection, id, fields, snapshotOptions, callback) { transaction.getSnapshotAndOps(request, function(error, snapshot, ops) { if (!snapshot) return getSnapshotFromDb(collection, id, fields, snapshotOptions, callback); - console.log('>> got ops', request.op, ops); var type = snapshot.type; // TODO: Use this._transformOp()? Get a version mismatch, and don't want ops on this.ops though for (var op of ops) { @@ -122,7 +120,6 @@ SubmitRequest.prototype.submit = function(callback) { } getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { - console.log('got snapshot', request.op, snapshot); if (err) return callback(err); request.snapshot = snapshot; @@ -170,7 +167,6 @@ SubmitRequest.prototype.submit = function(callback) { // Transform the op up to the current snapshot version, then apply var from = op.v; - console.log('get ops to snapshot', from, snapshot.v); backend.db.getOpsToSnapshot(collection, id, from, snapshot, {metadata: true}, function(err, ops) { if (err) return callback(err); @@ -298,7 +294,6 @@ SubmitRequest.prototype.retry = function(callback) { }; SubmitRequest.prototype._transformOp = function(ops) { - console.log('transform op', this.op, ops); var type = this.snapshot.type; for (var i = 0; i < ops.length; i++) { var op = ops[i]; diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js index 84cc23f74..596a0a9ba 100644 --- a/lib/transaction/transaction.js +++ b/lib/transaction/transaction.js @@ -34,12 +34,8 @@ Transaction.prototype.submit = function(callback) { // TODO: Handle multiple calls? this._callback = callback; - for (var collection in this._docOps) { - for (var id in this._docOps[collection]) { - for (var op of this._docOps[collection][id]) { - this._submitOp(op); - } - } + for (var op of this._ops) { + this._submitOp(op); } }; @@ -51,7 +47,6 @@ Transaction.prototype.ready = function(request, callback) { docRequests.push(request); this._requestCallbacks.push(callback); - console.log('> REQ READY', request.op, request.snapshot); this.emit('requestReady', request); if (this._isReady()) return this._commitTransaction(); @@ -64,19 +59,18 @@ Transaction.prototype.getSnapshotAndOps = function(request, callback) { this._waitForPreviousOpRequest(op, function(req) { if (!req) return callback(); var versionDiff = req.snapshot.v - request.op.v; - var ops = req.ops.slice(-versionDiff); + var ops = util.clone(req.ops.slice(-versionDiff)); if (ops.length) { var offset = request.op.v - ops[0].v; for (var op of ops) { op.v = op.v + offset; } } - callback(null, util.clone(req.snapshot), util.clone(ops)); + callback(null, util.clone(req.snapshot), ops); }); }; Transaction.prototype._waitForPreviousOpRequest = function(op, callback) { - console.log('wait for previous', op); var collection = op.c; var id = op.d; @@ -87,7 +81,6 @@ Transaction.prototype._waitForPreviousOpRequest = function(op, callback) { previousOp = docOp; } - console.log('previous op', previousOp); if (!previousOp) return callback(); var requests = util.dig(this._readyRequests, collection, id) || []; diff --git a/test/client/transaction.js b/test/client/transaction.js index fbe32a741..37490ff34 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -164,7 +164,7 @@ module.exports = function() { ], done); }); - it.only('transaction is behind remote', function(done) { + it('transaction is behind remote', function(done) { async.series([ doc.create.bind(doc, {tricks: ['fetch']}), remoteDoc.fetch.bind(remoteDoc),