Skip to content

Commit 7f18605

Browse files
authored
Merge pull request #146 from ipfs/head-processing-parallel
Disable multi-head processing by default and add option
2 parents b18eb77 + 3b312dc commit 7f18605

File tree

1 file changed

+24
-9
lines changed

1 file changed

+24
-9
lines changed

crdt.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ type Options struct {
101101
// RepairInterval specifies how often to walk the full DAG until
102102
// the root(s) if it has been marked dirty. 0 to disable.
103103
RepairInterval time.Duration
104+
// MultiHeadProcessing lets several new heads to be processed in
105+
// parallel. This results in more branching in general. More
106+
// branching is not necessarily a bad thing and may improve
107+
// throughput, but everything depends on usage.
108+
MultiHeadProcessing bool
104109
}
105110

106111
func (opts *Options) verify() error {
@@ -147,8 +152,9 @@ func DefaultOptions() *Options {
147152
// always keeping
148153
// https://github.com/libp2p/go-libp2p-core/blob/master/network/network.go#L23
149154
// in sight
150-
MaxBatchDeltaSize: 1 * 1024 * 1024, // 1MB,
151-
RepairInterval: time.Hour,
155+
MaxBatchDeltaSize: 1 * 1024 * 1024, // 1MB,
156+
RepairInterval: time.Hour,
157+
MultiHeadProcessing: false,
152158
}
153159
}
154160

@@ -353,15 +359,24 @@ func (store *Datastore) handleNext() {
353359
continue
354360
}
355361

362+
processHead := func(c cid.Cid) {
363+
err = store.handleBlock(c) //handleBlock blocks
364+
if err != nil {
365+
store.logger.Error(err)
366+
store.markDirty()
367+
}
368+
}
369+
356370
// For each head, we process it.
357371
for _, head := range bCastHeads {
358-
go func(c cid.Cid) {
359-
err = store.handleBlock(c) //handleBlock blocks
360-
if err != nil {
361-
store.logger.Error(err)
362-
store.markDirty()
363-
}
364-
}(head)
372+
// A thing to try here would be to process heads in
373+
// the same broadcast in parallel, but do not process
374+
// heads from multiple broadcasts in parallel.
375+
if store.opts.MultiHeadProcessing {
376+
go processHead(head)
377+
} else {
378+
processHead(head)
379+
}
365380
store.seenHeadsMux.Lock()
366381
store.seenHeads[head] = struct{}{}
367382
store.seenHeadsMux.Unlock()

0 commit comments

Comments
 (0)