Skip to content

Commit 1b4c984

Browse files
Merge pull request #53 from coinbase/patrick/encounter-optimization
[indexer] Syncing Optimizations
2 parents 7b5393d + 8eaa063 commit 1b4c984

File tree

5 files changed

+155
-35
lines changed

5 files changed

+155
-35
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.13
55
require (
66
github.com/btcsuite/btcd v0.21.0-beta
77
github.com/btcsuite/btcutil v1.0.2
8-
github.com/coinbase/rosetta-sdk-go v0.6.3
8+
github.com/coinbase/rosetta-sdk-go v0.6.5
99
github.com/dgraph-io/badger/v2 v2.2007.2
1010
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
1111
github.com/stretchr/testify v1.6.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ
6666
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
6767
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
6868
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
69-
github.com/coinbase/rosetta-sdk-go v0.6.3 h1:PPj14tPJ7SFc8sY/hlwK8zddT7PKwWU2wicxyerDxlg=
70-
github.com/coinbase/rosetta-sdk-go v0.6.3/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo=
69+
github.com/coinbase/rosetta-sdk-go v0.6.5 h1:RytFDCPXS64vEYwIOsxsoQGlZZyP9RQvzyYikxymI4w=
70+
github.com/coinbase/rosetta-sdk-go v0.6.5/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo=
7171
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
7272
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
7373
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=

indexer/indexer.go

Lines changed: 150 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"runtime"
22+
"sync"
2123
"time"
2224

2325
"github.com/coinbase/rosetta-bitcoin/bitcoin"
@@ -34,6 +36,7 @@ import (
3436
sdkUtils "github.com/coinbase/rosetta-sdk-go/utils"
3537
"github.com/dgraph-io/badger/v2"
3638
"github.com/dgraph-io/badger/v2/options"
39+
"golang.org/x/sync/semaphore"
3740
)
3841

3942
const (
@@ -58,6 +61,15 @@ const (
5861

5962
// zeroValue is 0 as a string
6063
zeroValue = "0"
64+
65+
// overclockMultiplier is the amount
66+
// we multiply runtime.NumCPU by to determine
67+
// how many goroutines we should
68+
// spwan to handle block data sequencing.
69+
overclockMultiplier = 16
70+
71+
// semaphoreWeight is the weight of each semaphore request.
72+
semaphoreWeight = int64(1)
6173
)
6274

6375
var (
@@ -97,6 +109,20 @@ type Indexer struct {
97109
workers []modules.BlockWorker
98110

99111
waiter *waitTable
112+
113+
// Store coins created in pre-store before persisted
114+
// in add block so we can optimistically populate
115+
// blocks before committed.
116+
coinCache map[string]*types.AccountCoin
117+
coinCacheMutex *sdkUtils.PriorityMutex
118+
119+
// When populating blocks using pre-stored blocks,
120+
// we should retry if a new block was seen (similar
121+
// to trying again if head block changes).
122+
seen int64
123+
seenMutex sync.Mutex
124+
125+
seenSemaphore *semaphore.Weighted
100126
}
101127

102128
// CloseDatabase closes a storage.Database. This should be called
@@ -178,7 +204,7 @@ func Initialize(
178204
return nil, fmt.Errorf("%w: unable to initialize storage", err)
179205
}
180206

181-
blockStorage := modules.NewBlockStorage(localStore)
207+
blockStorage := modules.NewBlockStorage(localStore, runtime.NumCPU()*overclockMultiplier)
182208
asserter, err := asserter.NewClientWithOptions(
183209
config.Network,
184210
config.GenesisBlockIdentifier,
@@ -192,14 +218,17 @@ func Initialize(
192218
}
193219

194220
i := &Indexer{
195-
cancel: cancel,
196-
network: config.Network,
197-
pruningConfig: config.Pruning,
198-
client: client,
199-
database: localStore,
200-
blockStorage: blockStorage,
201-
waiter: newWaitTable(),
202-
asserter: asserter,
221+
cancel: cancel,
222+
network: config.Network,
223+
pruningConfig: config.Pruning,
224+
client: client,
225+
database: localStore,
226+
blockStorage: blockStorage,
227+
waiter: newWaitTable(),
228+
asserter: asserter,
229+
coinCache: map[string]*types.AccountCoin{},
230+
coinCacheMutex: new(sdkUtils.PriorityMutex),
231+
seenSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())),
203232
}
204233

205234
coinStorage := modules.NewCoinStorage(
@@ -330,35 +359,31 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
330359
}
331360

332361
ops := 0
333-
334-
// Close channels of all blocks waiting.
335-
i.waiter.Lock()
336362
for _, transaction := range block.Transactions {
337363
ops += len(transaction.Operations)
338-
txHash := transaction.TransactionIdentifier.Hash
339-
val, ok := i.waiter.Get(txHash, false)
340-
if !ok {
341-
continue
342-
}
364+
}
343365

344-
if val.channelClosed {
345-
logger.Debugw(
346-
"channel already closed",
347-
"hash", block.BlockIdentifier.Hash,
348-
"index", block.BlockIdentifier.Index,
349-
"channel", txHash,
350-
)
351-
continue
352-
}
366+
// clean cache intermediate
367+
i.coinCacheMutex.Lock(true)
368+
for _, tx := range block.Transactions {
369+
for _, op := range tx.Operations {
370+
if op.CoinChange == nil {
371+
continue
372+
}
353373

354-
// Closing channel will cause all listeners to continue
355-
val.channelClosed = true
356-
close(val.channel)
374+
if op.CoinChange.CoinAction != types.CoinCreated {
375+
continue
376+
}
377+
378+
delete(i.coinCache, op.CoinChange.CoinIdentifier.Identifier)
379+
}
357380
}
381+
i.coinCacheMutex.Unlock()
358382

359383
// Look for all remaining waiting transactions associated
360384
// with the next block that have not yet been closed. We should
361385
// abort these waits as they will never be closed by a new transaction.
386+
i.waiter.Lock()
362387
for txHash, val := range i.waiter.table {
363388
if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed {
364389
logger.Debugw(
@@ -385,6 +410,91 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
385410
return nil
386411
}
387412

413+
// BlockSeen is called by the syncer when a block is encountered.
414+
func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error {
415+
if err := i.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil {
416+
return err
417+
}
418+
defer i.seenSemaphore.Release(semaphoreWeight)
419+
420+
logger := utils.ExtractLogger(ctx, "indexer")
421+
422+
// load intermediate
423+
i.coinCacheMutex.Lock(false)
424+
for _, tx := range block.Transactions {
425+
for _, op := range tx.Operations {
426+
if op.CoinChange == nil {
427+
continue
428+
}
429+
430+
// We only care about newly accessible coins.
431+
if op.CoinChange.CoinAction != types.CoinCreated {
432+
continue
433+
}
434+
435+
i.coinCache[op.CoinChange.CoinIdentifier.Identifier] = &types.AccountCoin{
436+
Account: op.Account,
437+
Coin: &types.Coin{
438+
CoinIdentifier: op.CoinChange.CoinIdentifier,
439+
Amount: op.Amount,
440+
},
441+
}
442+
}
443+
}
444+
i.coinCacheMutex.Unlock()
445+
446+
// Update so that lookers know it exists
447+
i.seenMutex.Lock()
448+
i.seen++
449+
i.seenMutex.Unlock()
450+
451+
err := i.blockStorage.SeeBlock(ctx, block)
452+
if err != nil {
453+
return fmt.Errorf(
454+
"%w: unable to encounter block to storage %s:%d",
455+
err,
456+
block.BlockIdentifier.Hash,
457+
block.BlockIdentifier.Index,
458+
)
459+
}
460+
461+
ops := 0
462+
463+
// Close channels of all blocks waiting.
464+
i.waiter.Lock()
465+
for _, transaction := range block.Transactions {
466+
ops += len(transaction.Operations)
467+
txHash := transaction.TransactionIdentifier.Hash
468+
val, ok := i.waiter.Get(txHash, false)
469+
if !ok {
470+
continue
471+
}
472+
473+
if val.channelClosed {
474+
logger.Debugw(
475+
"channel already closed",
476+
"hash", block.BlockIdentifier.Hash,
477+
"index", block.BlockIdentifier.Index,
478+
"channel", txHash,
479+
)
480+
continue
481+
}
482+
483+
// Closing channel will cause all listeners to continue
484+
val.channelClosed = true
485+
close(val.channel)
486+
}
487+
i.waiter.Unlock()
488+
489+
logger.Debugw(
490+
"block seen",
491+
"hash", block.BlockIdentifier.Hash,
492+
"index", block.BlockIdentifier.Index,
493+
)
494+
495+
return nil
496+
}
497+
388498
// BlockRemoved is called by the syncer when a block is removed.
389499
func (i *Indexer) BlockRemoved(
390500
ctx context.Context,
@@ -424,6 +534,7 @@ func (i *Indexer) findCoin(
424534
coinIdentifier string,
425535
) (*types.Coin, *types.AccountIdentifier, error) {
426536
for ctx.Err() == nil {
537+
startSeen := i.seen
427538
databaseTransaction := i.database.ReadTransaction(ctx)
428539
defer databaseTransaction.Discard(ctx)
429540

@@ -461,6 +572,14 @@ func (i *Indexer) findCoin(
461572
return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier)
462573
}
463574

575+
// Check seen CoinCache
576+
i.coinCacheMutex.Lock(false)
577+
accCoin, ok := i.coinCache[coinIdentifier]
578+
i.coinCacheMutex.Unlock()
579+
if ok {
580+
return accCoin.Coin, accCoin.Account, nil
581+
}
582+
464583
// Locking here prevents us from adding sending any done
465584
// signals while we are determining whether or not to add
466585
// to the WaitTable.
@@ -470,12 +589,13 @@ func (i *Indexer) findCoin(
470589
// we created our databaseTransaction.
471590
currHeadBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
472591
if err != nil {
592+
i.waiter.Unlock()
473593
return nil, nil, fmt.Errorf("%w: unable to get head block identifier", err)
474594
}
475595

476596
// If the block has changed, we try to look up the transaction
477597
// again.
478-
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) {
598+
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.seen != startSeen {
479599
i.waiter.Unlock()
480600
continue
481601
}

services/network_service_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
var (
30-
middlewareVersion = "0.0.8"
30+
middlewareVersion = "0.0.9"
3131
defaultNetworkOptions = &types.NetworkOptionsResponse{
3232
Version: &types.Version{
3333
RosettaVersion: types.RosettaAPIVersion,

services/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const (
4545
// variable instead of a constant because
4646
// we typically need the pointer of this
4747
// value.
48-
MiddlewareVersion = "0.0.8"
48+
MiddlewareVersion = "0.0.9"
4949
)
5050

5151
// Client is used by the servicers to get Peer information

0 commit comments

Comments
 (0)