Skip to content

Commit 27c1da0

Browse files
authored
Fix memory leaks in cloudwatch-integration (#214)
* Stop memory leaking every stream name seen in cloudwatch-integration The _postingEvents variable saved every single seen streamName as a key and these entries were never getting deleted. Since cloudwatch-integration stores _postingEvents as a top level variable, it gets shared between all instances of WinstonCloudWatch and leaks memory. To fix this, delete entries for streams that are not posting events. This is safe to do because the only place where _postingEvents entries get read checks whether the value is falsy. If a key doesn't exist in an object, accessing it returns `undefined` which is falsy and the code works the same as before. * Stop memory leaking CloudWatch sequence tokens in cloudwatch-integration The _nextToken variable saved every sequence token for every stream seen. Since cloudwatch-integration stores _nextToken as a top level variable, it gets shared between all instances of WinstonCloudWatch and leaks memory. To fix this, add a clearSequenceToken method and call it from kthxbye to delete the entry for the WinstonCloudWatch instance that is done sending logs.
1 parent af59811 commit 27c1da0

File tree

4 files changed

+49
-11
lines changed

4 files changed

+49
-11
lines changed

index.js

+5
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ WinstonCloudWatch.prototype.kthxbye = function(callback) {
135135

136136
this.submit((function(error) {
137137
debug('submit done', error);
138+
var groupName = typeof this.logGroupName === 'function' ?
139+
this.logGroupName() : this.logGroupName;
140+
var streamName = typeof this.logStreamName === 'function' ?
141+
this.logStreamName() : this.logStreamName;
142+
cloudWatchIntegration.clearSequenceToken(groupName, streamName);
138143
if (error) return callback(error);
139144
if (isEmpty(this.logEvents)) return callback();
140145
if (Date.now() > this.flushTimeout) return callback(new Error('Timeout reached while waiting for logs to submit'));

lib/cloudwatch-integration.js

+10-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ var find = require('lodash.find'),
1212
debug = require('./utils').debug;
1313

1414
var lib = {
15-
_postingEvents: {}
15+
_postingEvents: {},
16+
_nextToken: {}
1617
};
1718

1819
lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, options, cb) {
@@ -27,7 +28,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, op
2728

2829
lib._postingEvents[streamName] = true;
2930
safeUpload(function(err) {
30-
lib._postingEvents[streamName] = false;
31+
delete lib._postingEvents[streamName];
3132
return cb(err);
3233
});
3334

@@ -93,7 +94,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, op
9394
lib._nextToken[previousKeyMapKey(groupName, streamName)] = data.nextSequenceToken;
9495
}
9596

96-
lib._postingEvents[streamName] = false;
97+
delete lib._postingEvents[streamName];
9798
cb()
9899
}
99100
});
@@ -106,7 +107,7 @@ lib.submitWithAnotherToken = function(aws, groupName, streamName, payload, reten
106107
lib.getToken(aws, groupName, streamName, retentionInDays, options, function(err, token) {
107108
payload.sequenceToken = token;
108109
aws.putLogEvents(payload, function(err) {
109-
lib._postingEvents[streamName] = false;
110+
delete lib._postingEvents[streamName];
110111
cb(err)
111112
});
112113
})
@@ -118,7 +119,7 @@ function retrySubmit(aws, payload, times, cb) {
118119
if (err && times > 0) {
119120
retrySubmit(aws, payload, times - 1, cb)
120121
} else {
121-
lib._postingEvents[payload.logStreamName] = false;
122+
delete lib._postingEvents[payload.logStreamName];
122123
cb(err)
123124
}
124125
})
@@ -151,7 +152,6 @@ lib.getToken = function(aws, groupName, streamName, retentionInDays, options, cb
151152
}
152153
});
153154
};
154-
lib._nextToken = {};
155155

156156
function previousKeyMapKey(group, stream) {
157157
return group + ':' + stream;
@@ -229,4 +229,8 @@ lib.ignoreInProgress = function ignoreInProgress(cb) {
229229
};
230230
};
231231

232+
lib.clearSequenceToken = function clearSequenceToken(group, stream) {
233+
delete lib._nextToken[previousKeyMapKey(group, stream)];
234+
}
235+
232236
module.exports = lib;

test/cloudwatch-integration.js

+32-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ describe('cloudwatch-integration', function() {
3737
}, function() {
3838
// The second upload call should get ignored
3939
aws.putLogEvents.calledOnce.should.equal(true);
40-
lib._postingEvents['stream'] = false; // reset
40+
delete lib._postingEvents['stream']; // reset
4141
done()
4242
});
4343
});
@@ -54,7 +54,7 @@ describe('cloudwatch-integration', function() {
5454
}, function() {
5555
// The second upload call should get ignored
5656
lib.getToken.calledOnce.should.equal(true);
57-
lib._postingEvents['stream'] = false; // reset
57+
delete lib._postingEvents['stream']; // reset
5858
done()
5959
});
6060
});
@@ -72,8 +72,8 @@ describe('cloudwatch-integration', function() {
7272

7373
lib.getToken.calledTwice.should.equal(true);
7474

75-
lib._postingEvents['stream1'] = false; // reset
76-
lib._postingEvents['stream2'] = false; // reset
75+
delete lib._postingEvents['stream1']; // reset
76+
delete lib._postingEvents['stream2']; // reset
7777
});
7878

7979
it('truncates very large messages and alerts the error handler', function(done) {
@@ -524,4 +524,32 @@ describe('cloudwatch-integration', function() {
524524

525525
});
526526

527+
describe('clearSequenceToken', function() {
528+
var aws = {};
529+
530+
beforeEach(function() {
531+
sinon.stub(lib, 'getToken').yields(null, 'token');
532+
});
533+
534+
it('clears sequence token set by upload', function(done) {
535+
var nextSequenceToken = 'abc123';
536+
var group = 'group';
537+
var stream = 'stream';
538+
aws.putLogEvents = sinon.stub().yields(null, { nextSequenceToken: nextSequenceToken });
539+
540+
lib.upload(aws, group, stream, Array(20), 0, {}, function() {
541+
lib._nextToken.should.deepEqual({ 'group:stream': nextSequenceToken });
542+
lib.clearSequenceToken(group, stream);
543+
lib._nextToken.should.deepEqual({});
544+
done();
545+
});
546+
});
547+
548+
afterEach(function() {
549+
lib.getToken.restore();
550+
});
551+
})
552+
553+
554+
527555
});

test/index.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ var stubbedWinston = {
2020
upload: sinon.spy(function(aws, groupName, streamName, logEvents, retention, options, cb) {
2121
this.lastLoggedEvents = logEvents.splice(0, 20);
2222
cb();
23-
})
23+
}),
24+
clearSequenceToken: sinon.stub()
2425
};
2526
var clock = sinon.useFakeTimers();
2627

0 commit comments

Comments
 (0)