Skip to content

Commit 97e7f43

Browse files
authored
Merge pull request #11666 from Automattic/6.3
6.3
2 parents 20aa5e1 + f7da45e commit 97e7f43

34 files changed

+904
-641
lines changed

.github/workflows/test.yml

+12
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,15 @@ jobs:
102102
with:
103103
name: coverage
104104
path: coverage
105+
test-replica-sets:
106+
runs-on: ubuntu-latest
107+
name: Replica Set tests
108+
steps:
109+
- uses: actions/checkout@a12a3943b4bdde767164f792f33f40b04645d846 # v3
110+
- name: Setup node
111+
uses: actions/setup-node@5b52f097d36d4b0b2f94ed6de710023fbb8b2236 # v3.1.0
112+
with:
113+
node-version: 16
114+
- run: npm install
115+
- name: Test
116+
run: env START_REPLICA_SET=1 npm test

lib/aggregate.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
const AggregationCursor = require('./cursor/AggregationCursor');
88
const Query = require('./query');
9-
const applyGlobalMaxTimeMS = require('./helpers/query/applyGlobalMaxTimeMS');
9+
const { applyGlobalMaxTimeMS, applyGlobalDiskUse } = require('./helpers/query/applyGlobalOption');
1010
const getConstructorName = require('./helpers/getConstructorName');
1111
const prepareDiscriminatorPipeline = require('./helpers/aggregate/prepareDiscriminatorPipeline');
1212
const promiseOrCallback = require('./helpers/promiseOrCallback');
@@ -973,6 +973,7 @@ Aggregate.prototype.exec = function(callback) {
973973
const collection = this._model.collection;
974974

975975
applyGlobalMaxTimeMS(this.options, model);
976+
applyGlobalDiskUse(this.options, model);
976977

977978
if (this.options && this.options.cursor) {
978979
return new AggregationCursor(this);

lib/cursor/ChangeStream.js

+42-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class ChangeStream extends EventEmitter {
1616

1717
this.driverChangeStream = null;
1818
this.closed = false;
19+
this.bindedEvents = false;
1920
this.pipeline = pipeline;
2021
this.options = options;
2122

@@ -27,21 +28,60 @@ class ChangeStream extends EventEmitter {
2728
}
2829

2930
this.driverChangeStream = driverChangeStream;
30-
this._bindEvents();
3131
this.emit('ready');
3232
});
3333
}
3434

3535
_bindEvents() {
36+
if (this.bindedEvents) {
37+
return;
38+
}
39+
40+
this.bindedEvents = true;
41+
42+
if (this.driverChangeStream == null) {
43+
this.once('ready', () => {
44+
this.driverChangeStream.on('close', () => {
45+
this.closed = true;
46+
});
47+
48+
['close', 'change', 'end', 'error'].forEach(ev => {
49+
this.driverChangeStream.on(ev, data => this.emit(ev, data));
50+
});
51+
});
52+
53+
return;
54+
}
55+
3656
this.driverChangeStream.on('close', () => {
3757
this.closed = true;
3858
});
3959

4060
['close', 'change', 'end', 'error'].forEach(ev => {
41-
this.driverChangeStream.on(ev, data => this.emit(ev, data));
61+
this.driverChangeStream.on(ev, data => {
62+
this.emit(ev, data);
63+
});
4264
});
4365
}
4466

67+
hasNext(cb) {
68+
return this.driverChangeStream.hasNext(cb);
69+
}
70+
71+
next(cb) {
72+
return this.driverChangeStream.next(cb);
73+
}
74+
75+
on(event, handler) {
76+
this._bindEvents();
77+
return super.on(event, handler);
78+
}
79+
80+
once(event, handler) {
81+
this._bindEvents();
82+
return super.once(event, handler);
83+
}
84+
4585
_queue(cb) {
4686
this.once('ready', () => cb());
4787
}

lib/cursor/QueryCursor.js

+2
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ QueryCursor.prototype.next = function(callback) {
225225
* @param {Function} fn
226226
* @param {Object} [options]
227227
* @param {Number} [options.parallel] the number of promises to execute in parallel. Defaults to 1.
228+
* @param {Number} [options.batchSize] if set, will call `fn()` with arrays of documents with length at most `batchSize`
229+
* @param {Boolean} [options.continueOnError=false] if true, `eachAsync()` iterates through all docs even if `fn` throws an error. If false, `eachAsync()` throws an error immediately if the given function `fn()` throws an error.
228230
* @param {Function} [callback] executed when all docs have been processed
229231
* @return {Promise}
230232
* @api public

lib/error/eachAsyncMultiError.js

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*!
2+
* Module dependencies.
3+
*/
4+
5+
'use strict';
6+
7+
const MongooseError = require('./');
8+
9+
10+
/**
11+
* If `eachAsync()` is called with `continueOnError: true`, there can be
12+
* multiple errors. This error class contains an `errors` property, which
13+
* contains an array of all errors that occurred in `eachAsync()`.
14+
*
15+
* @api private
16+
*/
17+
18+
class EachAsyncMultiError extends MongooseError {
19+
/**
20+
* @param {String} connectionString
21+
*/
22+
constructor(errors) {
23+
let preview = errors.map(e => e.message).join(', ');
24+
if (preview.length > 50) {
25+
preview = preview.slice(0, 50) + '...';
26+
}
27+
super(`eachAsync() finished with ${errors.length} errors: ${preview}`);
28+
29+
this.errors = errors;
30+
}
31+
}
32+
33+
Object.defineProperty(EachAsyncMultiError.prototype, 'name', {
34+
value: 'EachAsyncMultiError'
35+
});
36+
37+
/*!
38+
* exports
39+
*/
40+
41+
module.exports = EachAsyncMultiError;

lib/helpers/cursor/eachAsync.js

+44-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Module dependencies.
55
*/
66

7+
const EachAsyncMultiError = require('../../error/eachAsyncMultiError');
78
const immediate = require('../immediate');
89
const promiseOrCallback = require('../promiseOrCallback');
910

@@ -24,10 +25,11 @@ const promiseOrCallback = require('../promiseOrCallback');
2425
module.exports = function eachAsync(next, fn, options, callback) {
2526
const parallel = options.parallel || 1;
2627
const batchSize = options.batchSize;
28+
const continueOnError = options.continueOnError;
29+
const aggregatedErrors = [];
2730
const enqueue = asyncQueue();
2831

2932
return promiseOrCallback(callback, cb => {
30-
3133
if (batchSize != null) {
3234
if (typeof batchSize !== 'number') {
3335
throw new TypeError('batchSize must be a number');
@@ -62,14 +64,22 @@ module.exports = function eachAsync(next, fn, options, callback) {
6264
return done();
6365
}
6466
if (err != null) {
65-
error = err;
66-
finalCallback(err);
67-
return done();
67+
if (continueOnError) {
68+
aggregatedErrors.push(err);
69+
} else {
70+
error = err;
71+
finalCallback(err);
72+
return done();
73+
}
6874
}
6975
if (doc == null) {
7076
drained = true;
7177
if (handleResultsInProgress <= 0) {
72-
finalCallback(null);
78+
const finalErr = continueOnError ?
79+
createEachAsyncMultiError(aggregatedErrors) :
80+
error;
81+
82+
finalCallback(finalErr);
7383
} else if (batchSize && documentsBatch.length) {
7484
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
7585
}
@@ -102,11 +112,18 @@ module.exports = function eachAsync(next, fn, options, callback) {
102112
--handleResultsInProgress;
103113
}
104114
if (err != null) {
105-
error = err;
106-
return finalCallback(err);
115+
if (continueOnError) {
116+
aggregatedErrors.push(err);
117+
} else {
118+
error = err;
119+
return finalCallback(err);
120+
}
107121
}
108122
if (drained && handleResultsInProgress <= 0) {
109-
return finalCallback(null);
123+
const finalErr = continueOnError ?
124+
createEachAsyncMultiError(aggregatedErrors) :
125+
error;
126+
return finalCallback(finalErr);
110127
}
111128

112129
immediate(() => enqueue(fetch));
@@ -118,11 +135,18 @@ module.exports = function eachAsync(next, fn, options, callback) {
118135
}
119136

120137
function handleNextResult(doc, i, callback) {
121-
const promise = fn(doc, i);
122-
if (promise && typeof promise.then === 'function') {
123-
promise.then(
138+
let maybePromise;
139+
try {
140+
maybePromise = fn(doc, i);
141+
} catch (err) {
142+
return callback(err);
143+
}
144+
if (maybePromise && typeof maybePromise.then === 'function') {
145+
maybePromise.then(
124146
function() { callback(null); },
125-
function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
147+
function(error) {
148+
callback(error || new Error('`eachAsync()` promise rejected without error'));
149+
});
126150
} else {
127151
callback(null);
128152
}
@@ -158,3 +182,11 @@ function asyncQueue() {
158182
}
159183
}
160184
}
185+
186+
function createEachAsyncMultiError(aggregatedErrors) {
187+
if (aggregatedErrors.length === 0) {
188+
return null;
189+
}
190+
191+
return new EachAsyncMultiError(aggregatedErrors);
192+
}

lib/helpers/model/discriminator.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ const CUSTOMIZABLE_DISCRIMINATOR_OPTIONS = {
1717
*/
1818

1919
module.exports = function discriminator(model, name, schema, tiedValue, applyPlugins) {
20-
2120
if (!(schema && schema.instanceOfSchema)) {
2221
throw new Error('You must pass a valid discriminator Schema');
2322
}
@@ -109,7 +108,7 @@ module.exports = function discriminator(model, name, schema, tiedValue, applyPlu
109108

110109
utils.merge(schema, baseSchema, {
111110
isDiscriminatorSchemaMerge: true,
112-
omit: { discriminators: true, base: true },
111+
omit: { discriminators: true, base: true, _applyDiscriminators: true },
113112
omitNested: conflictingPaths.reduce((cur, path) => {
114113
cur['tree.' + path] = true;
115114
return cur;
@@ -141,7 +140,6 @@ module.exports = function discriminator(model, name, schema, tiedValue, applyPlu
141140
obj[key][schema.options.typeKey] = existingPath ? existingPath.options[schema.options.typeKey] : String;
142141
schema.add(obj);
143142

144-
145143
schema.discriminatorMapping = { key: key, value: value, isRoot: false };
146144

147145
if (baseSchema.options.collection) {

lib/helpers/query/applyGlobalMaxTimeMS.js

-15
This file was deleted.
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const utils = require('../../utils');
4+
5+
function applyGlobalMaxTimeMS(options, model) {
6+
applyGlobalOption(options, model, 'maxTimeMS');
7+
}
8+
9+
function applyGlobalDiskUse(options, model) {
10+
applyGlobalOption(options, model, 'allowDiskUse');
11+
}
12+
13+
module.exports = {
14+
applyGlobalMaxTimeMS,
15+
applyGlobalDiskUse
16+
};
17+
18+
19+
function applyGlobalOption(options, model, optionName) {
20+
if (utils.hasUserDefinedProperty(options, optionName)) {
21+
return;
22+
}
23+
24+
if (utils.hasUserDefinedProperty(model.db.options, optionName)) {
25+
options[optionName] = model.db.options[optionName];
26+
} else if (utils.hasUserDefinedProperty(model.base.options, optionName)) {
27+
options[optionName] = model.base.options[optionName];
28+
}
29+
}

lib/index.js

+10-3
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ Mongoose.prototype.driver = driver;
179179
Mongoose.prototype.set = function(key, value) {
180180
const _mongoose = this instanceof Mongoose ? this : mongoose;
181181

182-
if (VALID_OPTIONS.indexOf(key) === -1) throw new Error(`\`${key}\` is an invalid option.`);
182+
if (VALID_OPTIONS.indexOf(key) === -1) {
183+
throw new Error(`\`${key}\` is an invalid option.`);
184+
}
183185

184186
if (arguments.length === 1) {
185187
return _mongoose.options[key];
@@ -467,6 +469,7 @@ Mongoose.prototype.pluralize = function(fn) {
467469
*/
468470

469471
Mongoose.prototype.model = function(name, schema, collection, options) {
472+
470473
const _mongoose = this instanceof Mongoose ? this : mongoose;
471474

472475
if (typeof schema === 'string') {
@@ -519,7 +522,6 @@ Mongoose.prototype.model = function(name, schema, collection, options) {
519522
}
520523

521524
const model = _mongoose._model(name, schema, collection, options);
522-
523525
_mongoose.connection.models[name] = model;
524526
_mongoose.models[name] = model;
525527

@@ -561,12 +563,17 @@ Mongoose.prototype._model = function(name, schema, collection, options) {
561563

562564
const connection = options.connection || _mongoose.connection;
563565
model = _mongoose.Model.compile(model || name, schema, collection, connection, _mongoose);
564-
565566
// Errors handled internally, so safe to ignore error
566567
model.init(function $modelInitNoop() {});
567568

568569
connection.emit('model', model);
569570

571+
if (schema._applyDiscriminators != null) {
572+
for (const disc of Object.keys(schema._applyDiscriminators)) {
573+
model.discriminator(disc, schema._applyDiscriminators[disc]);
574+
}
575+
}
576+
570577
return model;
571578
};
572579

0 commit comments

Comments
 (0)