Skip to content

Commit c810e84

Browse files
committed
feat: add reconnect and process old files
1 parent 4561853 commit c810e84

File tree

4 files changed

+206
-50
lines changed

4 files changed

+206
-50
lines changed

config/default.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ server:
1515

1616
features:
1717
disableFeatExtraction: false
18+
reconnectTimout: 60000
19+
requenceNumberSendingInterval: 60000
1820

1921
amplitude:
2022
key:

src/app.js

Lines changed: 147 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ const JSONStream = require('JSONStream');
22
const assert = require('assert').strict;
33
const config = require('config');
44
const fs = require('fs');
5+
const { stat } = require('fs/promises');
56
const http = require('http');
67
const https = require('https');
78
const path = require('path');
89
const { pipeline } = require('stream');
10+
const url = require('url');
911
const WebSocket = require('ws');
1012

1113
const { name: appName, version: appVersion } = require('../package');
@@ -17,6 +19,7 @@ const DemuxSink = require('./demux');
1719
const logger = require('./logging');
1820
const PromCollector = require('./metrics/PromCollector');
1921
const { saveEntryAssureUnique } = require('./store/dynamo');
22+
const fileStore = require('./store/file');
2023
const initS3Store = require('./store/s3.js');
2124
const { getStatsFormat } = require('./utils/stats-detection');
2225
const { asyncDeleteFile, getEnvName, getIdealWorkerCount, RequestType, ResponseType } = require('./utils/utils');
@@ -26,6 +29,7 @@ let amplitude;
2629
let store;
2730
let featPublisher;
2831
let tempPath;
32+
const sessionIdTimeouts = {};
2933

3034
/**
3135
* Store the dump to the configured store. The dump file might be stored under a different
@@ -180,6 +184,28 @@ function setupFeaturesPublisher() {
180184
* Initialize the directory where temporary dump files will be stored.
181185
*/
182186
function setupWorkDirectory() {
187+
try {
188+
// Temporary path for stats dumps must be configured.
189+
tempPath = config.server.tempPath;
190+
assert(tempPath);
191+
192+
if (!fs.existsSync(tempPath)) {
193+
logger.debug(`[App] Creating working dir ${tempPath}`);
194+
fs.mkdirSync(tempPath);
195+
}
196+
} catch (e) {
197+
logger.error(`[App] Error while accessing working dir ${tempPath} - ${e}`);
198+
199+
// The app is probably in an inconsistent state at this point, throw and stop process.
200+
throw e;
201+
}
202+
}
203+
204+
/**
205+
* Remove old files from the temp folder.
206+
*/
207+
function processOldFiles() {
208+
logger.info('[App] Waiting for connections to reconnect.');
183209
try {
184210
// Temporary path for stats dumps must be configured.
185211
tempPath = config.server.tempPath;
@@ -188,15 +214,37 @@ function setupWorkDirectory() {
188214
if (fs.existsSync(tempPath)) {
189215
fs.readdirSync(tempPath).forEach(fname => {
190216
try {
191-
logger.debug(`[App] Removing file ${`${tempPath}/${fname}`}`);
192-
fs.unlinkSync(`${tempPath}/${fname}`);
217+
const filePath = `${tempPath}/${fname}`;
218+
219+
logger.debug(`[App] Trying to process file ${filePath}`);
220+
fs.stat(filePath, (err, stats) => {
221+
if (err) {
222+
throw err;
223+
}
224+
225+
if (Math.abs(Date.now() - stats.mtime.getTime()) > config.features.reconnectTimout) {
226+
logger.debug(`[App] Start processing the file ${`${filePath}`}`);
227+
const response = fileStore?.getObjectsByKeys(
228+
filePath, [ 'connectionInfo', 'identity' ]);
229+
230+
response.then(
231+
obj => {
232+
logger.info(`[App] Meta and connectionInfo response: ${response}`);
233+
const meta = obj?.connectionInfo;
234+
const connectionInfo = obj?.identity;
235+
236+
setTimeout(processData,
237+
config.features.reconnectTimout, fname, meta, connectionInfo);
238+
})
239+
.catch(() => {
240+
logger.info('[App] New connection. File doesn\'t exist.');
241+
});
242+
}
243+
});
193244
} catch (e) {
194245
logger.error(`[App] Error while unlinking file ${fname} - ${e}`);
195246
}
196247
});
197-
} else {
198-
logger.debug(`[App] Creating working dir ${tempPath}`);
199-
fs.mkdirSync(tempPath);
200248
}
201249
} catch (e) {
202250
logger.error(`[App] Error while accessing working dir ${tempPath} - ${e}`);
@@ -248,10 +296,16 @@ function setupMetricsServer() {
248296
*/
249297
function wsConnectionHandler(client, upgradeReq) {
250298
PromCollector.connected.inc();
299+
logger.info('[App] Websocket connection handler');
251300

252301
// the url the client is coming from
253302
const referer = upgradeReq.headers.origin + upgradeReq.url;
254303
const ua = upgradeReq.headers['user-agent'];
304+
const queryObject = url.parse(referer, true).query;
305+
const statsSessionId = queryObject?.statsSessionId;
306+
307+
clearConnectionTimeout(statsSessionId);
308+
sendLastSequenceNumber(client, statsSessionId);
255309

256310
// During feature extraction we need information about the browser in order to decide which algorithms use.
257311
const connectionInfo = {
@@ -273,39 +327,11 @@ function wsConnectionHandler(client, upgradeReq) {
273327
const demuxSink = new DemuxSink(demuxSinkOptions);
274328

275329
demuxSink.on('close-sink', ({ id, meta }) => {
276-
logger.info('[App] Queue for processing id %s', id);
277-
278-
// Metadata associated with a dump can get large so just select the necessary fields.
279-
const dumpData = {
280-
app: meta.applicationName || 'Undefined',
281-
clientId: id,
282-
conferenceId: meta.confName,
283-
conferenceUrl: meta.confID,
284-
dumpPath: meta.dumpPath,
285-
endDate: Date.now(),
286-
endpointId: meta.endpointId,
287-
startDate: meta.startDate,
288-
sessionId: meta.meetingUniqueId,
289-
userId: meta.displayName,
290-
ampSessionId: meta.sessionId,
291-
ampUserId: meta.userId,
292-
ampDeviceId: meta.deviceId,
293-
statsFormat: connectionInfo.statsFormat,
294-
isBreakoutRoom: meta.isBreakoutRoom,
295-
breakoutRoomId: meta.roomId,
296-
parentStatsSessionId: meta.parentStatsSessionId
297-
};
298-
299-
// Don't process dumps generated by JVB, there should be a more formal process to
300-
if (config.features.disableFeatExtraction || connectionInfo.clientProtocol?.includes('JVB')) {
301-
persistDumpData(dumpData);
302-
} else {
303-
// Add the clientId in the worker pool so it can process the associated dump file.
304-
workerPool.addTask({
305-
type: RequestType.PROCESS,
306-
body: dumpData
307-
});
308-
}
330+
logger.info('[App] Websocket disconnected waiting for processing the data %s', id);
331+
332+
const timemoutId = setTimeout(processData, config.features.reconnectTimout, id, meta, connectionInfo);
333+
334+
sessionIdTimeouts[id] = timemoutId;
309335
});
310336

311337
const connectionPipeline = pipeline(
@@ -348,6 +374,61 @@ function wsConnectionHandler(client, upgradeReq) {
348374
});
349375
}
350376

377+
/**
378+
* Clear the connection timeout if the user is reconnected/
379+
*
380+
* @param {*} id
381+
*/
382+
function clearConnectionTimeout(id) {
383+
const timeoutId = sessionIdTimeouts[id];
384+
385+
if (timeoutId) {
386+
logger.info('[App] Clear timeout for connectionId: %s', id);
387+
clearTimeout(timeoutId);
388+
}
389+
}
390+
391+
/**
392+
*
393+
* @param {*} meta
394+
* @param {*} connectionInfo
395+
*/
396+
function processData(id, meta, connectionInfo) {
397+
logger.info('[App] Queue for processing id %s', id);
398+
399+
// Metadata associated with a dump can get large so just select the necessary fields.
400+
const dumpData = {
401+
app: meta.applicationName || 'Undefined',
402+
clientId: id,
403+
conferenceId: meta.confName,
404+
conferenceUrl: meta.confID,
405+
dumpPath: meta.dumpPath,
406+
endDate: Date.now(),
407+
endpointId: meta.endpointId,
408+
startDate: meta.startDate,
409+
sessionId: meta.meetingUniqueId,
410+
userId: meta.displayName,
411+
ampSessionId: meta.sessionId,
412+
ampUserId: meta.userId,
413+
ampDeviceId: meta.deviceId,
414+
statsFormat: connectionInfo.statsFormat,
415+
isBreakoutRoom: meta.isBreakoutRoom,
416+
breakoutRoomId: meta.roomId,
417+
parentStatsSessionId: meta.parentStatsSessionId
418+
};
419+
420+
// Don't process dumps generated by JVB, there should be a more formal process to
421+
if (config.features.disableFeatExtraction || connectionInfo.clientProtocol?.includes('JVB')) {
422+
persistDumpData(dumpData);
423+
} else {
424+
// Add the clientId in the worker pool so it can process the associated dump file.
425+
workerPool.addTask({
426+
type: RequestType.PROCESS,
427+
body: dumpData
428+
});
429+
}
430+
}
431+
351432
/**
352433
*
353434
* @param {*} wsServer
@@ -358,6 +439,33 @@ function setupWebSocketsServer(wsServer) {
358439
wss.on('connection', wsConnectionHandler);
359440
}
360441

442+
/**
443+
* @param {*} client
444+
* @param {*} id
445+
*/
446+
function sendLastSequenceNumber(client, id) {
447+
const dumpPath = `${tempPath}/${id}`;
448+
let sequenceNumber = 0;
449+
450+
fileStore?.getLastLine(dumpPath, 1)
451+
.then(
452+
lastLine => {
453+
const jsonData = JSON.parse(lastLine);
454+
455+
if (Array.isArray(jsonData) && jsonData[4] !== undefined) {
456+
sequenceNumber = jsonData[4];
457+
}
458+
})
459+
.catch(() => {
460+
logger.info('[App] New connection. File doesn\'t exist.');
461+
})
462+
.finally(() => {
463+
client.send(`{"sn":${sequenceNumber}}`);
464+
465+
setTimeout(sendLastSequenceNumber, config.features.requenceNumberSendingInterval, client, id);
466+
});
467+
}
468+
361469
/**
362470
* Handler used for basic availability checks.
363471
*
@@ -439,6 +547,7 @@ function startRtcstatsServer() {
439547
logger.info('[App] Initializing: %s; version: %s; env: %s ...', appName, appVersion, getEnvName());
440548

441549
setupWorkDirectory();
550+
processOldFiles();
442551
setupDumpStorage();
443552
setupFeaturesPublisher();
444553
setupAmplitudeConnector();

src/demux.js

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,11 @@ class DemuxSink extends Writable {
151151
async _sinkCreate(id) {
152152
PromCollector.sessionCount.inc();
153153

154-
let resolvedId = id;
155-
let i = 0;
154+
const resolvedId = id;
156155
let fd;
157156

158157
const idealPath = path.resolve(cwd, this.dumpFolder, id);
159-
let filePath = idealPath;
158+
const filePath = idealPath;
160159

161160
// If a client reconnects the same client id will be provided thus cases can occur where the previous dump
162161
// with the same id is still present on the disk, in order to avoid conflicts and states where multiple
@@ -168,15 +167,7 @@ class DemuxSink extends Writable {
168167
// if the entry already exists because some other instance uploaded first, the same incremental approach needs
169168
// to be taken.
170169
while (!fd) {
171-
try {
172-
fd = await fsOpen(filePath, 'wx');
173-
} catch (err) {
174-
if (err.code !== 'EEXIST') {
175-
throw err;
176-
}
177-
resolvedId = `${id}_${++i}`;
178-
filePath = path.resolve(cwd, this.dumpFolder, resolvedId);
179-
}
170+
fd = await fsOpen(filePath, 'w');
180171
}
181172

182173
this.log.info('[Demux] open-sink id: %s; path %s; connection: %o', id, filePath, this.connectionInfo);

src/store/file.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
const fs = require('fs');
2+
const readline = require('readline');
3+
const Stream = require('stream');
4+
5+
const logger = require('../logging');
6+
7+
exports.getLastLine = (fileName, minLength) => {
8+
const inStream = fs.createReadStream(fileName);
9+
const outStream = new Stream();
10+
11+
return new Promise((resolve, reject) => {
12+
const rl = readline.createInterface(inStream, outStream);
13+
14+
let lastLine = '';
15+
16+
rl.on('line', line => {
17+
if (line.length >= minLength) {
18+
lastLine = line;
19+
}
20+
});
21+
22+
rl.on('error', reject);
23+
24+
rl.on('close', () => {
25+
resolve(lastLine);
26+
});
27+
});
28+
};
29+
30+
exports.getObjectsByKeys = (fileName, keys) => {
31+
const inStream = fs.createReadStream(fileName);
32+
const outStream = new Stream();
33+
const response = {};
34+
35+
return new Promise((resolve, reject) => {
36+
const rl = readline.createInterface(inStream, outStream);
37+
38+
rl.on('line', line => {
39+
if (line !== '') {
40+
const jsonLine = JSON.parse(line);
41+
42+
if (keys.indexOf(jsonLine[0]) >= 0 && jsonLine[2] !== null) {
43+
response[jsonLine[0]] = jsonLine[2];
44+
}
45+
}
46+
});
47+
48+
rl.on('close', () => {
49+
resolve(response);
50+
});
51+
52+
rl.on('error', reject);
53+
});
54+
};

0 commit comments

Comments
 (0)