Skip to content

Commit 489bc83

Browse files
feat(dataset): hybrid IP storage with parallel batch processing (#128)
* feat(dataset): hybrid IP storage with parallel batch processing Implement memory-efficient hybrid storage that separates individual IPs from CIDR ranges, significantly reducing memory usage for typical workloads. ## Changes - Add IPMap using sync.Map for individual IP addresses (O(1) lookups) - Keep BART (RangeSet) only for CIDR ranges requiring LPM - Parallelize Add/Remove operations using sync.WaitGroup.Go() (Go 1.24+) - CheckIP() checks IPMap first, falls back to RangeSet for range matching ## Memory Optimization Most real-world deployments receive individual IP decisions, not ranges. BART's radix trie has significant per-entry overhead optimized for prefix matching. By storing individual IPs in a simple map: - **1K IPs**: BART ~598KB → Map significantly less - **10K IPs**: BART ~5.6MB → Map significantly less - **50K IPs**: BART ~26.5MB → Map significantly less ## Performance - IPMap lookup: ~72 ns/op, 0 allocs - RangeSet lookup: ~69 ns/op, 0 allocs - Parallel batch processing for Add/Remove operations ## Breaking Changes None - API unchanged, internal storage optimization only. * fix(ipmap): prevent data races with concurrent readers Address Copilot review comments: 1. Clone RemediationIdsMap before modifying in add() and remove() - Prevents race between Contains() readers and Add/Remove modifiers - sync.Map's Load→Modify→Store pattern is unsafe with mutable values 2. Fix version comment: sync.WaitGroup.Go was added in Go 1.23, not 1.24 3. Remove trailing whitespace in benchmark_test.go * refactor(dataset): rename BartUnifiedIPSet to BartRangeSet More descriptive name since it now only handles CIDR ranges, while individual IPs are stored in IPMap. * feat(dataset): Consistent COW for lock free reads (#129) * perf: lock-free reads for IPMap and CNSet Use atomic pointers for completely lock-free reads across all data structures. SPOA handlers never block, even during batch updates. Changes: 1. IPMap: atomic.Pointer per entry for individual IPs 2. CNSet: atomic.Pointer for entire country map (small, cloning is cheap) - Added NewCNSet() constructor for consistency - Added defensive nil checks in Add/Remove/Contains 3. BartRangeSet: already uses atomic pointer (unchanged) Concurrency model (consistent across all): - Reads: atomic pointer load (instant, never blocks) - Writes: clone → modify → atomic store (copy-on-write) - Readers always see consistent state (old or new, never partial) This ensures SPOA always has immediate access to decision data, regardless of ongoing batch updates from the stream bouncer. * refactor(ipmap): simplify addLocked - remove unreachable LoadOrStore fallback Since writeMu is held for the entire batch, no other writer can race. The LoadOrStore fallback code was unreachable - simplified to use Store directly. * Simplify RemediationMap: remove ID tracking, optimize cloning - Changed from map[Remediation][]RemediationDetails to map[Remediation]string - Removed ID tracking (LAPI only sends longest decisions) - Simplified Add/Remove methods (no ID parameters) - Optimized IPMap cloning: skip clone for empty maps and single-entry removals - Updated all callers to use new simplified API - Reduced memory allocations and GC pressure * Remove writeMu mutex from IPMap - single writer guarantee - Removed writeMu mutex (unnecessary with single writer thread) - Renamed addLocked/removeLocked to add/remove - Updated comments to reflect single-writer, multiple-reader model - Stream handler processes decisions sequentially, no concurrent writes - Atomic pointer operations handle reader-writer synchronization safely * Address Copilot review comments: remove unused ID parameters - Remove unused 'id' parameter from CNSet.Add and CNSet.Remove methods - Update addCN/removeCN helper functions to match new signatures - Update outdated comments: 'merging IDs' -> 'merging remediations' - Optimize CNSet.Add preallocation: remove +1 to avoid overallocation - Fix map initialization syntax in CNSet.Add * Remove unused ID fields from operation structs - Remove ID field from IPAddOp, IPRemoveOp, BartAddOp, BartRemoveOp - Remove id field from internal cnOp struct - Update all operation struct literals to remove ID assignments - Update benchmark tests to match new struct definitions - IDs are no longer tracked since LAPI behavior ensures only longest decisions * Add comprehensive metrics tests and optimize no-op handling - Add comprehensive test suite for metrics tracking (IPMap, BartRangeSet, CNSet) - Fix BartRangeSet no-op check to use exact prefix lookup (Get) instead of LPM - Add HasRemediation and GetOriginForRemediation methods to BartRangeSet for exact prefix matching - Optimize ipType assignment to only occur after no-op checks - Remove pre-allocation of operation slices to avoid wasting memory when many decisions are no-ops - Replace hardcoded metric increments with len(decisions) for better readability * Fix outdated comment: replace 'ID' with 'remediation' - Update comment in bart_types.go to reflect that IDs are no longer tracked - Addresses Copilot review feedback from PR #129 * Refactor Remove() to return error for cleaner duplicate delete handling - Add ErrRemediationNotFound sentinel error - Update RemediationMap.Remove() to return error instead of silently ignoring - Update all call sites to use errors.Is() for cleaner error checking - Simplify RemoveBatch logic - no need to check existence before calling Remove() - Metrics are only updated for actual removals, not duplicate deletes This makes the code easier to follow and more explicit about error handling. * refactor(metrics): use WithLabelValues instead of With for better performance Replace prometheus.With(Labels{...}) with WithLabelValues() to avoid map allocation overhead. This aligns with the optimization from main branch. - Remove unused prometheus import - Update all metrics calls to use WithLabelValues - Add comments indicating label order for clarity * fix(lint): remove unused nolint directive in metrics tests * fix: verify origin matches before removing decisions - Add origin verification in IPMap.remove() to prevent removing decisions when origin has been overwritten (e.g., by CAPI) - Add same origin check in BartRangeSet.RemoveBatch() for range removals - Remove optimization that deleted IP when only one remediation existed to handle edge cases where same origin changes remediation types - Ensures metrics are decremented correctly and prevents orphaned decisions Fixes issue where unsubscribing from blocklists didn't properly remove decisions when origins were overwritten by other sources. * fix: correct Go version in comments and add origin overwrite test - Fix version documentation: sync.WaitGroup.Go was added in Go 1.22, not 1.23 - Add TestMetrics_OriginOverwriteAndDelete to verify metrics correctness when same origin changes remediation types (ban -> captcha -> delete ban) - Test ensures origin verification works correctly and metrics are accurate
1 parent 2b0de18 commit 489bc83

File tree

8 files changed

+1781
-218
lines changed

8 files changed

+1781
-218
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ require (
4444
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
4545
github.com/google/go-querystring v1.1.0 // indirect
4646
github.com/josharian/intern v1.0.0 // indirect
47+
github.com/kylelemons/godebug v1.1.0 // indirect
4748
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
4849
github.com/mailru/easyjson v0.9.0 // indirect
4950
github.com/mitchellh/mapstructure v1.5.0 // indirect

pkg/dataset/bart_types.go

Lines changed: 105 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dataset
22

33
import (
4+
"errors"
45
"net/netip"
56
"sync"
67
"sync/atomic"
@@ -15,7 +16,6 @@ type BartAddOp struct {
1516
Prefix netip.Prefix
1617
Origin string
1718
R remediation.Remediation
18-
ID int64
1919
IPType string
2020
Scope string
2121
}
@@ -24,27 +24,26 @@ type BartAddOp struct {
2424
type BartRemoveOp struct {
2525
Prefix netip.Prefix
2626
R remediation.Remediation
27-
ID int64
2827
Origin string
2928
IPType string
3029
Scope string
3130
}
3231

33-
// BartUnifiedIPSet provides a unified interface for IP and CIDR operations using bart library.
32+
// BartRangeSet provides a unified interface for IP and CIDR operations using bart library.
3433
// Uses atomic pointer for lock-free reads and mutex-protected writes
3534
// following the pattern recommended in bart's documentation.
36-
type BartUnifiedIPSet struct {
37-
tableAtomicPtr atomic.Pointer[bart.Table[RemediationIdsMap]]
35+
type BartRangeSet struct {
36+
tableAtomicPtr atomic.Pointer[bart.Table[RemediationMap]]
3837
writeMutex sync.Mutex // Protects writers only
3938
logger *log.Entry
4039
}
4140

42-
// NewBartUnifiedIPSet creates a new BartUnifiedIPSet
41+
// NewBartRangeSet creates a new BartRangeSet
4342
// The table starts as nil and will be created on first use for better memory efficiency
4443
// Initialize with nil; the table will be allocated during the first AddBatch operation.
4544
// This approach enables using Insert for the initial table population, which is more memory efficient than incremental updates.
46-
func NewBartUnifiedIPSet(logAlias string) *BartUnifiedIPSet {
47-
return &BartUnifiedIPSet{
45+
func NewBartRangeSet(logAlias string) *BartRangeSet {
46+
return &BartRangeSet{
4847
logger: log.WithField("alias", logAlias),
4948
}
5049
}
@@ -55,7 +54,7 @@ func NewBartUnifiedIPSet(logAlias string) *BartUnifiedIPSet {
5554
// For the initial load (when table is nil), uses Insert for better memory efficiency.
5655
// For subsequent updates, uses ModifyPersist for incremental changes.
5756
// All operations always succeed (duplicates are merged, new entries are created).
58-
func (s *BartUnifiedIPSet) AddBatch(operations []BartAddOp) {
57+
func (s *BartRangeSet) AddBatch(operations []BartAddOp) {
5958
if len(operations) == 0 {
6059
return
6160
}
@@ -78,14 +77,14 @@ func (s *BartUnifiedIPSet) AddBatch(operations []BartAddOp) {
7877

7978
// initializeBatch creates a new table and initializes it with the given operations using Insert.
8079
// This is more memory efficient than using ModifyPersist for the initial load.
81-
// Handles duplicate prefixes by merging IDs before inserting.
80+
// Handles duplicate prefixes by merging remediations before inserting.
8281
// All operations always succeed.
83-
func (s *BartUnifiedIPSet) initializeBatch(operations []BartAddOp) {
82+
func (s *BartRangeSet) initializeBatch(operations []BartAddOp) {
8483
// Create a new table for the initial load
85-
next := &bart.Table[RemediationIdsMap]{}
84+
next := &bart.Table[RemediationMap]{}
8685

8786
// First, collect all operations by prefix to handle duplicates
88-
prefixMap := make(map[netip.Prefix]RemediationIdsMap)
87+
prefixMap := make(map[netip.Prefix]RemediationMap)
8988
for _, op := range operations {
9089
prefix := op.Prefix.Masked()
9190

@@ -99,10 +98,10 @@ func (s *BartUnifiedIPSet) initializeBatch(operations []BartAddOp) {
9998
// Get or create the data for this prefix
10099
data, exists := prefixMap[prefix]
101100
if !exists {
102-
data = RemediationIdsMap{}
101+
data = RemediationMap{}
103102
}
104-
// Add the ID (this handles merging if prefix already seen)
105-
data.AddID(valueLog, op.R, op.ID, op.Origin)
103+
// Add the remediation (this handles merging if prefix already seen)
104+
data.Add(valueLog, op.R, op.Origin)
106105
prefixMap[prefix] = data
107106
}
108107

@@ -126,7 +125,7 @@ func (s *BartUnifiedIPSet) initializeBatch(operations []BartAddOp) {
126125
// updateBatch updates an existing table with the given operations using ModifyPersist.
127126
// This handles incremental updates efficiently.
128127
// All operations always succeed.
129-
func (s *BartUnifiedIPSet) updateBatch(cur *bart.Table[RemediationIdsMap], operations []BartAddOp) {
128+
func (s *BartRangeSet) updateBatch(cur *bart.Table[RemediationMap], operations []BartAddOp) {
130129
// Process all operations, chaining the table updates
131130
next := cur
132131
for _, op := range operations {
@@ -141,21 +140,21 @@ func (s *BartUnifiedIPSet) updateBatch(cur *bart.Table[RemediationIdsMap], opera
141140

142141
// Use ModifyPersist to atomically update or create the prefix entry
143142
// This is more efficient than DeletePersist + InsertPersist as it only traverses once
144-
next, _, _ = next.ModifyPersist(prefix, func(existingData RemediationIdsMap, exists bool) (RemediationIdsMap, bool) {
143+
next, _, _ = next.ModifyPersist(prefix, func(existingData RemediationMap, exists bool) (RemediationMap, bool) {
145144
if exists {
146145
if valueLog != nil {
147-
valueLog.Trace("exact prefix exists, merging IDs")
146+
valueLog.Trace("exact prefix exists, merging remediations")
148147
}
149148
// bart already cloned via our Cloner interface, modify directly
150-
existingData.AddID(valueLog, op.R, op.ID, op.Origin)
149+
existingData.Add(valueLog, op.R, op.Origin)
151150
return existingData, false // false = don't delete
152151
}
153152
if valueLog != nil {
154153
valueLog.Trace("creating new entry")
155154
}
156155
// Create new data
157-
newData := make(RemediationIdsMap)
158-
newData.AddID(valueLog, op.R, op.ID, op.Origin)
156+
newData := make(RemediationMap)
157+
newData.Add(valueLog, op.R, op.Origin)
159158
return newData, false // false = don't delete
160159
})
161160
}
@@ -165,10 +164,11 @@ func (s *BartUnifiedIPSet) updateBatch(cur *bart.Table[RemediationIdsMap], opera
165164
}
166165

167166
// RemoveBatch removes multiple prefixes from the bart table in a single atomic operation.
168-
// Returns a slice of pointers to successfully removed operations (nil for failures).
167+
// Returns a slice of pointers to actually removed operations (nil for duplicate deletes or non-existent prefixes).
169168
// This allows callers to access operation metadata (Origin, IPType, Scope) for metrics.
169+
// Since Remove() never fails, we only return the operation pointer if the remediation actually existed.
170170
// IPs should be converted to /32 or /128 prefixes before calling this method.
171-
func (s *BartUnifiedIPSet) RemoveBatch(operations []BartRemoveOp) []*BartRemoveOp {
171+
func (s *BartRangeSet) RemoveBatch(operations []BartRemoveOp) []*BartRemoveOp {
172172
if len(operations) == 0 {
173173
return nil
174174
}
@@ -199,7 +199,7 @@ func (s *BartUnifiedIPSet) RemoveBatch(operations []BartRemoveOp) []*BartRemoveO
199199

200200
// Use ModifyPersist to atomically update or remove the prefix entry
201201
// This is more efficient than DeletePersist + InsertPersist as it only traverses once
202-
next, _, _ = next.ModifyPersist(prefix, func(existingData RemediationIdsMap, exists bool) (RemediationIdsMap, bool) {
202+
next, _, _ = next.ModifyPersist(prefix, func(existingData RemediationMap, exists bool) (RemediationMap, bool) {
203203
if !exists {
204204
if valueLog != nil {
205205
valueLog.Trace("exact prefix not found")
@@ -208,19 +208,38 @@ func (s *BartUnifiedIPSet) RemoveBatch(operations []BartRemoveOp) []*BartRemoveO
208208
return existingData, false // false = don't delete (prefix doesn't exist anyway)
209209
}
210210

211-
// bart already cloned via our Cloner interface, modify directly
212-
err := existingData.RemoveID(valueLog, op.R, op.ID)
213-
if err != nil {
211+
// Check if the remediation exists with the matching origin before removing
212+
// This prevents removing decisions when the origin has been overwritten (e.g., by CAPI)
213+
if !existingData.HasRemediationWithOrigin(op.R, op.Origin) {
214+
// Origin doesn't match - this decision was likely overwritten by another origin
215+
// Don't remove it, as it's not the decision we're trying to delete
214216
if valueLog != nil {
215-
valueLog.Trace("ID not found")
217+
storedOrigin, exists := existingData[op.R]
218+
if exists {
219+
valueLog.Tracef("remediation exists but origin mismatch (stored: %s, requested: %s), skipping removal", storedOrigin, op.Origin)
220+
} else {
221+
valueLog.Tracef("remediation not found, skipping removal")
222+
}
216223
}
217224
results[i] = nil
218-
return existingData, false // false = don't delete, keep data unchanged
225+
return existingData, false // false = don't delete, keep existing data
219226
}
220227

221-
// ID was successfully removed - return pointer to the operation for metadata access
222-
// Use index to get pointer to original operation (safe since we don't modify the slice)
223-
results[i] = &operations[i]
228+
// bart already cloned via our Cloner interface, modify directly
229+
// Remove returns an error if remediation doesn't exist (duplicate delete)
230+
// We already checked origin above, so this should succeed
231+
err := existingData.Remove(valueLog, op.R)
232+
if errors.Is(err, ErrRemediationNotFound) {
233+
// This shouldn't happen since we checked above, but handle it gracefully
234+
if valueLog != nil {
235+
valueLog.Trace("remediation not found after origin check, duplicate delete")
236+
}
237+
results[i] = nil
238+
} else {
239+
// Remediation was successfully removed - return pointer for metrics
240+
// Use index to get pointer to original operation (safe since we don't modify the slice)
241+
results[i] = &operations[i]
242+
}
224243

225244
if existingData.IsEmpty() {
226245
if valueLog != nil {
@@ -229,7 +248,7 @@ func (s *BartUnifiedIPSet) RemoveBatch(operations []BartRemoveOp) []*BartRemoveO
229248
return existingData, true // true = delete the prefix (it's now empty)
230249
}
231250
if valueLog != nil {
232-
valueLog.Trace("removed ID from existing prefix")
251+
valueLog.Trace("removed remediation from existing prefix")
233252
}
234253
return existingData, false // false = don't delete, keep modified data
235254
})
@@ -244,7 +263,7 @@ func (s *BartUnifiedIPSet) RemoveBatch(operations []BartRemoveOp) []*BartRemoveO
244263
// Contains checks if an IP address matches any prefix in the bart table.
245264
// Returns the longest matching prefix's remediation and origin.
246265
// This method uses lock-free reads via atomic pointer for optimal performance.
247-
func (s *BartUnifiedIPSet) Contains(ip netip.Addr) (remediation.Remediation, string) {
266+
func (s *BartRangeSet) Contains(ip netip.Addr) (remediation.Remediation, string) {
248267
// Lock-free read: atomically load the current table pointer
249268
table := s.tableAtomicPtr.Load()
250269

@@ -275,3 +294,54 @@ func (s *BartUnifiedIPSet) Contains(ip netip.Addr) (remediation.Remediation, str
275294
}
276295
return remediationResult, origin
277296
}
297+
298+
// HasRemediation checks if an exact prefix has a specific remediation with a specific origin.
299+
// Uses Get() for exact prefix lookup (not LPM like Contains/Lookup).
300+
// Returns true if the exact prefix exists and has the given remediation with the given origin.
301+
// This method uses lock-free reads via atomic pointer for optimal performance.
302+
func (s *BartRangeSet) HasRemediation(prefix netip.Prefix, r remediation.Remediation, origin string) bool {
303+
// Lock-free read: atomically load the current table pointer
304+
table := s.tableAtomicPtr.Load()
305+
306+
// Check for nil table (not yet initialized)
307+
if table == nil {
308+
return false
309+
}
310+
311+
// Use Get() for exact prefix lookup (not LPM)
312+
maskedPrefix := prefix.Masked()
313+
data, found := table.Get(maskedPrefix)
314+
if !found {
315+
return false
316+
}
317+
318+
return data.HasRemediationWithOrigin(r, origin)
319+
}
320+
321+
// GetOriginForRemediation returns the origin for a specific remediation on an exact prefix.
322+
// Uses Get() for exact prefix lookup (not LPM).
323+
// Returns the origin and true if the exact prefix exists and has the given remediation, false otherwise.
324+
// This method uses lock-free reads via atomic pointer for optimal performance.
325+
func (s *BartRangeSet) GetOriginForRemediation(prefix netip.Prefix, r remediation.Remediation) (string, bool) {
326+
// Lock-free read: atomically load the current table pointer
327+
table := s.tableAtomicPtr.Load()
328+
329+
// Check for nil table (not yet initialized)
330+
if table == nil {
331+
return "", false
332+
}
333+
334+
// Use Get() for exact prefix lookup (not LPM)
335+
maskedPrefix := prefix.Masked()
336+
data, found := table.Get(maskedPrefix)
337+
if !found {
338+
return "", false
339+
}
340+
341+
// Check if the remediation exists and return its origin
342+
if existingOrigin, ok := data[r]; ok {
343+
return existingOrigin, true
344+
}
345+
346+
return "", false
347+
}

0 commit comments

Comments
 (0)