Skip to content

Commit e247940

Browse files
committed
⚡️ Cache latest op version when broadcasting presence
At the moment, when sending a presence update to other subscribers, we [call `transformPresenceToLatestVersion()`][1] for every presence update which internally [calls `getOps()`][2] for every presence update. Calls to `getOps()` can be expensive, and rapid presence updates may cause undue load on the server, even when the `Doc` has not been updated. This change tries to mitigate this by subscribing to a pubsub stream for any `Doc`s that an `Agent` tries to broadcast presence on. We keep an in-memory cache of the latest snapshot version sent over this stream, which lets us quickly check if a presence broadcast is already current without needing to query the database at all. To avoid leaking streams, the `Agent` will internally handle its stream subscription state by: - subscribing whenever a non-`null` presence update is broadcast - unsubscribing whenever a `null` presence update is broadcast This means that rapid changes in presence being `null` or not can still result in database calls, but even in this case they should be less bad than before, because we only perform a snapshot fetch instead of ops. [1]: https://github.com/share/sharedb/blob/297ce5dc66563a5955311793a475768d73ac8b87/lib/agent.js#L804 [2]: https://github.com/share/sharedb/blob/297ce5dc66563a5955311793a475768d73ac8b87/lib/backend.js#L919
1 parent 297ce5d commit e247940

File tree

2 files changed

+97
-23
lines changed

2 files changed

+97
-23
lines changed

lib/agent.js

+79-23
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ function Agent(backend, stream) {
4848
// request if the client disconnects ungracefully. This is a
4949
// map of channel -> id -> request
5050
this.presenceRequests = Object.create(null);
51+
// Keep track of the latest known Doc version, so that we can avoid fetching
52+
// ops to transform presence if not needed
53+
this.latestDocVersionStreams = Object.create(null);
54+
this.latestDocVersions = Object.create(null);
5155

5256
// We need to track this manually to make sure we don't reply to messages
5357
// after the stream was closed.
@@ -115,17 +119,8 @@ Agent.prototype._cleanup = function() {
115119
* _sendOp()
116120
*/
117121
Agent.prototype._subscribeToStream = function(collection, id, stream) {
118-
if (this.closed) return stream.destroy();
119-
120-
var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null));
121-
122-
// If already subscribed to this document, destroy the previously subscribed stream
123-
var previous = streams[id];
124-
if (previous) previous.destroy();
125-
streams[id] = stream;
126-
127122
var agent = this;
128-
stream.on('data', function(data) {
123+
this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) {
129124
if (data.error) {
130125
// Log then silently ignore errors in a subscription stream, since these
131126
// may not be the client's fault, and they were not the result of a
@@ -135,13 +130,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
135130
}
136131
agent._onOp(collection, id, data);
137132
});
133+
};
134+
135+
Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) {
136+
if (this.closed) return stream.destroy();
137+
138+
var streams = map[collection] || (map[collection] = Object.create(null));
139+
140+
// If already subscribed to this document, destroy the previously subscribed stream
141+
var previous = streams[id];
142+
if (previous) previous.destroy();
143+
streams[id] = stream;
144+
145+
stream.on('data', dataHandler);
138146
stream.on('end', function() {
139147
// The op stream is done sending, so release its reference
140-
var streams = agent.subscribedDocs[collection];
148+
var streams = map[collection];
141149
if (!streams || streams[id] !== stream) return;
142150
delete streams[id];
143151
if (util.hasKeys(streams)) return;
144-
delete agent.subscribedDocs[collection];
152+
delete map[collection];
145153
});
146154
};
147155

@@ -794,25 +802,73 @@ Agent.prototype._broadcastPresence = function(presence, callback) {
794802
collection: presence.c
795803
};
796804
var start = Date.now();
797-
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) {
805+
806+
var subscriptionUpdater = presence.p === null ?
807+
this._unsubscribeDocVersion.bind(this) :
808+
this._subscribeDocVersion.bind(this);
809+
810+
subscriptionUpdater(presence.c, presence.d, function(error) {
798811
if (error) return callback(error);
799-
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
800-
var previousRequest = requests[presence.id];
801-
if (!previousRequest || previousRequest.pv < presence.pv) {
802-
presenceRequests[presence.ch][presence.id] = presence;
803-
}
804-
backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) {
812+
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) {
805813
if (error) return callback(error);
806-
var channel = agent._getPresenceChannel(presence.ch);
807-
agent.backend.pubsub.publish([channel], presence, function(error) {
808-
if (error) return callback(error);
809-
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
814+
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
815+
var previousRequest = requests[presence.id];
816+
if (!previousRequest || previousRequest.pv < presence.pv) {
817+
presenceRequests[presence.ch][presence.id] = presence;
818+
}
819+
820+
var transformer = function(agent, presence, callback) {
810821
callback(null, presence);
822+
};
823+
824+
var presenceIsUpToDate = presence.v === util.dig(agent.latestDocVersions, presence.c, presence.d);
825+
if (!presenceIsUpToDate) {
826+
transformer = backend.transformPresenceToLatestVersion.bind(backend);
827+
}
828+
829+
transformer(agent, presence, function(error, presence) {
830+
if (error) return callback(error);
831+
var channel = agent._getPresenceChannel(presence.ch);
832+
agent.backend.pubsub.publish([channel], presence, function(error) {
833+
if (error) return callback(error);
834+
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
835+
callback(null, presence);
836+
});
811837
});
812838
});
813839
});
814840
};
815841

842+
Agent.prototype._subscribeDocVersion = function(collection, id, callback) {
843+
if (!collection || !id) return callback();
844+
845+
var latestDocVersions = this.latestDocVersions;
846+
var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined;
847+
if (isSubscribed) return callback();
848+
849+
var agent = this;
850+
this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) {
851+
if (error) return callback(error);
852+
853+
util.digOrCreate(latestDocVersions, collection, id, function() {
854+
return snapshot.v;
855+
});
856+
857+
agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) {
858+
// op.v behind snapshot.v by 1
859+
latestDocVersions[collection][id] = op.v + 1;
860+
});
861+
862+
callback();
863+
});
864+
};
865+
866+
Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) {
867+
var stream = util.dig(this.latestDocVersionStreams, collection, id);
868+
if (stream) stream.destroy();
869+
util.nextTick(callback);
870+
};
871+
816872
Agent.prototype._createPresence = function(request) {
817873
return {
818874
a: ACTIONS.presence,

test/client/presence/doc-presence.js

+18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ var types = require('../../../lib/types');
55
var presenceTestType = require('./presence-test-type');
66
var errorHandler = require('../../util').errorHandler;
77
var PresencePauser = require('./presence-pauser');
8+
var sinon = require('sinon');
89
types.register(presenceTestType.type);
910

1011
describe('DocPresence', function() {
@@ -297,6 +298,23 @@ describe('DocPresence', function() {
297298
], done);
298299
});
299300

301+
it('does not call getOps() when presence is already up-to-date', function(done) {
302+
var localPresence1 = presence1.create('presence-1');
303+
304+
async.series([
305+
doc1.fetch.bind(doc1), // Ensure up-to-date
306+
function(next) {
307+
sinon.spy(Backend.prototype, 'getOps');
308+
next();
309+
},
310+
localPresence1.submit.bind(localPresence1, {index: 1}),
311+
function(next) {
312+
expect(Backend.prototype.getOps).not.to.have.been.called;
313+
next();
314+
}
315+
], done);
316+
});
317+
300318
// This test case attempts to force us into a tight race condition corner case:
301319
// 1. doc1 sends presence, as well as submits an op
302320
// 2. doc2 receives the op first, followed by the presence, which is now out-of-date

0 commit comments

Comments
 (0)