From bbb134ea2fe84d363fdb9136469ab0578e002903 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 11 Apr 2025 12:21:40 +0530 Subject: [PATCH] fix timeout and series progress marker for same requests with different shards --- .../deletion/delete_requests_manager.go | 22 ++++++---- .../deletion/delete_requests_manager_test.go | 44 +++++++++++++++++-- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index 55a566679bc38..1ed791f1270e7 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -343,7 +343,7 @@ func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels, } // The delete request touches the series. Do not skip if the series is not processed yet. - if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; !ok { + if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; !ok { return false } } @@ -366,7 +366,7 @@ func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls var filterFuncs []filter.Func for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests { - if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; ok { + if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; ok { continue } isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk) @@ -478,9 +478,13 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err) } - d.processedSeries = map[string]struct{}{} - if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) { - level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err) + // When we hit a timeout, MarkPhaseTimedOut is called to clear the list of delete requests to avoid marking delete requests as processed. + // Since this method is still called when we hit a timeout, we do not want to drop the progress so that deletion skips the already processed streams. + if len(d.deleteRequestsToProcess) > 0 { + d.processedSeries = map[string]struct{}{} + if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) { + level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err) + } } } @@ -513,9 +517,9 @@ func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, l if !labels.Selector(req.matchers).Matches(lbls) { continue } - processedSeriesKey := buildProcessedSeriesKey(req.RequestID, seriesID, tableName) + processedSeriesKey := buildProcessedSeriesKey(req.RequestID, req.StartTime, req.EndTime, seriesID, tableName) if _, ok := d.processedSeries[processedSeriesKey]; ok { - return fmt.Errorf("series for [table: %s, series: %s, user: %s, req: %s]", tableName, seriesID, userID, req.RequestID) + return fmt.Errorf("series already marked as processed: [table: %s, user: %s, req_id: %s, start: %d, end: %d, series: %s]", tableName, userID, req.RequestID, req.StartTime, req.EndTime, seriesID) } d.processedSeries[processedSeriesKey] = struct{}{} } @@ -523,8 +527,8 @@ func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, l return nil } -func buildProcessedSeriesKey(requestID string, seriesID []byte, tableName string) string { - return fmt.Sprintf("%s/%s/%s", requestID, tableName, seriesID) +func buildProcessedSeriesKey(requestID string, startTime, endTime model.Time, seriesID []byte, tableName string) string { + return fmt.Sprintf("%s/%d/%d/%s/%s", requestID, startTime, endTime, tableName, seriesID) } func getMaxRetentionInterval(userID string, limits Limits) time.Duration { diff --git a/pkg/compactor/deletion/delete_requests_manager_test.go b/pkg/compactor/deletion/delete_requests_manager_test.go index f586cdf0e1923..628bfcc6a3a2e 100644 --- a/pkg/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/compactor/deletion/delete_requests_manager_test.go @@ -1019,10 +1019,6 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { user2 := []byte("user2") lblFooBar := mustParseLabel(`{foo="bar"}`) lblFizzBuzz := mustParseLabel(`{fizz="buzz"}`) - deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{ - {RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived}, - {RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived}, - }} type markSeriesProcessed struct { userID, seriesID []byte lbls labels.Labels @@ -1189,6 +1185,11 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { workingDir := t.TempDir() + deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{ + {RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived}, + {RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived}, + }} + mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, err) require.NoError(t, mgr.loadDeleteRequestsToProcess()) @@ -1207,6 +1208,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, err) require.Equal(t, storedSeriesProgress, mgr.processedSeries) + require.NoError(t, mgr.loadDeleteRequestsToProcess()) // when the mark phase ends, series progress should get cleared mgr.MarkPhaseFinished() @@ -1216,6 +1218,40 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { } } +func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) { + workingDir := t.TempDir() + + user1 := []byte("user1") + lblFooBar := mustParseLabel(`{foo="bar"}`) + deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{ + {RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived}, + {RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 100, EndTime: 200, Status: StatusReceived}, + }} + + mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) + require.NoError(t, err) + require.NoError(t, mgr.loadDeleteRequestsToProcess()) + + require.NoError(t, mgr.MarkSeriesAsProcessed(user1, []byte(lblFooBar.String()), lblFooBar, "t1")) + + // timeout the retention processing + mgr.MarkPhaseTimedOut() + + // timeout should not clear the series progress + mgr.MarkPhaseFinished() + require.Len(t, mgr.processedSeries, 2) + require.NoError(t, mgr.storeSeriesProgress()) + require.FileExists(t, filepath.Join(workingDir, seriesProgressFilename)) + + // load the requests again for processing + require.NoError(t, mgr.loadDeleteRequestsToProcess()) + + // not hitting the timeout should clear the series progress + mgr.MarkPhaseFinished() + require.Len(t, mgr.processedSeries, 0) + require.NoFileExists(t, filepath.Join(workingDir, seriesProgressFilename)) +} + type storeAddReqDetails struct { userID, query string startTime, endTime model.Time