Skip to content

Commit bebac8e

Browse files
lupin012claude
andauthored
[3.4] cherry-pick protect execution TIP under RPC load (#19905) (#20297)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b174bea commit bebac8e

12 files changed

Lines changed: 179 additions & 12 deletions

File tree

.github/workflows/qa-rpc-performance-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ jobs:
121121
if: matrix.client == 'erigon' || needs.setup.outputs.run_geth == 'true'
122122
run: |
123123
rm -rf ${{runner.workspace}}/rpc-tests
124-
git -c advice.detachedHead=false clone --depth 1 --branch v1.115.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
124+
git -c advice.detachedHead=false clone --depth 1 --branch v1.124.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
125125
cd ${{runner.workspace}}/rpc-tests
126126
127127
- name: Clean Erigon Build Directory

.github/workflows/qa-tip-tracking-with-load.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ jobs:
8282
if: matrix.client == 'erigon' || (matrix.client == 'geth' && github.event.inputs.run_geth == 'true')
8383
run: |
8484
rm -rf ${{runner.workspace}}/rpc-tests
85-
git -c advice.detachedHead=false clone --depth 1 --branch v1.78.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
85+
git -c advice.detachedHead=false clone --depth 1 --branch v1.124.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
8686
cd ${{runner.workspace}}/rpc-tests
8787
8888
- name: Clean Erigon Build Directory

cmd/rpcdaemon/cli/config.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
129129
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
130130
rootCmd.PersistentFlags().BoolVar(&cfg.DebugSingleRequest, utils.HTTPDebugSingleFlag.Name, false, utils.HTTPDebugSingleFlag.Usage)
131131
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, utils.DBReadConcurrencyFlag.Name, utils.DBReadConcurrencyFlag.Value, utils.DBReadConcurrencyFlag.Usage)
132+
rootCmd.PersistentFlags().IntVar(&cfg.RpcMaxConcurrentRequests, utils.RpcMaxConcurrentRequestsFlag.Name, utils.RpcMaxConcurrentRequestsFlag.Value, utils.RpcMaxConcurrentRequestsFlag.Usage)
132133
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
133134
rootCmd.PersistentFlags().BoolVar(&cfg.GethCompatibility, "rpc.gethcompat", false, "Enables Geth-compatible storage iteration order for debug_storageRangeAt (sorted by keccak256 hash). Disabled by default for performance.")
134135
rootCmd.PersistentFlags().BoolVar(&cfg.TestingEnabled, "rpc.testing", false, "Enables the testing_ RPC namespace (testing_buildBlockV1). WARNING: do not enable on production networks.")
@@ -744,7 +745,17 @@ func startRegularRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []r
744745
logger.Info("Socket Endpoint opened", "url", socketUrl)
745746
}
746747

747-
httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression)
748+
// RPC admission limit: -1 = unlimited, 0 = use db.read.concurrency, >0 = explicit limit.
749+
var rpcConcurrencyLimit int64
750+
switch {
751+
case cfg.RpcMaxConcurrentRequests == -1:
752+
rpcConcurrencyLimit = 0 // disabled
753+
case cfg.RpcMaxConcurrentRequests > 0:
754+
rpcConcurrencyLimit = int64(cfg.RpcMaxConcurrentRequests)
755+
default:
756+
rpcConcurrencyLimit = int64(cfg.DBReadConcurrency)
757+
}
758+
httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression, rpcConcurrencyLimit, true)
748759
var wsHandler http.Handler
749760
if cfg.WebsocketEnabled {
750761
wsHandler = srv.WebsocketHandler([]string{"*"}, nil, cfg.WebsocketCompression, logger)
@@ -955,7 +966,9 @@ func createEngineListener(cfg *httpcfg.HttpCfg, engineApi []rpc.API, logger log.
955966

956967
wsHandler := engineSrv.WebsocketHandler([]string{"*"}, jwtSecret, cfg.WebsocketCompression, logger)
957968

958-
engineHttpHandler := node.NewHTTPHandlerStack(engineSrv, nil /* authCors */, cfg.AuthRpcVirtualHost, cfg.HttpCompression)
969+
// Engine API (auth) is the CL↔EL protocol — not user RPC. Do not tag with TxPriorityRPC
970+
// so execution-engine DB operations use blocking Acquire instead of fail-fast TryAcquire.
971+
engineHttpHandler := node.NewHTTPHandlerStack(engineSrv, nil /* authCors */, cfg.AuthRpcVirtualHost, cfg.HttpCompression, 0, false)
959972

960973
graphQLHandler := graphql.CreateHandler(engineApi)
961974

cmd/rpcdaemon/cli/httpcfg/http_cfg.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type HttpCfg struct {
7171
RpcStreamingDisable bool
7272
RpcFiltersConfig rpchelper.FiltersConfig
7373
DBReadConcurrency int
74+
RpcMaxConcurrentRequests int // HTTP admission control limit; -1 = unlimited
7475
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
7576
GethCompatibility bool // Geth-compatible storage iteration order for debug_storageRangeAt
7677
TxPoolApiAddr string

cmd/utils/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,11 @@ var (
389389
Usage: "Does limit amount of parallel db reads. Default: equal to GOMAXPROCS (or number of CPU)",
390390
Value: min(max(10, runtime.GOMAXPROCS(-1)*64), 9_000),
391391
}
392+
RpcMaxConcurrentRequestsFlag = cli.IntFlag{
393+
Name: "rpc.max.concurrency",
394+
Usage: "Maximum number of concurrent HTTP RPC requests (HTTP admission control). 0 = use db.read.concurrency, -1 = unlimited (no admission control)",
395+
Value: 0,
396+
}
392397
RpcAccessListFlag = cli.StringFlag{
393398
Name: "rpc.accessList",
394399
Usage: "Specify granular (method-by-method) API allowlist",

db/kv/kv_interface.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,23 @@ var (
696696
//DbGcSelfPnlMergeCalls = metrics.NewCounter(`db_gc_pnl{phase="slef_merge_calls"}`) //nolint
697697
)
698698

699+
// ErrServerOverloaded is returned by BeginRo when the DB semaphore is full and the caller is an RPC handler.
700+
var ErrServerOverloaded = errors.New("server overloaded, retry later")
701+
702+
type nonBlockingAcquireKey struct{}
703+
704+
// WithNonBlockingAcquire tags ctx to request fail-fast semaphore acquisition in BeginRo.
705+
// When set, BeginRo uses TryAcquire and returns ErrServerOverloaded immediately if the
706+
// read-tx semaphore is full, instead of blocking until a slot is available.
707+
func WithNonBlockingAcquire(ctx context.Context) context.Context {
708+
return context.WithValue(ctx, nonBlockingAcquireKey{}, struct{}{})
709+
}
710+
711+
// IsNonBlockingAcquire reports whether ctx was tagged by WithNonBlockingAcquire.
712+
func IsNonBlockingAcquire(ctx context.Context) bool {
713+
return ctx.Value(nonBlockingAcquireKey{}) != nil
714+
}
715+
699716
type Closer interface {
700717
Close()
701718
}

db/kv/mdbx/kv_mdbx.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ import (
4646
"github.com/erigontech/erigon/db/kv/dbcfg"
4747
"github.com/erigontech/erigon/db/kv/order"
4848
"github.com/erigontech/erigon/db/kv/stream"
49+
"github.com/erigontech/erigon/diagnostics/metrics"
4950
)
5051

52+
var dbRoTxOverloaded = metrics.GetOrCreateCounter(`db_rotx_overloaded_total`)
53+
5154
func init() {
5255
mdbx.MapFullErrorMessage += " You can try remove the database files (e.g., by running rm -rf /path/to/db)"
5356
}
@@ -582,8 +585,13 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
582585
return nil, errors.New("db closed")
583586
}
584587

585-
// will return nil err if context is cancelled (may appear to acquire the semaphore)
586-
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
588+
if kv.IsNonBlockingAcquire(ctx) {
589+
if !db.roTxsLimiter.TryAcquire(1) {
590+
db.trackTxEnd()
591+
dbRoTxOverloaded.Inc()
592+
return nil, kv.ErrServerOverloaded
593+
}
594+
} else if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
587595
db.trackTxEnd()
588596
return nil, fmt.Errorf("mdbx.MdbxKV.BeginRo: roTxsLimiter error %w", semErr)
589597
}

db/kv/remotedbserver/remotedbserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (s *KvServer) begin(ctx context.Context) (id uint64, err error) {
135135
}
136136
s.txsMapLock.Lock()
137137
defer s.txsMapLock.Unlock()
138-
tx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic
138+
tx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic // tx is stored in s.txs and rolled back by rollback(); defer would close it prematurely
139139
if errBegin != nil {
140140
return 0, errBegin
141141
}
@@ -157,7 +157,7 @@ func (s *KvServer) renew(ctx context.Context, id uint64) (err error) {
157157
defer tx.Unlock()
158158
tx.Rollback()
159159
}
160-
newTx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic
160+
newTx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic // tx is stored in s.txs and rolled back by rollback(); defer would close it prematurely
161161
if errBegin != nil {
162162
return fmt.Errorf("kvserver: %w", errBegin)
163163
}

node/cli/default_flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ var DefaultFlags = []cli.Flag{
8383
&utils.RpcBatchConcurrencyFlag,
8484
&utils.RpcStreamingDisableFlag,
8585
&utils.DBReadConcurrencyFlag,
86+
&utils.RpcMaxConcurrentRequestsFlag,
8687
&utils.RpcAccessListFlag,
8788
&utils.RpcTraceCompatFlag,
8889
&utils.RpcGethCompatFlag,

node/cli/flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
447447
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
448448
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
449449
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
450+
RpcMaxConcurrentRequests: ctx.Int(utils.RpcMaxConcurrentRequestsFlag.Name),
450451
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
451452
RpcFiltersConfig: rpchelper.FiltersConfig{
452453
RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name),

0 commit comments

Comments
 (0)