Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
481ef0d
db, rpc, node: protect execution TIP under RPC load
lupin012 Mar 15, 2026
288c341
db/kv/remotedb: revert TryAcquire on client-side limiter
lupin012 Mar 15, 2026
090267c
node/rpcstack: skip TxPriorityRPC tagging on engine API port
lupin012 Mar 15, 2026
0a03c25
use branch for rpc-test:perf
lupin012 Mar 15, 2026
88212b6
update rpc versin on TIP trucking worflow
lupin012 Mar 17, 2026
12bf73e
db/kv/remotedb: apply TryAcquire for RPC on client-side limiter
lupin012 Mar 17, 2026
e90eb8e
debug counter
lupin012 Mar 18, 2026
61ea4e0
db/kv/mdbx: debug goroutine for chaindata only — remove before merge
lupin012 Mar 18, 2026
7bcb52b
node, db/kv, rpc: clean up debug counters, restore 503 on admission r…
lupin012 Mar 18, 2026
83c6b2c
cmd, node: remove --rpc.max.concurrent flag, use DBReadConcurrency di…
lupin012 Mar 18, 2026
f82b9d8
db/kv, node, execution: remove TxPriority system and executionLimiter
lupin012 Mar 18, 2026
13bbfe5
db/kv, node: remove debug goroutine and HTTP counters
lupin012 Mar 18, 2026
82ffb5d
cmd, node: re-add --rpc.max.concurrentRequests for HTTP admission con…
lupin012 Mar 18, 2026
82bc8ab
cmd: rename --rpc.max.concurrentRequests to --rpc.max.concurrent-requ…
lupin012 Mar 18, 2026
ea9c29a
db/kv: restore kv_interface.go to main (no functional changes)
lupin012 Mar 18, 2026
d0b5224
db/kv/mdbx: restore kv_mdbx.go to main (no functional changes)
lupin012 Mar 18, 2026
bfe80e6
rpc: restore http.go to main (no functional changes)
lupin012 Mar 18, 2026
c760754
Merge branch 'main' into lupin012/protect-tip-under-load
lupin012 Mar 18, 2026
fe5bea3
db/kv, node, cmd: protect tip under RPC load with TryAcquire at BeginRo
lupin012 Mar 19, 2026
7b047ab
update rpc version
lupin012 Mar 20, 2026
179b611
Merge branch 'main' into lupin012/protect-tip-under-load
lupin012 Mar 20, 2026
3e7e070
db/kv, node: address review comments on RPC admission control
lupin012 Mar 24, 2026
5ae86a7
node/rpcstack_test: populate RpcConcurrencyLimit in createAndStartServer
lupin012 Mar 24, 2026
1c2134d
Merge branch 'main' into lupin012/protect-tip-under-load
lupin012 Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/qa-rpc-performance-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
if: matrix.client == 'erigon' || needs.setup.outputs.run_geth == 'true'
run: |
rm -rf ${{runner.workspace}}/rpc-tests
git -c advice.detachedHead=false clone --depth 1 --branch v1.115.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
git -c advice.detachedHead=false clone --depth 1 --branch v1.124.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
cd ${{runner.workspace}}/rpc-tests

- name: Clean Erigon Build Directory
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/qa-tip-tracking-with-load.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
if: matrix.client == 'erigon' || (matrix.client == 'geth' && github.event.inputs.run_geth == 'true')
run: |
rm -rf ${{runner.workspace}}/rpc-tests
git -c advice.detachedHead=false clone --depth 1 --branch v1.78.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
git -c advice.detachedHead=false clone --depth 1 --branch v1.124.0 https://github.com/erigontech/rpc-tests ${{runner.workspace}}/rpc-tests
cd ${{runner.workspace}}/rpc-tests

- name: Clean Erigon Build Directory
Expand Down
17 changes: 15 additions & 2 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.DebugSingleRequest, utils.HTTPDebugSingleFlag.Name, false, utils.HTTPDebugSingleFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, utils.DBReadConcurrencyFlag.Name, utils.DBReadConcurrencyFlag.Value, utils.DBReadConcurrencyFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.RpcMaxConcurrentRequests, utils.RpcMaxConcurrentRequestsFlag.Name, utils.RpcMaxConcurrentRequestsFlag.Value, utils.RpcMaxConcurrentRequestsFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
rootCmd.PersistentFlags().BoolVar(&cfg.GethCompatibility, "rpc.gethcompat", false, "Enables Geth-compatible storage iteration order for debug_storageRangeAt (sorted by keccak256 hash). Disabled by default for performance.")
rootCmd.PersistentFlags().BoolVar(&cfg.TestingEnabled, "rpc.testing", false, "Enables the testing_ RPC namespace (testing_buildBlockV1). WARNING: do not enable on production networks.")
Expand Down Expand Up @@ -747,7 +748,17 @@ func startRegularRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []r
logger.Info("Socket Endpoint opened", "url", socketUrl)
}

httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression)
// RPC admission limit: -1 = unlimited, 0 = use db.read.concurrency, >0 = explicit limit.
var rpcConcurrencyLimit int64
switch {
case cfg.RpcMaxConcurrentRequests == -1:
rpcConcurrencyLimit = 0 // disabled
case cfg.RpcMaxConcurrentRequests > 0:
rpcConcurrencyLimit = int64(cfg.RpcMaxConcurrentRequests)
default:
rpcConcurrencyLimit = int64(cfg.DBReadConcurrency)
}
httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression, rpcConcurrencyLimit, true)
var wsHandler http.Handler
if cfg.WebsocketEnabled {
wsHandler = srv.WebsocketHandler([]string{"*"}, nil, cfg.WebsocketCompression, logger)
Expand Down Expand Up @@ -958,7 +969,9 @@ func createEngineListener(cfg *httpcfg.HttpCfg, engineApi []rpc.API, logger log.

wsHandler := engineSrv.WebsocketHandler([]string{"*"}, jwtSecret, cfg.WebsocketCompression, logger)

engineHttpHandler := node.NewHTTPHandlerStack(engineSrv, nil /* authCors */, cfg.AuthRpcVirtualHost, cfg.HttpCompression)
// Engine API (auth) is the CL↔EL protocol — not user RPC. Do not tag with TxPriorityRPC
// so execution-engine DB operations use blocking Acquire instead of fail-fast TryAcquire.
engineHttpHandler := node.NewHTTPHandlerStack(engineSrv, nil /* authCors */, cfg.AuthRpcVirtualHost, cfg.HttpCompression, 0, false)

graphQLHandler := graphql.CreateHandler(engineApi)

Expand Down
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type HttpCfg struct {
RpcStreamingDisable bool
RpcFiltersConfig rpchelper.FiltersConfig
DBReadConcurrency int
RpcMaxConcurrentRequests int // HTTP admission control limit; -1 = unlimited
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
GethCompatibility bool // Geth-compatible storage iteration order for debug_storageRangeAt
TxPoolApiAddr string
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ var (
Usage: "Does limit amount of parallel db reads. Default: equal to GOMAXPROCS (or number of CPU)",
Value: min(max(10, runtime.GOMAXPROCS(-1)*64), 9_000),
}
RpcMaxConcurrentRequestsFlag = cli.IntFlag{
Name: "rpc.max.concurrency",
Usage: "Maximum number of concurrent HTTP RPC requests (HTTP admission control). 0 = use db.read.concurrency, -1 = unlimited (no admission control)",
Value: 0,
}
RpcAccessListFlag = cli.StringFlag{
Name: "rpc.accessList",
Usage: "Specify granular (method-by-method) API allowlist",
Expand Down
17 changes: 17 additions & 0 deletions db/kv/kv_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,23 @@ var (
//DbGcSelfPnlMergeCalls = metrics.NewCounter(`db_gc_pnl{phase="slef_merge_calls"}`) //nolint
)

// ErrServerOverloaded is returned by BeginRo when the DB semaphore is full and the caller is an RPC handler.
var ErrServerOverloaded = errors.New("server overloaded, retry later")

type nonBlockingAcquireKey struct{}

// WithNonBlockingAcquire tags ctx to request fail-fast semaphore acquisition in BeginRo.
// When set, BeginRo uses TryAcquire and returns ErrServerOverloaded immediately if the
// read-tx semaphore is full, instead of blocking until a slot is available.
func WithNonBlockingAcquire(ctx context.Context) context.Context {
return context.WithValue(ctx, nonBlockingAcquireKey{}, struct{}{})
}

// IsNonBlockingAcquire reports whether ctx was tagged by WithNonBlockingAcquire.
func IsNonBlockingAcquire(ctx context.Context) bool {
return ctx.Value(nonBlockingAcquireKey{}) != nil
}

type Closer interface {
Close()
}
Expand Down
12 changes: 10 additions & 2 deletions db/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ import (
"github.com/erigontech/erigon/db/kv/dbcfg"
"github.com/erigontech/erigon/db/kv/order"
"github.com/erigontech/erigon/db/kv/stream"
"github.com/erigontech/erigon/diagnostics/metrics"
)

var dbRoTxOverloaded = metrics.GetOrCreateCounter(`db_rotx_overloaded_total`)

func init() {
mdbx.MapFullErrorMessage += " You can try remove the database files (e.g., by running rm -rf /path/to/db)"
}
Expand Down Expand Up @@ -582,8 +585,13 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
return nil, errors.New("db closed")
}

// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
if kv.IsNonBlockingAcquire(ctx) {
if !db.roTxsLimiter.TryAcquire(1) {
db.trackTxEnd()
dbRoTxOverloaded.Inc()
return nil, kv.ErrServerOverloaded
}
} else if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
db.trackTxEnd()
return nil, fmt.Errorf("mdbx.MdbxKV.BeginRo: roTxsLimiter error %w", semErr)
}
Expand Down
4 changes: 2 additions & 2 deletions db/kv/remotedbserver/remotedbserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *KvServer) begin(ctx context.Context) (id uint64, err error) {
}
s.txsMapLock.Lock()
defer s.txsMapLock.Unlock()
tx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic
tx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic // tx is stored in s.txs and rolled back by rollback(); defer would close it prematurely
if errBegin != nil {
return 0, errBegin
}
Expand All @@ -157,7 +157,7 @@ func (s *KvServer) renew(ctx context.Context, id uint64) (err error) {
defer tx.Unlock()
tx.Rollback()
}
newTx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic
newTx, errBegin := s.kv.BeginTemporalRo(ctx) //nolint:gocritic // tx is stored in s.txs and rolled back by rollback(); defer would close it prematurely
if errBegin != nil {
return fmt.Errorf("kvserver: %w", errBegin)
}
Expand Down
1 change: 1 addition & 0 deletions node/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var DefaultFlags = []cli.Flag{
&utils.RpcBatchConcurrencyFlag,
&utils.RpcStreamingDisableFlag,
&utils.DBReadConcurrencyFlag,
&utils.RpcMaxConcurrentRequestsFlag,
&utils.RpcAccessListFlag,
&utils.RpcTraceCompatFlag,
&utils.RpcGethCompatFlag,
Expand Down
1 change: 1 addition & 0 deletions node/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcMaxConcurrentRequests: ctx.Int(utils.RpcMaxConcurrentRequestsFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
RpcFiltersConfig: rpchelper.FiltersConfig{
RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name),
Expand Down
62 changes: 58 additions & 4 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/rs/cors"

"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/diagnostics/metrics"
"github.com/erigontech/erigon/rpc"
"github.com/erigontech/erigon/rpc/rpccfg"
)
Expand All @@ -46,6 +48,10 @@ type httpConfig struct {
Vhosts []string
Compression bool
prefix string // path prefix on which to mount http handler
// RpcConcurrencyLimit is the maximum number of concurrent HTTP RPC requests.
// Requests beyond this limit receive an immediate 503 before touching any middleware.
// 0 means unlimited (admission control disabled).
RpcConcurrencyLimit int64
}

// wsConfig is the JSON-RPC/Websocket configuration
Expand Down Expand Up @@ -193,6 +199,11 @@ func (h *httpServer) start() error {

func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// check if ws request and serve if ws enabled
// Note: WebSocket connections bypass rpcAdmissionHandler intentionally.
// HTTP admission control limits inflight requests per connection, but WebSocket
// is a persistent long-lived connection where the relevant limit is the number
// of concurrent connections, not inflight requests. A dedicated WebSocket
// connection limiter will be addressed in a separate PR.
ws := h.wsHandler.Load().(*rpcHandler)
if ws != nil && isWebsocket(r) {
if checkPath(r, h.wsConfig.prefix) {
Expand Down Expand Up @@ -280,7 +291,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc.
}
h.httpConfig = config
h.httpHandler.Store(&rpcHandler{
Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts, config.Compression),
Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts, config.Compression, config.RpcConcurrencyLimit, true),
server: srv,
})
return nil
Expand Down Expand Up @@ -345,17 +356,60 @@ func isWebsocket(r *http.Request) bool {
strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade")
}

// NewHTTPHandlerStack returns wrapped http-related handlers
func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string, compression bool) http.Handler {
// Wrap the CORS-handler within a host-handler
// NewHTTPHandlerStack returns wrapped http-related handlers.
// When tagAsRPC is true and rpcConcurrencyLimit > 0, enforces admission control
// (503 if inflight > limit) to prevent goroutine pile-up under load.
func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string, compression bool, rpcConcurrencyLimit int64, tagAsRPC bool) http.Handler {
handler := newCorsHandler(srv, cors)
handler = newVHostHandler(vhosts, handler)
if compression {
handler = newGzipHandler(handler)
}
if tagAsRPC {
handler = newRPCAdmissionHandler(rpcConcurrencyLimit, handler)
}
return handler
}

// rpcAdmissionHandler limits the number of concurrent HTTP RPC requests.
// Requests that exceed the limit receive an immediate HTTP 503 without going
// through CORS, gzip, or JSON decoding.
type rpcAdmissionHandler struct {
inflight atomic.Int64
limit int64
next http.Handler
}

var rpcAdmissionRejected = metrics.GetOrCreateCounter(`rpc_admission_rejected_total`)

func newRPCAdmissionHandler(limit int64, next http.Handler) http.Handler {
return &rpcAdmissionHandler{limit: limit, next: next}
}

func (h *rpcAdmissionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.limit > 0 {
if h.inflight.Add(1) > h.limit {
h.inflight.Add(-1)
rpcAdmissionRejected.Inc()
w.Header().Set("Retry-After", "1")
// TODO: the 503 response here is plain text, not a JSON-RPC envelope.
// Similarly, when BeginRo returns ErrServerOverloaded (inner gate), the
// response is HTTP 200 with JSON-RPC code -32000 instead of -32005.
// Both paths should return HTTP 503 + JSON-RPC {"error":{"code":-32005,...}}.
// This requires buffering the response in rpc/http.go and will be
// addressed in a separate PR.
http.Error(w, "server overloaded, retry later", http.StatusServiceUnavailable)
return
}
defer h.inflight.Add(-1)
}
ctx := r.Context()
if h.limit > 0 {
ctx = kv.WithNonBlockingAcquire(ctx)
}
h.next.ServeHTTP(w, r.WithContext(ctx))
}

func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler {
// disable CORS support if user has not specified a custom CORS configuration
if len(allowedOrigins) == 0 {
Expand Down
62 changes: 62 additions & 0 deletions node/rpcstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"strings"
"sync"
"testing"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -375,3 +377,63 @@ func TestHTTP2H2C(t *testing.T) {
require.NoError(t, err)
assert.Contains(t, string(result), "jsonrpc", "expected JSON-RPC response")
}

// TestRPCAdmissionHandler verifies that rpcAdmissionHandler correctly limits
// concurrent requests and returns HTTP 503 when the limit is exceeded.
func TestRPCAdmissionHandler(t *testing.T) {
okHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

t.Run("disabled when limit is zero", func(t *testing.T) {
h := newRPCAdmissionHandler(0, okHandler)
rec := httptest.NewRecorder()
h.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/", nil))
assert.Equal(t, http.StatusOK, rec.Code)
})

t.Run("allows requests under the limit", func(t *testing.T) {
h := newRPCAdmissionHandler(5, okHandler)
rec := httptest.NewRecorder()
h.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/", nil))
assert.Equal(t, http.StatusOK, rec.Code)
})

t.Run("returns 503 when limit is exceeded", func(t *testing.T) {
// Use a gate channel to hold inflight requests open long enough to
// trigger the limit.
gate := make(chan struct{})
blockingHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-gate
w.WriteHeader(http.StatusOK)
})

const limit = 2
h := newRPCAdmissionHandler(limit, blockingHandler)

var wg sync.WaitGroup
for i := 0; i < limit; i++ {
wg.Add(1)
go func() {
defer wg.Done()
h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest(http.MethodPost, "/", nil))
}()
}

// Give goroutines time to enter the handler and increment the counter.
// We busy-wait on the inflight counter rather than sleeping.
admission := h.(*rpcAdmissionHandler)
for admission.inflight.Load() < limit {
// spin
}

// Now the limit is reached — next request must be rejected.
rec := httptest.NewRecorder()
h.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/", nil))
assert.Equal(t, http.StatusServiceUnavailable, rec.Code)

// Release the held requests.
close(gate)
wg.Wait()
})
}
Loading