Skip to content

Commit be69ea2

Browse files
committed
[lbry] claimtrie: created node cache
1 parent 987a533 commit be69ea2

File tree

5 files changed

+212
-98
lines changed

5 files changed

+212
-98
lines changed

claimtrie/claimtrie.go

Lines changed: 23 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"bytes"
55
"fmt"
66
"path/filepath"
7-
"runtime"
87
"sort"
9-
"sync"
108

119
"github.com/pkg/errors"
1210

@@ -249,17 +247,17 @@ func (ct *ClaimTrie) AppendBlock(temporary bool) error {
249247
names = append(names, expirations...)
250248
names = removeDuplicates(names)
251249

252-
nhns := ct.makeNameHashNext(names, false, nil)
253-
for nhn := range nhns {
250+
for _, name := range names {
254251

255-
ct.merkleTrie.Update(nhn.Name, nhn.Hash, true)
256-
if nhn.Next <= 0 {
252+
hash, next := ct.nodeManager.Hash(name)
253+
ct.merkleTrie.Update(name, hash, true)
254+
if next <= 0 {
257255
continue
258256
}
259257

260-
newName := normalization.NormalizeIfNecessary(nhn.Name, nhn.Next)
258+
newName := normalization.NormalizeIfNecessary(name, next)
261259
updateNames = append(updateNames, newName)
262-
updateHeights = append(updateHeights, nhn.Next)
260+
updateHeights = append(updateHeights, next)
263261
}
264262
if !temporary && len(updateNames) > 0 {
265263
err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights)
@@ -356,22 +354,29 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
356354
}
357355

358356
func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) {
359-
var nhns chan NameHashNext
360357
if names == nil {
361358
node.Log("Building the entire claim trie in RAM...")
362359
ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger())
363-
nhns = ct.makeNameHashNext(nil, true, interrupt)
364-
} else {
365-
ct.claimLogger = nil
366-
nhns = ct.makeNameHashNext(names, false, interrupt)
367-
}
368360

369-
for nhn := range nhns {
370-
ct.merkleTrie.Update(nhn.Name, nhn.Hash, false)
371-
if ct.claimLogger != nil {
372-
ct.claimLogger.LogName(nhn.Name)
361+
ct.nodeManager.IterateNames(func(name []byte) bool {
362+
if interruptRequested(interrupt) {
363+
return false
364+
}
365+
clone := make([]byte, len(name))
366+
copy(clone, name)
367+
hash, _ := ct.nodeManager.Hash(clone)
368+
ct.merkleTrie.Update(clone, hash, false)
369+
ct.claimLogger.LogName(name)
370+
return true
371+
})
372+
373+
} else {
374+
for _, name := range names {
375+
hash, _ := ct.nodeManager.Hash(name)
376+
ct.merkleTrie.Update(name, hash, false)
373377
}
374378
}
379+
375380
}
376381

377382
// MerkleHash returns the Merkle Hash of the claimTrie.
@@ -437,12 +442,6 @@ func (ct *ClaimTrie) FlushToDisk() {
437442
}
438443
}
439444

440-
type NameHashNext struct {
441-
Name []byte
442-
Hash *chainhash.Hash
443-
Next int32
444-
}
445-
446445
func interruptRequested(interrupted <-chan struct{}) bool {
447446
select {
448447
case <-interrupted: // should never block on nil
@@ -452,53 +451,3 @@ func interruptRequested(interrupted <-chan struct{}) bool {
452451

453452
return false
454453
}
455-
456-
func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext {
457-
inputs := make(chan []byte, 512)
458-
outputs := make(chan NameHashNext, 512)
459-
460-
var wg sync.WaitGroup
461-
hashComputationWorker := func() {
462-
for name := range inputs {
463-
hash, next := ct.nodeManager.Hash(name)
464-
outputs <- NameHashNext{name, hash, next}
465-
}
466-
wg.Done()
467-
}
468-
469-
threads := int(0.8 * float32(runtime.GOMAXPROCS(0)))
470-
if threads < 1 {
471-
threads = 1
472-
}
473-
for threads > 0 {
474-
threads--
475-
wg.Add(1)
476-
go hashComputationWorker()
477-
}
478-
go func() {
479-
if all {
480-
ct.nodeManager.IterateNames(func(name []byte) bool {
481-
if interruptRequested(interrupt) {
482-
return false
483-
}
484-
clone := make([]byte, len(name))
485-
copy(clone, name) // iteration name buffer is reused on future loops
486-
inputs <- clone
487-
return true
488-
})
489-
} else {
490-
for _, name := range names {
491-
if interruptRequested(interrupt) {
492-
break
493-
}
494-
inputs <- name
495-
}
496-
}
497-
close(inputs)
498-
}()
499-
go func() {
500-
wg.Wait()
501-
close(outputs)
502-
}()
503-
return outputs
504-
}

claimtrie/node/cache.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package node
2+
3+
import (
4+
"container/list"
5+
6+
"github.com/lbryio/lbcd/claimtrie/change"
7+
)
8+
9+
type cacheLeaf struct {
10+
node *Node
11+
element *list.Element
12+
changes []change.Change
13+
height int32
14+
}
15+
16+
type Cache struct {
17+
nodes map[string]*cacheLeaf
18+
order *list.List
19+
limit int
20+
}
21+
22+
func (nc *Cache) insert(name []byte, n *Node, height int32) {
23+
key := string(name)
24+
25+
existing := nc.nodes[key]
26+
if existing != nil {
27+
existing.node = n
28+
existing.height = height
29+
existing.changes = nil
30+
nc.order.MoveToFront(existing.element)
31+
return
32+
}
33+
34+
for nc.order.Len() >= nc.limit {
35+
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
36+
delete(nc.nodes, nc.order.Back().Value.(string))
37+
nc.order.Remove(nc.order.Back())
38+
}
39+
40+
element := nc.order.PushFront(key)
41+
nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height}
42+
}
43+
44+
func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) {
45+
key := string(name)
46+
47+
existing := nc.nodes[key]
48+
if existing != nil && existing.height <= height {
49+
nc.order.MoveToFront(existing.element)
50+
return existing.node, existing.changes, existing.height
51+
}
52+
return nil, nil, -1
53+
}
54+
55+
func (nc *Cache) addChanges(changes []change.Change, height int32) {
56+
for _, c := range changes {
57+
key := string(c.Name)
58+
existing := nc.nodes[key]
59+
if existing != nil && existing.height <= height {
60+
existing.changes = append(existing.changes, c)
61+
}
62+
}
63+
}
64+
65+
func (nc *Cache) drop(names [][]byte) {
66+
for _, name := range names {
67+
key := string(name)
68+
existing := nc.nodes[key]
69+
if existing != nil {
70+
// we can't roll it backwards because we don't know its previous height value; just toast it
71+
delete(nc.nodes, key)
72+
nc.order.Remove(existing.element)
73+
}
74+
}
75+
}
76+
77+
func (nc *Cache) clear() {
78+
nc.nodes = map[string]*cacheLeaf{}
79+
nc.order = list.New()
80+
// we'll let the GC sort out the remains...
81+
}
82+
83+
func NewCache(limit int) *Cache {
84+
return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()}
85+
}

claimtrie/node/manager.go

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Manager interface {
2121
IterateNames(predicate func(name []byte) bool)
2222
Hash(name []byte) (*chainhash.Hash, int32)
2323
Flush() error
24+
ClearCache()
2425
}
2526

2627
type BaseManager struct {
@@ -30,31 +31,62 @@ type BaseManager struct {
3031
changes []change.Change
3132

3233
tempChanges map[string][]change.Change
34+
35+
cache *Cache
3336
}
3437

3538
func NewBaseManager(repo Repo) (*BaseManager, error) {
3639

3740
nm := &BaseManager{
38-
repo: repo,
41+
repo: repo,
42+
cache: NewCache(10000), // TODO: how many should we cache?
3943
}
4044

4145
return nm, nil
4246
}
4347

48+
func (nm *BaseManager) ClearCache() {
49+
nm.cache.clear()
50+
}
51+
4452
func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
4553

46-
changes, err := nm.repo.LoadChanges(name)
47-
if err != nil {
48-
return nil, errors.Wrap(err, "in load changes")
49-
}
54+
n, changes, oldHeight := nm.cache.fetch(name, height)
55+
if n == nil {
56+
changes, err := nm.repo.LoadChanges(name)
57+
if err != nil {
58+
return nil, errors.Wrap(err, "in load changes")
59+
}
5060

51-
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
52-
changes = append(changes, nm.tempChanges[string(name)]...)
53-
}
61+
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
62+
changes = append(changes, nm.tempChanges[string(name)]...)
63+
}
5464

55-
n, err := nm.newNodeFromChanges(changes, height)
56-
if err != nil {
57-
return nil, errors.Wrap(err, "in new node")
65+
n, err = nm.newNodeFromChanges(changes, height)
66+
if err != nil {
67+
return nil, errors.Wrap(err, "in new node")
68+
}
69+
// TODO: how can we tell what needs to be cached?
70+
if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 4 || len(name) < 12) {
71+
nm.cache.insert(name, n, height)
72+
}
73+
} else {
74+
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
75+
changes = append(changes, nm.tempChanges[string(name)]...)
76+
n = n.Clone()
77+
} else if height != nm.height {
78+
n = n.Clone()
79+
}
80+
updated, err := nm.updateFromChanges(n, changes, height)
81+
if err != nil {
82+
return nil, errors.Wrap(err, "in update from changes")
83+
}
84+
if !updated {
85+
n.AdjustTo(oldHeight, height, name)
86+
}
87+
if nm.tempChanges == nil && height == nm.height {
88+
nm.cache.insert(name, n, height)
89+
}
5890
}
5991

6092
return n, nil
@@ -66,17 +98,13 @@ func (nm *BaseManager) node(name []byte) (*Node, error) {
6698
return nm.NodeAt(nm.height, name)
6799
}
68100

69-
// newNodeFromChanges returns a new Node constructed from the changes.
70-
// The changes must preserve their order received.
71-
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
101+
func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) {
72102

73-
if len(changes) == 0 {
74-
return nil, nil
103+
count := len(changes)
104+
if count == 0 {
105+
return false, nil
75106
}
76-
77-
n := New()
78107
previous := changes[0].Height
79-
count := len(changes)
80108

81109
for i, chg := range changes {
82110
if chg.Height < previous {
@@ -95,15 +123,37 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32)
95123
delay := nm.getDelayForName(n, chg)
96124
err := n.ApplyChange(chg, delay)
97125
if err != nil {
98-
return nil, errors.Wrap(err, "in apply change")
126+
return false, errors.Wrap(err, "in apply change")
99127
}
100128
}
101129

102130
if count <= 0 {
103-
return nil, nil
131+
// we applied no changes, which means we shouldn't exist if we had all the changes
132+
// or might mean nothing significant if we are applying a partial changeset
133+
return false, nil
104134
}
105135
lastChange := changes[count-1]
106-
return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil
136+
n.AdjustTo(lastChange.Height, height, lastChange.Name)
137+
return true, nil
138+
}
139+
140+
// newNodeFromChanges returns a new Node constructed from the changes.
141+
// The changes must preserve their order received.
142+
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
143+
144+
if len(changes) == 0 {
145+
return nil, nil
146+
}
147+
148+
n := New()
149+
updated, err := nm.updateFromChanges(n, changes, height)
150+
if err != nil {
151+
return nil, errors.Wrap(err, "in update from changes")
152+
}
153+
if updated {
154+
return n, nil
155+
}
156+
return nil, nil
107157
}
108158

109159
func (nm *BaseManager) AppendChange(chg change.Change) {
@@ -220,6 +270,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte
220270
}
221271

222272
if !temporary {
273+
nm.cache.addChanges(nm.changes, height)
223274
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
224275
return nil, errors.Wrap(err, "in append changes")
225276
}
@@ -255,6 +306,8 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) (
255306
return affectedNames, errors.Wrap(err, "in drop changes")
256307
}
257308
}
309+
310+
nm.cache.drop(affectedNames)
258311
}
259312
nm.height = height
260313

0 commit comments

Comments
 (0)