Skip to content

Commit ccb0502

Browse files
author
Perki
committed
Writing tests to check streaming (result chunks) of stream deletions
1 parent 853b8e6 commit ccb0502

File tree

4 files changed

+74
-6
lines changed

4 files changed

+74
-6
lines changed

components/api-server/src/Result.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ class Result {
196196
const streams = [];
197197
for (let i = 0; i < streamsArray.length; i++) {
198198
const s = streamsArray[i];
199-
streams.push(s.stream.pipe(s.isArray ? new ArraySerializationStream(s.name) : new SingleObjectSerializationStream(s.name)));
199+
const serializedStream = s.stream.pipe(s.isArray ? new ArraySerializationStream(s.name) : new SingleObjectSerializationStream(s.name));
200+
streams.push(serializedStream);
200201
}
201202

202203
return new MultiStream(streams)

components/api-server/src/methods/streams/ArraySerializationStream.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
*/
77
const Transform = require('stream').Transform;
88

9-
const SERIALIZATION_STACK_SIZE = 2048;
9+
// serialize every n objects
10+
const OBJECT_BUFFER_SIZE = 100;
11+
// event if OBJECT_BUFFER_SIZE is not reach, serialize if MAX_WAIT_MS is reached
12+
const MAX_WAIT_MS = 100;
1013

1114
/**
1215
* Stream that encapsulates the items it receives in a stringified array.
@@ -19,20 +22,22 @@ module.exports = class ArraySerializationStream extends Transform {
1922
super({ writableObjectMode: true });
2023
this.isStart = true;
2124
this.prefix = '"' + arrayName + '":';
22-
this.size = SERIALIZATION_STACK_SIZE;
25+
this.size = OBJECT_BUFFER_SIZE;
2326
this.stack = [];
27+
this.lastSerialization = Date.now();
2428
}
2529

2630
_transform (item, encoding, callback) {
2731
this.stack.push(item);
2832

29-
if (this.stack.length >= this.size) {
33+
if (this.stack.length >= this.size || (Date.now() - this.lastSerialization) > MAX_WAIT_MS) {
3034
if (this.isStart) {
3135
this.isStart = false;
3236
this.push((this.prefix + JSON.stringify(this.stack)).slice(0, -1));
3337
} else {
3438
this.push(',' + (JSON.stringify(this.stack)).slice(1, -1));
3539
}
40+
this.lastSerialization = Date.now();
3641
this.stack = [];
3742
}
3843
callback();

components/api-server/test/events-streaming.test.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const { produceMongoConnection, context } = require('./test-helpers');
1212
const { databaseFixture } = require('test-helpers');
1313

1414
const http = require('http');
15+
const superagent = require('superagent');
16+
const { promisify } = require('util');
1517

1618
const N_ITEMS = 2000;
1719
describe('events streaming with ' + N_ITEMS + ' entries', function () {
@@ -66,13 +68,16 @@ describe('events streaming with ' + N_ITEMS + ' entries', function () {
6668
method: 'GET'
6769
};
6870

71+
let lastChunkRecievedAt = Date.now();
6972
http.request(options, function (res) {
7073
assert.equal(res.headers['content-type'], 'application/json');
7174
assert.equal(res.headers['transfer-encoding'], 'chunked');
7275
res.setEncoding('utf8');
7376
let jsonString = '';
7477
let chunkCount = 0;
7578
res.on('data', function (chunk) {
79+
if (Date.now() - lastChunkRecievedAt > 500) throw new Error('It took more that 500ms between chunks');
80+
lastChunkRecievedAt = Date.now();
7681
chunkCount++;
7782
jsonString += chunk;
7883
});
@@ -86,4 +91,43 @@ describe('events streaming with ' + N_ITEMS + ' entries', function () {
8691
});
8792
}).end();
8893
});
94+
95+
it('[XZGB] Streams deleted in sent as chunked', async function () {
96+
const options = {
97+
host: apiServer.host,
98+
port: apiServer.port,
99+
path: '/' + username + '/streams/' + streamId + '?mergeEventsWithParent=false&auth=' + appAccessToken,
100+
method: 'DELETE'
101+
};
102+
103+
const resultTrash = await superagent.delete(`http://${options.host}:${options.port}${options.path}`);
104+
assert.isTrue(resultTrash.body?.stream?.trashed);
105+
106+
let lastChunkRecievedAt = Date.now();
107+
108+
await promisify(function (callback) {
109+
http.request(options, function (res) {
110+
assert.equal(res.headers['content-type'], 'application/json');
111+
assert.equal(res.headers['transfer-encoding'], 'chunked');
112+
res.setEncoding('utf8');
113+
let jsonString = '';
114+
let chunkCount = 0;
115+
res.on('data', function (chunk) {
116+
if (Date.now() - lastChunkRecievedAt > 2000) throw new Error('It took more that 2000ms between chunks');
117+
lastChunkRecievedAt = Date.now();
118+
chunkCount++;
119+
jsonString += chunk;
120+
});
121+
res.on('end', () => {
122+
lastChunkRecievedAt = -1;
123+
assert.equal(JSON.parse(jsonString).updatedEvents.length, N_ITEMS);
124+
assert.isAtLeast(chunkCount, 3, 'Should receive at least 3 chunks');
125+
callback();
126+
});
127+
res.on('error', function (error) {
128+
callback(error);
129+
});
130+
}).end();
131+
})();
132+
});
89133
});

components/api-server/test/streams.test.js

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ describe('[STRE] streams', function () {
664664
});
665665
});
666666

667-
describe('DELETE /<id>', function () {
667+
describe('[STRD] DELETE /<id>', function () {
668668
this.timeout(5000);
669669

670670
beforeEach(resetData);
@@ -833,6 +833,8 @@ describe('[STRE] streams', function () {
833833
const deletedEventWithAtt = deletedEvents[0];
834834
let deletionTime;
835835

836+
const ADD_N_EVENTS = 100;
837+
836838
async.series([
837839
function addEventAttachment (stepDone) {
838840
request.post('/' + user.username + '/events/' + deletedEventWithAtt.id)
@@ -844,6 +846,22 @@ describe('[STRE] streams', function () {
844846
stepDone();
845847
});
846848
},
849+
async function fillStreamWithALotOfEvent () {
850+
const mall = await getMall();
851+
for (let i = 0; i < ADD_N_EVENTS; i++) {
852+
await mall.events.create(user.id, {
853+
id: 'cxxxxxxx' + i,
854+
type: 'note/txt',
855+
streamIds: [testData.streams[8].id],
856+
content: '' + i,
857+
time: timestamp.now(),
858+
created: timestamp.now(),
859+
createdBy: 'test',
860+
modified: timestamp.now(),
861+
modifiedBy: 'test'
862+
});
863+
}
864+
},
847865
async function trashStream () {
848866
await mall.streams.update(user.id, { id, trashed: true });
849867
},
@@ -870,7 +888,7 @@ describe('[STRE] streams', function () {
870888
const separatedEvents = validation.separateAccountStreamsAndOtherEvents(events);
871889
events = separatedEvents.events;
872890
const eventsWithoutHistory = testData.events.filter(e => e.headId == null);
873-
(events.length + foundDeletedEvents.length).should.eql(eventsWithoutHistory.length, 'events');
891+
(events.length + foundDeletedEvents.length).should.eql(eventsWithoutHistory.length + ADD_N_EVENTS, 'events');
874892

875893
// validate account streams events
876894
const actualAccountStreamsEvents = separatedEvents.accountStreamsEvents;

0 commit comments

Comments
 (0)