feat: add reconnect and process old files#99
feat: add reconnect and process old files#99tamasdomokos wants to merge 1 commit intojitsi:feature-reconnectfrom
Conversation
81b8aad to
c810e84
Compare
andrei-gavrilescu
left a comment
There was a problem hiding this comment.
I know this is meant to be a draft, I've just added a few comments to consider when refactoring.
|
There are a lot of awesome changes here, and we probably need to add at least some basic integration tests for the reconnect scenarios please checkout |
c14d391 to
b97fbae
Compare
b8d3cf6 to
07667e9
Compare
src/test/client.js
Outdated
| assert.deepStrictEqual(parsedBody, resultTemplate); | ||
| } else { | ||
| // this is a reconnect dumpInfo is not relevant | ||
| logger.info('[TEST] Handling DONE event after reconnect with statsSessionId %j, body %j %j', |
There was a problem hiding this comment.
Why is this not relevant, the reconnect test should have the same result as the dump without the disconnect.
There was a problem hiding this comment.
After the disconnect it's waiting for the reconnect before is getting processed.
| // Subsequent operations will be taken by services in the upper level, like upload to store and persist | ||
| // metadata do a db. | ||
| case 'close': | ||
| this.log.info('[Demux] sink closed'); |
There was a problem hiding this comment.
In case a client reconnect happens, the client won't sent the identity data again, which means the meta object will be missing the identity information used in the app.
There was a problem hiding this comment.
the startDate that's set in _sinkCreate will also not correspond with the actual start date of the session, won't this affect the rest of the application?
|
|
||
| // we need to wait a little bit before reconnecting. | ||
| setTimeout(() => { | ||
| connection.connect(); |
There was a problem hiding this comment.
There is one problem with how reconnect is currently handled, if the client reconnects after the server decided it waited enough then the FeatureExtractor will process the resulting dump file as if it was a new session (even though identity information is missing) resulting in an overwritten s3 dump with partial information and a duplicate entry in redshift. This can also happen if the server restarts and the client lands on another server because of how haproxy is currently setup. Ideally the server or the client would identify these cases and not process them, simply logging an error would do.
Is this case handled by client/server protocol somehow?
| @@ -338,6 +399,13 @@ function simulateConnection(dumpPath, resultPath, ua, protocolV) { | |||
| function runTest() { | |||
There was a problem hiding this comment.
A test that would somehow simulate a server restart would be cool, but I assume that would be a bit convoluted, maybe an isolated test for the OprhanFileHelper? wdyt?
| this._validateSequenceNumber(statsSessionId, requestData.sequenceNumber, this.lastSequenceNumber); | ||
|
|
||
| this.lastTimestamp = requestData.timestamp; | ||
| this.lastSequenceNumber = requestData.sequenceNumber; |
There was a problem hiding this comment.
in case of the connectionInfo stats entry there is no sequenceNumber, and it might be undefined, please check if the sequenceNumber from the request data is valid
eae6ed3 to
a718b4f
Compare
| sequenceNumber = this.demuxSink.lastSequenceNumber; | ||
| } else { | ||
| logger.debug('[ClientMessageHandler] Last sequence number from dump '); | ||
| sequenceNumber = await this._getLastSequenceNumberFromDump(); |
There was a problem hiding this comment.
do we need to read the entire file in case of a client reconnect (not server restart case)? Technically we have that information in the previous sink
| @@ -168,15 +189,7 @@ class DemuxSink extends Writable { | |||
| // if the entry already exists because some other instance uploaded first, the same incremental approach needs | |||
| // to be taken. | |||
| while (!fd) { | |||
There was a problem hiding this comment.
if the incremental approach was removed I assume the while and the comments need to be removed as well.
| let identity; | ||
|
|
||
| if (isReconnect) { | ||
| identity = await this._getIdentityFromFile(sinkData.id); |
There was a problem hiding this comment.
If this is a client reconnect (not server restart case), the file will be read when we try to get the last sequence number and then again here, we can probably make this more efficient and only read the file once, not insisting for this pr.
| this._createSequenceNumberBody(sequenceNumber, isInitial) | ||
| )); | ||
|
|
||
| if (this.client.readyState === 1) { |
There was a problem hiding this comment.
what does this mean? are there cases where readystate is not 1, if such a case occurs what happens? add a comment for why this is needed.
| return 0; | ||
| }); | ||
|
|
||
| const result = await promis; |
There was a problem hiding this comment.
this can be done a bit more elegantly like:
let lastLine = 0;
try {
const lastLineString = await storeFile.getLastLine(dumpPath, 1);
lastLine = utils.parseLineForSequenceNumber(lastLineString))
} catch(e) {
logger.error('[ClientMessageHandler] Error. ', e);
}
return lastLine;
wdyt
| .then( | ||
| lastLine => utils.parseLineForSequenceNumber(lastLine)) | ||
| .catch(() => { | ||
| logger.debug('[ClientMessageHandler] New connection. File doesn\'t exist. file: ', dumpPath); |
There was a problem hiding this comment.
I'm assuming the error can be any error not necessarily that the file doesn't exist, maybe we should log the error as well.
| return jsonData[4]; | ||
| } | ||
|
|
||
| return -1; |
There was a problem hiding this comment.
how will the client react on receiving -1, we should add some comments here.
| const readline = require('readline'); | ||
| const Stream = require('stream'); | ||
|
|
||
| exports.getLastLine = (fileName, minLength) => { |
There was a problem hiding this comment.
I feel a bit insecure about this function, if a server restarts, which means we have about 2000 files, when clients start reconnecting we're gonna read each dump (can be up to 1gb per file) line by line not sure how that's gonna affect the server. I wonder if there are any more efficient ways to read the end of a file.
No description provided.