Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.")
rootCmd.PersistentFlags().DurationVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersTimeout, "rpc.subscription.filters.timeout", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersTimeout, "Timeout before idle filters are evicted. Defaults to 0 to disable eviction.")
rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.AllowUnprotectedTxs, utils.AllowUnprotectedTxs.Name, utils.AllowUnprotectedTxs.Value, utils.AllowUnprotectedTxs.Usage)
Expand Down
2 changes: 1 addition & 1 deletion execution/tests/execution-spec-tests
1 change: 1 addition & 0 deletions node/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ var DefaultFlags = []cli.Flag{
&RpcSubscriptionFiltersMaxTxsFlag,
&RpcSubscriptionFiltersMaxAddressesFlag,
&RpcSubscriptionFiltersMaxTopicsFlag,
&RpcSubscriptionFiltersTimeoutFlag,

&utils.SnapKeepBlocksFlag,
&utils.SnapStopFlag,
Expand Down
6 changes: 6 additions & 0 deletions node/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ var (
Usage: "Maximum number of topics per subscription to filter logs by.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics,
}
RpcSubscriptionFiltersTimeoutFlag = cli.DurationFlag{
Name: "rpc.subscription.filters.timeout",
Usage: "Timeout before idle filters are evicted. Set to 0 to disable eviction.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersTimeout,
}
)

func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config, logger log.Logger) {
Expand Down Expand Up @@ -443,6 +448,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name),
RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name),
RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name),
RpcSubscriptionFiltersTimeout: ctx.Duration(RpcSubscriptionFiltersTimeoutFlag.Name),
},
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name),
Expand Down
12 changes: 12 additions & 0 deletions rpc/jsonrpc/eth_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (api *APIImpl) NewPendingTransactionFilter(_ context.Context) (string, erro
return "", rpc.ErrNotificationsUnsupported
}
txsCh, id := api.filters.SubscribePendingTxs(32)
api.filters.TrackSubscription(rpchelper.SubscriptionID(id), rpchelper.FilterTypePendingTxs, rpchelper.ProtocolHTTP)
go func() {
for txs := range txsCh {
api.filters.AddPendingTxs(id, txs)
Expand All @@ -49,6 +50,7 @@ func (api *APIImpl) NewBlockFilter(_ context.Context) (string, error) {
return "", rpc.ErrNotificationsUnsupported
}
ch, id := api.filters.SubscribeNewHeads(32)
api.filters.TrackSubscription(rpchelper.SubscriptionID(id), rpchelper.FilterTypeHeads, rpchelper.ProtocolHTTP)
go func() {
for block := range ch {
api.filters.AddPendingBlock(id, block)
Expand All @@ -63,6 +65,7 @@ func (api *APIImpl) NewFilter(_ context.Context, crit filters.FilterCriteria) (s
return "", rpc.ErrNotificationsUnsupported
}
logs, id := api.filters.SubscribeLogs(256, crit)
api.filters.TrackSubscription(rpchelper.SubscriptionID(id), rpchelper.FilterTypeLogs, rpchelper.ProtocolHTTP)
go func() {
for lg := range logs {
api.filters.AddLogs(id, lg)
Expand Down Expand Up @@ -110,12 +113,14 @@ func (api *APIImpl) GetFilterChanges(_ context.Context, index string) ([]any, er

// Identify the subscription type by probing each store; if none have data yet, return empty slice
if blocks, ok := api.filters.ReadPendingBlocks(rpchelper.HeadsSubID(cutIndex)); ok {
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypeHeads)
for _, v := range blocks {
stub = append(stub, v.Hash())
}
return stub, nil
}
if txs, ok := api.filters.ReadPendingTxs(rpchelper.PendingTxsSubID(cutIndex)); ok {
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypePendingTxs)
if len(txs) > 0 {
for _, txn := range txs[0] {
stub = append(stub, txn.Hash())
Expand All @@ -125,6 +130,7 @@ func (api *APIImpl) GetFilterChanges(_ context.Context, index string) ([]any, er
return stub, nil
}
if logs, ok := api.filters.ReadLogs(rpchelper.LogsSubID(cutIndex)); ok {
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypeLogs)
for _, v := range logs {
stub = append(stub, v)
}
Expand All @@ -144,6 +150,8 @@ func (api *APIImpl) GetFilterLogs(_ context.Context, index string) ([]*types.Log
if found := api.filters.HasSubscription(rpchelper.LogsSubID(cutIndex)); !found {
return nil, rpc.ErrFilterNotFound
}
// Reset the filter deadline since it was just accessed
api.filters.TouchSubscription(rpchelper.SubscriptionID(cutIndex), rpchelper.FilterTypeLogs)
if logs, ok := api.filters.ReadLogs(rpchelper.LogsSubID(cutIndex)); ok {
return logs, nil
}
Expand All @@ -165,6 +173,7 @@ func (api *APIImpl) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
go func() {
defer dbg.LogPanic()
headers, id := api.filters.SubscribeNewHeads(32)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypeHeads, rpchelper.ProtocolWS)
defer api.filters.UnsubscribeHeads(id)
for {
select {
Expand Down Expand Up @@ -203,6 +212,7 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context, fullTx *bool) (*
go func() {
defer dbg.LogPanic()
txsCh, id := api.filters.SubscribePendingTxs(256)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypePendingTxs, rpchelper.ProtocolWS)
defer api.filters.UnsubscribePendingTxs(id)

for {
Expand Down Expand Up @@ -250,6 +260,7 @@ func (api *APIImpl) NewPendingTransactionsWithBody(ctx context.Context) (*rpc.Su
go func() {
defer dbg.LogPanic()
txsCh, id := api.filters.SubscribePendingTxs(512)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypePendingTxs, rpchelper.ProtocolWS)
defer api.filters.UnsubscribePendingTxs(id)

for {
Expand Down Expand Up @@ -291,6 +302,7 @@ func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc
go func() {
defer dbg.LogPanic()
logs, id := api.filters.SubscribeLogs(api.SubscribeLogsChannelSize, crit)
api.filters.SetSubscriptionProtocol(rpchelper.SubscriptionID(id), rpchelper.FilterTypeLogs, rpchelper.ProtocolWS)
defer api.filters.UnsubscribeLogs(id)

for {
Expand Down
26 changes: 16 additions & 10 deletions rpc/rpchelper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@

package rpchelper

import "time"

const DefaultFilterTimeout = 0 * time.Minute

// FiltersConfig defines the configuration settings for RPC subscription filters.
// Each field represents a limit on the number of respective items that can be stored per subscription.
type FiltersConfig struct {
RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit)
RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit)
RpcSubscriptionFiltersTimeout time.Duration // Timeout before idle filters are evicted. Default: 0 (no eviction)
}

// DefaultFiltersConfig defines the default settings for filter configurations.
// These default values set no limits on the number of logs, block headers, transactions,
// addresses, or topics that can be stored per subscription.
var DefaultFiltersConfig = FiltersConfig{
RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription
RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription
RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription
RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by
RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by
RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription
RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription
RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription
RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by
RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by
RpcSubscriptionFiltersTimeout: DefaultFilterTimeout, // Evict filters not polled within this duration
}
Loading
Loading