Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improved multinode proxy #249

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 160 additions & 28 deletions deps/apiinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@ package deps

import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"reflect"
"sync"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/curio/api"

"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/lib/retry"
)

var clog = logging.Logger("curio/chain")

func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(api.Chain), func() {}, nil
Expand Down Expand Up @@ -48,17 +55,15 @@ func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, json
for _, head := range httpHeads {
v1api, closer, err := newChainNodeRPCV1(ctx.Context, head.addr, head.header)
if err != nil {
log.Warnf("Not able to establish connection to node with addr: %s, Reason: %s", head.addr, err.Error())
clog.Warnf("Not able to establish connection to node with addr: %s, Reason: %s", head.addr, err.Error())
continue
}
fullNodes = append(fullNodes, v1api)
closers = append(closers, closer)
}

// When running in cluster mode and trying to establish connections to multiple nodes, fail
// if less than 2 lotus nodes are actually running
if len(httpHeads) > 1 && len(fullNodes) < 2 {
return nil, nil, xerrors.Errorf("Not able to establish connection to more than a single node")
if len(fullNodes) == 0 {
return nil, nil, xerrors.Errorf("failed to establish connection with all nodes")
}

finalCloser := func() {
Expand Down Expand Up @@ -96,54 +101,152 @@ func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Head
return &res, closer, err
}

const initialBackoff = time.Second
const maxRetryAttempts = 5
const maxBehindBestHealthy = 1

var errorsToRetry = []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}

const preferredAllBad = -1

// FullNodeProxy creates a proxy for the Chain API
// TODO: port improvements here from https://github.com/filecoin-project/lotus/pull/11470
func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
providerCount := len(ins)

var healthyLk sync.Mutex
unhealthyProviders := make([]bool, providerCount)

nextHealthyProvider := func(start int) int {
healthyLk.Lock()
defer healthyLk.Unlock()

for i := 0; i < providerCount; i++ {
idx := (start + i) % providerCount
if !unhealthyProviders[idx] {
return idx
}
}
return preferredAllBad
}

// watch provider health
startWatch := func() {
if len(ins) == 1 {
// not like we have any onter node to go to..
return
}

// don't bother for short-running commands
time.Sleep(250 * time.Millisecond)

var bestKnownTipset, nextBestKnownTipset *types.TipSet

for {
var wg sync.WaitGroup
wg.Add(providerCount)

for i := 0; i < providerCount; i++ {
go func(i int) {
defer wg.Done()

toctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // todo better timeout
ch, err := ins[i].ChainHead(toctx)
cancel()

// error is definitely not healthy
if err != nil {
healthyLk.Lock()
unhealthyProviders[i] = true
healthyLk.Unlock()

clog.Debugw("rpc check chain head call failed", "fail_type", "rpc_error", "provider", i, "error", err)
return
}

healthyLk.Lock()
// maybe set best next
if nextBestKnownTipset == nil || big.Cmp(ch.ParentWeight(), nextBestKnownTipset.ParentWeight()) > 0 || len(ch.Blocks()) > len(nextBestKnownTipset.Blocks()) {
nextBestKnownTipset = ch
}

if bestKnownTipset != nil {
// if we're behind the best tipset, mark as unhealthy
unhealthyProviders[i] = ch.Height() < bestKnownTipset.Height()-maxBehindBestHealthy
if unhealthyProviders[i] {
clog.Debugw("rpc check chain head call failed", "fail_type", "behind_best", "provider", i, "height", ch.Height(), "best_height", bestKnownTipset.Height())
}
}
healthyLk.Unlock()
}(i)
}

wg.Wait()
bestKnownTipset = nextBestKnownTipset

time.Sleep(5 * time.Second)
}
}
var starWatchOnce sync.Once

// populate output api proxy

outs := api.GetInternalStructs(outstr)

var rins []reflect.Value
var apiProviders []reflect.Value
for _, in := range ins {
rins = append(rins, reflect.ValueOf(in))
apiProviders = append(apiProviders, reflect.ValueOf(in))
}

for _, out := range outs {
rProxyInternal := reflect.ValueOf(out).Elem()
rOutStruct := reflect.ValueOf(out).Elem()

for f := 0; f < rProxyInternal.NumField(); f++ {
field := rProxyInternal.Type().Field(f)
for f := 0; f < rOutStruct.NumField(); f++ {
field := rOutStruct.Type().Field(f)

var fns []reflect.Value
for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name))
var providerFuncs []reflect.Value
for _, rin := range apiProviders {
mv := rin.MethodByName(field.Name)
if !mv.IsValid() {
continue
}
providerFuncs = append(providerFuncs, mv)
}

rProxyInternal.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
initialBackoff, err := time.ParseDuration("1s")
if err != nil {
return nil
}
rOutStruct.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
starWatchOnce.Do(func() {
go startWatch()
})

ctx := args[0].Interface().(context.Context)

curr := -1
preferredProvider := new(int)
*preferredProvider = nextHealthyProvider(0)
if *preferredProvider == preferredAllBad {
// select at random, retry will do it's best
*preferredProvider = rand.Intn(providerCount)
}

// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
key := contextKey("retry-node")
if ctx.Value(key) != nil {
if (*ctx.Value(key).(**int)) == nil {
*ctx.Value(key).(**int) = &curr
*ctx.Value(key).(**int) = preferredProvider
} else {
curr = **ctx.Value(key).(**int) - 1
preferredProvider = *ctx.Value(key).(**int)
}
}

total := len(rins)
result, _ := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() ([]reflect.Value, error) {
curr = (curr + 1) % total
result, _ := Retry(ctx, maxRetryAttempts, initialBackoff, errorsToRetry, func(isRetry bool) ([]reflect.Value, error) {
if isRetry {
pp := nextHealthyProvider(*preferredProvider + 1)
if pp == -1 {
return nil, xerrors.Errorf("no healthy providers")
}
*preferredProvider = pp
}

result := fns[curr].Call(args)
result := providerFuncs[*preferredProvider].Call(args)
if result[len(result)-1].IsNil() {
return result, nil
}
Expand All @@ -155,3 +258,32 @@ func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
}
}
}

func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duration, errorTypes []error, f func(isRetry bool) (T, error)) (result T, err error) {
for i := 0; i < attempts; i++ {
if i > 0 {
clog.Debugw("Retrying after error:", err)
time.Sleep(initialBackoff)
initialBackoff *= 2
}
result, err = f(i > 0)
if err == nil || !ErrorIsIn(err, errorTypes) {
return result, err
}
if ctx.Err() != nil {
return result, ctx.Err()
}
}
clog.Errorf("Failed after %d attempts, last error: %s", attempts, err)
return result, err
}

func ErrorIsIn(err error, errorTypes []error) bool {
for _, etype := range errorTypes {
tmp := reflect.New(reflect.PointerTo(reflect.ValueOf(etype).Elem().Type())).Interface()
if errors.As(err, &tmp) {
return true
}
}
return false
}
43 changes: 32 additions & 11 deletions web/api/webrpc/sync_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/BurntSushi/toml"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"

"github.com/filecoin-project/curio/api"
"github.com/filecoin-project/curio/build"

cliutil "github.com/filecoin-project/lotus/cli/util"
Expand Down Expand Up @@ -70,19 +73,18 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
}
}

rpcInfos := map[string]minimalApiInfo{} // config name -> api info
confNameToAddr := map[string]string{} // config name -> api address
var rpcInfos []string
confNameToAddr := make(map[string][]string) // config name -> api addresses

err := forEachConfig[minimalApiInfo](a, func(name string, info minimalApiInfo) error {
if len(info.Apis.ChainApiInfo) == 0 {
return nil
}

rpcInfos[name] = info

for _, addr := range info.Apis.ChainApiInfo {
rpcInfos = append(rpcInfos, addr)
ai := cliutil.ParseApiInfo(addr)
confNameToAddr[name] = ai.Addr
confNameToAddr[name] = append(confNameToAddr[name], ai.Addr)
}

return nil
Expand All @@ -98,7 +100,7 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {

var wg sync.WaitGroup
for _, info := range rpcInfos {
ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0])
ai := cliutil.ParseApiInfo(info)
if dedup[ai.Addr] {
continue
}
Expand All @@ -107,9 +109,11 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
go func() {
defer wg.Done()
var clayers []string
for layer, a := range confNameToAddr {
if a == ai.Addr {
clayers = append(clayers, layer)
for layer, adrs := range confNameToAddr {
for _, adr := range adrs {
if adr == ai.Addr {
clayers = append(clayers, layer)
}
}
}

Expand All @@ -123,13 +127,30 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
defer infosLk.Unlock()
infos[ai.Addr] = myinfo
}()
ver, err := a.deps.Chain.Version(ctx)

addr, err := ai.DialArgs("v1")
if err != nil {
log.Warnf("could not get DialArgs: %w", err)
}

var res api.ChainStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), ai.AuthHeader(), []jsonrpc.Option{jsonrpc.WithErrors(jsonrpc.NewErrors())}...)
if err != nil {
log.Warnf("error creating jsonrpc client: %v", err)
return
}
defer closer()

full := &res

ver, err := full.Version(ctx)
if err != nil {
log.Warnw("Version", "error", err)
return
}

head, err := a.deps.Chain.ChainHead(ctx)
head, err := full.ChainHead(ctx)
if err != nil {
log.Warnw("ChainHead", "error", err)
return
Expand Down