Skip to content

Commit e739693

Browse files
authored
node: High usage monitor (#4506)
### Description Implemented abuse detection for event RPCs: - Introduced a configurable CallRateMonitor that tracks per-user call rates across sliding windows via ring buffers, plus helper wiring to instantiate it from config.AbuseDetection. - Hooked AddEvent, AddMediaEvent, and CreateMediaStream so every successful call records the creator’s address under the appropriate call type - Exposed recent usage on the /status endpoint via a new field, detailing which accounts exceeded which thresholds over 1-minute and 30-minute windows by default. ### Checklist - [ ] Tests added where required - [ ] Documentation updated where applicable - [ ] Changes adhere to the repository's contribution guidelines
1 parent 0d1319d commit e739693

File tree

12 files changed

+1107
-6
lines changed

12 files changed

+1107
-6
lines changed

core/config/config.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ func GetDefaultConfig() *Config {
5353
Metrics: MetricsConfig{
5454
Enabled: true,
5555
},
56+
HighUsageDetection: HighUsageDetectionConfig{
57+
Enabled: false,
58+
MaxResults: 10,
59+
Thresholds: HighUsageThresholdFields{
60+
ThresholdAddEventWindow: time.Minute,
61+
ThresholdAddEventCount: 100,
62+
ThresholdAddMediaEventWindow: time.Minute,
63+
ThresholdAddMediaEventCount: 50,
64+
ThresholdCreateMediaStreamWindow: time.Minute,
65+
ThresholdCreateMediaStreamCount: 10,
66+
},
67+
},
5668
// TODO: Network: NetworkConfig{},
5769
StandByOnStart: true,
5870
StandByPollPeriod: 500 * time.Millisecond,
@@ -122,6 +134,7 @@ type Config struct {
122134
// Metrics
123135
Metrics MetricsConfig
124136
PerformanceTracking PerformanceTrackingConfig
137+
HighUsageDetection HighUsageDetectionConfig
125138

126139
// Scrubbing
127140
Scrubbing ScrubbingConfig
@@ -602,6 +615,60 @@ type MetricsConfig struct {
602615
Interface string
603616
}
604617

618+
type HighUsageDetectionConfig struct {
619+
// Enabled toggles the high-usage detection tracker logic.
620+
Enabled bool
621+
622+
// MaxResults limits the number of high-usage accounts exposed via /status.
623+
MaxResults int
624+
625+
// Thresholds captures explicit per-call-type threshold definitions.
626+
Thresholds HighUsageThresholdFields
627+
}
628+
629+
// HighUsageThresholds flattens the configured threshold_* fields into a standard
630+
// map keyed by call type.
631+
func (cfg HighUsageDetectionConfig) HighUsageThresholds() map[string][]HighUsageThreshold {
632+
return cfg.Thresholds.effectiveThresholds()
633+
}
634+
635+
type HighUsageThresholdFields struct {
636+
ThresholdAddEventWindow time.Duration `mapstructure:"threshold_add_event_window"`
637+
ThresholdAddEventCount uint32 `mapstructure:"threshold_add_event_count"`
638+
ThresholdAddMediaEventWindow time.Duration `mapstructure:"threshold_add_media_event_window"`
639+
ThresholdAddMediaEventCount uint32 `mapstructure:"threshold_add_media_event_count"`
640+
ThresholdCreateMediaStreamWindow time.Duration `mapstructure:"threshold_create_media_stream_window"`
641+
ThresholdCreateMediaStreamCount uint32 `mapstructure:"threshold_create_media_stream_count"`
642+
}
643+
644+
func (fields HighUsageThresholdFields) effectiveThresholds() map[string][]HighUsageThreshold {
645+
result := make(map[string][]HighUsageThreshold)
646+
647+
addThreshold := func(name string, window time.Duration, count uint32) {
648+
if window <= 0 || count == 0 || name == "" {
649+
return
650+
}
651+
result[name] = append(result[name], HighUsageThreshold{
652+
Window: window,
653+
Count: count,
654+
})
655+
}
656+
657+
addThreshold("event", fields.ThresholdAddEventWindow, fields.ThresholdAddEventCount)
658+
addThreshold("media_event", fields.ThresholdAddMediaEventWindow, fields.ThresholdAddMediaEventCount)
659+
addThreshold("create_media_stream", fields.ThresholdCreateMediaStreamWindow, fields.ThresholdCreateMediaStreamCount)
660+
661+
if len(result) == 0 {
662+
return nil
663+
}
664+
return result
665+
}
666+
667+
type HighUsageThreshold struct {
668+
Window time.Duration
669+
Count uint32
670+
}
671+
605672
type DebugEndpointsConfig struct {
606673
Cache bool
607674
Memory bool

core/config/config_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config_test
33
import (
44
"os"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/require"
89

@@ -134,3 +135,64 @@ func TestConfig_ChainProvidersDoNotLog(t *testing.T) {
134135
)
135136
require.NotContains(logOutput, "CHAINS", "Expected CHAINS to be omitted from logOutput `%v`", logOutput)
136137
}
138+
139+
func TestHighUsageDetectionConfig_DefaultThresholds(t *testing.T) {
140+
cfg := config.HighUsageDetectionConfig{
141+
Thresholds: config.HighUsageThresholdFields{
142+
ThresholdAddEventWindow: time.Minute,
143+
ThresholdAddEventCount: 100,
144+
},
145+
}
146+
147+
values := cfg.HighUsageThresholds()
148+
require.Len(t, values, 1)
149+
require.Len(t, values["event"], 1)
150+
require.Equal(t, time.Minute, values["event"][0].Window)
151+
require.Equal(t, uint32(100), values["event"][0].Count)
152+
}
153+
154+
func TestHighUsageDetectionConfig_MultipleCallTypes(t *testing.T) {
155+
cfg := config.HighUsageDetectionConfig{
156+
Thresholds: config.HighUsageThresholdFields{
157+
ThresholdAddEventWindow: 2 * time.Minute,
158+
ThresholdAddEventCount: 200,
159+
ThresholdAddMediaEventWindow: 45 * time.Second,
160+
ThresholdAddMediaEventCount: 15,
161+
ThresholdCreateMediaStreamWindow: 90 * time.Second,
162+
ThresholdCreateMediaStreamCount: 25,
163+
},
164+
}
165+
166+
values := cfg.HighUsageThresholds()
167+
168+
require.Len(t, values["event"], 1)
169+
require.Equal(t, 2*time.Minute, values["event"][0].Window)
170+
require.Equal(t, uint32(200), values["event"][0].Count)
171+
require.Len(t, values["media_event"], 1)
172+
require.Equal(t, 45*time.Second, values["media_event"][0].Window)
173+
require.Equal(t, uint32(15), values["media_event"][0].Count)
174+
require.Len(t, values["create_media_stream"], 1)
175+
require.Equal(t, 90*time.Second, values["create_media_stream"][0].Window)
176+
require.Equal(t, uint32(25), values["create_media_stream"][0].Count)
177+
}
178+
179+
func TestHighUsageDetectionConfig_InvalidEntriesIgnored(t *testing.T) {
180+
cfg := config.HighUsageDetectionConfig{
181+
Thresholds: config.HighUsageThresholdFields{
182+
ThresholdAddEventWindow: 0,
183+
ThresholdAddEventCount: 100,
184+
ThresholdAddMediaEventWindow: time.Minute,
185+
ThresholdAddMediaEventCount: 0,
186+
ThresholdCreateMediaStreamWindow: 30 * time.Second,
187+
ThresholdCreateMediaStreamCount: 5,
188+
},
189+
}
190+
191+
values := cfg.HighUsageThresholds()
192+
193+
require.Equal(t, map[string][]config.HighUsageThreshold{
194+
"create_media_stream": {
195+
{Window: 30 * time.Second, Count: 5},
196+
},
197+
}, values)
198+
}

core/node/rpc/add_event.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/towns-protocol/towns/core/node/logging"
1414
. "github.com/towns-protocol/towns/core/node/protocol"
1515
rpcHeaders "github.com/towns-protocol/towns/core/node/rpc/headers"
16+
"github.com/towns-protocol/towns/core/node/rpc/highusage"
1617
"github.com/towns-protocol/towns/core/node/rules"
1718
. "github.com/towns-protocol/towns/core/node/shared"
1819
)
@@ -50,11 +51,13 @@ func (s *Service) localAddEvent(
5051

5152
if err != nil {
5253
return nil, err
53-
} else {
54-
return connect.NewResponse(&AddEventResponse{
55-
NewEvents: newEvents,
56-
}), nil
5754
}
55+
56+
s.callRateMonitor.RecordCall(parsedEvent.Event.CreatorAddress, time.Now(), highusage.CallTypeEvent)
57+
58+
return connect.NewResponse(&AddEventResponse{
59+
NewEvents: newEvents,
60+
}), nil
5861
}
5962

6063
// ensureStreamIsUpToDate returns the StreamView for the given StreamId that is up to date enough to

core/node/rpc/add_media_event.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rpc
33
import (
44
"bytes"
55
"context"
6+
"time"
67

78
"connectrpc.com/connect"
89
"github.com/ethereum/go-ethereum/common"
@@ -12,6 +13,7 @@ import (
1213
. "github.com/towns-protocol/towns/core/node/events"
1314
"github.com/towns-protocol/towns/core/node/logging"
1415
. "github.com/towns-protocol/towns/core/node/protocol"
16+
"github.com/towns-protocol/towns/core/node/rpc/highusage"
1517
. "github.com/towns-protocol/towns/core/node/shared"
1618
)
1719

@@ -73,6 +75,8 @@ func (s *Service) localAddMediaEvent(
7375
return nil, AsRiverError(err).Func("localAddMediaEvent")
7476
}
7577

78+
s.callRateMonitor.RecordCall(parsedEvent.Event.CreatorAddress, time.Now(), highusage.CallTypeMediaEvent)
79+
7680
return connect.NewResponse(&AddMediaEventResponse{
7781
CreationCookie: &CreationCookie{
7882
StreamId: streamId[:],

core/node/rpc/create_media_stream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/towns-protocol/towns/core/node/logging"
1414
. "github.com/towns-protocol/towns/core/node/nodes"
1515
. "github.com/towns-protocol/towns/core/node/protocol"
16+
"github.com/towns-protocol/towns/core/node/rpc/highusage"
1617
"github.com/towns-protocol/towns/core/node/rules"
1718
. "github.com/towns-protocol/towns/core/node/shared"
1819
"github.com/towns-protocol/towns/core/node/storage"
@@ -148,6 +149,8 @@ func (s *Service) createMediaStream(ctx context.Context, req *CreateMediaStreamR
148149
return nil, AsRiverError(err).Func("createMediaStream")
149150
}
150151

152+
s.callRateMonitor.RecordCall(parsedEvents[0].Event.CreatorAddress, time.Now(), highusage.CallTypeCreateMediaStream)
153+
151154
// add derived events
152155
for _, de := range csRules.DerivedEvents {
153156
_, err = s.AddEventPayload(ctx, de.StreamId, de.Payload, de.Tags)

0 commit comments

Comments
 (0)