Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ test:
$(REPOSITORY):$(TAG) \
run test

integration:
@docker run \
-p 3000:3000 \
-p 8095:8095 \
-v $(PWD):/rtcstats-server \
--env RTCSTATS_LOG_LEVEL=debug \
--entrypoint npm \
--cpus=2 \
$(REPOSITORY):$(TAG) \
run integration

debug-restricted:
@docker run \
-p 3000:3000 \
Expand Down
4 changes: 4 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ server:

features:
disableFeatExtraction: false
reconnectTimeout: 60000
sequenceNumberSendingInterval: 60000
orphanFileCleanupTimeoutMinutes: 12
cleanupCronHour: 14

amplitude:
key:
Expand Down
10 changes: 8 additions & 2 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ server:
skipLoadBalancerIp: false
logLevel: info
jsonConsoleLog: false
useHTTPS: true
useHTTPS: false

amplitude:
key: ''

features:
sequenceNumberSendingInterval: 60000
orphanFileCleanupTimeoutMinutes: 12
cleanupCronIntervalMinutes: 720
reconnectTimeout: 10000

s3:
accessKeyId:
secretAccessKey:
region: us-west-2
region:
bucket:
useIAMAuth: false

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"scripts": {
"lint:fix": "eslint --fix ./src/",
"lint": "eslint ./src/",
"integration": "node ./src/test/client.js",
"integration": "NODE_ENV=test node ./src/test/client.js",
"test": "jest",
"test:fix": "jest ./src/test/jest/extract.test.js -- --fix",
"start": "NODE_ENV=production node ./src/app.js",
Expand Down
112 changes: 112 additions & 0 deletions src/ClientMessageHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

const logger = require('./logging');
const storeFile = require('./store/file');
const utils = require('./utils/utils');

const messageTypes = {
SequenceNumber: 'sn'
};

/**
* This handles sending the messages to the frontend
*/
class ClientMessageHandler {
/**
* @param tempPath {string}
* @param sequenceNumberSendingInterval {number}
*/
constructor({ statsSessionId, tempPath, sequenceNumberSendingInterval, demuxSink, client }) {
logger.debug('[ClientMessageHandler] Constructor statsSessionId', statsSessionId);
this.statsSessionId = statsSessionId;
this.tempPath = tempPath;
this.sequenceNumberSendingInterval = sequenceNumberSendingInterval;
this.demuxSink = demuxSink;
this.client = client;
this.sendLastSequenceNumber = this.sendLastSequenceNumber.bind(this);
}

/**
* Sends the last sequence number from demuxSink or reads from the dump file
*/
async sendLastSequenceNumber(isInitial) {
logger.debug('[ClientMessageHandler] Sending last sequence number for: ', this.statsSessionId);
let sequenceNumber = 0;

if (this.demuxSink.lastSequenceNumber > 0) {
logger.debug('[ClientMessageHandler] Last sequence number from demux ');
sequenceNumber = this.demuxSink.lastSequenceNumber;
} else {
logger.debug('[ClientMessageHandler] Last sequence number from dump ');
sequenceNumber = await this._getLastSequenceNumberFromDump();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to read the entire file in case of a client reconnect (not server restart case)? Technically we have that information in the previous sink

}

this.client.send(this._createMessage(
messageTypes.SequenceNumber,
this._createSequenceNumberBody(sequenceNumber, isInitial)
));

if (this.client.readyState === 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean? are there cases where readystate is not 1, if such a case occurs what happens? add a comment for why this is needed.

setTimeout(
this.sendLastSequenceNumber,
this.sequenceNumberSendingInterval,
this.client, this.statsSessionId
);
}
logger.debug('[ClientMessageHandler] Last sequence number: ', sequenceNumber);
}

/**
* Reads the last sequnce number from the dump file.
*/
async _getLastSequenceNumberFromDump() {
const dumpPath = utils.getDumpPath(this.tempPath, this.statsSessionId);

logger.debug('[ClientMessageHandler] Last sequence number from dump: ', dumpPath);

const promis = storeFile.getLastLine(dumpPath, 1)
.then(
lastLine => utils.parseLineForSequenceNumber(lastLine))
.catch(() => {
logger.debug('[ClientMessageHandler] New connection. File doesn\'t exist. file: ', dumpPath);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the error can be any error not necessarily that the file doesn't exist, maybe we should log the error as well.


return 0;
});

const result = await promis;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be done a bit more elegantly like:

let lastLine = 0;
try {
  const lastLineString = await storeFile.getLastLine(dumpPath, 1);
  lastLine = utils.parseLineForSequenceNumber(lastLineString))
} catch(e) {
  logger.error('[ClientMessageHandler] Error.  ', e);
}

return lastLine;

wdyt


return result;
}

/**
*
* @param type {string}
* @param body {string}
* @returns {string}
*/
_createMessage(type, body) {
return JSON.stringify({
'type': type,
'body': body
});
}

/**
*
* @param {*} sequenceNumber
* @param {*} isInitial
* @returns {object}
*/
_createSequenceNumberBody(sequenceNumber, isInitial) {
const body = {
value: sequenceNumber
};

if (isInitial === true) {
body.state = 'initial';
}

return body;
}
}

module.exports = ClientMessageHandler;
94 changes: 94 additions & 0 deletions src/DumpPersister.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@

const logger = require('./logging');
const PromCollector = require('./metrics/PromCollector');
const { saveEntryAssureUnique } = require('./store/dynamo');
const initS3Store = require('./store/s3.js');
const { asyncDeleteFile, getDumpPath } = require('./utils/utils');

/**
*
*/
class DumpPersister {
/**
*
*/
constructor({ tempPath, s3Config, disableFeatExtraction, webhookSender, config }) {
this.tempPath = tempPath;
this.store = this.createDumpStorage(s3Config);
this.disableFeatExtraction = disableFeatExtraction;
this.webhookSender = webhookSender;
this.config = config;
}

/**
* Initialize the service which will persist the dump files.
*/
createDumpStorage(s3Config) {
if (s3Config?.region) {
return initS3Store(s3Config);
}
logger.warn('[DumpPersister] S3 is not configured!');
}

/**
* Persist the dump file to the configured store and save the associated metadata. At the time of writing the
* only supported store for metadata is dynamo.
*
* @param {Object} sinkMeta - metadata associated with the dump file.
*/
async persistDumpData(sinkMeta) {

// Metadata associated with a dump can get large so just select the necessary fields.
const { clientId } = sinkMeta;
let uniqueClientId = clientId;

// Because of the current reconnect mechanism some files might have the same clientId, in which case the
// underlying call will add an associated uniqueId to the clientId and return it.
uniqueClientId = await saveEntryAssureUnique(sinkMeta);

// Store the dump file associated with the clientId using uniqueClientId as the key value. In the majority of
// cases the input parameter will have the same values.
this.storeDump(sinkMeta, uniqueClientId ?? clientId);
}

/**
* Store the dump to the configured store. The dump file might be stored under a different
* name, this is to account for the reconnect mechanism currently in place.
*
* @param {string} sinkMeta - name that the dump file will actually have on disk.
* @param {string} uniqueClientId - name that the dump will have on the store.
*/
async storeDump(sinkMeta, uniqueClientId) {
const {
clientId,
isJaaSTenant
} = sinkMeta;


const dumpPath = getDumpPath(this.tempPath, clientId);
const { webhooks: { sendRtcstatsUploaded } = { sendRtcstatsUploaded: false } } = this.config;

try {

logger.info(`[S3] Storing dump ${uniqueClientId} with path ${dumpPath}`);

await this.store?.put(uniqueClientId, dumpPath);

if (isJaaSTenant && sendRtcstatsUploaded && this.webhookSender) {
const signedLink = await this.store?.getSignedUrl(uniqueClientId);

logger.info('[App] Signed url:', signedLink);

this.webhookSender.sendRtcstatsUploadedHook(sinkMeta, signedLink);
}
} catch (err) {
PromCollector.storageErrorCount.inc();

logger.error('Error storing: %s uniqueId: %s - %s', dumpPath, uniqueClientId, err);
} finally {
await asyncDeleteFile(dumpPath);
}
}
}

module.exports = DumpPersister;
103 changes: 103 additions & 0 deletions src/OrphanFileHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const fs = require('fs');


const logger = require('./logging');
const fileStore = require('./store/file');
const utils = require('./utils/utils');

/**
*
*/
class OrphanFileHelper {
/**
*
*/
constructor({ tempPath, orphanFileCleanupTimeoutMinutes, wsHandler, cleanupCronHour }) {
this.tempPath = tempPath;
this.orphanFileCleanupTimeoutMs = orphanFileCleanupTimeoutMinutes * 60 * 1000;
this.wsHandler = wsHandler;
this.cleanupCronHour = cleanupCronHour;
this.processOldFiles = this.processOldFiles.bind(this);
}

/**
* Remove old files from the temp folder.
*/
processOldFiles() {
logger.info('[OrphanFileHelper] Waiting for connections to reconnect.');

if (fs.existsSync(this.tempPath)) {
fs.readdirSync(this.tempPath).forEach(fname => {

const filePath = utils.getDumpPath(this.tempPath, fname);

logger.debug(`[OrphanFileHelper] Trying to process file ${filePath}`);
fs.stat(filePath, (err, stats) => {
if (err) {
logger.error(`[OrphanFileHelper] File does not exist! ${filePath}`);
}

this.processIfExpired(stats, filePath, fname);
});
});
} else {
logger.error('[OrphanFileHelper] Temp path doesn\'t exists. path: ', this.tempPath);
throw new Error(`Temp path doesn't exists. tempPath: ${this.tempPath}`);
}
this.scheduleNext(this.cleanupCronHour);
}

/**
*
*/
processIfExpired(stats, filePath, fname) {
const lastModifiedDurationMs = Math.abs(Date.now() - stats.mtime.getTime());

logger.debug(`[OrphanFileHelper] File last modified ${lastModifiedDurationMs} ms ago:`);
if (lastModifiedDurationMs > this.orphanFileCleanupTimeoutMs) {
logger.debug(`[OrphanFileHelper] Start processing the file ${`${filePath}`}`);
const response = fileStore.getObjectsByKeys(
filePath, [ 'connectionInfo', 'identity' ]);

response.then(
obj => {
const jsonObj = obj;
let meta;
let connectionInfo;

if (jsonObj?.connectionInfo) {
meta = JSON.parse(jsonObj?.connectionInfo);
meta.dumpPath = `${filePath}`;
}

if (jsonObj?.identity) {
connectionInfo = jsonObj?.identity;
}

this.wsHandler.processData(fname, meta, connectionInfo);
})
.catch(e => {
logger.error(`[OrphanFileHelper] ${e}`);
logger.info(`[OrphanFileHelper] New connection. File doesn't exist. ${filePath}`);
});
}
}

/**
*
* @param {*} func
*/
scheduleNext(hour) {
const now = new Date();
const start = new Date(now.getFullYear(), now.getMonth(), now.getDate() + 1, hour, 0, 0, 0);

const wait = start.getTime() - now.getTime();

setTimeout(() => { // Wait until the specified hour
this.processOldFiles();
}, wait);
}
}


module.exports = OrphanFileHelper;
Loading