Skip to content

Commit 4608644

Browse files
authored
Merge pull request #486 from pryv/feature/stream-eventdeletions
Streaming eventDeletion on streams.delete
2 parents e174e7f + fad7023 commit 4608644

File tree

16 files changed

+310
-121
lines changed

16 files changed

+310
-121
lines changed

components/api-server/src/Result.js

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
const commonMeta = require('./methods/helpers/setCommonMeta');
88
const MultiStream = require('multistream');
99
const DrainStream = require('./methods/streams/DrainStream');
10-
const ArrayStream = require('./methods/streams/ArrayStream');
10+
const ArraySerializationStream = require('./methods/streams/ArraySerializationStream');
11+
const SingleObjectSerializationStream = require('./methods/streams/SingleObjectSerializationStream');
1112
const async = require('async');
1213

1314
const { Transform } = require('stream');
@@ -131,9 +132,9 @@ class Result {
131132
* @param {Readable} stream
132133
* @returns {void}
133134
*/
134-
addStream (arrayName, stream) {
135+
addStream (arrayName, stream, isArray = true) {
135136
this._private.isStreamResult = true;
136-
this._private.streamsArray.push({ name: arrayName, stream });
137+
this._private.streamsArray.push({ name: arrayName, stream, isArray });
137138
}
138139

139140
// Returns true if the Result holds any streams, false otherwise.
@@ -191,20 +192,14 @@ class Result {
191192
const streamsArray = this._private.streamsArray;
192193
if (!this._private.isStreamResult) { throw new Error('AF: not a stream result.'); }
193194
if (streamsArray.length < 1) { throw new Error('streams array empty'); }
194-
// Are we handling a single stream?
195-
if (streamsArray.length === 1) {
196-
const first = streamsArray[0];
197-
return first.stream
198-
.pipe(new ArrayStream(first.name, true))
199-
.pipe(new ResultStream(this._private.tracing, this._private.tracingId))
200-
.pipe(res);
201-
}
202-
// assert: streamsArray.length > 1
195+
203196
const streams = [];
204197
for (let i = 0; i < streamsArray.length; i++) {
205198
const s = streamsArray[i];
206-
streams.push(s.stream.pipe(new ArrayStream(s.name, i === 0)));
199+
const serializedStream = s.stream.pipe(s.isArray ? new ArraySerializationStream(s.name) : new SingleObjectSerializationStream(s.name));
200+
streams.push(serializedStream);
207201
}
202+
208203
return new MultiStream(streams)
209204
.pipe(new ResultStream(this._private.tracing, this._private.tracingId))
210205
.pipe(res);
@@ -245,7 +240,7 @@ class Result {
245240
const streamsArray = _private.streamsArray;
246241
const resultObj = {};
247242
async.forEachOfSeries(streamsArray, (elementDef, i, done) => {
248-
const drain = new DrainStream({ limit: _private.arrayLimit }, (err, list) => {
243+
const drain = new DrainStream({ limit: _private.arrayLimit, isArray: elementDef.isArray }, (err, list) => {
249244
if (err) {
250245
return done(err);
251246
}
@@ -275,15 +270,15 @@ class Result {
275270
/** @extends Transform */
276271
class ResultStream extends Transform {
277272
isStart;
278-
279273
tracing;
280-
281274
tracingId;
275+
debugString;
282276
constructor (tracing, parentTracingId) {
283-
super({ objectMode: true });
277+
super({ writableObjectMode: true });
284278
this.isStart = true;
285279
this.tracing = tracing;
286280
this.tracingId = this.tracing.startSpan('resultStream', {}, parentTracingId);
281+
this.debugString = '';
287282
}
288283

289284
/**
@@ -300,13 +295,18 @@ class ResultStream extends Transform {
300295
callback();
301296
}
302297

298+
// uncomment to debug
299+
// push (data) { this.debugString += data; super.push(data); }
300+
303301
/**
304302
* @returns {void}
305303
*/
306304
_flush (callback) {
307-
const thing = ', "meta": ' + JSON.stringify(commonMeta.setCommonMeta({}).meta);
305+
const thing = ' "meta": ' + JSON.stringify(commonMeta.setCommonMeta({}).meta);
308306
this.push(thing + '}');
309307
this.tracing.finishSpan('resultStream');
308+
309+
if (this.debugString !== '') { console.log('***** RESULT DATA **********\n' + this.debugString + '\n*********************'); }
310310
callback();
311311
}
312312
}

components/api-server/src/methods/streams.js

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const logger = getLogger('methods:streams');
2222
const { getMall, storeDataUtils } = require('mall');
2323
const { changePrefixIdForStreams, replaceWithNewPrefix } = require('./helpers/backwardCompatibility');
2424
const { pubsub } = require('messages');
25+
const Readable = require('stream').Readable;
2526
/**
2627
* Event streams API methods implementation.
2728
*
@@ -345,6 +346,7 @@ module.exports = async function (api) {
345346
childrenDepth: -1,
346347
storeId
347348
});
349+
348350
const streamToDelete = streamToDeleteSingleArray[0]; // no need to check existence: done before in verifyStreamExistenceAndPermissions
349351
const streamAndDescendantIds = treeUtils.collectPluckFromRootItem(streamToDelete, 'id');
350352
// keep stream and children to delete in next step
@@ -359,13 +361,25 @@ module.exports = async function (api) {
359361
const events = await mall.events.getWithParamsByStore(context.user.id, {
360362
[storeId]: { streams: [{ any: cleanDescendantIds }], limit: 1 }
361363
});
364+
362365
const hasLinkedEvents = !!events.length;
363366
if (hasLinkedEvents) {
364367
// has linked events -----------------
365368
if (params.mergeEventsWithParent === null) {
366369
return next(errors.invalidParametersFormat('There are events referring to the deleted items ' +
367370
'and the `mergeEventsWithParent` parameter is missing.'));
368371
}
372+
}
373+
374+
// --- all tests are passed
375+
// --- create result streams and start send result as they come
376+
const updatedEventsStream = new ItemsStream();
377+
result.addStream('updatedEvents', updatedEventsStream);
378+
const singleItemDeletedStream = new ItemsStream();
379+
result.addStream('streamDeletion', singleItemDeletedStream, false);
380+
next(); // <== call next here to avoid await blocking
381+
382+
if (hasLinkedEvents) {
369383
if (params.mergeEventsWithParent) {
370384
// -- Case 1 -- Merge events with parent
371385
// add parent stream Id if needed and remove deleted stream ids
@@ -374,6 +388,9 @@ module.exports = async function (api) {
374388
await mall.events.updateMany(context.user.id, query, {
375389
addStreams: [parentId],
376390
removeStreams: streamAndDescendantIds
391+
}, function (event) {
392+
if (event == null) return;
393+
updatedEventsStream.add({ action: 'mergedToParent', id: event.id });
377394
});
378395
} else {
379396
// case mergeEventsWithParent = false
@@ -382,14 +399,17 @@ module.exports = async function (api) {
382399
const remaningStreamsIds = _.difference(event.streamIds, streamAndDescendantIds);
383400
if (remaningStreamsIds.length === 0) { // no more streams deleted event
384401
await mall.events.delete(context.user.id, event);
402+
updatedEventsStream.add({ action: 'deleted', id: event.id });
385403
} else { // update event without these streams
386404
event.streamIds = remaningStreamsIds;
387405
await mall.events.update(context.user.id, event);
406+
updatedEventsStream.add({ action: 'updatedStreamIds', id: event.id });
388407
}
389408
}
390409
}
391410
pubsub.notifications.emit(context.user.username, pubsub.USERNAME_BASED_EVENTS_CHANGED);
392411
}
412+
updatedEventsStream.add(null); // close stream
393413
// finally delete stream
394414
for (const streamIdToDelete of context.streamToDeleteAndDescendantIds) {
395415
try {
@@ -398,8 +418,25 @@ module.exports = async function (api) {
398418
logger.error('Failed deleted some streams', err);
399419
}
400420
}
401-
result.streamDeletion = { id: params.id };
421+
singleItemDeletedStream.push({ id: params.id });
422+
singleItemDeletedStream.push(null); // close stream
402423
pubsub.notifications.emit(context.user.username, pubsub.USERNAME_BASED_STREAMS_CHANGED);
403-
next();
404424
}
405425
};
426+
427+
class ItemsStream extends Readable {
428+
buffer;
429+
constructor () {
430+
super({ objectMode: true });
431+
this.buffer = [];
432+
}
433+
434+
add (item) { this.push(item); }
435+
436+
_read () {
437+
let push = true;
438+
while (this.buffer.length > 0 && push) {
439+
push = this.push(this.buffer.shift());
440+
}
441+
}
442+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* @license
3+
* Copyright (C) 2012–2023 Pryv S.A. https://pryv.com - All Rights Reserved
4+
* Unauthorized copying of this file, via any medium is strictly prohibited
5+
* Proprietary and confidential
6+
*/
7+
const Transform = require('stream').Transform;
8+
9+
// serialize every n objects
10+
const OBJECT_BUFFER_SIZE = 100;
11+
// event if OBJECT_BUFFER_SIZE is not reach, serialize if MAX_WAIT_MS is reached
12+
const MAX_WAIT_MS = 100;
13+
14+
/**
15+
* Stream that encapsulates the items it receives in a stringified array.
16+
*
17+
* @param arrayName {String} array name that will prefix the array
18+
* @constructor
19+
*/
20+
module.exports = class ArraySerializationStream extends Transform {
21+
constructor (arrayName) {
22+
super({ writableObjectMode: true });
23+
this.isStart = true;
24+
this.prefix = '"' + arrayName + '":';
25+
this.size = OBJECT_BUFFER_SIZE;
26+
this.stack = [];
27+
this.lastSerialization = Date.now();
28+
}
29+
30+
_transform (item, encoding, callback) {
31+
this.stack.push(item);
32+
33+
if (this.stack.length >= this.size || (Date.now() - this.lastSerialization) > MAX_WAIT_MS) {
34+
if (this.isStart) {
35+
this.isStart = false;
36+
this.push((this.prefix + JSON.stringify(this.stack)).slice(0, -1));
37+
} else {
38+
this.push(',' + (JSON.stringify(this.stack)).slice(1, -1));
39+
}
40+
this.lastSerialization = Date.now();
41+
this.stack = [];
42+
}
43+
callback();
44+
}
45+
46+
_flush = function (callback) {
47+
if (this.isStart) {
48+
this.push(this.prefix + JSON.stringify(this.stack));
49+
} else {
50+
const joiningComma = this.stack.length > 0 ? ',' : '';
51+
this.push(joiningComma + (JSON.stringify(this.stack)).slice(1));
52+
}
53+
this.push(',');
54+
callback();
55+
};
56+
};

components/api-server/src/methods/streams/ArrayStream.js

Lines changed: 0 additions & 69 deletions
This file was deleted.

components/api-server/src/methods/streams/DrainStream.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@ function DrainStream (params, callback) {
3434

3535
if (callback) {
3636
this.on('finish', function () {
37-
callback(null, this.array);
37+
if (params.isArray) {
38+
return callback(null, this.array);
39+
}
40+
if (this.array.length !== 1) {
41+
return callback(new Error('Expected to find 1 item in array got: ' + JSON.stringify(this.array)));
42+
}
43+
callback(null, this.array[0]);
3844
});
3945
}
4046

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* @license
3+
* Copyright (C) 2012–2023 Pryv S.A. https://pryv.com - All Rights Reserved
4+
* Unauthorized copying of this file, via any medium is strictly prohibited
5+
* Proprietary and confidential
6+
*/
7+
const Transform = require('stream').Transform;
8+
9+
/**
10+
* Stream that serialize the first object it receives.
11+
*
12+
* @param objectName {String} array name that will prefix the array
13+
* @constructor
14+
*/
15+
module.exports = class SingleObjectSerializationStream extends Transform {
16+
name;
17+
constructor (objectName) {
18+
super({ writableObjectMode: true });
19+
this.name = objectName;
20+
}
21+
22+
_transform = function (item, encoding, callback) {
23+
this.push('"' + this.name + '": ' + JSON.stringify(item) + ', ');
24+
callback();
25+
};
26+
27+
_flush = function (callback) {
28+
callback();
29+
};
30+
};

components/api-server/src/schema/streamsMethods.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ const string = helpers.string;
1818
const number = helpers.number;
1919
const boolean = helpers.boolean;
2020

21+
const updatedEvent = helpers.object({
22+
id: helpers.string(),
23+
action: helpers.string()
24+
}, {
25+
id: 'updatedEvent',
26+
required: ['id', 'action'],
27+
additionalProperties: false
28+
});
29+
2130
module.exports = {
2231
get: {
2332
params: object({
@@ -83,7 +92,7 @@ module.exports = {
8392
required: ['stream'],
8493
additionalProperties: false
8594
}),
86-
object({ streamDeletion: itemDeletion }, {
95+
object({ streamDeletion: itemDeletion, updatedEvents: array(updatedEvent) }, {
8796
required: ['streamDeletion'],
8897
additionalProperties: false
8998
})

0 commit comments

Comments
 (0)