Skip to content

Commit 7116184

Browse files
authored
Merge pull request #2736 from cfromknecht/config-num-workers
lncfg: add CLI worker configuration
2 parents 158a32c + d81ce61 commit 7116184

File tree

4 files changed

+167
-4
lines changed

4 files changed

+167
-4
lines changed

config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ type config struct {
252252
net tor.Net
253253

254254
Routing *routing.Conf `group:"routing" namespace:"routing"`
255+
256+
Workers *lncfg.Workers `group:"workers" namespace:"workers"`
255257
}
256258

257259
// loadConfig initializes and parses the config using a config file and command
@@ -334,6 +336,11 @@ func loadConfig() (*config, error) {
334336
Control: defaultTorControl,
335337
},
336338
net: &tor.ClearNet{},
339+
Workers: &lncfg.Workers{
340+
Read: lncfg.DefaultReadWorkers,
341+
Write: lncfg.DefaultWriteWorkers,
342+
Sig: lncfg.DefaultSigWorkers,
343+
},
337344
}
338345

339346
// Pre-parse the command line options to pick up an alternative config
@@ -968,6 +975,12 @@ func loadConfig() (*config, error) {
968975
"minbackoff")
969976
}
970977

978+
// Assert that all worker pools will have a positive number of
979+
// workers, otherwise the pools will rendered useless.
980+
if err := cfg.Workers.Validate(); err != nil {
981+
return nil, err
982+
}
983+
971984
// Finally, ensure that the user's color is correctly formatted,
972985
// otherwise the server will not be able to start after the unlocking
973986
// the wallet.

lncfg/workers.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package lncfg
2+
3+
import "fmt"
4+
5+
const (
6+
// DefaultReadWorkers is the default maximum number of concurrent
7+
// workers used by the daemon's read pool.
8+
DefaultReadWorkers = 16
9+
10+
// DefaultWriteWorkers is the default maximum number of concurrent
11+
// workers used by the daemon's write pool.
12+
DefaultWriteWorkers = 16
13+
14+
// DefaultSigWorkers is the default maximum number of concurrent workers
15+
// used by the daemon's sig pool.
16+
DefaultSigWorkers = 8
17+
)
18+
19+
// Workers exposes CLI configuration for turning resources consumed by worker
20+
// pools.
21+
type Workers struct {
22+
// Read is the maximum number of concurrent read pool workers.
23+
Read int `long:"read" description:"Maximum number of concurrent read pool workers."`
24+
25+
// Write is the maximum number of concurrent write pool workers.
26+
Write int `long:"write" description:"Maximum number of concurrent write pool workers."`
27+
28+
// Sig is the maximum number of concurrent sig pool workers.
29+
Sig int `long:"sig" description:"Maximum number of concurrent sig pool workers."`
30+
}
31+
32+
// Validate checks the Workers configuration to ensure that the input values are
33+
// sane.
34+
func (w *Workers) Validate() error {
35+
if w.Read <= 0 {
36+
return fmt.Errorf("number of read workers (%d) must be "+
37+
"positive", w.Read)
38+
}
39+
if w.Write <= 0 {
40+
return fmt.Errorf("number of write workers (%d) must be "+
41+
"positive", w.Write)
42+
}
43+
if w.Sig <= 0 {
44+
return fmt.Errorf("number of sig workers (%d) must be "+
45+
"positive", w.Sig)
46+
}
47+
48+
return nil
49+
}

lncfg/workers_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package lncfg_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/lightningnetwork/lnd/lncfg"
7+
)
8+
9+
const (
10+
maxUint = ^uint(0)
11+
maxInt = int(maxUint >> 1)
12+
minInt = -maxInt - 1
13+
)
14+
15+
// TestValidateWorkers asserts that validating the Workers config only succeeds
16+
// if all fields specify a positive number of workers.
17+
func TestValidateWorkers(t *testing.T) {
18+
tests := []struct {
19+
name string
20+
cfg *lncfg.Workers
21+
valid bool
22+
}{
23+
{
24+
name: "min valid",
25+
cfg: &lncfg.Workers{
26+
Read: 1,
27+
Write: 1,
28+
Sig: 1,
29+
},
30+
valid: true,
31+
},
32+
{
33+
name: "max valid",
34+
cfg: &lncfg.Workers{
35+
Read: maxInt,
36+
Write: maxInt,
37+
Sig: maxInt,
38+
},
39+
valid: true,
40+
},
41+
{
42+
name: "read max invalid",
43+
cfg: &lncfg.Workers{
44+
Read: 0,
45+
Write: 1,
46+
Sig: 1,
47+
},
48+
},
49+
{
50+
name: "write max invalid",
51+
cfg: &lncfg.Workers{
52+
Read: 1,
53+
Write: 0,
54+
Sig: 1,
55+
},
56+
},
57+
{
58+
name: "sig max invalid",
59+
cfg: &lncfg.Workers{
60+
Read: 1,
61+
Write: 1,
62+
Sig: 0,
63+
},
64+
},
65+
{
66+
name: "read min invalid",
67+
cfg: &lncfg.Workers{
68+
Read: minInt,
69+
Write: 1,
70+
Sig: 1,
71+
},
72+
},
73+
{
74+
name: "write min invalid",
75+
cfg: &lncfg.Workers{
76+
Read: 1,
77+
Write: minInt,
78+
Sig: 1,
79+
},
80+
},
81+
{
82+
name: "sig min invalid",
83+
cfg: &lncfg.Workers{
84+
Read: 1,
85+
Write: 1,
86+
Sig: minInt,
87+
},
88+
},
89+
}
90+
91+
for _, test := range tests {
92+
t.Run(test.name, func(t *testing.T) {
93+
err := test.cfg.Validate()
94+
switch {
95+
case test.valid && err != nil:
96+
t.Fatalf("valid config was invalid: %v", err)
97+
case !test.valid && err == nil:
98+
t.Fatalf("invalid config was valid")
99+
}
100+
})
101+
}
102+
}

server.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"net"
1212
"path/filepath"
1313
"regexp"
14-
"runtime"
1514
"strconv"
1615
"sync"
1716
"sync/atomic"
@@ -273,7 +272,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
273272
)
274273

275274
writePool := pool.NewWrite(
276-
writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
275+
writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
277276
)
278277

279278
readBufferPool := pool.NewReadBuffer(
@@ -282,7 +281,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
282281
)
283282

284283
readPool := pool.NewRead(
285-
readBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
284+
readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
286285
)
287286

288287
decodeFinalCltvExpiry := func(payReq string) (uint32, error) {
@@ -296,7 +295,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
296295
s := &server{
297296
chanDB: chanDB,
298297
cc: cc,
299-
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
298+
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer),
300299
writePool: writePool,
301300
readPool: readPool,
302301

0 commit comments

Comments
 (0)