Skip to content

Commit 164bc0b

Browse files
fix(distributor): Keep stream sharding enabled with empty overrides (#21829)
The rate store now treats empty tenant overrides the same as missing tenant overrides when deciding whether any stream sharding is enabled. This keeps default limits_config.shard_streams settings active when the runtime overrides file is empty.
1 parent 3a3245b commit 164bc0b

2 files changed

Lines changed: 33 additions & 3 deletions

File tree

pkg/distributor/ratestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (s *rateStore) wasUpdated(tenantID string, streamID uint64, lastUpdated map
219219

220220
func (s *rateStore) anyShardingEnabled() bool {
221221
limits := s.limits.AllByUserID()
222-
if limits == nil {
222+
if len(limits) == 0 {
223223
// There aren't any tenant limits, check the default
224224
return s.limits.ShardStreams("fake").Enabled
225225
}

pkg/distributor/ratestore_test.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,27 @@ func TestRateStore(t *testing.T) {
140140
requireRatesAndPushesEqual(t, 0, 0, tc.rateStore, "tenant 1", 0)
141141
})
142142

143+
t.Run("it uses default sharding limits when overrides are empty", func(t *testing.T) {
144+
tc := setupWithOverrides(&fakeOverrides{
145+
enabled: true,
146+
tenantLimits: map[string]*validation.Limits{},
147+
})
148+
tc.ring.replicationSet = ring.ReplicationSet{
149+
Instances: []ring.InstanceDesc{
150+
{Addr: "ingester0"},
151+
},
152+
}
153+
154+
tc.clientPool.clients = map[string]client.PoolClient{
155+
"ingester0": newRateClient([]*logproto.StreamRate{
156+
{Tenant: "tenant 1", StreamHash: 1, StreamHashNoShard: 0, Rate: 25},
157+
}),
158+
}
159+
160+
require.NoError(t, tc.rateStore.instrumentedUpdateAllRates(context.Background()))
161+
requireRatesAndPushesEqual(t, 25, 0, tc.rateStore, "tenant 1", 0)
162+
})
163+
143164
t.Run("it clears the rate after an interval", func(t *testing.T) {
144165
tc := setup(true)
145166
tc.ring.replicationSet = ring.ReplicationSet{
@@ -335,10 +356,15 @@ func (c *fakeStreamDataClient) GetStreamRates(_ context.Context, _ *logproto.Str
335356

336357
type fakeOverrides struct {
337358
Limits
338-
enabled bool
359+
enabled bool
360+
tenantLimits map[string]*validation.Limits
339361
}
340362

341363
func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits {
364+
if c.tenantLimits != nil {
365+
return c.tenantLimits
366+
}
367+
342368
return map[string]*validation.Limits{
343369
"ingester0": {
344370
ShardStreams: shardstreams.Config{
@@ -361,13 +387,17 @@ type testContext struct {
361387
}
362388

363389
func setup(shardingEnabled bool) *testContext {
390+
return setupWithOverrides(&fakeOverrides{enabled: shardingEnabled})
391+
}
392+
393+
func setupWithOverrides(overrides *fakeOverrides) *testContext {
364394
ring := newFakeRing()
365395
cp := newFakeClientPool()
366396
cfg := RateStoreConfig{MaxParallelism: 5, IngesterReqTimeout: time.Second, StreamRateUpdateInterval: 10 * time.Millisecond}
367397

368398
return &testContext{
369399
ring: ring,
370400
clientPool: cp,
371-
rateStore: NewRateStore(cfg, ring, cp, &fakeOverrides{enabled: shardingEnabled}, nil),
401+
rateStore: NewRateStore(cfg, ring, cp, overrides, nil),
372402
}
373403
}

0 commit comments

Comments
 (0)