-
Notifications
You must be signed in to change notification settings - Fork 48
Rate limit apply for gateway connections #832
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughOrganization-level WebSocket connection limits are implemented by adding configuration support, tracking per-organization connections in the manager, validating against organization limits during upgrade, and introducing an error type for limit violations. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Handler as WS Handler
participant Manager as WS Manager
participant Registry as Connection Registry
Client->>Handler: WebSocket upgrade request
Handler->>Manager: CanAcceptOrgConnection(orgID)
Manager->>Registry: Count connections for orgID
Registry-->>Manager: currentCount
alt Org limit exceeded
Manager-->>Handler: false
Handler-->>Client: 429 (ORG_CONNECTION_LIMIT_EXCEEDED)
else Org limit available
Manager-->>Handler: true
Handler->>Manager: Register(gatewayID, transport, apiKey, orgID)
Manager->>Manager: Create Connection with orgID
Manager->>Registry: Add connection to registry
Registry-->>Manager: Connection stored
Manager-->>Handler: Connection, nil
Handler-->>Client: 101 Switching Protocols
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@platform-api/src/internal/database/schema.sql`:
- Line 24: Add a DB migration that alters the organizations table to add the
tier column and backfill existing rows: create an ALTER TABLE migration which
adds column tier VARCHAR(20) (with default 'free' and NOT NULL), update existing
rows to set tier='free' where NULL, and then add the CHECK constraint limiting
tier to ('free','paid') (use a clear constraint name like
organizations_tier_check); target the organizations table and the tier column so
deployments with existing data will succeed.
🧹 Nitpick comments (6)
platform-api/src/internal/websocket/errors.go (1)
34-37: Useerrors.Asto detect wrapped errors.Direct type assertion won’t match if the error is wrapped upstream.
♻️ Proposed fix
-import "fmt" +import ( + "errors" + "fmt" +) func IsOrgConnectionLimitError(err error) bool { - _, ok := err.(*OrgConnectionLimitError) - return ok + var e *OrgConnectionLimitError + return errors.As(err, &e) }platform-api/src/internal/dto/organization.go (1)
30-30: Consider validating tier values at the DTO boundary.To prevent invalid tiers before hitting DB constraints, add a validator tag like
oneof=free paidfor the optional field.♻️ Proposed tweak
- Tier string `json:"tier,omitempty" yaml:"tier,omitempty"` + Tier string `json:"tier,omitempty" yaml:"tier,omitempty" binding:"omitempty,oneof=free paid"`platform-api/src/internal/service/organization.go (1)
55-89: Consider normalizing tier input before validation.This prevents accidental downgrades from values like
"PAID"or trailing spaces while preserving the fallback-to-free behavior.🔧 Suggested tweak
- // Default tier to free if not specified or invalid - if tier == "" || !constants.ValidOrgTiers[tier] { + // Normalize and default tier to free if not specified or invalid + tier = strings.ToLower(strings.TrimSpace(tier)) + if tier == "" || !constants.ValidOrgTiers[tier] { tier = constants.OrgTierFree }platform-api/src/internal/handler/websocket.go (1)
100-114: Validate org tier before use to avoid unexpected limits.If the stored tier is missing or invalid, you currently default only when empty. Consider validating against
constants.ValidOrgTiersto keep the limiter behavior predictable.🔧 Suggested tweak
- orgTier := constants.OrgTierFree - if org != nil && org.Tier != "" { - orgTier = org.Tier - } + orgTier := constants.OrgTierFree + if org != nil && constants.ValidOrgTiers[org.Tier] { + orgTier = org.Tier + }platform-api/src/internal/websocket/org_limiter.go (2)
45-52: Consider validating constructor parameters.If
freeOrgMaxConnectionsorpaidOrgMaxConnectionsare passed as zero or negative values (e.g., due to misconfigured environment variables), the limiter will reject all connections without surfacing a clear error. Adding validation or defaulting to sensible minimums would make debugging easier.💡 Optional defensive validation
func NewOrgConnectionLimiter(freeOrgMaxConnections, paidOrgMaxConnections int) *OrgConnectionLimiter { + if freeOrgMaxConnections <= 0 { + freeOrgMaxConnections = 1 // or log a warning + } + if paidOrgMaxConnections <= 0 { + paidOrgMaxConnections = 1 // or log a warning + } return &OrgConnectionLimiter{ connections: make(map[string]map[string]bool), freeOrgMaxConnections: freeOrgMaxConnections, paidOrgMaxConnections: paidOrgMaxConnections, } }
54-62: Unknown tier values silently default to free-tier limits.The
defaultcase appliesfreeOrgMaxConnectionsfor any tier that isn't"paid", including invalid or unknown values. While this is a conservative fail-safe, consider logging a warning for unrecognized tiers to help detect data integrity issues (e.g., typos in the database tier column).
22089f7 to
1b991dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@platform-api/src/internal/websocket/org_limiter.go`:
- Around line 64-73: CanAcceptConnection on OrgConnectionLimiter is a
TOCTOU/speculative check because it releases the lock before callers act; update
the method comment on CanAcceptConnection to explicitly state it is advisory and
does not guarantee AddConnection will succeed, referencing that AddConnection
performs the authoritative atomic check (and returns an error on contention), or
remove CanAcceptConnection entirely and update callers to call AddConnection
directly and handle its error; ensure references to getMaxConnectionsForTier
remain consistent and update any unit tests or call-sites accordingly.
🧹 Nitpick comments (3)
platform-api/src/internal/service/organization.go (1)
55-88: Avoid silently downgrading invalid tiers.Right now a non-empty but invalid tier is coerced to
free. Consider returning a validation error (or at least logging) when a caller supplies an invalid tier to avoid unintentional downgrades.platform-api/src/internal/websocket/errors.go (1)
19-37: Useerrors.Asto detect wrappedOrgConnectionLimitError.A direct type assertion only matches the outermost error and misses wrapped errors.
errors.Asis the idiomatic Go approach (supported since 1.13) and robustly unwraps the error chain to find the target type.Proposed change
-import "fmt" +import ( + "errors" + "fmt" +) @@ func IsOrgConnectionLimitError(err error) bool { - _, ok := err.(*OrgConnectionLimitError) - return ok + var target *OrgConnectionLimitError + return errors.As(err, &target) }platform-api/src/internal/websocket/org_limiter.go (1)
45-52: Consider validating constructor inputs.If negative values are passed for
freeOrgMaxConnectionsorpaidOrgMaxConnections, all connection attempts will be rejected sincecurrentCount >= maxConnectionswill always be true for non-negative counts. Consider adding validation or at minimum documenting expected inputs.🛠️ Optional: Add input validation
func NewOrgConnectionLimiter(freeOrgMaxConnections, paidOrgMaxConnections int) *OrgConnectionLimiter { + if freeOrgMaxConnections < 0 { + freeOrgMaxConnections = 0 + } + if paidOrgMaxConnections < 0 { + paidOrgMaxConnections = 0 + } return &OrgConnectionLimiter{ connections: make(map[string]map[string]bool), freeOrgMaxConnections: freeOrgMaxConnections, paidOrgMaxConnections: paidOrgMaxConnections, } }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@platform-api/src/internal/websocket/manager.go`:
- Around line 117-131: The code unconditionally calls
m.orgLimiter.AddConnection(orgID, connectionID) which can leak counts when orgID
is empty because RemoveConnection is only executed for non-empty OrganizationID;
to fix, make the add/remove symmetric by guarding both calls with an explicit
orgID != "" check (or validate/return an error early if orgID is empty) so that
m.orgLimiter.AddConnection(orgID, connectionID) and
m.orgLimiter.RemoveConnection(orgID, connectionID) are only invoked when orgID
is non-empty; update the logic around connectionID, the org limiter calls, and
the rollback path in the function managing connections to ensure symmetry
(references: connectionID, orgID, m.orgLimiter.AddConnection,
m.orgLimiter.RemoveConnection, m.connectionCount, m.maxConnections, m.mu.Lock).
In `@platform-api/src/internal/websocket/org_limiter.go`:
- Around line 30-70: The limiter currently only uses a single
maxConnectionsPerOrg so tiered limits are not enforced; update
OrgConnectionLimiter to accept per-tier limits (or accept a resolved per-org
max) and wire that through NewOrgConnectionLimiter, CanAcceptConnection and
AddConnection: add a map or lookup (e.g., tierLimits map[string]int or a
function to resolve max by org/tier) to the OrgConnectionLimiter struct, change
NewOrgConnectionLimiter to accept the tiered defaults (free=3, paid=10) or a
resolver, update CanAcceptConnection(orgID string) and AddConnection(orgID,
connectionID string) to use the resolved max for that org when counting and when
constructing OrgConnectionLimitError (populate MaxAllowed with the tier-specific
value), and ensure AddConnection signature can accept or derive the org tier if
needed so paid orgs are allowed their higher limit.
| type Manager struct { | ||
| // connections maps gatewayID -> []*Connection | ||
| // Supports multiple connections per gateway ID for clustering scenarios | ||
| connections sync.Map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer using this map to detemine the number of active connections, rather than keeping two different maps. For ratelimiter, may be we can pass this object and perform the ratelimit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
platform-api/src/internal/websocket/manager.go (1)
117-144:⚠️ Potential issue | 🟠 MajorRace condition in per-org limit enforcement.
The per-org limit check at line 118-125 is not atomic with the connection addition at lines 141-144. Under concurrent registrations for the same organization:
- Goroutine A calls
countOrgConnections(orgID)→ returns 2- Goroutine B calls
countOrgConnections(orgID)→ returns 2- Both pass the
< maxConnectionsPerOrgcheck- Both add connections → org now has 4 connections (exceeds limit of 3)
The global limit is protected by
mu, but the per-org check is not. Consider holding a lock across the entire check-and-add operation, or maintaining a separate atomic counter per org.🔧 Suggested approach
One option is to extend the mutex to cover both checks and the map update:
func (m *Manager) Register(gatewayID string, transport Transport, authToken string, orgID string) (*Connection, error) { + m.mu.Lock() // Check per-org limit first (count from main connections map) orgCount := m.countOrgConnections(orgID) if orgCount >= m.maxConnectionsPerOrg { + m.mu.Unlock() return nil, &OrgConnectionLimitError{ OrganizationID: orgID, CurrentCount: orgCount, MaxAllowed: m.maxConnectionsPerOrg, } } // Check global connection limit - m.mu.Lock() if m.connectionCount >= m.maxConnections { m.mu.Unlock() return nil, fmt.Errorf("maximum connection limit reached (%d)", m.maxConnections) } m.connectionCount++ - m.mu.Unlock() // Create connection connectionID := uuid.New().String() conn := NewConnection(gatewayID, connectionID, transport, authToken, orgID) // Add connection to registry connsInterface, _ := m.connections.LoadOrStore(gatewayID, []*Connection{}) conns := connsInterface.([]*Connection) conns = append(conns, conns) m.connections.Store(gatewayID, conns) + m.mu.Unlock()Note: This changes
mufrom protecting onlyconnectionCountto a broader registration lock, which may impact concurrency. Alternatively, maintain a separatemap[string]*atomic.Int32for per-org counts.
🤖 Fix all issues with AI agents
In `@platform-api/src/internal/handler/websocket.go`:
- Around line 134-147: The JSON marshaling calls in the org-limit and generic
error branches misuse the return values (they name the bytes variable jsonErr
and ignore the actual error), so change both json.Marshal calls to capture
(payload, marshalErr := json.Marshal(errorMsg)), check marshalErr != nil and log
it (e.g., using log.Printf) instead of attempting to write nil, and only call
conn.WriteMessage(websocket.TextMessage, payload) when marshalErr is nil; update
variable names (payload, marshalErr) and apply this fix in the orgLimit error
handling and the generic error branch around conn.WriteMessage and json.Marshal.
In `@platform-api/src/internal/websocket/manager.go`:
- Line 147: The call m.wg.Go(...) is invalid because m.wg is a sync.WaitGroup;
replace it with the standard WaitGroup pattern: call m.wg.Add(1) before starting
a goroutine and inside the goroutine defer m.wg.Done() then call
m.monitorHeartbeat(conn); locate the usage of m.wg.Go in manager.go (the
goroutine launching monitorHeartbeat) and change it accordingly, or
alternatively change the m.wg field to an errgroup.Group (importing
golang.org/x/sync/errgroup) if you intended to use Go semantics.
🧹 Nitpick comments (2)
platform-api/src/internal/handler/websocket.go (1)
139-148: Consider sanitizing generic error messages sent to clients.Line 143 sends
err.Error()directly to the client, which could potentially expose internal implementation details (database errors, file paths, etc.). Consider using a generic message for non-specific errors.🔧 Suggested fix
} else { // Generic error errorMsg := map[string]string{ "type": "error", - "message": err.Error(), + "message": "Connection registration failed", }platform-api/src/internal/websocket/manager.go (1)
240-254: Consider maintaining a per-org counter map for better performance.
countOrgConnectionsiterates through all connections (O(n) where n = total connections) and is called multiple times per registration. With the default max of 1000 connections, this could become a bottleneck under load.Consider maintaining a
map[string]int(orsync.Map) for per-org counts, updated atomically duringRegisterandUnregister. This would make lookups O(1).
| if jsonErr, _ := json.Marshal(errorMsg); jsonErr != nil { | ||
| conn.WriteMessage(websocket.TextMessage, jsonErr) | ||
| } | ||
| log.Printf("[WARN] Organization connection limit exceeded: orgID=%s count=%d max=%d", | ||
| orgLimitErr.OrganizationID, orgLimitErr.CurrentCount, orgLimitErr.MaxAllowed) | ||
| } else { | ||
| // Generic error | ||
| errorMsg := map[string]string{ | ||
| "type": "error", | ||
| "message": err.Error(), | ||
| } | ||
| if jsonErr, _ := json.Marshal(errorMsg); jsonErr != nil { | ||
| conn.WriteMessage(websocket.TextMessage, jsonErr) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misleading variable name and ignored errors in JSON marshaling.
json.Marshal returns ([]byte, error), so jsonErr is actually the marshaled bytes, not an error. While the code happens to work (failed marshals return nil), the naming is confusing and the marshal error is silently discarded.
🔧 Suggested fix
- if jsonErr, _ := json.Marshal(errorMsg); jsonErr != nil {
- conn.WriteMessage(websocket.TextMessage, jsonErr)
+ if jsonBytes, err := json.Marshal(errorMsg); err == nil {
+ _ = conn.WriteMessage(websocket.TextMessage, jsonBytes)
}Apply the same fix for the generic error case at lines 145-147.
🤖 Prompt for AI Agents
In `@platform-api/src/internal/handler/websocket.go` around lines 134 - 147, The
JSON marshaling calls in the org-limit and generic error branches misuse the
return values (they name the bytes variable jsonErr and ignore the actual
error), so change both json.Marshal calls to capture (payload, marshalErr :=
json.Marshal(errorMsg)), check marshalErr != nil and log it (e.g., using
log.Printf) instead of attempting to write nil, and only call
conn.WriteMessage(websocket.TextMessage, payload) when marshalErr is nil; update
variable names (payload, marshalErr) and apply this fix in the orgLimit error
handling and the generic error branch around conn.WriteMessage and json.Marshal.
| // Start heartbeat monitoring in background | ||
| m.wg.Add(1) | ||
| go m.monitorHeartbeat(conn) | ||
| m.wg.Go(func() { m.monitorHeartbeat(conn) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate the manager.go file and read it to see the wg declaration
find . -name "manager.go" -path "*/websocket/*" | head -5Repository: wso2/api-platform
Length of output: 109
🏁 Script executed:
# Read the manager.go file to find the wg declaration around line 64
cat -n platform-api/src/internal/websocket/manager.go | sed -n '50,80p'Repository: wso2/api-platform
Length of output: 1420
🏁 Script executed:
# Also check around line 147 for context
cat -n platform-api/src/internal/websocket/manager.go | sed -n '140,155p'Repository: wso2/api-platform
Length of output: 751
🏁 Script executed:
# Check for any custom WaitGroup types or imports
rg -n "type.*WaitGroup|errgroup|sync\.WaitGroup" platform-api/src/internal/websocket/manager.goRepository: wso2/api-platform
Length of output: 82
🏁 Script executed:
# Check imports in the file
head -30 platform-api/src/internal/websocket/manager.goRepository: wso2/api-platform
Length of output: 945
Remove the Go method call: sync.WaitGroup does not have this method.
Standard sync.WaitGroup (declared at line 64) only has Add, Done, and Wait methods. The call m.wg.Go(...) at line 153 will not compile.
Use the standard pattern instead:
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.monitorHeartbeat(conn)
}()Alternatively, if you intend to use errgroup.Group instead, import golang.org/x/sync/errgroup and change the field type accordingly.
🤖 Prompt for AI Agents
In `@platform-api/src/internal/websocket/manager.go` at line 147, The call
m.wg.Go(...) is invalid because m.wg is a sync.WaitGroup; replace it with the
standard WaitGroup pattern: call m.wg.Add(1) before starting a goroutine and
inside the goroutine defer m.wg.Done() then call m.monitorHeartbeat(conn);
locate the usage of m.wg.Go in manager.go (the goroutine launching
monitorHeartbeat) and change it accordingly, or alternatively change the m.wg
field to an errgroup.Group (importing golang.org/x/sync/errgroup) if you
intended to use Go semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://pkg.go.dev/sync#WaitGroup.Go
it is available since go 1.25.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check the Go version specified in the project
fd -t f "go.mod|go.work" | xargs cat
Length of output: 324376
🏁 Script executed:
#!/bin/bash
# Also check if there are any version specifications in CI/build files
fd -t f "Dockerfile|.tool-versions|.go-version" | xargs cat 2>/dev/null || echo "No version files found"
Length of output: 28046
🌐 Web query:
Go 1.25.0 sync.WaitGroup.Go method
💡 Result:
In Go 1.25.0, sync.WaitGroup gained a convenience method:
func (wg *WaitGroup) Go(f func())wg.Go(f) is equivalent to the common pattern:
wg.Add(1)
go func() {
defer wg.Done()
f()
}()It starts f in a new goroutine, increments the WaitGroup counter before starting, and decrements it when f returns. It’s intended to reduce Add/Done mismatch bugs. [1][2]
Key rules/constraints from the docs:
fmust not panic. [2]- If the WaitGroup counter is currently zero, calls to
Gomust happen before aWait()(i.e., don’t raceGo()againstWait()on an “empty” WaitGroup). If it’s non-zero,Gomay happen at any time (including from goroutines started byGo). [2] - Added as part of the Go 1.25
syncchanges (“The newWaitGroup.Gomethod…”). [1] - The proposal and rationale are tracked in the Go issue “sync: add WaitGroup.Go” (milestoned for Go 1.25). [3]
Example:
var wg sync.WaitGroup
for _, x := range workItems {
x := x
wg.Go(func() { doWork(x) })
}
wg.Wait()Sources: Go 1.25 release notes [1], sync package docs for Go 1.25 [2], Go proposal issue [3].
@coderabbit-user You’re right — sync.WaitGroup.Go is available since Go 1.25. Thanks for the correction.
I double‑checked the repo setup from the project files we inspected:
- go.work declares go 1.25.1.
- Multiple Dockerfiles build with golang:1.25.5.
- The platform-api module is built under Go 1.25.x in its Docker build stages.
Given that, m.wg.Go(func() { m.monitorHeartbeat(conn) }) in platform-api/src/internal/websocket/manager.go is perfectly valid and will compile. I retract my earlier suggestion to replace it with Add/Done or to switch to errgroup. One small note: ensure any local/CI environments also use Go ≥ 1.25; I noticed the common module lists go 1.24.0 (with toolchain go1.24.5), but platform-api and the workspace are on 1.25.x, so this PR is fine.
Appreciate the nudge!
(_/) thoughtful bunny
(•‿•) thanks for the correction
/ づ 🎓
🧬 Code Graph Analysis Results
platform-api/src/internal/websocket/manager.go
-
Field: WaitGroup used for tracking active connection handler goroutines
Code snippet:// wg tracks active connection handler goroutines for graceful shutdown wg sync.WaitGroup(Note: This shows the presence of a sync.WaitGroup and its purpose in the Manager.)
-
Usage: Starting a per-connection heartbeat monitor using WaitGroup.Go
Code snippet:// Start heartbeat monitoring in background m.wg.Go(func() { m.monitorHeartbeat(conn) })(Note: Demonstrates the Go method on the WaitGroup, which aligns with Go 1.25+ behavior.)
-
Usage: Waiting for all connection handler goroutines to exit during shutdown
Code snippet:// Wait for all goroutines to exit m.wg.Wait()(Note: Shows synchronization to ensure all background workers finish before shutdown completes.)
If you would like, I can also provide exact surrounding context for each snippet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@platform-api/src/internal/websocket/manager.go`:
- Around line 56-58: Add an orgCounts map[string]int field to the Manager struct
and make per-org limit checks atomic: in Register(), acquire the existing mutex
(Lock) before calling countOrgConnections (or replace that call) so you can
check orgCounts[orgID] and increment orgCounts[orgID] and the global
connectionCount together while still holding the lock; in Unregister(), while
holding the same lock decrement orgCounts[removed.OrganizationID] along with the
global connectionCount; change countOrgConnections() to simply read from
orgCounts under RLock instead of iterating over connections so reads are
consistent with the locked updates.
| // maxConnectionsPerOrg enforces per-organization connection limits | ||
| maxConnectionsPerOrg int | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "manager.go" -path "*/websocket/*" | head -5Repository: wso2/api-platform
Length of output: 109
🏁 Script executed:
wc -l ./platform-api/src/internal/websocket/manager.goRepository: wso2/api-platform
Length of output: 113
🏁 Script executed:
cat -n ./platform-api/src/internal/websocket/manager.goRepository: wso2/api-platform
Length of output: 14513
Per-org limit enforcement is not atomic under concurrency.
The per-org check at line 124 in Register() calls countOrgConnections() without holding the mutex, allowing concurrent calls to see the same snapshot count and both pass even when combined they would exceed the limit. For example: Thread A reads count=2 for org1 (max=3) and passes the check; Thread B reads the same count=2 in parallel and also passes; both then acquire the lock and register, resulting in 4 connections for org1.
Additionally, Unregister() only decrements the global connectionCount (line 205) and does not maintain any per-org counts, so stats and logging drift.
Fix by tracking per-org counts atomically under the existing mutex:
- Add
orgCounts map[string]intfield to Manager - In
Register(), acquire the lock before the per-org check and atomically incrementorgCounts[orgID]alongside the global count - In
Unregister(), decrementorgCounts[removed.OrganizationID]under the same lock - Update
countOrgConnections()to read fromorgCountsunder RLock instead of iterating the map
This makes check+increment atomic and ensures stats are consistent with enforced limits.
🤖 Prompt for AI Agents
In `@platform-api/src/internal/websocket/manager.go` around lines 56 - 58, Add an
orgCounts map[string]int field to the Manager struct and make per-org limit
checks atomic: in Register(), acquire the existing mutex (Lock) before calling
countOrgConnections (or replace that call) so you can check orgCounts[orgID] and
increment orgCounts[orgID] and the global connectionCount together while still
holding the lock; in Unregister(), while holding the same lock decrement
orgCounts[removed.OrganizationID] along with the global connectionCount; change
countOrgConnections() to simply read from orgCounts under RLock instead of
iterating over connections so reads are consistent with the locked updates.
Summary
OrgConnectionLimitercomponent to track and enforce connection limits per organizationORG_CONNECTION_LIMIT_EXCEEDED) when limits are exceededTest plan
ORG_CONNECTION_LIMIT_EXCEEDEDerror withcurrentCountandmaxAllowedWS_MAX_CONNECTIONS_PER_ORGChanges
websocket/org_limiter.gowebsocket/errors.goOrgConnectionLimitErrortype with detailed contextwebsocket/manager.gowebsocket/connection.goOrganizationIDfield to track connection ownershiphandler/websocket.goconfig/config.goMaxConnectionsPerOrgconfiguration optionserver/server.goDocumentation
Automation tests
Security checks
Samples
Related PRs
Test environment
Summary by CodeRabbit
Release Notes