Skip to content

Commit d9a5d4c

Browse files
committed
fixup! oplog: Update pipeline on Mongo >= 6.0 if match stage is unchanged
fixup! oplog: Update pipeline on Mongo >= 6.0 if match stage is unchanged
1 parent dbdfc4d commit d9a5d4c

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

extensions/oplogPopulator/modules/Connector.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,21 +314,22 @@ class Connector extends EventEmitter {
314314
* That is why we use updateConnectorConfig() instead of
315315
* updateConnectorPipeline()
316316
* @param {boolean} [doUpdate=false] updates connector if true
317-
* @param {boolean} [updateMatchStage=true] when false, only update
318-
* if the $match stage has not changed (to protect resume tokens
319-
* on MongoDB >= 6.0)
317+
* @param {boolean} [updateSupported=true] whether full pipeline updates
318+
* (including match stage changes) are supported. When false, the update
319+
* is only applied if the match stage has not changed, to protect resume
320+
* tokens on MongoDB >= 6.0.
320321
* @returns {Promise|boolean} connector did update
321322
* @throws {InternalError}
322323
*/
323-
async updatePipeline(doUpdate = false, updateMatchStage = true) {
324+
async updatePipeline(doUpdate = false, updateSupported = true) {
324325
// Only update when buckets changed and when not already updating
325326
if (!this._state.bucketsGotModified || this._state.isUpdating) {
326327
return false;
327328
}
328329
this._config.pipeline = this._getPipeline([...this._buckets]);
329330
try {
330331
if (doUpdate && this._isRunning) {
331-
if (!updateMatchStage) {
332+
if (!updateSupported) {
332333
const currentConfig = await this._kafkaConnect.getConnectorConfig(this._name);
333334
const currentPipeline = currentConfig.pipeline
334335
? JSON.parse(currentConfig.pipeline) : [];

tests/unit/oplogPopulator/Connector.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ describe('Connector', () => {
258258
assert(updateStub.notCalled);
259259
});
260260

261-
it('should update when updateMatchStage is false and match stages are the same', async () => {
261+
it('should update when updateSupported is false and match stages are the same', async () => {
262262
const matchStage = { $match: { 'ns.coll': { $in: ['bucket1'] } } };
263263
const newPipeline = JSON.stringify([matchStage, { $set: { 'fullDocument.value.location': {} } }]);
264264
connector._state.bucketsGotModified = true;
@@ -275,7 +275,7 @@ describe('Connector', () => {
275275
assert(updateStub.calledOnce);
276276
});
277277

278-
it('should skip update when updateMatchStage is false and match stages differ', async () => {
278+
it('should skip update when updateSupported is false and match stages differ', async () => {
279279
const newPipeline = JSON.stringify([
280280
{ $match: { 'ns.coll': { $in: ['bucket1'] } } },
281281
]);

0 commit comments

Comments
 (0)