Skip to content

Commit d8bd915

Browse files
committed
fix(store): deduplicate flags across flagSetIds in incremental update mode
Add a membership table to track which flagSetIds reference which flags, enabling per-flagSetId queries without storing duplicate flag entries. Flags are stored once per (key, source) in the flags table while the membership table records (flagSetId, key, source) triples. Key behaviors: - Get iterates all membership entries for a (flagSetId, key) and selects the highest-priority flag, consistent with GetAll/collectViaMembership. - updateIncremental upserts flag content when a (key, source) already exists at equal or lower priority, so content changes (targeting rules, variants) are not silently dropped. - Metadata always includes flagSetId even when the original metadata is nil, ensuring consistent serialization for clients. - Watch registers scoped per-(key, source) watch channels from memdb instead of watching the entire flags table, avoiding O(N_watchers x N_updates) spurious wake-ups.
1 parent 5fa86c6 commit d8bd915

4 files changed

Lines changed: 544 additions & 40 deletions

File tree

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package store
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"runtime"
7+
"testing"
8+
9+
"github.com/open-feature/flagd/core/pkg/logger"
10+
"github.com/open-feature/flagd/core/pkg/model"
11+
)
12+
13+
// TestMemoryUsage simulates the production workload
14+
// It measures actual heap usage for both dedup (incremental) and no-dedup (full snapshot) modes.
15+
func TestMemoryUsage(t *testing.T) {
16+
const (
17+
numProjects = 1000
18+
commonFlagCount = 1000
19+
totalUnique = 3000
20+
source = "grpc://features-api:443"
21+
)
22+
23+
// Build a realistic flag with variants, targeting, and metadata (~800B each)
24+
makeFlag := func(key string) model.Flag {
25+
return model.Flag{
26+
Key: key,
27+
State: "ENABLED",
28+
DefaultVariant: "off",
29+
Variants: map[string]any{
30+
"on": true,
31+
"off": false,
32+
},
33+
Targeting: json.RawMessage(`{"if":[{"in":["@example.com",{"var":"email"}]},"on","off"]}`),
34+
}
35+
}
36+
37+
// Pre-generate all unique flags
38+
allFlags := make([]model.Flag, totalUnique)
39+
for i := range totalUnique {
40+
allFlags[i] = makeFlag(fmt.Sprintf("flag-%04d", i))
41+
}
42+
43+
commonFlags := allFlags[:commonFlagCount]
44+
45+
// Distribute remaining flags across projects
46+
uniquePerProject := (totalUnique - commonFlagCount) / numProjects
47+
if uniquePerProject < 1 {
48+
uniquePerProject = 1
49+
}
50+
51+
// Build per-project flag sets
52+
type projectPayload struct {
53+
flagSetId string
54+
flags []model.Flag
55+
}
56+
projects := make([]projectPayload, numProjects)
57+
uniqueIdx := commonFlagCount
58+
for i := range numProjects {
59+
fsi := fmt.Sprintf("project-%04d", i)
60+
// Each project gets all common flags + a slice of the unique ones
61+
pFlags := make([]model.Flag, 0, commonFlagCount+uniquePerProject+1)
62+
pFlags = append(pFlags, commonFlags...)
63+
end := uniqueIdx + uniquePerProject
64+
if i < (totalUnique-commonFlagCount)%numProjects {
65+
end++ // distribute remainder across the first N projects
66+
}
67+
if end > totalUnique {
68+
end = totalUnique
69+
}
70+
for ; uniqueIdx < end; uniqueIdx++ {
71+
pFlags = append(pFlags, allFlags[uniqueIdx])
72+
}
73+
projects[i] = projectPayload{flagSetId: fsi, flags: pFlags}
74+
}
75+
76+
measure := func() uint64 {
77+
runtime.GC()
78+
runtime.GC()
79+
var m runtime.MemStats
80+
runtime.ReadMemStats(&m)
81+
return m.HeapAlloc
82+
}
83+
84+
t.Run("with_dedup_incremental", func(t *testing.T) {
85+
s, err := NewStore(logger.NewLogger(nil, false), []string{source})
86+
if err != nil {
87+
t.Fatal(err)
88+
}
89+
90+
before := measure()
91+
92+
for _, p := range projects {
93+
s.Update(source, p.flags, model.Metadata{"flagSetId": p.flagSetId}, true)
94+
}
95+
96+
after := measure()
97+
heapMB := float64(after-before) / (1024 * 1024)
98+
t.Logf("DEDUP: heap delta = %.1f MB (before=%d, after=%d)", heapMB, before, after)
99+
100+
// Sanity: verify flag count
101+
allF, _, _ := s.GetAll(nil, nil)
102+
t.Logf(" flags in store: %d (expected ~%d unique)", len(allF), totalUnique)
103+
})
104+
105+
t.Run("without_dedup_full_snapshot", func(t *testing.T) {
106+
s, err := NewStore(logger.NewLogger(nil, false), []string{source})
107+
if err != nil {
108+
t.Fatal(err)
109+
}
110+
111+
before := measure()
112+
113+
// Full snapshot: each project replaces the whole source, so we simulate
114+
// what the store would look like if all projects' flags were loaded at once
115+
// with unique flagSetIds (no dedup). Load all at once since full-snapshot
116+
// replaces per source.
117+
var allWithFSI []model.Flag
118+
for _, p := range projects {
119+
for _, f := range p.flags {
120+
f.Metadata = model.Metadata{"flagSetId": p.flagSetId}
121+
allWithFSI = append(allWithFSI, f)
122+
}
123+
}
124+
s.Update(source, allWithFSI, nil, false)
125+
126+
after := measure()
127+
heapMB := float64(after-before) / (1024 * 1024)
128+
t.Logf("NO DEDUP: heap delta = %.1f MB (before=%d, after=%d)", heapMB, before, after)
129+
130+
allF, _, _ := s.GetAll(nil, nil)
131+
t.Logf(" flags in store: %d (expected ~%d with duplicates)", len(allF), numProjects*(commonFlagCount+uniquePerProject))
132+
})
133+
}

core/pkg/store/query.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ const flagSetIdSourceCompoundIndex = flagSetIdIndex + "+" + sourceIndex
2424
const keySourceCompoundIndex = keyIndex + "+" + sourceIndex
2525
const flagSetIdKeySourceCompoundIndex = flagSetIdIndex + "+" + keyIndex + "+" + sourceIndex
2626

27+
// membership table for incremental update deduplication
28+
const membershipTable = "membership"
29+
const membershipFlagSetIdKeyIndex = flagSetIdIndex + "+" + keyIndex
30+
2731
// flagSetId defaults to a UUID generated at startup to make our queries consistent
2832
// any flag without a "flagSetId" is assigned this one; it's never exposed externally
2933
var nilFlagSetId = uuid.New().String()
@@ -83,6 +87,15 @@ func (s *Selector) IsEmpty() bool {
8387
return s == nil || len(s.indexMap) == 0
8488
}
8589

90+
// HasFlagSetId returns the flagSetId value and true if the selector includes a flagSetId constraint.
91+
func (s *Selector) HasFlagSetId() (string, bool) {
92+
if s == nil || s.indexMap == nil {
93+
return "", false
94+
}
95+
v, ok := s.indexMap[flagSetIdIndex]
96+
return v, ok && v != ""
97+
}
98+
8699
// ToQuery converts the Selector map to an indexId and constraints for querying the Store.
87100
// For a given index, a specific order and number of constraints are required.
88101
// Both the indexId and constraints are generated based on the keys present in the selector's internal map.

0 commit comments

Comments
 (0)