Skip to content

Commit f513fca

Browse files
committed
lbcdblocknotify: reorganize the code with a few updates
1. Fixed a bug, which reads certs even TLS is disabled 2. Persists Stratum TCP connection with auto-reconnect. (retry backoff increases from 1s to 60s maximum) 3. Stratum update jobs on previous notifications are canceled when a new notification arrives. Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this prevents the bridge from accumulating stale jobs.
1 parent 6728bf4 commit f513fca

6 files changed

Lines changed: 364 additions & 119 deletions

File tree

rpcclient/examples/lbcdblocknotify/README.md

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1-
# lbcd Websockets Example
1+
# lbcdbloknotify
22

3-
This example shows how to use the rpcclient package to connect to a btcd RPC
4-
server using TLS-secured websockets, register for block connected and block
5-
disconnected notifications, and get the current block count.
3+
This bridge program subscribes to lbcd's notifications over websockets using the rpcclient package.
4+
Users can specify supported actions upon receiving this notifications.
65

7-
## Running the Example
6+
## Building(or Running) the Program
87

9-
The first step is to clone the lbcd package:
8+
Clone the lbcd package:
109

1110
```bash
1211
$ git clone github.com/lbryio/lbcd
12+
$ cd lbcd/rpcclient/examples
13+
14+
# build the program
15+
$ go build .
16+
17+
# or directly run it (build implicitly behind the scene)
18+
$ go run .
1319
```
1420

1521
Display available options:
@@ -30,19 +36,30 @@ $ go run . -h
3036
-stratumpass string
3137
Stratum server password (default "password")
3238
-quiet
33-
Do not print logs
39+
Do not print periodic logs
3440
```
3541

36-
Start the program:
42+
Running the program:
3743

3844
```bash
39-
$ go run . -stratumpass <STRATUM PASSWD> -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD>
45+
# Send stratum mining.update_block mesage upon receving block connected notifiations.
46+
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -stratum <STRATUM SERVER> -stratumpass <STRATUM PASSWD>
4047

41-
2022/01/10 23:16:21 NotifyBlocks: Registration Complete
42-
2022/01/10 23:16:21 Block count: 1093112
48+
2022/01/10 23:16:21 Current block count: 1093112
4349
...
50+
51+
# Execute a custome command (with blockhash) upon receving block connected notifiations.
52+
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -run "echo %s"
4453
```
4554

55+
## Notes
56+
57+
* Stratum TCP connection is persisted with auto-reconnect. (retry backoff increases from 1s to 60s maximum)
58+
59+
* Stratum update_block jobs on previous notifications are canceled when a new notification arrives.
60+
Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this
61+
prevents the bridge from accumulating stale jobs.
62+
4663
## License
4764

4865
This example is licensed under the [copyfree](http://copyfree.org) ISC License.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package main
2+
3+
import (
4+
"github.com/lbryio/lbcd/wire"
5+
"github.com/lbryio/lbcutil"
6+
)
7+
8+
type eventBlockConected struct {
9+
height int32
10+
header *wire.BlockHeader
11+
txns []*lbcutil.Tx
12+
}
13+
14+
type adapter struct {
15+
*bridge
16+
}
17+
18+
func (a *adapter) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
19+
a.eventCh <- &eventBlockConected{height, header, txns}
20+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"net"
9+
"os"
10+
"os/exec"
11+
"strings"
12+
"sync"
13+
"syscall"
14+
"time"
15+
)
16+
17+
type bridge struct {
18+
ctx context.Context
19+
20+
prevJobContext context.Context
21+
prevJobCancel context.CancelFunc
22+
23+
eventCh chan interface{}
24+
errorc chan error
25+
wg sync.WaitGroup
26+
27+
stratum *stratumClient
28+
29+
customCmd string
30+
}
31+
32+
func newBridge(stratumServer, stratumPass, coinid string) *bridge {
33+
34+
s := &bridge{
35+
ctx: context.Background(),
36+
eventCh: make(chan interface{}),
37+
errorc: make(chan error),
38+
}
39+
40+
if len(stratumServer) > 0 {
41+
s.stratum = newStratumClient(stratumServer, stratumPass, coinid)
42+
}
43+
44+
return s
45+
}
46+
47+
func (b *bridge) start() {
48+
49+
if b.stratum != nil {
50+
backoff := time.Second
51+
for {
52+
err := b.stratum.dial()
53+
if err == nil {
54+
break
55+
}
56+
log.Printf("WARN: stratum.dial() error: %s, retry in %s", err, backoff)
57+
time.Sleep(backoff)
58+
if backoff < 60*time.Second {
59+
backoff += time.Second
60+
}
61+
}
62+
}
63+
64+
for e := range b.eventCh {
65+
switch e := e.(type) {
66+
case *eventBlockConected:
67+
b.handleFilteredBlockConnected(e)
68+
default:
69+
b.errorc <- fmt.Errorf("unknown event type: %T", e)
70+
return
71+
}
72+
}
73+
}
74+
75+
func (b *bridge) handleFilteredBlockConnected(e *eventBlockConected) {
76+
77+
if !*quiet {
78+
log.Printf("Block connected: %s (%d) %v", e.header.BlockHash(), e.height, e.header.Timestamp)
79+
}
80+
81+
hash := e.header.BlockHash().String()
82+
height := e.height
83+
84+
// Cancel jobs on previous block. It's safe if they are already done.
85+
if b.prevJobContext != nil {
86+
select {
87+
case <-b.prevJobContext.Done():
88+
log.Printf("prev one canceled")
89+
default:
90+
b.prevJobCancel()
91+
}
92+
}
93+
94+
// Wait until all previous jobs are done or canceled.
95+
b.wg.Wait()
96+
97+
// Create and save cancelable subcontext for new jobs.
98+
ctx, cancel := context.WithCancel(b.ctx)
99+
b.prevJobContext, b.prevJobCancel = ctx, cancel
100+
101+
if len(b.customCmd) > 0 {
102+
go b.execCustomCommand(ctx, hash, height)
103+
}
104+
105+
// Send stratum update block message
106+
if b.stratum != nil {
107+
go b.stratumUpdateBlock(ctx, hash, height)
108+
}
109+
}
110+
111+
func (s *bridge) stratumUpdateBlock(ctx context.Context, hash string, height int32) {
112+
s.wg.Add(1)
113+
defer s.wg.Done()
114+
115+
backoff := time.Second
116+
retry := func(err error) {
117+
if backoff < 60*time.Second {
118+
backoff += time.Second
119+
}
120+
log.Printf("WARN: stratum.send() on block %d error: %s", height, err)
121+
time.Sleep(backoff)
122+
s.stratum.dial()
123+
}
124+
125+
msg := stratumUpdateBlockMsg(*stratumPass, *coinid, hash)
126+
127+
for {
128+
switch err := s.stratum.send(ctx, msg); {
129+
case err == nil:
130+
return
131+
case errors.Is(err, context.Canceled):
132+
log.Printf("INFO: stratum.send() on block %d: %s.", height, err)
133+
return
134+
case errors.Is(err, syscall.EPIPE):
135+
errClose := s.stratum.conn.Close()
136+
if errClose != nil {
137+
log.Printf("WARN: stratum.conn.Close() on block %d: %s.", height, errClose)
138+
}
139+
retry(err)
140+
case errors.Is(err, net.ErrClosed):
141+
retry(err)
142+
default:
143+
retry(err)
144+
}
145+
}
146+
147+
}
148+
149+
func (s *bridge) execCustomCommand(ctx context.Context, hash string, height int32) {
150+
s.wg.Add(1)
151+
defer s.wg.Done()
152+
153+
cmd := strings.ReplaceAll(s.customCmd, "%s", hash)
154+
err := doExecCustomCommand(ctx, cmd)
155+
if err != nil {
156+
log.Printf("ERROR: execCustomCommand on block %s(%d): %s", hash, height, err)
157+
}
158+
}
159+
160+
func doExecCustomCommand(ctx context.Context, cmd string) error {
161+
strs := strings.Split(cmd, " ")
162+
path, err := exec.LookPath(strs[0])
163+
if errors.Is(err, exec.ErrDot) {
164+
err = nil
165+
}
166+
if err != nil {
167+
return err
168+
}
169+
c := exec.CommandContext(ctx, path, strs[1:]...)
170+
c.Stdout = os.Stdout
171+
return c.Run()
172+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"io/ioutil"
5+
"log"
6+
"path/filepath"
7+
8+
"github.com/lbryio/lbcd/rpcclient"
9+
)
10+
11+
func newLbcdClient(server, user, pass string, notls bool, adpt adapter) *rpcclient.Client {
12+
13+
ntfnHandlers := rpcclient.NotificationHandlers{
14+
OnFilteredBlockConnected: adpt.onFilteredBlockConnected,
15+
}
16+
17+
// Config lbcd RPC client with websockets.
18+
connCfg := &rpcclient.ConnConfig{
19+
Host: server,
20+
Endpoint: "ws",
21+
User: user,
22+
Pass: pass,
23+
DisableTLS: true,
24+
}
25+
26+
if !notls {
27+
cert, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
28+
if err != nil {
29+
log.Fatalf("can't read lbcd certificate: %s", err)
30+
}
31+
connCfg.Certificates = cert
32+
connCfg.DisableTLS = false
33+
}
34+
35+
client, err := rpcclient.New(connCfg, &ntfnHandlers)
36+
if err != nil {
37+
log.Fatalf("can't create rpc client: %s", err)
38+
}
39+
40+
// Register for block connect and disconnect notifications.
41+
if err = client.NotifyBlocks(); err != nil {
42+
log.Fatalf("can't register block notification: %s", err)
43+
}
44+
45+
// Get the current block count.
46+
blockCount, err := client.GetBlockCount()
47+
if err != nil {
48+
log.Fatalf("can't get block count: %s", err)
49+
}
50+
log.Printf("Current block count: %d", blockCount)
51+
52+
return client
53+
}

0 commit comments

Comments
 (0)