From 609338c965ea18317d69788af3def8d4086d70e5 Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Tue, 17 Mar 2020 23:10:15 -0700 Subject: [PATCH] Define server plugin interface, move Agent queries code into a built-in plugin --- lib/agent.js | 197 ++++++++----------------------- lib/backend.js | 63 ++++++++++ lib/query-server-plugin.js | 229 +++++++++++++++++++++++++++++++++++++ 3 files changed, 338 insertions(+), 151 deletions(-) create mode 100644 lib/query-server-plugin.js diff --git a/lib/agent.js b/lib/agent.js index a8ab4a3bb..2ebdeaca8 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -6,6 +6,9 @@ var ShareDBError = require('./error'); var ERROR_CODE = ShareDBError.CODES; +/** @typedef {import('./backend')} Backend */ +/** @typedef {import('./backend').ServerPlugin} ServerPlugin */ + /** * Agent deserializes the wire protocol messages received from the stream and * calls the corresponding functions on its Agent. It uses the return values @@ -16,6 +19,7 @@ var ERROR_CODE = ShareDBError.CODES; * @param {Duplex} stream connection to a client */ function Agent(backend, stream) { + /** @type {Backend} */ this.backend = backend; this.stream = stream; @@ -32,8 +36,24 @@ function Agent(backend, stream) { // map of collection -> id -> stream this.subscribedDocs = {}; - // Map from queryId -> emitter - this.subscribedQueries = {}; + /** + * Map of action name (`a` field in requests) to plugin + * @type {{ [action: string]: ServerPlugin }} + */ + var actionToPlugin = this.actionToPlugin = {}; + // Map of plugin name -> plugin's agent state + this.pluginStates = {}; + for (var i = 0; i < backend.plugins.length; i++) { + var plugin = backend.plugins[i]; + for (var action in plugin.requestHandlers) { + if (actionToPlugin[action]) { + throw new Error('Action ' + action + ' in plugin ' + plugin.name + + ' conflicts with plugin ' + actionToPlugin[action].name); + } + actionToPlugin[action] = plugin; + } + this.pluginStates[plugin.name] = plugin.createAgentState(); + } // Track which documents are subscribed to presence by the client. This is a // map of channel -> stream @@ -167,43 +187,6 @@ Agent.prototype._subscribeToPresenceStream = function(channel, stream) { }); }; -Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) { - var previous = this.subscribedQueries[queryId]; - if (previous) previous.destroy(); - this.subscribedQueries[queryId] = emitter; - - var agent = this; - emitter.onExtra = function(extra) { - agent.send({a: 'q', id: queryId, extra: extra}); - }; - - emitter.onDiff = function(diff) { - for (var i = 0; i < diff.length; i++) { - var item = diff[i]; - if (item.type === 'insert') { - item.values = getResultsData(item.values); - } - } - // Consider stripping the collection out of the data we send here - // if it matches the query's collection. - agent.send({a: 'q', id: queryId, diff: diff}); - }; - - emitter.onError = function(err) { - // Log then silently ignore errors in a subscription stream, since these - // may not be the client's fault, and they were not the result of a - // direct request by the client - logger.error('Query subscription stream error', collection, query, err); - }; - - emitter.onOp = function(op) { - var id = op.d; - agent._onOp(collection, id, op); - }; - - emitter._open(); -}; - Agent.prototype._onOp = function(collection, id, op) { if (this._isOwnOp(collection, op)) return; @@ -379,19 +362,22 @@ Agent.prototype._checkRequest = function(request) { // Handle an incoming message from the client Agent.prototype._handleMessage = function(request, callback) { try { + var plugin = this.actionToPlugin[request.a]; + var errMessage = this._checkRequest(request); if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage)); + if (plugin) { + try { + plugin.checkRequest(request); + } catch (err) { + return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, err.message)); + } + } switch (request.a) { case 'hs': if (request.id) this.src = request.id; return callback(null, this._initMessage('hs')); - case 'qf': - return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback); - case 'qs': - return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback); - case 'qu': - return this._queryUnsubscribe(request.id, callback); case 'bf': return this._fetchBulk(request.c, request.b, callback); case 'bs': @@ -435,109 +421,23 @@ Agent.prototype._handleMessage = function(request, callback) { case 'pu': return this._unsubscribePresence(request.ch, request.seq, callback); default: - callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); - } - } catch (err) { - callback(err); - } -}; -function getQueryOptions(request) { - var results = request.r; - var ids; - var fetch; - var fetchOps; - if (results) { - ids = []; - for (var i = 0; i < results.length; i++) { - var result = results[i]; - var id = result[0]; - var version = result[1]; - ids.push(id); - if (version == null) { - if (fetch) { - fetch.push(id); + if (plugin) { + return plugin.requestHandlers[request.a](request, { + agent: this, + backend: this.backend, + agentState: this.pluginStates[plugin.name] + }, callback); } else { - fetch = [id]; + callback( + new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message') + ); } - } else { - if (!fetchOps) fetchOps = {}; - fetchOps[id] = version; - } } + } catch (err) { + callback(err); } - var options = request.o || {}; - options.ids = ids; - options.fetch = fetch; - options.fetchOps = fetchOps; - return options; -} - -Agent.prototype._queryFetch = function(queryId, collection, query, options, callback) { - // Fetch the results of a query once - this.backend.queryFetch(this, collection, query, options, function(err, results, extra) { - if (err) return callback(err); - var message = { - data: getResultsData(results), - extra: extra - }; - callback(null, message); - }); -}; - -Agent.prototype._querySubscribe = function(queryId, collection, query, options, callback) { - // Subscribe to a query. The client is sent the query results and its - // notified whenever there's a change - var agent = this; - var wait = 1; - var message; - function finish(err) { - if (err) return callback(err); - if (--wait) return; - callback(null, message); - } - if (options.fetch) { - wait++; - this.backend.fetchBulk(this, collection, options.fetch, function(err, snapshotMap) { - if (err) return finish(err); - message = getMapResult(snapshotMap); - finish(); - }); - } - if (options.fetchOps) { - wait++; - this._fetchBulkOps(collection, options.fetchOps, finish); - } - this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) { - if (err) return finish(err); - if (this.closed) return emitter.destroy(); - - agent._subscribeToQuery(emitter, queryId, collection, query); - // No results are returned when ids are passed in as an option. Instead, - // want to re-poll the entire query once we've established listeners to - // emit any diff in results - if (!results) { - emitter.queryPoll(finish); - return; - } - message = { - data: getResultsData(results), - extra: extra - }; - finish(); - }); }; -function getResultsData(results) { - var items = []; - for (var i = 0; i < results.length; i++) { - var result = results[i]; - var item = getSnapshotData(result); - item.d = result.id; - items.push(item); - } - return items; -} - function getMapResult(snapshotMap) { var data = {}; for (var id in snapshotMap) { @@ -553,6 +453,8 @@ function getMapResult(snapshotMap) { } return {data: data}; } +// Exported for use in core plugins +Agent._getMapResult = getMapResult; function getSnapshotData(snapshot) { var data = { @@ -564,15 +466,8 @@ function getSnapshotData(snapshot) { } return data; } - -Agent.prototype._queryUnsubscribe = function(queryId, callback) { - var emitter = this.subscribedQueries[queryId]; - if (emitter) { - emitter.destroy(); - delete this.subscribedQueries[queryId]; - } - process.nextTick(callback); -}; +// Exported for use in core plugins +Agent._getSnapshotData = getSnapshotData; Agent.prototype._fetch = function(collection, id, version, callback) { if (version == null) { diff --git a/lib/backend.js b/lib/backend.js index 923dc33b3..9d523a349 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -8,6 +8,7 @@ var MemoryPubSub = require('./pubsub/memory'); var ot = require('./ot'); var projections = require('./projections'); var QueryEmitter = require('./query-emitter'); +var QueryServerPlugin = require('./query-server-plugin').Plugin; var ShareDBError = require('./error'); var Snapshot = require('./snapshot'); var StreamSocket = require('./stream-socket'); @@ -37,6 +38,10 @@ function Backend(options) { // Map from event name to a list of middleware this.middleware = {}; + /** @type {Array>} */ + this.plugins = []; + this._registerServerPlugin(new QueryServerPlugin()); + // The number of open agents for monitoring and testing memory leaks this.agentsCount = 0; this.remoteAgentsCount = 0; @@ -82,6 +87,64 @@ Backend.prototype.SNAPSHOT_TYPES = { byTimestamp: 'byTimestamp' }; +/** + * @param {ServerPlugin} plugin + */ +Backend.prototype._registerServerPlugin = function(plugin) { + this.plugins.push(plugin); +}; + +/** + * @typedef {object} ServerPlugin + * + * @property {string} name Unique plugin name, usually based on the plugin's package name + * @property {{[action: string]: RequestHandler}} requestHandlers + * @property {(callback: (error?: Error) => void) => void} close Function called when + * `Backend#close` is called. The close function should shut down database connections and then + * call the callback. + * @property {() => S} createAgentState Function to create an object that contains custom + * per-agent state that the plugin needs + * @property {(agentState: S) => void} destroyAgentState Function to tear down the plugin's state + * for an agent + * @property {(request: unknown) => request is TReq} checkRequest Function to check the format of + * an incoming message from a client, throwing an error if the message is invalid + * + * @template TReq - Type for request data from a client + * @template TResp - Type for response data sent back to a client + * @template S - Type for custom per-agent state that the plugin needs to keep + */ + +/** + * @callback RequestHandler + * + * Function that handles an incoming message from a client + * + * @param {TReq} request Request message from a client + * @param {RequestHandlerContext} context + * @param {(err?: Error | null, reply?: TResp) => void} callback Callback to be called with the + * reply message + * @returns {void} + * + * @template TReq - Type for request data from a client + * @template TResp - Type for response data sent back to a client + * @template S - Type for custom per-agent state that the plugin needs to keep + */ + +/** + * @typedef {object} RequestHandlerContext + * + * @property {import('./agent')} agent + * @property {import('./backend')} backend + * @property {S} agentState + * + * @template S + */ + +/** + * Closes the backend and all its database connections. + * + * @param {(error?: Error) => void} callback + */ Backend.prototype.close = function(callback) { var wait = 4; var backend = this; diff --git a/lib/query-server-plugin.js b/lib/query-server-plugin.js new file mode 100644 index 000000000..304e5bbfc --- /dev/null +++ b/lib/query-server-plugin.js @@ -0,0 +1,229 @@ +var Agent = require('./agent'); + +exports.Plugin = QueryServerPlugin; + +/** + * @typedef { import('./backend').RequestHandlerContext } RequestHandlerContext + * @template S + */ + +/** + * Core plugin to handle queries + */ +function QueryServerPlugin() { + this.name = 'sharedb.query'; + this.requestHandlers = { + qf: queryFetch, + qs: querySubscribe, + qu: queryUnsubscribe + }; +} + +QueryServerPlugin.prototype.close = function(callback) { + callback(); +}; + +QueryServerPlugin.prototype.createAgentState = function() { + return new QueriesAgentState(); +}; + +/** + * @param {QueriesAgentState} agentState + */ +QueryServerPlugin.prototype.destroyAgentState = function(agentState) { + // Clean up query subscription streams + for (var id in agentState.subscribedQueries) { + var emitter = agentState.subscribedQueries[id]; + emitter.destroy(); + } + agentState.subscribedQueries = {}; +}; + +QueryServerPlugin.prototype.checkRequest = function(request) { + if (typeof request.id !== 'number') { + throw new Error('Missing query ID'); + } +}; + +/** + * Fetch the results of a query once + * + * @param {TReq} request Request message from a client + * @param {RequestHandlerContext} context + * @param {(err?: Error | null, reply?: TResp) => void} callback Callback to be called with the + * reply message + */ +function queryFetch(request, context, callback) { + var collection = request.c; + var query = request.q; + var options = getQueryOptions(request); + var agent = context.agent; + var backend = context.backend; + + backend.queryFetch(agent, collection, query, options, function(err, results, extra) { + if (err) return callback(err); + var message = { + data: getResultsData(results), + extra: extra + }; + callback(null, message); + }); +} + +/** + * Subscribe to a query. The client is sent the query results, and it's notified whenever there's a + * change + * + * @param {TReq} request Request message from a client + * @param {RequestHandlerContext} context + * @param {(err?: Error | null, reply?: TResp) => void} callback Callback to be called with the + * reply message + */ +function querySubscribe(request, context, callback) { + var queryId = request.id; + var collection = request.c; + var query = request.q; + var options = getQueryOptions(request); + var agent = context.agent; + var backend = context.backend; + + var wait = 1; + var message; + function finish(err) { + if (err) return callback(err); + if (--wait) return; + callback(null, message); + } + if (options.fetch) { + wait++; + backend.fetchBulk(agent, collection, options.fetch, function(err, snapshotMap) { + if (err) return finish(err); + message = Agent._getMapResult(snapshotMap); + finish(); + }); + } + if (options.fetchOps) { + wait++; + agent._fetchBulkOps(collection, options.fetchOps, finish); + } + backend.querySubscribe(agent, collection, query, options, function(err, emitter, results, extra) { + if (err) return finish(err); + if (agent.closed) return emitter.destroy(); + + _subscribeToQuery(context, emitter, queryId, collection, query); + // No results are returned when ids are passed in as an option. Instead, + // want to re-poll the entire query once we've established listeners to + // emit any diff in results + if (!results) { + emitter.queryPoll(finish); + return; + } + message = { + data: getResultsData(results), + extra: extra + }; + finish(); + }); +} + +function _subscribeToQuery(context, emitter, queryId, collection, query) { + var previous = context.agentState.subscribedQueries[queryId]; + if (previous) previous.destroy(); + context.agentState.subscribedQueries[queryId] = emitter; + + var agent = context.agent; + emitter.onExtra = function(extra) { + agent.send({a: 'q', id: queryId, extra: extra}); + }; + + emitter.onDiff = function(diff) { + for (var i = 0; i < diff.length; i++) { + var item = diff[i]; + if (item.type === 'insert') { + item.values = getResultsData(item.values); + } + } + // Consider stripping the collection out of the data we send here + // if it matches the query's collection. + agent.send({a: 'q', id: queryId, diff: diff}); + }; + + emitter.onError = function(err) { + // Log then silently ignore errors in a subscription stream, since these + // may not be the client's fault, and they were not the result of a + // direct request by the client + logger.error('Query subscription stream error', collection, query, err); + }; + + emitter.onOp = function(op) { + var id = op.d; + agent._onOp(collection, id, op); + }; + + emitter._open(); +}; + +/** + * @param {TReq} request Request message from a client + * @param {RequestHandlerContext} context + * @param {(err?: Error | null, reply?: TResp) => void} callback Callback to be called with the + * reply message + */ +function queryUnsubscribe(request, context, callback) { + var queryId = request.id; + + var emitter = this.subscribedQueries[queryId]; + if (emitter) { + emitter.destroy(); + delete this.subscribedQueries[queryId]; + } + process.nextTick(callback); +} + +function getQueryOptions(request) { + var results = request.r; + var ids; + var fetch; + var fetchOps; + if (results) { + ids = []; + for (var i = 0; i < results.length; i++) { + var result = results[i]; + var id = result[0]; + var version = result[1]; + ids.push(id); + if (version == null) { + if (fetch) { + fetch.push(id); + } else { + fetch = [id]; + } + } else { + if (!fetchOps) fetchOps = {}; + fetchOps[id] = version; + } + } + } + var options = request.o || {}; + options.ids = ids; + options.fetch = fetch; + options.fetchOps = fetchOps; + return options; +} + +function getResultsData(results) { + var items = []; + for (var i = 0; i < results.length; i++) { + var result = results[i]; + var item = Agent._getSnapshotData(result); + item.d = result.id; + items.push(item); + } + return items; +} + +function QueriesAgentState() { + // Map from queryId -> QueryEmitter + /** @type {{[queryId: number]: import('./query-emitter')}} */ + this.subscribedQueries = {}; +}