Skip to content

Commit 5edb748

Browse files
committed
ft: ARSN-65 oplog pattern library
Snapshot-scan-oplog pattern with state persistence for applications requiring reading the oplog
1 parent c95f84e commit 5edb748

File tree

11 files changed

+1481
-1
lines changed

11 files changed

+1481
-1
lines changed
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
/*
2+
* Main interface for bucketd oplog management
3+
*
4+
* persist is an interface with the following methods:
5+
* - constructor(params)
6+
* - load(bucketName, persistData, cb(err, offset))
7+
* - save(bucketName, persistData, offset, cb(err))
8+
9+
* persistData is an interface with the following methods:
10+
* - constuctor(params)
11+
* - initState(cb(err)): initialize the structure, e.g. initial bucket scan
12+
* - loadState(stream, cb(err)): load the state
13+
* - saveState(stream, cb(err)): save the state
14+
* - updateState(addQueue, delQueue, cb(err)): update the state
15+
* item: { bucketName, key, value }
16+
*/
17+
const async = require('async');
18+
const BucketClient = require('bucketclient').RESTClient;
19+
const { jsutil } = require('arsenal');
20+
const LogConsumer = require('arsenal').storage.metadata.bucketclient.LogConsumer;
21+
const { isMasterKey } = require('arsenal/lib/versioning/Version');
22+
const werelogs = require('werelogs');
23+
24+
werelogs.configure({
25+
level: 'info',
26+
dump: 'error',
27+
});
28+
29+
class BucketdOplogInterface {
30+
31+
constructor(params) {
32+
this.stopAt = -1;
33+
this.backendRetryTimes = 3;
34+
this.backendRetryInterval = 300;
35+
this.bucketdOplogQuerySize = 20;
36+
let bkBootstrap = ['localhost:9000'];
37+
if (params && params.bootstrap !== undefined) {
38+
bkBootstrap = params.bootstrap;
39+
}
40+
if (params && params.persist !== undefined) {
41+
this.persist = params.persist;
42+
}
43+
if (params && params.persistData !== undefined) {
44+
this.persistData = params.persistData;
45+
}
46+
if (params && params.stopAt !== undefined) {
47+
this.stopAt = params.stopAt;
48+
}
49+
this.bkClient = new BucketClient(bkBootstrap);
50+
this.logger = new werelogs.Logger('BucketdOplogInterface');
51+
}
52+
53+
start(bucketName, cb) {
54+
async.waterfall([
55+
/*
56+
* In this step we get the raftId for bucketName
57+
*/
58+
next => {
59+
this.logger.info('obtaining raftId',
60+
{ bucketName });
61+
async.retry(
62+
{
63+
times: this.backendRetryTimes,
64+
interval: this.backendRetryInterval,
65+
},
66+
done => {
67+
this.bkClient.getBucketInformation(
68+
bucketName,
69+
null,
70+
(err, info) => {
71+
if (err) {
72+
this.logger.info('retrying getBucketInformation', { err, bucketName });
73+
return done(err);
74+
}
75+
return done(null, JSON.parse(info));
76+
});
77+
},
78+
(err, res) => {
79+
if (err) {
80+
this.logger.error('getBucketInformation too many failures', { err, bucketName });
81+
return next(err);
82+
}
83+
return next(null, res.raftSessionId);
84+
});
85+
return undefined;
86+
},
87+
/*
88+
* In this step we get the stored offset if we have it
89+
*/
90+
(raftId, next) => {
91+
let cseq = undefined;
92+
this.persist.load(bucketName, this.persistData, (err, offset) => {
93+
if (err) {
94+
return next(err);
95+
}
96+
cseq = offset;
97+
return next(null, raftId, cseq);
98+
});
99+
},
100+
/*
101+
* In this step we acquire the offset if we don't already have it
102+
*/
103+
(raftId, cseq, next) => {
104+
if (cseq !== undefined) {
105+
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
106+
{ bucketName });
107+
return next(null, raftId, cseq, true);
108+
}
109+
this.logger.info('cseq acquisition',
110+
{ bucketName });
111+
async.retry(
112+
{
113+
times: this.backendRetryTimes,
114+
interval: this.backendRetryInterval,
115+
},
116+
done => {
117+
this.bkClient.getRaftLog(
118+
raftId,
119+
1,
120+
1,
121+
true,
122+
null,
123+
(err, stream) => {
124+
if (err) {
125+
this.logger.info('retrying getRaftLog', { err, bucketName });
126+
return done(err);
127+
}
128+
const chunks = [];
129+
stream.on('data', chunk => {
130+
chunks.push(chunk);
131+
});
132+
stream.on('end', () => {
133+
const info = JSON.parse(Buffer.concat(chunks));
134+
return done(null, info);
135+
});
136+
return undefined;
137+
});
138+
},
139+
(err, res) => {
140+
if (err) {
141+
this.logger.error('getRaftLog too many failures', { err, bucketName });
142+
return next(err);
143+
}
144+
return next(null, raftId, res.info.cseq, false);
145+
});
146+
return undefined;
147+
},
148+
/*
149+
* In this step we scan the bucket
150+
*/
151+
(raftId, cseq, skipListing, next) => {
152+
if (skipListing) {
153+
this.logger.info(`skipping listing cseq=${cseq}`,
154+
{ bucketName });
155+
return next(null, raftId, cseq);
156+
}
157+
this.logger.info(`listing cseq=${cseq}`,
158+
{ bucketName });
159+
this.persistData.initState(err => {
160+
if (err) {
161+
return next(err);
162+
}
163+
this.persist.save(
164+
bucketName, this.persistData, cseq, err => {
165+
if (err) {
166+
return next(err);
167+
}
168+
return next(null, raftId, cseq);
169+
});
170+
return undefined;
171+
});
172+
return undefined;
173+
},
174+
/*
175+
* In this step we loop over the oplog
176+
*/
177+
(raftId, cseq, next) => {
178+
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
179+
{ bucketName });
180+
// only way to get out of the loop in all cases
181+
const nextOnce = jsutil.once(next);
182+
let doStop = false;
183+
// resume reading the oplog from cseq. changes are idempotent
184+
const logConsumer = new LogConsumer({
185+
bucketClient: this.bkClient,
186+
raftSession: raftId,
187+
});
188+
let _cseq = cseq;
189+
async.until(
190+
() => doStop,
191+
_next => {
192+
// console.error(
193+
// 'readRecords', _cseq, this.bucketdOplogQuerySize);
194+
logConsumer.readRecords({
195+
startSeq: _cseq,
196+
limit: this.bucketdOplogQuerySize,
197+
}, (err, record) => {
198+
if (err) {
199+
this.logger.error('readRecords error', { err, bucketName });
200+
// return _next(err);
201+
return setTimeout(() => _next(), 5000);
202+
}
203+
// console.error('record info', record.info);
204+
if (!record.log) {
205+
// nothing to read
206+
return setTimeout(() => _next(), 5000);
207+
}
208+
const seqs = [];
209+
record.log.on('data', chunk => {
210+
seqs.push(chunk);
211+
});
212+
record.log.on('end', () => {
213+
const addQueue = [];
214+
const delQueue = [];
215+
for (let i = 0; i < seqs.length; i++) {
216+
if (seqs[i].db === bucketName) {
217+
for (let j = 0; j < seqs[i].entries.length; j++) {
218+
// console.info(i, j, seqs[i].db, seqs[i].entries[j]);
219+
const _item = {};
220+
_item.bucketName = bucketName;
221+
_item.key = seqs[i].entries[j].key;
222+
if (seqs[i].entries[j].type !== undefined &&
223+
seqs[i].entries[j].type === 'del') {
224+
if (!isMasterKey(_item.key)) {
225+
// ignore for now
226+
return;
227+
}
228+
delQueue.push(_item);
229+
} else {
230+
_item.value = Object.assign({}, seqs[i].entries[j].value);
231+
addQueue.push(_item);
232+
}
233+
}
234+
}
235+
}
236+
this.persistData.updateState(
237+
addQueue, delQueue, err => {
238+
if (err) {
239+
return _next(err);
240+
}
241+
_cseq += seqs.length;
242+
this.persist.save(
243+
bucketName, this.persistData, _cseq, err => {
244+
if (err) {
245+
return _next(err);
246+
}
247+
if (_cseq > this.stopAt) {
248+
doStop = true;
249+
}
250+
return _next();
251+
});
252+
return undefined;
253+
});
254+
});
255+
return undefined;
256+
});
257+
}, err => {
258+
if (err) {
259+
return nextOnce(err);
260+
}
261+
return nextOnce();
262+
});
263+
},
264+
], err => {
265+
if (err) {
266+
return cb(err);
267+
}
268+
this.logger.info('returning',
269+
{ bucketName });
270+
return cb();
271+
});
272+
}
273+
}
274+
275+
module.exports = BucketdOplogInterface;

0 commit comments

Comments
 (0)