Skip to content

Commit c9d9747

Browse files
committed
Improved multinode proxy
1 parent 6390f94 commit c9d9747

File tree

1 file changed

+154
-23
lines changed

1 file changed

+154
-23
lines changed

deps/apiinfo.go

Lines changed: 154 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,24 @@ package deps
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"math/rand"
68
"net/http"
79
"reflect"
10+
"sync"
811
"time"
912

1013
"github.com/urfave/cli/v2"
1114
"golang.org/x/xerrors"
1215

1316
"github.com/filecoin-project/go-jsonrpc"
17+
"github.com/filecoin-project/go-state-types/big"
1418

1519
"github.com/filecoin-project/curio/api"
1620

21+
"github.com/filecoin-project/lotus/chain/types"
1722
cliutil "github.com/filecoin-project/lotus/cli/util"
18-
"github.com/filecoin-project/lotus/lib/retry"
1923
)
2024

2125
func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, jsonrpc.ClientCloser, error) {
@@ -96,54 +100,152 @@ func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Head
96100
return &res, closer, err
97101
}
98102

103+
const initialBackoff = time.Second
104+
const maxRetryAttempts = 5
105+
const maxBehinhBestHealthy = 1
106+
107+
var errorsToRetry = []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
108+
109+
const preferredAllBad = -1
110+
99111
// FullNodeProxy creates a proxy for the Chain API
100-
// TODO: port improvements here from https://github.com/filecoin-project/lotus/pull/11470
101112
func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
113+
providerCount := len(ins)
114+
115+
var healthyLk sync.Mutex
116+
unhealthyProviders := make([]bool, providerCount)
117+
118+
nextHealthyProvider := func(start int) int {
119+
healthyLk.Lock()
120+
defer healthyLk.Unlock()
121+
122+
for i := 0; i < providerCount; i++ {
123+
idx := (start + i) % providerCount
124+
if !unhealthyProviders[idx] {
125+
return idx
126+
}
127+
}
128+
return preferredAllBad
129+
}
130+
131+
// watch provider health
132+
startWatch := func() {
133+
if len(ins) == 1 {
134+
// not like we have any onter node to go to..
135+
return
136+
}
137+
138+
// don't bother for short-running commands
139+
time.Sleep(250 * time.Millisecond)
140+
141+
var bestKnownTipset, nextBestKnownTipset *types.TipSet
142+
143+
for {
144+
var wg sync.WaitGroup
145+
wg.Add(providerCount)
146+
147+
for i := 0; i < providerCount; i++ {
148+
go func(i int) {
149+
defer wg.Done()
150+
151+
toctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // todo better timeout
152+
ch, err := ins[i].ChainHead(toctx)
153+
cancel()
154+
155+
// error is definitely not healthy
156+
if err != nil {
157+
healthyLk.Lock()
158+
unhealthyProviders[i] = true
159+
healthyLk.Unlock()
160+
161+
log.Errorw("rpc check chain head call failed", "fail_type", "rpc_error", "provider", i, "error", err)
162+
return
163+
}
164+
165+
healthyLk.Lock()
166+
// maybe set best next
167+
if nextBestKnownTipset == nil || big.Cmp(ch.ParentWeight(), nextBestKnownTipset.ParentWeight()) > 0 || len(ch.Blocks()) > len(nextBestKnownTipset.Blocks()) {
168+
nextBestKnownTipset = ch
169+
}
170+
171+
if bestKnownTipset != nil {
172+
// if we're behind the best tipset, mark as unhealthy
173+
unhealthyProviders[i] = ch.Height() < bestKnownTipset.Height()-maxBehinhBestHealthy
174+
if unhealthyProviders[i] {
175+
log.Errorw("rpc check chain head call failed", "fail_type", "behind_best", "provider", i, "height", ch.Height(), "best_height", bestKnownTipset.Height())
176+
}
177+
}
178+
healthyLk.Unlock()
179+
}(i)
180+
}
181+
182+
wg.Wait()
183+
bestKnownTipset = nextBestKnownTipset
184+
185+
time.Sleep(5 * time.Second)
186+
}
187+
}
188+
var starWatchOnce sync.Once
189+
190+
// populate output api proxy
191+
102192
outs := api.GetInternalStructs(outstr)
103193

104-
var rins []reflect.Value
194+
var apiProviders []reflect.Value
105195
for _, in := range ins {
106-
rins = append(rins, reflect.ValueOf(in))
196+
apiProviders = append(apiProviders, reflect.ValueOf(in))
107197
}
108198

109199
for _, out := range outs {
110-
rProxyInternal := reflect.ValueOf(out).Elem()
200+
rOutStruct := reflect.ValueOf(out).Elem()
111201

112-
for f := 0; f < rProxyInternal.NumField(); f++ {
113-
field := rProxyInternal.Type().Field(f)
202+
for f := 0; f < rOutStruct.NumField(); f++ {
203+
field := rOutStruct.Type().Field(f)
114204

115-
var fns []reflect.Value
116-
for _, rin := range rins {
117-
fns = append(fns, rin.MethodByName(field.Name))
205+
var providerFuncs []reflect.Value
206+
for _, rin := range apiProviders {
207+
mv := rin.MethodByName(field.Name)
208+
if !mv.IsValid() {
209+
continue
210+
}
211+
providerFuncs = append(providerFuncs, mv)
118212
}
119213

120-
rProxyInternal.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
121-
errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
122-
initialBackoff, err := time.ParseDuration("1s")
123-
if err != nil {
124-
return nil
125-
}
214+
rOutStruct.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
215+
starWatchOnce.Do(func() {
216+
go startWatch()
217+
})
126218

127219
ctx := args[0].Interface().(context.Context)
128220

129-
curr := -1
221+
preferredProvider := new(int)
222+
*preferredProvider = nextHealthyProvider(0)
223+
if *preferredProvider == preferredAllBad {
224+
// select at random, retry will do it's best
225+
*preferredProvider = rand.Intn(providerCount)
226+
}
130227

131228
// for calls that need to be performed on the same node
132229
// primarily for miner when calling create block and submit block subsequently
133230
key := contextKey("retry-node")
134231
if ctx.Value(key) != nil {
135232
if (*ctx.Value(key).(**int)) == nil {
136-
*ctx.Value(key).(**int) = &curr
233+
*ctx.Value(key).(**int) = preferredProvider
137234
} else {
138-
curr = **ctx.Value(key).(**int) - 1
235+
preferredProvider = *ctx.Value(key).(**int)
139236
}
140237
}
141238

142-
total := len(rins)
143-
result, _ := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() ([]reflect.Value, error) {
144-
curr = (curr + 1) % total
239+
result, _ := Retry(ctx, maxRetryAttempts, initialBackoff, errorsToRetry, func(isRetry bool) ([]reflect.Value, error) {
240+
if isRetry {
241+
pp := nextHealthyProvider(*preferredProvider + 1)
242+
if pp == -1 {
243+
return nil, xerrors.Errorf("no healthy providers")
244+
}
245+
*preferredProvider = pp
246+
}
145247

146-
result := fns[curr].Call(args)
248+
result := providerFuncs[*preferredProvider].Call(args)
147249
if result[len(result)-1].IsNil() {
148250
return result, nil
149251
}
@@ -155,3 +257,32 @@ func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
155257
}
156258
}
157259
}
260+
261+
func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duration, errorTypes []error, f func(isRetry bool) (T, error)) (result T, err error) {
262+
for i := 0; i < attempts; i++ {
263+
if i > 0 {
264+
log.Info("Retrying after error:", err)
265+
time.Sleep(initialBackoff)
266+
initialBackoff *= 2
267+
}
268+
result, err = f(i > 0)
269+
if err == nil || !ErrorIsIn(err, errorTypes) {
270+
return result, err
271+
}
272+
if ctx.Err() != nil {
273+
return result, ctx.Err()
274+
}
275+
}
276+
log.Errorf("Failed after %d attempts, last error: %s", attempts, err)
277+
return result, err
278+
}
279+
280+
func ErrorIsIn(err error, errorTypes []error) bool {
281+
for _, etype := range errorTypes {
282+
tmp := reflect.New(reflect.PointerTo(reflect.ValueOf(etype).Elem().Type())).Interface()
283+
if errors.As(err, &tmp) {
284+
return true
285+
}
286+
}
287+
return false
288+
}

0 commit comments

Comments
 (0)