Skip to content

Commit d4ad94a

Browse files
committed
Use sourceCheckIfSizeGreaterThanMB for stripping locations
Issue: BB-491
1 parent bf4ac4c commit d4ad94a

File tree

10 files changed

+76
-43
lines changed

10 files changed

+76
-43
lines changed

conf/config.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@
304304
"kafkaConnectHost": "127.0.0.1",
305305
"kafkaConnectPort": 8083,
306306
"numberOfConnectors": 1,
307-
"locationStrippingThreshold": 500,
308307
"probeServer": {
309308
"bindAddress": "0.0.0.0",
310309
"port": 8556

extensions/oplogPopulator/OplogPopulator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class OplogPopulator {
335335
if (this._config.numberOfConnectors === 0) {
336336
// If the number of connector is set to 0, then we
337337
// use a single connector to listen to the whole DB.
338-
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold);
338+
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingBytesThreshold);
339339
strategy = new UniqueConnector({
340340
logger: this._logger,
341341
});
@@ -345,7 +345,7 @@ class OplogPopulator {
345345
// words, we cannot alter an existing pipeline. In this
346346
// case, the strategy is to allow a maximum of one
347347
// bucket per kafka connector.
348-
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
348+
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold);
349349
strategy = new ImmutableConnector({
350350
logger: this._logger,
351351
});
@@ -354,7 +354,7 @@ class OplogPopulator {
354354
// kafka connector. However, we want to proactively
355355
// ensure that the pipeline will be accepted by
356356
// mongodb.
357-
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
357+
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold);
358358
strategy = new LeastFullConnector({
359359
logger: this._logger,
360360
});

extensions/oplogPopulator/OplogPopulatorConfigValidator.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const joiSchema = joi.object({
66
kafkaConnectHost: joi.string().required(),
77
kafkaConnectPort: joi.number().required(),
88
numberOfConnectors: joi.number().required().min(0),
9-
locationStrippingThreshold: joi.number().default(5),
9+
locationStrippingBytesThreshold: joi.number().min(0).default(0),
1010
prefix: joi.string().optional(),
1111
probeServer: probeServerJoi.default(),
1212
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),

extensions/oplogPopulator/OplogPopulatorTask.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ werelogs.configure({ level: config.log.logLevel,
2020

2121
const mongoConfig = config.queuePopulator.mongo;
2222
const oplogPopulatorConfig = config.extensions.oplogPopulator;
23+
const replicationQPConfig = config.extensions.replication &&
24+
config.extensions.replication.queueProcessor;
25+
if (replicationQPConfig && replicationQPConfig.sourceCheckIfSizeGreaterThanMB) {
26+
oplogPopulatorConfig.locationStrippingBytesThreshold =
27+
replicationQPConfig.sourceCheckIfSizeGreaterThanMB * 1024 * 1024;
28+
}
2329

2430
const activeExtensions = [
2531
'notification',

extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ const PipelineFactory = require('./PipelineFactory');
1010
class MultipleBucketsPipelineFactory extends PipelineFactory {
1111
/**
1212
* @constructor
13-
* @param {number} locationStrippingThreshold threshold for stripping location data
13+
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
1414
*/
15-
constructor(locationStrippingThreshold) {
16-
super(locationStrippingThreshold);
15+
constructor(locationStrippingBytesThreshold) {
16+
super(locationStrippingBytesThreshold);
1717
// getPipeline is used standalone later, make sure its `this` reference binds to us.
1818
this.getPipeline = this.getPipeline.bind(this);
1919
}

extensions/oplogPopulator/pipeline/PipelineFactory.js

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ const { wildCardForAllBuckets } = require('../constants');
99
class PipelineFactory {
1010
/**
1111
* @constructor
12-
* @param {number} locationStrippingThreshold threshold for stripping location data
12+
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
1313
*/
14-
constructor(locationStrippingThreshold) {
15-
this._locationStrippingThreshold = locationStrippingThreshold;
14+
constructor(locationStrippingBytesThreshold) {
15+
this._locationStrippingBytesThreshold = locationStrippingBytesThreshold;
1616
}
1717

1818
/**
@@ -71,31 +71,35 @@ class PipelineFactory {
7171
const pipeline = [
7272
stage,
7373
];
74-
if (this._locationStrippingThreshold >= 0) {
74+
if (this._locationStrippingBytesThreshold > 0) {
7575
pipeline.push({
7676
$set: {
77-
'fullDocument.value.location':
78-
this._locationStrippingExpression('fullDocument.value.location'),
79-
'updateDescription.updatedFields.value.location':
80-
this._locationStrippingExpression('updateDescription.updatedFields.value.location'),
77+
'fullDocument.value.location': {
78+
$cond: {
79+
if: { $gte: [
80+
'$fullDocument.value.content-length',
81+
this._locationStrippingBytesThreshold,
82+
] },
83+
then: '$$REMOVE',
84+
else: '$fullDocument.value.location',
85+
},
86+
},
87+
'updateDescription.updatedFields.value.location': {
88+
$cond: {
89+
if: { $gte: [
90+
'$updateDescription.updatedFields.value.content-length',
91+
this._locationStrippingBytesThreshold,
92+
] },
93+
then: '$$REMOVE',
94+
else: '$updateDescription.updatedFields.value.location',
95+
},
96+
},
8197
}
8298
});
8399
}
84100
return JSON.stringify(pipeline);
85101
}
86102

87-
_locationStrippingExpression(field) {
88-
return {
89-
$switch: {
90-
branches: [
91-
{ case: { $not: [{ $isArray: `$${field}` }] }, then: '$$REMOVE' },
92-
{ case: { $gte: [{ $size: `$${field}` }, this._locationStrippingThreshold] }, then: '$$REMOVE' },
93-
],
94-
default: `$${field}`,
95-
}
96-
};
97-
}
98-
99103
/**
100104
* Makes connector pipeline stage, to then be used by getPipeline.
101105
* @param {string[] | undefined} buckets buckets assigned to this connector

extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ const PipelineFactory = require('./PipelineFactory');
1111
class WildcardPipelineFactory extends PipelineFactory {
1212
/**
1313
* @constructor
14-
* @param {number} locationStrippingThreshold threshold for stripping location data
14+
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
1515
*/
16-
constructor(locationStrippingThreshold) {
17-
super(locationStrippingThreshold);
16+
constructor(locationStrippingBytesThreshold) {
17+
super(locationStrippingBytesThreshold);
1818
// getPipeline is used standalone later, make sure its `this` reference binds to us.
1919
this.getPipeline = this.getPipeline.bind(this);
2020
}

tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ const defaultConfig = {
1313
};
1414

1515
describe('OplogPopulatorConfigValidator', () => {
16-
describe('locationStrippingThreshold validation', () => {
16+
describe('locationStrippingBytesThreshold validation', () => {
1717
it('should accept valid threshold', () => {
1818
const config = {
1919
...defaultConfig,
20-
locationStrippingThreshold: 50,
20+
locationStrippingBytesThreshold: 100 * 1024 * 1024,
2121
};
2222
const result = OplogPopulatorConfigJoiSchema.validate(config);
2323
assert.ifError(result.error);
24-
assert.strictEqual(result.value.locationStrippingThreshold, 50);
24+
assert.strictEqual(result.value.locationStrippingBytesThreshold, 100 * 1024 * 1024);
2525
});
2626
});
2727
});

tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ const MultipleBucketsPipelineFactory =
44
const { constants } = require('arsenal');
55

66
describe('MultipleBucketsPipelineFactory', () => {
7-
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200);
7+
const thresholdBytes = 100 * 1024 * 1024;
8+
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(thresholdBytes);
89

910
describe('isValid', () => {
1011
it('should detect a valid list of buckets', () => {
@@ -52,9 +53,20 @@ describe('MultipleBucketsPipelineFactory', () => {
5253

5354
assert.strictEqual(pipeline.length, 2);
5455
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}});
55-
assert(pipeline[1].$set['fullDocument.value.location']);
56-
assert(pipeline[1].$set['updateDescription.updatedFields.value.location']);
57-
assert(result.includes('200'));
56+
assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], {
57+
$cond: {
58+
if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] },
59+
then: '$$REMOVE',
60+
else: '$fullDocument.value.location',
61+
},
62+
});
63+
assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], {
64+
$cond: {
65+
if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] },
66+
then: '$$REMOVE',
67+
else: '$updateDescription.updatedFields.value.location',
68+
},
69+
});
5870
});
5971
});
6072

tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p
33
const { constants } = require('arsenal');
44

55
describe('WildcardPipelineFactory', () => {
6-
const wildcardPipelineFactory = new WildcardPipelineFactory(200);
6+
const thresholdBytes = 100 * 1024 * 1024;
7+
const wildcardPipelineFactory = new WildcardPipelineFactory(thresholdBytes);
78

89
describe('isValid', () => {
910
it('should detect a wildcard', () => {
@@ -39,13 +40,24 @@ describe('WildcardPipelineFactory', () => {
3940

4041
assert.strictEqual(pipeline.length, 2);
4142
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}});
42-
assert(pipeline[1].$set['fullDocument.value.location']);
43-
assert(pipeline[1].$set['updateDescription.updatedFields.value.location']);
44-
assert(result.includes('200'));
43+
assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], {
44+
$cond: {
45+
if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] },
46+
then: '$$REMOVE',
47+
else: '$fullDocument.value.location',
48+
},
49+
});
50+
assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], {
51+
$cond: {
52+
if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] },
53+
then: '$$REMOVE',
54+
else: '$updateDescription.updatedFields.value.location',
55+
},
56+
});
4557
});
4658

4759
it('should return the pipeline with buckets and no location stripping if disabled', () => {
48-
const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(-1);
60+
const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(0);
4961

5062
const buckets = ['bucket1', 'bucket2'];
5163
const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets);

0 commit comments

Comments
 (0)