-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.js
More file actions
97 lines (83 loc) · 2.33 KB
/
Copy pathtest.js
File metadata and controls
97 lines (83 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
const mongoose = require('mongoose');
const { Leader } = require('mongo-leader');
const CollectionManager = require('./lib/collection-manager');
function deferred () {
let _resolve;
let _reject;
const promise = new Promise((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});
return {
resolve: _resolve,
reject: _reject,
promise,
};
}
function delay (ms) {
const done = deferred();
setTimeout(done.resolve, ms);
return done.promise;
}
async function onChange (data) {
await delay(1000);
console.log(`${data.operationType}: `, data);
}
async function onError (error) {
console.error('Error from stream', error);
}
async function main () {
const isReady = deferred();
mongoose.connect('mongodb://localhost:27017/?replicaSet=rs1');
mongoose.connection
.on('connected', () => {
console.log('DB is connected');
isReady.resolve();
})
.on('disconnected', () => {
console.log('DB is disconnected');
})
.on('error', (error) => {
console.log('DB error: ', error);
isReady.reject(error);
})
.on('close', () => {
console.log('DB is closed');
});
await isReady.promise;
const leader = new Leader(mongoose.connection.useDb('pud', { useCache: true }).db, { ttl: 5000, wait: 1000 });
const collections = [];
['b', 'c'].forEach(async (col) => {
const collectionManager = new CollectionManager(mongoose.connection.useDb('pud', { useCache: true }), {
watch: {
fullDocument: 'updateLookup',
readPreference: 'secondaryPreferred'
},
collection: col,
handlers: {
onChange,
onError,
onClose: () => { console.log('stream is closed'); },
onEnd: () => { console.log('stream is ended'); }
},
logger: console,
});
collections.push(collectionManager);
});
leader
.on('elected', async () => {
console.log('I am a leader');
await Promise.all(collections.map(c => c.watch()));
})
.on('revoked', async () => {
console.log('I am stand by');
await Promise.all(collections.map(c => c.close()));
});
process.on('SIGINT', async () => {
await Promise.all(collections.map(c => c.close()));
await mongoose.connection.close();
console.log('everything is closed. Bye');
process.exit(0);
});
}
main().catch(console.error);