Skip to content

Commit e51fcab

Browse files
committed
oplog: Strip locations when the array is large
This saves space on the Kafka side while preventing a refetch on most (small) objects. Issue: BB-491 Signed-off-by: Thomas Flament <thomas.flament@scality.com>
1 parent 158bb09 commit e51fcab

File tree

9 files changed

+139
-35
lines changed

9 files changed

+139
-35
lines changed

conf/config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@
304304
"kafkaConnectHost": "127.0.0.1",
305305
"kafkaConnectPort": 8083,
306306
"numberOfConnectors": 1,
307+
"locationStrippingThreshold": 5,
307308
"probeServer": {
308309
"bindAddress": "0.0.0.0",
309310
"port": 8556

extensions/oplogPopulator/OplogPopulator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class OplogPopulator {
335335
if (this._config.numberOfConnectors === 0) {
336336
// If the number of connector is set to 0, then we
337337
// use a single connector to listen to the whole DB.
338-
pipelineFactory = new WildcardPipelineFactory();
338+
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold);
339339
strategy = new UniqueConnector({
340340
logger: this._logger,
341341
});
@@ -345,7 +345,7 @@ class OplogPopulator {
345345
// words, we cannot alter an existing pipeline. In this
346346
// case, the strategy is to allow a maximum of one
347347
// bucket per kafka connector.
348-
pipelineFactory = new MultipleBucketsPipelineFactory();
348+
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
349349
strategy = new ImmutableConnector({
350350
logger: this._logger,
351351
});
@@ -354,7 +354,7 @@ class OplogPopulator {
354354
// kafka connector. However, we want to proactively
355355
// ensure that the pipeline will be accepted by
356356
// mongodb.
357-
pipelineFactory = new MultipleBucketsPipelineFactory();
357+
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
358358
strategy = new LeastFullConnector({
359359
logger: this._logger,
360360
});

extensions/oplogPopulator/OplogPopulatorConfigValidator.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const joiSchema = joi.object({
66
kafkaConnectHost: joi.string().required(),
77
kafkaConnectPort: joi.number().required(),
88
numberOfConnectors: joi.number().required().min(0),
9+
locationStrippingThreshold: joi.number().default(5),
910
prefix: joi.string().optional(),
1011
probeServer: probeServerJoi.default(),
1112
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),

extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ const PipelineFactory = require('./PipelineFactory');
88
* given a list of buckets.
99
*/
1010
class MultipleBucketsPipelineFactory extends PipelineFactory {
11+
/**
12+
* @constructor
13+
* @param {number} locationStrippingThreshold threshold for stripping location data
14+
*/
15+
constructor(locationStrippingThreshold) {
16+
super(locationStrippingThreshold);
17+
// getPipeline is used standalone later, make sure its `this` reference binds to us.
18+
this.getPipeline = this.getPipeline.bind(this);
19+
}
20+
1121
/**
1222
* Checks if an existing pipeline is valid against the current
1323
* factory.
@@ -22,24 +32,21 @@ class MultipleBucketsPipelineFactory extends PipelineFactory {
2232
}
2333

2434
/**
25-
* Makes new connector pipeline that includes
26-
* buckets assigned to this connector.
35+
* Makes connector pipeline stage, that includes buckets assigned to this connector.
2736
* @param {string[] | undefined} buckets buckets assigned to this connector
28-
* @returns {string} new connector pipeline
37+
* @returns {object} connector pipeline stage
2938
*/
30-
getPipeline(buckets) {
39+
getPipelineStage(buckets) {
3140
if (!buckets || !buckets.length) {
32-
return JSON.stringify([]);
41+
return null;
3342
}
34-
return JSON.stringify([
35-
{
36-
$match: {
37-
'ns.coll': {
38-
$in: buckets,
39-
}
43+
return {
44+
$match: {
45+
'ns.coll': {
46+
$in: buckets,
4047
}
4148
}
42-
]);
49+
};
4350
}
4451
}
4552

extensions/oplogPopulator/pipeline/PipelineFactory.js

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants');
77
* @classdesc base class for the pipeline factories
88
*/
99
class PipelineFactory {
10+
/**
11+
* @constructor
12+
* @param {number} locationStrippingThreshold threshold for stripping location data
13+
*/
14+
constructor(locationStrippingThreshold) {
15+
this._locationStrippingThreshold = locationStrippingThreshold;
16+
}
17+
1018
/**
1119
* Checks if an existing pipeline is valid against the current
1220
* factory.
@@ -55,7 +63,40 @@ class PipelineFactory {
5563
* @param {string[] | undefined} buckets buckets assigned to this connector
5664
* @returns {string} new connector pipeline
5765
*/
58-
getPipeline(buckets) { // eslint-disable-line no-unused-vars
66+
getPipeline(buckets) {
67+
const stage = this.getPipelineStage(buckets);
68+
if (!stage) {
69+
return JSON.stringify([]);
70+
}
71+
const pipeline = [
72+
stage,
73+
];
74+
if (this._locationStrippingThreshold >= 0) {
75+
// If a threshold is provided, strip the location array if the number of parts is larger than the threshold.
76+
pipeline.push({
77+
$set: {
78+
'fullDocument.value.location': {
79+
$cond: {
80+
if: {
81+
$gte: [
82+
{ $size: { $ifNull: ['$fullDocument.value.location', []] } },
83+
this._locationStrippingThreshold
84+
]
85+
},
86+
}
87+
}
88+
}
89+
});
90+
}
91+
return JSON.stringify(pipeline);
92+
}
93+
94+
/**
95+
* Makes connector pipeline stage, to then be used by getPipeline.
96+
* @param {string[] | undefined} buckets buckets assigned to this connector
97+
* @returns {object} connector pipeline stage
98+
*/
99+
getPipelineStage(buckets) { // eslint-disable-line no-unused-vars
59100
throw errors.NotImplemented;
60101
}
61102
}

extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ const PipelineFactory = require('./PipelineFactory');
99
* that listens to all buckets.
1010
*/
1111
class WildcardPipelineFactory extends PipelineFactory {
12+
/**
13+
* @constructor
14+
* @param {number} locationStrippingThreshold threshold for stripping location data
15+
*/
16+
constructor(locationStrippingThreshold) {
17+
super(locationStrippingThreshold);
18+
// getPipeline is used standalone later, make sure its `this` reference binds to us.
19+
this.getPipeline = this.getPipeline.bind(this);
20+
}
21+
1222
/**
1323
* Checks if an existing pipeline is valid against the current
1424
* factory.
@@ -23,23 +33,20 @@ class WildcardPipelineFactory extends PipelineFactory {
2333
}
2434

2535
/**
26-
* Create a pipeline for the connector, to listen to all
27-
* non-special collections.
36+
* Makes connector pipeline stage, to listen to all non-special collections.
2837
* @param {string[] | undefined} buckets buckets assigned to this connector
29-
* @returns {string} new connector pipeline
38+
* @returns {object} connector pipeline stage
3039
*/
31-
getPipeline(buckets) { // eslint-disable-line no-unused-vars
32-
return JSON.stringify([
33-
{
34-
$match: {
35-
'ns.coll': {
36-
$not: {
37-
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
38-
},
39-
}
40+
getPipelineStage(buckets) { // eslint-disable-line no-unused-vars
41+
return {
42+
$match: {
43+
'ns.coll': {
44+
$not: {
45+
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
46+
},
4047
}
4148
}
42-
]);
49+
};
4350
}
4451
}
4552

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
const assert = require('assert');
2+
const { OplogPopulatorConfigJoiSchema} = require('../../../extensions/oplogPopulator/OplogPopulatorConfigValidator');
3+
4+
const defaultConfig = {
5+
topic: 'backbeat-oplog',
6+
kafkaConnectHost: '127.0.0.1',
7+
kafkaConnectPort: 8083,
8+
numberOfConnectors: 1,
9+
probeServer: {
10+
bindAddress: '0.0.0.0',
11+
port: 8556,
12+
},
13+
};
14+
15+
describe('OplogPopulatorConfigValidator', () => {
16+
describe('locationStrippingThreshold validation', () => {
17+
it('should accept valid threshold', () => {
18+
const config = {
19+
...defaultConfig,
20+
locationStrippingThreshold: 50,
21+
};
22+
const result = OplogPopulatorConfigJoiSchema.validate(config);
23+
assert.ifError(result.error);
24+
assert.strictEqual(result.value.locationStrippingThreshold, 50);
25+
});
26+
});
27+
});

tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const MultipleBucketsPipelineFactory =
44
const { constants } = require('arsenal');
55

66
describe('MultipleBucketsPipelineFactory', () => {
7-
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory();
7+
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200);
88

99
describe('isValid', () => {
1010
it('should detect a valid list of buckets', () => {
@@ -45,10 +45,15 @@ describe('MultipleBucketsPipelineFactory', () => {
4545
assert.strictEqual(result, '[]');
4646
});
4747

48-
it('should return the pipeline with buckets', () => {
48+
it('should return the pipeline with buckets and location stripping', () => {
4949
const buckets = ['bucket1', 'bucket2'];
5050
const result = multipleBucketsPipelineFactory.getPipeline(buckets);
51-
assert.strictEqual(result, '[{"$match":{"ns.coll":{"$in":["bucket1","bucket2"]}}}]');
51+
const pipeline = JSON.parse(result);
52+
53+
assert.strictEqual(pipeline.length, 2);
54+
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}});
55+
assert(pipeline[1].$set['fullDocument.value.location']);
56+
assert(result.includes('200'));
5257
});
5358
});
5459

tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p
33
const { constants } = require('arsenal');
44

55
describe('WildcardPipelineFactory', () => {
6-
const wildcardPipelineFactory = new WildcardPipelineFactory();
6+
const wildcardPipelineFactory = new WildcardPipelineFactory(200);
77

88
describe('isValid', () => {
99
it('should detect a wildcard', () => {
@@ -32,10 +32,25 @@ describe('WildcardPipelineFactory', () => {
3232
});
3333

3434
describe('getPipeline', () => {
35-
it('should return the pipeline with wildcard', () => {
35+
it('should return the pipeline with buckets and location stripping', () => {
3636
const buckets = ['bucket1', 'bucket2'];
3737
const result = wildcardPipelineFactory.getPipeline(buckets);
38-
assert.strictEqual(result, '[{"$match":{"ns.coll":{"$not":{"$regex":"^(mpuShadowBucket|__).*"}}}}]');
38+
const pipeline = JSON.parse(result);
39+
40+
assert.strictEqual(pipeline.length, 2);
41+
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}});
42+
assert(pipeline[1].$set['fullDocument.value.location']);
43+
assert(result.includes('200'));
44+
});
45+
46+
it('should return the pipeline with buckets and no location stripping if disabled', () => {
47+
const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(-1);
48+
49+
const buckets = ['bucket1', 'bucket2'];
50+
const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets);
51+
const pipeline = JSON.parse(result);
52+
53+
assert.strictEqual(pipeline.length, 1);
3954
});
4055
});
4156

0 commit comments

Comments
 (0)