Skip to content

Commit a01d9dc

Browse files
committed
add streaming api implementation for getEventsByRevision
1 parent 803f933 commit a01d9dc

File tree

2 files changed

+113
-2
lines changed

2 files changed

+113
-2
lines changed

lib/databases/mongodb.js

+89
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,74 @@ var util = require('util'),
22
Store = require('../base'),
33
_ = require('lodash'),
44
async = require('async'),
5+
stream = require('stream'),
56
mongo = Store.use('mongodb'),
67
mongoVersion = Store.use('mongodb/package.json').version,
78
isNew = mongoVersion.indexOf('1.') !== 0,
89
ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID,
910
debug = require('debug')('eventstore:store:mongodb');
1011

12+
function streamEventsByRevision(self, findStatement, revMin, revMax, resultStream, lastEvent) {
13+
14+
findStatement.streamRevision = (revMax === -1) ? { '$gte': revMin } : { '$gte': revMin, '$lt': revMax };
15+
16+
var mongoStream = self.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] });
17+
18+
async.during(function(clb) {
19+
mongoStream.hasNext(clb);
20+
},
21+
function(clb) {
22+
mongoStream.next(function(error, e) {
23+
if (error)
24+
return clb(error);
25+
26+
if (!lastEvent) {
27+
lastEvent = e;
28+
return clb();
29+
}
30+
31+
// if for some reason we have written this event alredy
32+
if ((e.streamRevision === lastEvent.streamRevision && e.restInCommitStream <= lastEvent.restInCommitStream) ||
33+
(e.streamRevision <= lastEvent.streamRevision)) {
34+
return clb();
35+
}
36+
37+
lastEvent = e;
38+
resultStream.write(lastEvent, clb);
39+
});
40+
},
41+
function (error) {
42+
if (error) {
43+
return resultStream.destroy(error);
44+
}
45+
46+
if (!lastEvent) {
47+
return resultStream.end();
48+
}
49+
50+
var txOk = (revMax === -1 && !lastEvent.restInCommitStream) ||
51+
(revMax !== -1 && (lastEvent.streamRevision === revMax - 1 || !lastEvent.restInCommitStream));
52+
53+
if (txOk) {
54+
// the following is usually unnecessary
55+
self.removeTransactions(lastEvent);
56+
resultStream.end(lastEvent);
57+
}
58+
59+
self.repairFailedTransaction(lastEvent, function (err) {
60+
if (err) {
61+
if (err.message.indexOf('missing tx entry') >= 0) {
62+
return resultStream.end(lastEvent);;
63+
}
64+
debug(err);
65+
return resultStream.destroy(error);
66+
}
67+
68+
streamEventsByRevision(self, findStatement, lastEvent.revMin, revMax, resultStream, lastEvent);
69+
});
70+
});
71+
};
72+
1173
function Mongo(options) {
1274
options = options || {};
1375

@@ -330,6 +392,33 @@ _.extend(Mongo.prototype, {
330392
return query;
331393
},
332394

395+
streamEventsByRevision: function (query, revMin, revMax) {
396+
if (!query.aggregateId) {
397+
var errMsg = 'aggregateId not defined!';
398+
debug(errMsg);
399+
if (callback) callback(new Error(errMsg));
400+
return;
401+
}
402+
403+
var findStatement = {
404+
aggregateId: query.aggregateId,
405+
};
406+
407+
if (query.aggregate) {
408+
findStatement.aggregate = query.aggregate;
409+
}
410+
411+
if (query.context) {
412+
findStatement.context = query.context;
413+
}
414+
415+
var self = this;
416+
417+
var resultStream = new stream.PassThrough({ objectMode: true, highWaterMark: 1 });
418+
streamEventsByRevision(self, findStatement, revMin, revMax, resultStream);
419+
return resultStream;
420+
},
421+
333422
getEvents: function (query, skip, limit, callback) {
334423
this.streamEvents(query, skip, limit).toArray(callback);
335424
},

lib/eventstore.js

+24-2
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ _.extend(Eventstore.prototype, {
143143
* @param {Date} commitStamp the date object
144144
* @param {Number} skip how many events should be skipped? [optional]
145145
* @param {Number} limit how many events do you want in the result? [optional]
146-
* @returns {Stream} a stream with the events
147-
*/
146+
* @returns {Stream} a stream with the events
147+
*/
148148
streamEventsSince: function (commitStamp, skip, limit) {
149149
if (!this.store.streamEvents) {
150150
throw new Error('Streaming API is not suppoted by '+(this.options.type || 'inmemory') +' db implementation.');
@@ -163,6 +163,28 @@ _.extend(Eventstore.prototype, {
163163
},
164164

165165

166+
/**
167+
* stream events by revision
168+
* @param {Object || String} query the query object
169+
* @param {Number} revMin revision start point [optional]
170+
* @param {Number} revMax revision end point (hint: -1 = to end) [optional]
171+
* @returns {Stream} a stream with the events
172+
*/
173+
streamEventsByRevision: function (query, revMin, revMax) {
174+
if (typeof query === 'string') {
175+
query = { aggregateId: query };
176+
}
177+
178+
if (!query.aggregateId) {
179+
var err = new Error('An aggregateId should be passed!');
180+
debug(err);
181+
if (callback) callback(err);
182+
return;
183+
}
184+
185+
return this.store.streamEventsByRevision(query, revMin, revMax);
186+
},
187+
166188
/**
167189
* loads the events
168190
* @param {Object || String} query the query object [optional]

0 commit comments

Comments
 (0)