Skip to content

Commit bf4ac4c

Browse files
committed
oplog: Use local _liveBuckets instead of unnecessary pipeline fetch
Issue: BB-491
1 parent 636f27a commit bf4ac4c

File tree

2 files changed

+53
-24
lines changed

2 files changed

+53
-24
lines changed

extensions/oplogPopulator/modules/Connector.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class Connector extends EventEmitter {
4545
this._name = params.name;
4646
this._config = params.config;
4747
this._buckets = new Set(params.buckets);
48+
this._liveBuckets = new Set(params.buckets);
4849
this._isRunning = params.isRunning;
4950
this._getPipeline = params.getPipeline;
5051
this._state = {
@@ -166,6 +167,7 @@ class Connector extends EventEmitter {
166167
config: this._config,
167168
});
168169
this._isRunning = true;
170+
this._liveBuckets = new Set(this._buckets);
169171
} catch (err) {
170172
this._logger.error('Error while spawning connector', {
171173
method: 'Connector.spawn',
@@ -193,6 +195,7 @@ class Connector extends EventEmitter {
193195
this.emit(constants.connectorUpdatedEvent, this);
194196
await this._kafkaConnect.deleteConnector(this._name);
195197
this._isRunning = false;
198+
this._liveBuckets = new Set();
196199
// resetting the resume point to set a new one on creation of the connector
197200
delete this._config['startup.mode.timestamp.start.at.operation.time'];
198201
} catch (err) {
@@ -330,13 +333,9 @@ class Connector extends EventEmitter {
330333
try {
331334
if (doUpdate && this._isRunning) {
332335
if (!updateSupported) {
333-
const currentConfig = await this._kafkaConnect.getConnectorConfig(this._name);
334-
const currentPipeline = currentConfig.pipeline
335-
? JSON.parse(currentConfig.pipeline) : [];
336-
const newPipeline = JSON.parse(this._config.pipeline);
337-
const currentMatch = JSON.stringify(currentPipeline[0]?.$match);
338-
const newMatch = JSON.stringify(newPipeline[0]?.$match);
339-
if (currentMatch !== newMatch) {
336+
const sameBuckets = this._liveBuckets.size === this._buckets.size &&
337+
this._liveBuckets.isSubsetOf(this._buckets);
338+
if (!sameBuckets) {
340339
this._logger.info('Skipping pipeline update: match stage differs', {
341340
method: 'Connector.updatePipeline',
342341
connector: this._name,
@@ -348,6 +347,7 @@ class Connector extends EventEmitter {
348347
this._state.isUpdating = true;
349348
this.emit(constants.connectorUpdatedEvent, this);
350349
await this._kafkaConnect.updateConnectorConfig(this._name, this._config);
350+
this._liveBuckets = new Set(this._buckets);
351351
this._updateConnectorState(false, timeBeforeUpdate);
352352
this._state.isUpdating = false;
353353
return true;

tests/unit/oplogPopulator/Connector.js

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ describe('Connector', () => {
6262
await connector.spawn();
6363
assert.notStrictEqual(partitionName, connector.config['offset.partition.name']);
6464
});
65+
it('should set liveBuckets to current buckets on spawn', async () => {
66+
sinon.stub(connector._kafkaConnect, 'createConnector').resolves();
67+
connector._buckets = new Set(['bucket1', 'bucket2']);
68+
await connector.spawn();
69+
assert.deepStrictEqual(connector._liveBuckets, new Set(['bucket1', 'bucket2']));
70+
});
6571
it('should not try spawning a new connector when on is already existent', async () => {
6672
const createStub = sinon.stub(connector._kafkaConnect, 'createConnector')
6773
.resolves();
@@ -81,6 +87,13 @@ describe('Connector', () => {
8187
assert(deleteStub.calledOnceWith('example-connector'));
8288
assert.strictEqual(connector.isRunning, false);
8389
});
90+
it('should clear liveBuckets on destroy', async () => {
91+
sinon.stub(connector._kafkaConnect, 'deleteConnector').resolves();
92+
connector._isRunning = true;
93+
connector._liveBuckets = new Set(['bucket1']);
94+
await connector.destroy();
95+
assert.deepStrictEqual(connector._liveBuckets, new Set());
96+
});
8497
it('should not try destroying a new connector when connector is already destroyed', async () => {
8598
const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector')
8699
.resolves();
@@ -258,42 +271,58 @@ describe('Connector', () => {
258271
assert(updateStub.notCalled);
259272
});
260273

261-
it('should update when updateSupported is false and match stages are the same', async () => {
262-
const matchStage = { $match: { 'ns.coll': { $in: ['bucket1'] } } };
263-
const newPipeline = JSON.stringify([matchStage, { $set: { 'fullDocument.value.location': {} } }]);
274+
it('should update when updateSupported is false and live buckets match', async () => {
275+
connector._buckets = new Set(['bucket1']);
276+
connector._liveBuckets = new Set(['bucket1']);
264277
connector._state.bucketsGotModified = true;
265278
connector._state.isUpdating = false;
266279
connector._isRunning = true;
267-
sinon.stub(connector, '_getPipeline').returns(newPipeline);
268-
sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({
269-
pipeline: JSON.stringify([matchStage]),
270-
});
280+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
271281
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
272282
.resolves();
273283
const didUpdate = await connector.updatePipeline(true, false);
274284
assert.strictEqual(didUpdate, true);
275285
assert(updateStub.calledOnce);
276286
});
277287

278-
it('should skip update when updateSupported is false and match stages differ', async () => {
279-
const newPipeline = JSON.stringify([
280-
{ $match: { 'ns.coll': { $in: ['bucket1'] } } },
281-
]);
288+
it('should skip update when updateSupported is false and live buckets differ', async () => {
289+
connector._buckets = new Set(['bucket1']);
290+
connector._liveBuckets = new Set(['different-bucket']);
282291
connector._state.bucketsGotModified = true;
283292
connector._state.isUpdating = false;
284293
connector._isRunning = true;
285-
sinon.stub(connector, '_getPipeline').returns(newPipeline);
286-
sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({
287-
pipeline: JSON.stringify([
288-
{ $match: { 'ns.coll': { $in: ['different-bucket'] } } },
289-
]),
290-
});
294+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
291295
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
292296
.resolves();
293297
const didUpdate = await connector.updatePipeline(true, false);
294298
assert.strictEqual(didUpdate, false);
295299
assert(updateStub.notCalled);
296300
});
301+
302+
it('should sync liveBuckets after successful update', async () => {
303+
connector._buckets = new Set(['bucket1', 'bucket2']);
304+
connector._liveBuckets = new Set();
305+
connector._state.bucketsGotModified = true;
306+
connector._state.isUpdating = false;
307+
connector._isRunning = true;
308+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
309+
sinon.stub(connector._kafkaConnect, 'updateConnectorConfig').resolves();
310+
await connector.updatePipeline(true);
311+
assert.deepStrictEqual(connector._liveBuckets, new Set(['bucket1', 'bucket2']));
312+
});
313+
314+
it('should not sync liveBuckets after failed update', async () => {
315+
connector._buckets = new Set(['bucket1']);
316+
connector._liveBuckets = new Set();
317+
connector._state.bucketsGotModified = true;
318+
connector._state.isUpdating = false;
319+
connector._isRunning = true;
320+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
321+
sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
322+
.rejects(errors.InternalError);
323+
await assert.rejects(() => connector.updatePipeline(true));
324+
assert.deepStrictEqual(connector._liveBuckets, new Set());
325+
});
297326
});
298327

299328
describe('getConfigSizeInBytes', () => {

0 commit comments

Comments
 (0)