Skip to content

Commit 93779e6

Browse files
author
Patrick Nagurny
committed
get rid of queue in sync and detect reorgs correctly
1 parent 46134e1 commit 93779e6

File tree

7 files changed

+114
-188
lines changed

7 files changed

+114
-188
lines changed

lib/services/bitcoind/index.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ Bitcoin.prototype._wrapRPCError = function(errObj) {
115115
};
116116

117117
Bitcoin.prototype._getGenesisBlock = function(callback) {
118-
119118
var self = this;
120119

121120
self.client.getBlockHash(0, function(err, response) {
@@ -290,7 +289,6 @@ Bitcoin.prototype._connectProcess = function(config) {
290289
};
291290

292291
Bitcoin.prototype.start = function(callback) {
293-
294292
var self = this;
295293

296294
if (!self.options.connect) {
@@ -303,7 +301,10 @@ Bitcoin.prototype.start = function(callback) {
303301
throw new Error('Could not connect to any servers in connect array.');
304302
}
305303

306-
self._initChain(function() {
304+
self._initChain(function(err) {
305+
if(err) {
306+
return callback(err);
307+
}
307308

308309
log.info('Bitcoin Daemon Ready');
309310
callback();

lib/services/db/index.js

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ DB.prototype.start = function(callback) {
171171
log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
172172
self._sync.sync();
173173

174+
self._sync.once('synced', function() {
175+
self.emit('synced');
176+
});
177+
174178
});
175179

176180
});
@@ -239,43 +243,13 @@ DB.prototype.createKeyStream = function(op) {
239243
return stream;
240244
};
241245

242-
DB.prototype.detectReorg = function(blocks) {
243-
244-
var self = this;
245-
246-
if (!blocks || blocks.length === 0) {
247-
return;
248-
}
249-
250-
var tipHash = self.reorgTipHash || self.tip.hash;
251-
var chainMembers = [];
252-
253-
var loopIndex = 0;
254-
var overallCounter = 0;
255-
256-
while(overallCounter < blocks.length) {
257-
258-
if (loopIndex >= blocks.length) {
259-
overallCounter++;
260-
loopIndex = 0;
261-
}
262-
263-
var prevHash = BufferUtil.reverse(blocks[loopIndex].header.prevHash).toString('hex');
264-
if (prevHash === tipHash) {
265-
tipHash = blocks[loopIndex].hash;
266-
chainMembers.push(blocks[loopIndex]);
267-
}
268-
loopIndex++;
269-
270-
}
271-
272-
for(var i = 0; i < blocks.length; i++) {
273-
if (chainMembers.indexOf(blocks[i]) === -1) {
274-
return blocks[i];
275-
}
276-
self.reorgTipHash = blocks[i].hash;
246+
DB.prototype.detectReorg = function(block) {
247+
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
248+
if(this.tip.hash !== prevHash) {
249+
return true;
277250
}
278251

252+
return false;
279253
};
280254

281255
DB.prototype.handleReorg = function(forkBlock, callback) {

lib/services/db/sync.js

Lines changed: 16 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ function BlockStream(highWaterMark, db, sync) {
1616
this.db = db;
1717
this.dbTip = this.db.tip;
1818
this.lastReadHeight = this.dbTip.__height;
19-
this.lastEmittedHash = this.dbTip.hash;
20-
this.queue = [];
19+
this.lastReadHash = this.dbTip.hash;
2120
this.processing = false;
2221
this.bitcoind = this.db.bitcoind;
2322
}
@@ -101,18 +100,9 @@ Sync.prototype.sync = function() {
101100
};
102101

103102
Sync.prototype._onFinish = function() {
104-
105103
var self = this;
106104
self.syncing = false;
107105

108-
if (self.forkBlock) {
109-
self.db.handleReorg(self.forkBlock, function() {
110-
self.forkBlock = null;
111-
self.sync();
112-
});
113-
return;
114-
}
115-
116106
self._startSubscriptions();
117107
self.emit('synced');
118108

@@ -147,91 +137,27 @@ Sync.prototype._handleErrors = function(stream) {
147137

148138

149139
BlockStream.prototype._read = function() {
140+
var self = this;
150141

151-
if (this.lastEmittedHash === this.bitcoind.tiphash) {
142+
if(this.lastReadHash === this.bitcoind.tiphash) {
152143
return this.push(null);
153144
}
154145

155-
this.queue.push(++this.lastReadHeight);
156-
this._process();
157-
};
158-
159-
BlockStream.prototype._process = function() {
160-
var self = this;
161-
162-
if(self.processing) {
163-
return;
146+
if(this.lastReadHeight >= this.bitcoind.height) {
147+
return this.push(null);
164148
}
165149

166-
this.processing = true;
167-
168-
async.whilst(
169-
function() {
170-
return self.queue.length;
171-
}, function(next) {
172-
173-
var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length));
174-
self.queue = self.queue.slice(blockArgs.length);
175-
self._getBlocks(blockArgs, next);
176-
177-
}, function(err) {
178-
if(err) {
179-
return self.emit('error', err);
180-
}
181-
self.processing = false;
182-
}
183-
);
184-
};
185-
186-
187-
BlockStream.prototype._getBlocks = function(heights, callback) {
188-
189-
var self = this;
190-
async.map(heights, function(height, next) {
191-
192-
if (height === 0) {
193-
var block = new Block(self.bitcoind.genesisBuffer);
194-
block.__height = 0;
195-
return next(null, block);
196-
}
197-
198-
self.bitcoind.getBlock(height, function(err, block) {
199-
200-
if(err) {
201-
return next(err);
202-
}
203-
204-
block.__height = height;
205-
next(null, block);
206-
});
207-
208-
209-
}, function(err, blocks) {
210-
150+
self.bitcoind.getBlock(self.lastReadHeight + 1, function(err, block) {
211151
if(err) {
212-
return callback(err);
152+
// add new stack lines to err
153+
return self.emit('error', new Error(err));
213154
}
214155

215-
//at this point, we know that all blocks we've sent down the pipe
216-
//have not been reorg'ed, but the new batch here might have been
217-
self.sync.forkBlock = self.db.detectReorg(blocks);
218-
219-
if (!self.sync.forkBlock) {
220-
221-
for(var i = 0; i < blocks.length; i++) {
222-
223-
self.lastEmittedHash = blocks[i].hash;
224-
self.push(blocks[i]);
225-
226-
}
227-
228-
return callback();
229-
230-
}
231-
232-
self.push(null);
233-
callback();
156+
self.lastReadHeight++;
157+
self.lastReadHash = block.hash;
234158

159+
block.__height = self.lastReadHeight;
160+
self.push(block);
235161
});
236162
};
237163

@@ -266,6 +192,10 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
266192
ProcessSerial.prototype._process = function(block, callback) {
267193
var self = this;
268194

195+
if(self.db.detectReorg(block)) {
196+
return self.db.handleReorg(block, callback);
197+
}
198+
269199
self.db.getSerialBlockOperations(block, true, function(err, operations) {
270200
if(err) {
271201
return callback(err);

test/data/blocks.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"genesis": "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000",
33
"block1a": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f69965a91e7fc9ccccbe4051b74d086114741b96678f5e491b5609b18962252fd2d12f858ffff7f20040000000102000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210372bfaa748e546ba784a4d1395f5cedf673f9f5a8160effbe0f595fe905fb3e59ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf900000000",
4-
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000"
4+
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000",
5+
"block2b": "00000020d459cd53b1e7fb9c9b7bdd7c48b571da9b0df08ff68b13cb6784b5b18761512e206072ebec8302cf43f859245d61dc304eaced26703e5d2fa035ae8068c6196ae0ede058ffff7f20010000000101000000000100f2052a010000001976a9142a48bf892a5461dffe8c68fe209be16a84289ca488ac00000000"
56
}

test/services/db/index.unit.js

Lines changed: 0 additions & 56 deletions
This file was deleted.

0 commit comments

Comments
 (0)