Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* [BUGFIX] Query-frontend: Fix incorrect annotation position information when running sharding inside MQE. #13484
* [BUGFIX] Query-frontend: Fix incorrect query results when evaluating some sharded aggregations with `without` when running sharding inside MQE. #13484
* [BUGFIX]: Ingester: Panic when push and read reactive limiters are enabled with prioritization. #13482
* [BUGFIX]: Usage-tracker: Prevent tracking requests to be handled by partition handlers that are not in Running state. #13532

### Mixin

Expand Down
32 changes: 21 additions & 11 deletions pkg/usagetracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,12 @@ func (t *UsageTracker) stop(_ error) error {

// TrackSeries implements usagetrackerpb.UsageTrackerServer.
func (t *UsageTracker) TrackSeries(_ context.Context, req *usagetrackerpb.TrackSeriesRequest) (*usagetrackerpb.TrackSeriesResponse, error) {
t.partitionsMtx.RLock()
p, ok := t.partitions[req.Partition]
t.partitionsMtx.RUnlock()
if !ok {
return nil, fmt.Errorf("partition handler %d not found", req.Partition)
partition := req.Partition
p, err := t.runningPartition(partition)
if err != nil {
return nil, err
}

rejected, err := p.store.trackSeries(context.Background(), req.UserID, req.SeriesHashes, time.Now())
if err != nil {
return nil, err
Expand All @@ -681,12 +681,9 @@ func (t *UsageTracker) TrackSeries(_ context.Context, req *usagetrackerpb.TrackS
// GetUsersCloseToLimit implements usagetrackerpb.UsageTrackerServer.
func (t *UsageTracker) GetUsersCloseToLimit(_ context.Context, req *usagetrackerpb.GetUsersCloseToLimitRequest) (*usagetrackerpb.GetUsersCloseToLimitResponse, error) {
partition := req.Partition

t.partitionsMtx.RLock()
p, ok := t.partitions[partition]
t.partitionsMtx.RUnlock()
if !ok {
return nil, fmt.Errorf("partition handler %d not found", partition)
p, err := t.runningPartition(partition)
if err != nil {
return nil, err
}

userIDs := p.store.getSortedUsersCloseToLimit()
Expand All @@ -696,6 +693,19 @@ func (t *UsageTracker) GetUsersCloseToLimit(_ context.Context, req *usagetracker
}, nil
}

func (t *UsageTracker) runningPartition(partition int32) (*partitionHandler, error) {
t.partitionsMtx.RLock()
p, ok := t.partitions[partition]
t.partitionsMtx.RUnlock()
if !ok {
return nil, fmt.Errorf("partition handler %d not found", partition)
}
if p.State() != services.Running {
return nil, fmt.Errorf("partition handler %d is not running (state: %s)", partition, p.State())
}
return p, nil
}

// CheckReady performs a readiness check.
// An instance is ready when it has instantiated all the partitions that should belong to it according to the ring.
func (t *UsageTracker) CheckReady(_ context.Context) error {
Expand Down
101 changes: 70 additions & 31 deletions pkg/usagetracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ func TestUsageTracker_Tracking(t *testing.T) {
require.Len(t, resp.RejectedSeriesHashes, 1)
})

t.Run("should not use partitions that are not in running state", func(t *testing.T) {
t.Parallel()

tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
"tenant": {
MaxActiveSeriesPerUser: testPartitionsCount, // one series per partition.
MaxGlobalSeriesPerUser: testPartitionsCount * 100,
},
})
withRLock(&tracker.partitionsMtx, func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), tracker.partitions[0]))
})

_, err := tracker.TrackSeries(t.Context(), &usagetrackerpb.TrackSeriesRequest{
UserID: "tenant",
Partition: 0,
SeriesHashes: []uint64{0, 1},
})
require.Error(t, err)
require.ErrorContains(t, err, "partition handler 0 is not running (state: Terminated)")
})

t.Run("applies global series limit when configured", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -530,44 +552,61 @@ func TestUsageTracker_PartitionAssignment(t *testing.T) {
}

func TestUsageTracker_GetUsersCloseToLimit(t *testing.T) {
makeSeries := func(n int) []uint64 {
series := make([]uint64, n)
for i := range series {
series[i] = uint64(i)
t.Run("happy case", func(t *testing.T) {
makeSeries := func(n int) []uint64 {
series := make([]uint64, n)
for i := range series {
series[i] = uint64(i)
}
return series
}
return series
}

tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
"a": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
"b": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
"c": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
"d": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
"e": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
})

for _, tenant := range []string{"a", "b", "c", "d", "e"} {
resp, err := tracker.TrackSeries(t.Context(), &usagetrackerpb.TrackSeriesRequest{
UserID: tenant,
Partition: 0,
SeriesHashes: makeSeries(900),
tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
"a": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
"b": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
"c": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
"d": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
"e": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
})
require.NoError(t, err)
require.Empty(t, resp.RejectedSeriesHashes)
}

// Call updateLimits (on all partitions, although we only need partition 0.
withRLock(&tracker.partitionsMtx, func() {
for _, p := range tracker.partitions {
done := make(chan struct{})
p.forceUpdateLimitsForTests <- done
<-done
for _, tenant := range []string{"a", "b", "c", "d", "e"} {
resp, err := tracker.TrackSeries(t.Context(), &usagetrackerpb.TrackSeriesRequest{
UserID: tenant,
Partition: 0,
SeriesHashes: makeSeries(900),
})
require.NoError(t, err)
require.Empty(t, resp.RejectedSeriesHashes)
}

// Call updateLimits (on all partitions, although we only need partition 0.
withRLock(&tracker.partitionsMtx, func() {
for _, p := range tracker.partitions {
done := make(chan struct{})
p.forceUpdateLimitsForTests <- done
<-done
}
})

resp, err := tracker.GetUsersCloseToLimit(t.Context(), &usagetrackerpb.GetUsersCloseToLimitRequest{Partition: 0})
require.NoError(t, err)
require.Equal(t, []string{"a", "c", "e"}, resp.SortedUserIds, "List of users close to the limit should be sorted lexicographically")
})

resp, err := tracker.GetUsersCloseToLimit(t.Context(), &usagetrackerpb.GetUsersCloseToLimitRequest{Partition: 0})
require.NoError(t, err)
require.Equal(t, []string{"a", "c", "e"}, resp.SortedUserIds, "List of users close to the limit should be sorted lexicographically")
t.Run("partition handler is not running", func(t *testing.T) {
tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
"a": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
})

// Call updateLimits (on all partitions, although we only need partition 0.
withRLock(&tracker.partitionsMtx, func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), tracker.partitions[0]))
})

_, err := tracker.GetUsersCloseToLimit(t.Context(), &usagetrackerpb.GetUsersCloseToLimitRequest{Partition: 0})
require.Error(t, err)
require.ErrorContains(t, err, "partition handler 0 is not running (state: Terminated)")
})
}

func callPrepareDownscaleEndpoint(t *testing.T, ut *UsageTracker, method string) {
Expand Down
Loading