Skip to content

Commit d48fd8a

Browse files
committed
oplog: Use local _liveBuckets instead of unnecessary pipeline fetch
Issue: BB-491
1 parent d9a5d4c commit d48fd8a

File tree

2 files changed

+60
-24
lines changed

2 files changed

+60
-24
lines changed

extensions/oplogPopulator/modules/Connector.js

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class Connector extends EventEmitter {
4545
this._name = params.name;
4646
this._config = params.config;
4747
this._buckets = new Set(params.buckets);
48+
this._liveBuckets = new Set(params.buckets);
4849
this._isRunning = params.isRunning;
4950
this._getPipeline = params.getPipeline;
5051
this._state = {
@@ -79,6 +80,12 @@ class Connector extends EventEmitter {
7980
*/
8081
get buckets() { return [...this._buckets]; }
8182

83+
/**
84+
* Getter for connector live buckets (currently configured on Kafka Connect)
85+
* @returns {string[]} live buckets
86+
*/
87+
get liveBuckets() { return [...this._liveBuckets]; }
88+
8289
/**
8390
* Get number of buckets assigned to this
8491
* connector
@@ -166,6 +173,7 @@ class Connector extends EventEmitter {
166173
config: this._config,
167174
});
168175
this._isRunning = true;
176+
this._liveBuckets = new Set(this._buckets);
169177
} catch (err) {
170178
this._logger.error('Error while spawning connector', {
171179
method: 'Connector.spawn',
@@ -193,6 +201,7 @@ class Connector extends EventEmitter {
193201
this.emit(constants.connectorUpdatedEvent, this);
194202
await this._kafkaConnect.deleteConnector(this._name);
195203
this._isRunning = false;
204+
this._liveBuckets = new Set();
196205
// resetting the resume point to set a new one on creation of the connector
197206
delete this._config['startup.mode.timestamp.start.at.operation.time'];
198207
} catch (err) {
@@ -330,13 +339,10 @@ class Connector extends EventEmitter {
330339
try {
331340
if (doUpdate && this._isRunning) {
332341
if (!updateSupported) {
333-
const currentConfig = await this._kafkaConnect.getConnectorConfig(this._name);
334-
const currentPipeline = currentConfig.pipeline
335-
? JSON.parse(currentConfig.pipeline) : [];
336-
const newPipeline = JSON.parse(this._config.pipeline);
337-
const currentMatch = JSON.stringify(currentPipeline[0]?.$match);
338-
const newMatch = JSON.stringify(newPipeline[0]?.$match);
339-
if (currentMatch !== newMatch) {
342+
const sameSize = this._liveBuckets.size === this._buckets.size;
343+
const sameContent = sameSize &&
344+
[...this._liveBuckets].every(b => this._buckets.has(b));
345+
if (!sameContent) {
340346
this._logger.info('Skipping pipeline update: match stage differs', {
341347
method: 'Connector.updatePipeline',
342348
connector: this._name,
@@ -348,6 +354,7 @@ class Connector extends EventEmitter {
348354
this._state.isUpdating = true;
349355
this.emit(constants.connectorUpdatedEvent, this);
350356
await this._kafkaConnect.updateConnectorConfig(this._name, this._config);
357+
this._liveBuckets = new Set(this._buckets);
351358
this._updateConnectorState(false, timeBeforeUpdate);
352359
this._state.isUpdating = false;
353360
return true;

tests/unit/oplogPopulator/Connector.js

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ describe('Connector', () => {
6262
await connector.spawn();
6363
assert.notStrictEqual(partitionName, connector.config['offset.partition.name']);
6464
});
65+
it('should set liveBuckets to current buckets on spawn', async () => {
66+
sinon.stub(connector._kafkaConnect, 'createConnector').resolves();
67+
connector._buckets = new Set(['bucket1', 'bucket2']);
68+
await connector.spawn();
69+
assert.deepStrictEqual(connector.liveBuckets.sort(), ['bucket1', 'bucket2']);
70+
});
6571
it('should not try spawning a new connector when on is already existent', async () => {
6672
const createStub = sinon.stub(connector._kafkaConnect, 'createConnector')
6773
.resolves();
@@ -81,6 +87,13 @@ describe('Connector', () => {
8187
assert(deleteStub.calledOnceWith('example-connector'));
8288
assert.strictEqual(connector.isRunning, false);
8389
});
90+
it('should clear liveBuckets on destroy', async () => {
91+
sinon.stub(connector._kafkaConnect, 'deleteConnector').resolves();
92+
connector._isRunning = true;
93+
connector._liveBuckets = new Set(['bucket1']);
94+
await connector.destroy();
95+
assert.deepStrictEqual(connector.liveBuckets, []);
96+
});
8497
it('should not try destroying a new connector when connector is already destroyed', async () => {
8598
const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector')
8699
.resolves();
@@ -258,42 +271,58 @@ describe('Connector', () => {
258271
assert(updateStub.notCalled);
259272
});
260273

261-
it('should update when updateSupported is false and match stages are the same', async () => {
262-
const matchStage = { $match: { 'ns.coll': { $in: ['bucket1'] } } };
263-
const newPipeline = JSON.stringify([matchStage, { $set: { 'fullDocument.value.location': {} } }]);
274+
it('should update when updateSupported is false and live buckets match', async () => {
275+
connector._buckets = new Set(['bucket1']);
276+
connector._liveBuckets = new Set(['bucket1']);
264277
connector._state.bucketsGotModified = true;
265278
connector._state.isUpdating = false;
266279
connector._isRunning = true;
267-
sinon.stub(connector, '_getPipeline').returns(newPipeline);
268-
sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({
269-
pipeline: JSON.stringify([matchStage]),
270-
});
280+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
271281
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
272282
.resolves();
273283
const didUpdate = await connector.updatePipeline(true, false);
274284
assert.strictEqual(didUpdate, true);
275285
assert(updateStub.calledOnce);
276286
});
277287

278-
it('should skip update when updateSupported is false and match stages differ', async () => {
279-
const newPipeline = JSON.stringify([
280-
{ $match: { 'ns.coll': { $in: ['bucket1'] } } },
281-
]);
288+
it('should skip update when updateSupported is false and live buckets differ', async () => {
289+
connector._buckets = new Set(['bucket1']);
290+
connector._liveBuckets = new Set(['different-bucket']);
282291
connector._state.bucketsGotModified = true;
283292
connector._state.isUpdating = false;
284293
connector._isRunning = true;
285-
sinon.stub(connector, '_getPipeline').returns(newPipeline);
286-
sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({
287-
pipeline: JSON.stringify([
288-
{ $match: { 'ns.coll': { $in: ['different-bucket'] } } },
289-
]),
290-
});
294+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
291295
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
292296
.resolves();
293297
const didUpdate = await connector.updatePipeline(true, false);
294298
assert.strictEqual(didUpdate, false);
295299
assert(updateStub.notCalled);
296300
});
301+
302+
it('should sync liveBuckets after successful update', async () => {
303+
connector._buckets = new Set(['bucket1', 'bucket2']);
304+
connector._liveBuckets = new Set();
305+
connector._state.bucketsGotModified = true;
306+
connector._state.isUpdating = false;
307+
connector._isRunning = true;
308+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
309+
sinon.stub(connector._kafkaConnect, 'updateConnectorConfig').resolves();
310+
await connector.updatePipeline(true);
311+
assert.deepStrictEqual(connector.liveBuckets.sort(), ['bucket1', 'bucket2']);
312+
});
313+
314+
it('should not sync liveBuckets after failed update', async () => {
315+
connector._buckets = new Set(['bucket1']);
316+
connector._liveBuckets = new Set();
317+
connector._state.bucketsGotModified = true;
318+
connector._state.isUpdating = false;
319+
connector._isRunning = true;
320+
sinon.stub(connector, '_getPipeline').returns('example-pipeline');
321+
sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
322+
.rejects(errors.InternalError);
323+
await assert.rejects(() => connector.updatePipeline(true));
324+
assert.deepStrictEqual(connector.liveBuckets, []);
325+
});
297326
});
298327

299328
describe('getConfigSizeInBytes', () => {

0 commit comments

Comments
 (0)