Skip to content

Commit fdb4e94

Browse files
committed
Use sourceCheckIfSizeGreaterThanMB for stripping locations
Issue: BB-491
1 parent 78c2d1b commit fdb4e94

File tree

11 files changed

+80
-51
lines changed

11 files changed

+80
-51
lines changed

conf/config.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@
304304
"kafkaConnectHost": "127.0.0.1",
305305
"kafkaConnectPort": 8083,
306306
"numberOfConnectors": 1,
307-
"locationStrippingThreshold": 500,
308307
"probeServer": {
309308
"bindAddress": "0.0.0.0",
310309
"port": 8556

extensions/oplogPopulator/OplogPopulator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class OplogPopulator {
335335
if (this._config.numberOfConnectors === 0) {
336336
// If the number of connector is set to 0, then we
337337
// use a single connector to listen to the whole DB.
338-
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold);
338+
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingBytesThreshold);
339339
strategy = new UniqueConnector({
340340
logger: this._logger,
341341
});
@@ -345,7 +345,7 @@ class OplogPopulator {
345345
// words, we cannot alter an existing pipeline. In this
346346
// case, the strategy is to allow a maximum of one
347347
// bucket per kafka connector.
348-
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
348+
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold);
349349
strategy = new ImmutableConnector({
350350
logger: this._logger,
351351
});
@@ -354,7 +354,7 @@ class OplogPopulator {
354354
// kafka connector. However, we want to proactively
355355
// ensure that the pipeline will be accepted by
356356
// mongodb.
357-
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
357+
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold);
358358
strategy = new LeastFullConnector({
359359
logger: this._logger,
360360
});

extensions/oplogPopulator/OplogPopulatorConfigValidator.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const joiSchema = joi.object({
66
kafkaConnectHost: joi.string().required(),
77
kafkaConnectPort: joi.number().required(),
88
numberOfConnectors: joi.number().required().min(0),
9-
locationStrippingThreshold: joi.number().default(5),
9+
locationStrippingBytesThreshold: joi.number().min(0).default(0),
1010
prefix: joi.string().optional(),
1111
probeServer: probeServerJoi.default(),
1212
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),

extensions/oplogPopulator/OplogPopulatorTask.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ werelogs.configure({ level: config.log.logLevel,
2020

2121
const mongoConfig = config.queuePopulator.mongo;
2222
const oplogPopulatorConfig = config.extensions.oplogPopulator;
23+
const replicationQPConfig = config.extensions.replication &&
24+
config.extensions.replication.queueProcessor;
25+
if (replicationQPConfig && replicationQPConfig.sourceCheckIfSizeGreaterThanMB) {
26+
oplogPopulatorConfig.locationStrippingBytesThreshold =
27+
replicationQPConfig.sourceCheckIfSizeGreaterThanMB * 1024 * 1024;
28+
}
2329

2430
const activeExtensions = [
2531
'notification',

extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ const PipelineFactory = require('./PipelineFactory');
1010
class MultipleBucketsPipelineFactory extends PipelineFactory {
1111
/**
1212
* @constructor
13-
* @param {number} locationStrippingThreshold threshold for stripping location data
13+
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
1414
*/
15-
constructor(locationStrippingThreshold) {
16-
super(locationStrippingThreshold);
15+
constructor(locationStrippingBytesThreshold) {
16+
super(locationStrippingBytesThreshold);
1717
// getPipeline is used standalone later, make sure its `this` reference binds to us.
1818
this.getPipeline = this.getPipeline.bind(this);
1919
}

extensions/oplogPopulator/pipeline/PipelineFactory.js

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ const { wildCardForAllBuckets } = require('../constants');
99
class PipelineFactory {
1010
/**
1111
* @constructor
12-
* @param {number} locationStrippingThreshold threshold for stripping location data
12+
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
1313
*/
14-
constructor(locationStrippingThreshold) {
15-
this._locationStrippingThreshold = locationStrippingThreshold;
14+
constructor(locationStrippingBytesThreshold) {
15+
this._locationStrippingBytesThreshold = locationStrippingBytesThreshold;
1616
}
1717

1818
/**
@@ -71,31 +71,35 @@ class PipelineFactory {
7171
const pipeline = [
7272
stage,
7373
];
74-
if (this._locationStrippingThreshold >= 0) {
74+
if (this._locationStrippingBytesThreshold > 0) {
7575
pipeline.push({
7676
$set: {
77-
'fullDocument.value.location':
78-
this._locationStrippingExpression('fullDocument.value.location'),
79-
'updateDescription.updatedFields.value.location':
80-
this._locationStrippingExpression('updateDescription.updatedFields.value.location'),
77+
'fullDocument.value.location': {
78+
$cond: {
79+
if: { $gte: [
80+
'$fullDocument.value.content-length',
81+
this._locationStrippingBytesThreshold,
82+
] },
83+
then: '$$REMOVE',
84+
else: '$fullDocument.value.location',
85+
},
86+
},
87+
'updateDescription.updatedFields.value.location': {
88+
$cond: {
89+
if: { $gte: [
90+
'$updateDescription.updatedFields.value.content-length',
91+
this._locationStrippingBytesThreshold,
92+
] },
93+
then: '$$REMOVE',
94+
else: '$updateDescription.updatedFields.value.location',
95+
},
96+
},
8197
}
8298
});
8399
}
84100
return JSON.stringify(pipeline);
85101
}
86102

87-
_locationStrippingExpression(field) {
88-
return {
89-
$switch: {
90-
branches: [
91-
{ case: { $not: [{ $isArray: `$${field}` }] }, then: '$$REMOVE' },
92-
{ case: { $gte: [{ $size: `$${field}` }, this._locationStrippingThreshold] }, then: '$$REMOVE' },
93-
],
94-
default: `$${field}`,
95-
}
96-
};
97-
}
98-
99103
/**
100104
* Makes connector pipeline stage, to then be used by getPipeline.
101105
* @param {string[] | undefined} buckets buckets assigned to this connector

extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ const PipelineFactory = require('./PipelineFactory');
1111
class WildcardPipelineFactory extends PipelineFactory {
1212
/**
1313
* @constructor
14-
* @param {number} locationStrippingThreshold threshold for stripping location data
14+
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
1515
*/
16-
constructor(locationStrippingThreshold) {
17-
super(locationStrippingThreshold);
16+
constructor(locationStrippingBytesThreshold) {
17+
super(locationStrippingBytesThreshold);
1818
// getPipeline is used standalone later, make sure its `this` reference binds to us.
1919
this.getPipeline = this.getPipeline.bind(this);
2020
}

tests/unit/oplogPopulator/ConnectorsManager.js

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,31 +285,27 @@ describe('ConnectorsManager', () => {
285285
assert(connectorDeleteStub.notCalled);
286286
});
287287

288-
it('should update non-match stages when canUpdate is false and match stages are the same', async () => {
288+
it('should update non-match stages when canUpdate is false and live buckets match', async () => {
289289
connector1._isRunning = true;
290290
connector1._state.bucketsGotModified = true;
291291
connector1._buckets = new Set(['bucket1']);
292+
connector1._liveBuckets = new Set(['bucket1']);
292293
sinon.stub(connectorsManager._allocationStrategy, 'canUpdate')
293294
.returns(false);
294-
sinon.stub(connector1._kafkaConnect, 'getConnectorConfig').resolves({
295-
pipeline: JSON.stringify([{ $match: { 'ns.coll': { $in: ['bucket1'] } } }]),
296-
});
297295
const updated = await connectorsManager._spawnOrDestroyConnector(connector1);
298296
assert.strictEqual(updated, true);
299297
assert(connectorUpdateStub.calledOnce);
300298
assert(connectorCreateStub.notCalled);
301299
assert(connectorDeleteStub.notCalled);
302300
});
303301

304-
it('should skip update when canUpdate is false and match stages differ', async () => {
302+
it('should skip update when canUpdate is false and live buckets differ', async () => {
305303
connector1._isRunning = true;
306304
connector1._state.bucketsGotModified = true;
307305
connector1._buckets = new Set(['bucket1']);
306+
connector1._liveBuckets = new Set(['different-bucket']);
308307
sinon.stub(connectorsManager._allocationStrategy, 'canUpdate')
309308
.returns(false);
310-
sinon.stub(connector1._kafkaConnect, 'getConnectorConfig').resolves({
311-
pipeline: JSON.stringify([{ $match: { 'ns.coll': { $in: ['different-bucket'] } } }]),
312-
});
313309
const updated = await connectorsManager._spawnOrDestroyConnector(connector1);
314310
assert.strictEqual(updated, false);
315311
assert(connectorUpdateStub.notCalled);

tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ const defaultConfig = {
1313
};
1414

1515
describe('OplogPopulatorConfigValidator', () => {
16-
describe('locationStrippingThreshold validation', () => {
16+
describe('locationStrippingBytesThreshold validation', () => {
1717
it('should accept valid threshold', () => {
1818
const config = {
1919
...defaultConfig,
20-
locationStrippingThreshold: 50,
20+
locationStrippingBytesThreshold: 100 * 1024 * 1024,
2121
};
2222
const result = OplogPopulatorConfigJoiSchema.validate(config);
2323
assert.ifError(result.error);
24-
assert.strictEqual(result.value.locationStrippingThreshold, 50);
24+
assert.strictEqual(result.value.locationStrippingBytesThreshold, 100 * 1024 * 1024);
2525
});
2626
});
2727
});

tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ const MultipleBucketsPipelineFactory =
44
const { constants } = require('arsenal');
55

66
describe('MultipleBucketsPipelineFactory', () => {
7-
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200);
7+
const thresholdBytes = 100 * 1024 * 1024;
8+
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(thresholdBytes);
89

910
describe('isValid', () => {
1011
it('should detect a valid list of buckets', () => {
@@ -52,9 +53,20 @@ describe('MultipleBucketsPipelineFactory', () => {
5253

5354
assert.strictEqual(pipeline.length, 2);
5455
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}});
55-
assert(pipeline[1].$set['fullDocument.value.location']);
56-
assert(pipeline[1].$set['updateDescription.updatedFields.value.location']);
57-
assert(result.includes('200'));
56+
assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], {
57+
$cond: {
58+
if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] },
59+
then: '$$REMOVE',
60+
else: '$fullDocument.value.location',
61+
},
62+
});
63+
assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], {
64+
$cond: {
65+
if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] },
66+
then: '$$REMOVE',
67+
else: '$updateDescription.updatedFields.value.location',
68+
},
69+
});
5870
});
5971
});
6072

0 commit comments

Comments
 (0)