Skip to content

fix: fix timeout and series progress marker for same requests with different shards #17125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels,
}

// The delete request touches the series. Do not skip if the series is not processed yet.
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; !ok {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; !ok {
return false
}
}
Expand All @@ -366,7 +366,7 @@ func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls
var filterFuncs []filter.Func

for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; ok {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; ok {
continue
}
isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk)
Expand Down Expand Up @@ -478,9 +478,13 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}

d.processedSeries = map[string]struct{}{}
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
// When we hit a timeout, MarkPhaseTimedOut is called to clear the list of delete requests to avoid marking delete requests as processed.
// Since this method is still called when we hit a timeout, we do not want to drop the progress so that deletion skips the already processed streams.
if len(d.deleteRequestsToProcess) > 0 {
d.processedSeries = map[string]struct{}{}
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
}
}
}

Expand Down Expand Up @@ -513,18 +517,18 @@ func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, l
if !labels.Selector(req.matchers).Matches(lbls) {
continue
}
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, seriesID, tableName)
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, req.StartTime, req.EndTime, seriesID, tableName)
if _, ok := d.processedSeries[processedSeriesKey]; ok {
return fmt.Errorf("series for [table: %s, series: %s, user: %s, req: %s]", tableName, seriesID, userID, req.RequestID)
return fmt.Errorf("series already marked as processed: [table: %s, user: %s, req_id: %s, start: %d, end: %d, series: %s]", tableName, userID, req.RequestID, req.StartTime, req.EndTime, seriesID)
}
d.processedSeries[processedSeriesKey] = struct{}{}
}

return nil
}

func buildProcessedSeriesKey(requestID string, seriesID []byte, tableName string) string {
return fmt.Sprintf("%s/%s/%s", requestID, tableName, seriesID)
func buildProcessedSeriesKey(requestID string, startTime, endTime model.Time, seriesID []byte, tableName string) string {
return fmt.Sprintf("%s/%d/%d/%s/%s", requestID, startTime, endTime, tableName, seriesID)
}

func getMaxRetentionInterval(userID string, limits Limits) time.Duration {
Expand Down
44 changes: 40 additions & 4 deletions pkg/compactor/deletion/delete_requests_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,10 +1019,6 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
user2 := []byte("user2")
lblFooBar := mustParseLabel(`{foo="bar"}`)
lblFizzBuzz := mustParseLabel(`{fizz="buzz"}`)
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
}}
type markSeriesProcessed struct {
userID, seriesID []byte
lbls labels.Labels
Expand Down Expand Up @@ -1189,6 +1185,11 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
workingDir := t.TempDir()
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
}}

mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
Expand All @@ -1207,6 +1208,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.Equal(t, storedSeriesProgress, mgr.processedSeries)
require.NoError(t, mgr.loadDeleteRequestsToProcess())

// when the mark phase ends, series progress should get cleared
mgr.MarkPhaseFinished()
Expand All @@ -1216,6 +1218,40 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
}
}

func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) {
workingDir := t.TempDir()

user1 := []byte("user1")
lblFooBar := mustParseLabel(`{foo="bar"}`)
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 100, EndTime: 200, Status: StatusReceived},
}}

mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())

require.NoError(t, mgr.MarkSeriesAsProcessed(user1, []byte(lblFooBar.String()), lblFooBar, "t1"))

// timeout the retention processing
mgr.MarkPhaseTimedOut()

// timeout should not clear the series progress
mgr.MarkPhaseFinished()
require.Len(t, mgr.processedSeries, 2)
require.NoError(t, mgr.storeSeriesProgress())
require.FileExists(t, filepath.Join(workingDir, seriesProgressFilename))

// load the requests again for processing
require.NoError(t, mgr.loadDeleteRequestsToProcess())

// not hitting the timeout should clear the series progress
mgr.MarkPhaseFinished()
require.Len(t, mgr.processedSeries, 0)
require.NoFileExists(t, filepath.Join(workingDir, seriesProgressFilename))
}

type storeAddReqDetails struct {
userID, query string
startTime, endTime model.Time
Expand Down