Skip to content

Commit 89df197

Browse files
authored
[r387] Revert ingester Kafka record timestamps for active series (#14611, #14744) (#15040)
Backport of #15036 to `r387`. On `r387` the ingester code hasn't been split into separate files (`ingester_compaction.go`, `ingester_metrics_update.go`, `ingester_push.go`) yet, so the automated cherry-pick failed with conflicts. The equivalent changes were applied directly to `ingester.go`. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes affect ingester active-series purging/loading timing and early-compaction decision inputs, which can alter memory/metric behavior under ingest-storage replay or Kafka lag; scope is contained to ingester/ingest-storage plumbing. > > **Overview** > This PR reverts the ingest-storage behavior that drove *active series purging/loading and related compaction heuristics* off Kafka record timestamps, returning those paths to wall-clock time. > > It removes the ingester’s Kafka-timestamp tracking fields and the inline, Kafka-time-based `updateActiveSeries` triggering during record consumption, and updates the ingest pusher to stop propagating record timestamps via context. Related tests and the changelog entry covering Kafka-timestamp-based active series behavior are removed/updated accordingly. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 966ef95. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 155d9dc commit 89df197

6 files changed

Lines changed: 21 additions & 442 deletions

File tree

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@
139139
* [ENHANCEMENT] Querier: Add new config flag `querier.enable-delayed-name-removal-prometheus-engine` to enable delayed name removal for Prometheus engine. #14349
140140
* [ENHANCEMENT] Ingester: reduce heap usage during streaming chunk queries by releasing series label memory after each batch is sent rather than holding it until chunk streaming completes. #14422
141141
* [ENHANCEMENT] Ingester: Eliminate 20-minute active series metrics loading period when custom tracker or cost attribution configuration changes. Active series counts are now immediately correct after a config reload. #14537
142-
* [ENHANCEMENT] Ingester: Use Kafka record timestamps instead of wall-clock time for active series tracking, grace period validation, and purging when ingest storage is enabled. This prevents premature purging or stale series caused by lag between record production and consumption. #14611 #14744
143142
* [ENHANCEMENT] Ingester: Export `cortex_ingester_active_series_loading` gauge metric that is `1` while active series counts are still warming up after ingester startup, and `0` once they are accurate (after IdleTimeout has elapsed). #14783
144143
* [ENHANCEMENT] Ingest storage: Allow configuring multiple Kafka seed brokers via `-ingest-storage.kafka.address` (comma-separated). #14328
145144
* [ENHANCEMENT] MQE: Add experimental support for eliminating selectors that are a subset of another selector. Enable with `-querier.mimir-query-engine.enable-subset-selector-elimination=true`. #14456 #14457 #14546 #14559 #14561 #14621

pkg/ingester/ingester.go

Lines changed: 13 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -378,22 +378,6 @@ type Ingester struct {
378378
ingestPartitionID int32
379379
ingestPartitionLifecycler *ring.PartitionInstanceLifecycler
380380

381-
// latestKafkaRecordTimestamp tracks the most recent Kafka record timestamp
382-
// seen by the ingester (unix milliseconds). Used to provide a Kafka-time-aware
383-
// "now" for active series purging when ingest storage is enabled.
384-
latestKafkaRecordTimestamp atomic.Int64
385-
386-
// lastKafkaActiveSeriesUpdate tracks the last Kafka time (unix milliseconds) at which
387-
// updateActiveSeries was triggered inline during record consumption. This ensures
388-
// active series are purged at approximately UpdatePeriod intervals in Kafka time,
389-
// even when records are replayed faster than real-time.
390-
lastKafkaActiveSeriesUpdate atomic.Int64
391-
392-
// activeSeriesStartMs tracks when the ingester started receiving samples (unix milliseconds).
393-
// For classic mode this is wall-clock time when the ticker starts; for Kafka mode it is the
394-
// first Kafka record timestamp. Used to determine when active series counts become accurate.
395-
activeSeriesStartMs atomic.Int64
396-
397381
circuitBreaker ingesterCircuitBreaker
398382
reactiveLimiter *ingesterReactiveLimiter
399383
}
@@ -880,8 +864,9 @@ func (i *Ingester) metricsUpdaterServiceRunning(ctx context.Context) error {
880864
defer ingestionRateTicker.Stop()
881865

882866
var activeSeriesTickerChan <-chan time.Time
883-
if i.cfg.ActiveSeriesMetrics.Enabled && !i.cfg.IngestStorageConfig.Enabled {
884-
i.activeSeriesStartMs.Store(time.Now().UnixMilli())
867+
var activeSeriesStartedAt time.Time
868+
if i.cfg.ActiveSeriesMetrics.Enabled {
869+
activeSeriesStartedAt = time.Now()
885870
t := time.NewTicker(i.cfg.ActiveSeriesMetrics.UpdatePeriod)
886871
activeSeriesTickerChan = t.C
887872
defer t.Stop()
@@ -905,7 +890,12 @@ func (i *Ingester) metricsUpdaterServiceRunning(ctx context.Context) error {
905890
}
906891
i.tsdbsMtx.RUnlock()
907892
case <-activeSeriesTickerChan:
908-
i.updateActiveSeries(i.activeSeriesNow())
893+
now := time.Now()
894+
if !activeSeriesStartedAt.IsZero() && now.Sub(activeSeriesStartedAt) >= i.cfg.ActiveSeriesMetrics.IdleTimeout {
895+
i.metrics.activeSeriesLoading.Set(0)
896+
activeSeriesStartedAt = time.Time{} // Only flip once.
897+
}
898+
i.updateActiveSeries(now)
909899
case <-usageStatsUpdateTicker.C:
910900
i.updateUsageStats()
911901
case <-limitMetricsUpdateTicker.C:
@@ -916,23 +906,7 @@ func (i *Ingester) metricsUpdaterServiceRunning(ctx context.Context) error {
916906
}
917907
}
918908

919-
func (i *Ingester) activeSeriesNow() time.Time {
920-
if i.cfg.IngestStorageConfig.Enabled {
921-
if tsMs := i.latestKafkaRecordTimestamp.Load(); tsMs > 0 {
922-
return time.UnixMilli(tsMs)
923-
}
924-
}
925-
return time.Now()
926-
}
927-
928909
func (i *Ingester) updateActiveSeries(now time.Time) {
929-
if startMs := i.activeSeriesStartMs.Load(); startMs > 0 &&
930-
now.UnixMilli()-startMs >= i.cfg.ActiveSeriesMetrics.IdleTimeout.Milliseconds() {
931-
// Active series counts has passed the warm up.
932-
// Mark the loading phase off to signal the counts are now accurate.
933-
i.metrics.activeSeriesLoading.Set(0)
934-
}
935-
936910
for _, userID := range i.getTSDBUsers() {
937911
userDB := i.getTSDB(userID)
938912
if userDB == nil {
@@ -1387,8 +1361,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
13871361
spanlog.DebugLog("event", "acquired append lock")
13881362

13891363
var (
1390-
startAppend = time.Now()
1391-
appendWallClockStart = startAppend
1364+
startAppend = time.Now()
13921365

13931366
// Keep track of some stats which are tracked only if the samples will be
13941367
// successfully committed
@@ -1492,10 +1465,6 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
14921465
)
14931466
)
14941467

1495-
if ts, ok := ingest.RecordTimestampFromContext(ctx); ok {
1496-
startAppend = ts
1497-
}
1498-
14991468
// Walk the samples, appending them to the users database
15001469
app := db.Appender(ctx).(extendedAppender)
15011470
spanlog.DebugLog("event", "got appender for timeseries", "series", len(req.Timeseries))
@@ -1529,7 +1498,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
15291498
}
15301499

15311500
// At this point all samples have been added to the appender, so we can track the time it took.
1532-
i.metrics.appenderAddDuration.Observe(time.Since(appendWallClockStart).Seconds())
1501+
i.metrics.appenderAddDuration.Observe(time.Since(startAppend).Seconds())
15331502

15341503
spanlog.DebugLog(
15351504
"event", "start commit",
@@ -1563,32 +1532,6 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
15631532
appendedSamplesStats.Inc(int64(stats.succeededSamplesCount))
15641533
appendedExemplarsStats.Inc(int64(stats.succeededExemplarsCount))
15651534

1566-
if ts, ok := ingest.RecordTimestampFromContext(ctx); ok {
1567-
tsMs := ts.UnixMilli()
1568-
for {
1569-
cur := i.latestKafkaRecordTimestamp.Load()
1570-
if tsMs <= cur || i.latestKafkaRecordTimestamp.CompareAndSwap(cur, tsMs) {
1571-
break
1572-
}
1573-
}
1574-
1575-
i.activeSeriesStartMs.CompareAndSwap(0, tsMs)
1576-
1577-
if i.cfg.ActiveSeriesMetrics.Enabled {
1578-
updatePeriodMs := i.cfg.ActiveSeriesMetrics.UpdatePeriod.Milliseconds()
1579-
for {
1580-
lastUpdate := i.lastKafkaActiveSeriesUpdate.Load()
1581-
if tsMs-lastUpdate < updatePeriodMs {
1582-
break
1583-
}
1584-
if i.lastKafkaActiveSeriesUpdate.CompareAndSwap(lastUpdate, tsMs) {
1585-
i.updateActiveSeries(ts)
1586-
break
1587-
}
1588-
}
1589-
}
1590-
}
1591-
15921535
group := i.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(i.limits, userID, req.Timeseries), startAppend)
15931536

15941537
i.updateMetricsFromPushStats(userID, group, &stats, req.Source, db, i.metrics.discarded)
@@ -3383,10 +3326,10 @@ func (i *Ingester) compactionServiceRunning(ctx context.Context) error {
33833326
i.compactBlocks(ctx, false, 0, nil)
33843327

33853328
// Check if any TSDB Head should be compacted to reduce the number of in-memory series.
3386-
i.compactBlocksToReduceInMemorySeries(ctx, i.activeSeriesNow())
3329+
i.compactBlocksToReduceInMemorySeries(ctx, time.Now())
33873330

33883331
// Check if any TSDB Head should be compacted based on per-tenant owned series thresholds.
3389-
i.compactBlocksToReducePerTenantOwnedSeries(ctx, i.activeSeriesNow())
3332+
i.compactBlocksToReducePerTenantOwnedSeries(ctx, time.Now())
33903333

33913334
// Decrement the counter after compaction is complete
33923335
i.numCompactionsInProgress.Dec()

pkg/ingester/ingester_ingest_storage_test.go

Lines changed: 0 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,180 +1143,6 @@ func createTestIngesterWithIngestStorage(
11431143
return ingester, kafkaCluster, prw
11441144
}
11451145

1146-
func TestIngester_ActiveSeriesPurgeWithKafkaTimestamps(t *testing.T) {
1147-
var (
1148-
ctx = context.Background()
1149-
cfg = defaultIngesterTestConfig(t)
1150-
reg = prometheus.NewRegistry()
1151-
user1 = "user-1"
1152-
user2 = "user-2"
1153-
baseTime = time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
1154-
idleTimeout = 20 * time.Minute
1155-
)
1156-
1157-
cfg.ActiveSeriesMetrics.Enabled = true
1158-
cfg.ActiveSeriesMetrics.IdleTimeout = idleTimeout
1159-
// Use a long update period so that the wall-clock ticker never fires during this test.
1160-
// Only the inline Kafka-time-based purge trigger should update active series.
1161-
cfg.ActiveSeriesMetrics.UpdatePeriod = 10 * time.Minute
1162-
1163-
overrides := validation.NewOverrides(defaultLimitsTestConfig(), nil)
1164-
ingester, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, nil, reg, util_test.NewTestingLogger(t))
1165-
1166-
partitionID, err := ingest.IngesterPartitionID(cfg.IngesterRing.InstanceID)
1167-
require.NoError(t, err)
1168-
1169-
kafkaClient, err := kgo.NewClient(
1170-
kgo.SeedBrokers(cfg.IngestStorageConfig.KafkaConfig.Address...),
1171-
kgo.RecordPartitioner(kgo.ManualPartitioner()),
1172-
)
1173-
require.NoError(t, err)
1174-
t.Cleanup(kafkaClient.Close)
1175-
1176-
produceRecord := func(tenantID string, seriesName string, kafkaTimestamp time.Time) {
1177-
wreq := &mimirpb.WriteRequest{
1178-
Timeseries: []mimirpb.PreallocTimeseries{{
1179-
TimeSeries: &mimirpb.TimeSeries{
1180-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(model.MetricNameLabel, seriesName)),
1181-
Samples: []mimirpb.Sample{{TimestampMs: kafkaTimestamp.UnixMilli(), Value: 1}},
1182-
},
1183-
}},
1184-
Source: mimirpb.API,
1185-
}
1186-
data, err := wreq.Marshal()
1187-
require.NoError(t, err)
1188-
1189-
results := kafkaClient.ProduceSync(ctx, &kgo.Record{
1190-
Topic: cfg.IngestStorageConfig.KafkaConfig.Topic,
1191-
Key: []byte(tenantID),
1192-
Value: data,
1193-
Headers: []kgo.RecordHeader{ingest.RecordVersionHeader(1)},
1194-
Partition: partitionID,
1195-
Timestamp: kafkaTimestamp,
1196-
})
1197-
require.NoError(t, results.FirstErr())
1198-
}
1199-
1200-
require.NoError(t, services.StartAndAwaitRunning(ctx, ingester))
1201-
t.Cleanup(func() {
1202-
require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester))
1203-
})
1204-
1205-
// Phase 1: Produce 20 series for user1 at T0.
1206-
// The first record triggers an inline purge (Kafka time jumps from 0 to T0 > UpdatePeriod),
1207-
// but since no series exist yet, it has no effect on counts.
1208-
for i := 0; i < 20; i++ {
1209-
produceRecord(user1, fmt.Sprintf("series_%d", i), baseTime)
1210-
}
1211-
1212-
// Phase 2: Produce 20 series for user2 at T0+21min (past idle timeout for user1).
1213-
// The first record triggers an inline purge because Kafka time advanced by 21min > UpdatePeriod (10min).
1214-
// This purge runs with now=T0+21min, which purges user1's series (last seen at T0, idle for 21min > 20min timeout).
1215-
// After the purge, user2 has 1 active series (just added), but user1's 20 series are purged.
1216-
futureTime := baseTime.Add(idleTimeout + 1*time.Minute)
1217-
for i := 0; i < 20; i++ {
1218-
produceRecord(user2, fmt.Sprintf("series_%d", i), futureTime)
1219-
}
1220-
1221-
// Phase 3: Produce one more record at T0+32min to trigger another inline purge
1222-
// (advances by 11min from T0+21min > UpdatePeriod of 10min). This purge updates the
1223-
// active series counts to reflect all 20 of user2's series.
1224-
finalTime := futureTime.Add(cfg.ActiveSeriesMetrics.UpdatePeriod + 1*time.Minute)
1225-
produceRecord(user2, "series_final", finalTime)
1226-
1227-
// Wait for all records to be consumed and the inline purge to update metrics.
1228-
// user1's series should be purged (0 active), user2 should have 21 active series.
1229-
test.Poll(t, 10*time.Second, nil, func() interface{} {
1230-
return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
1231-
# HELP cortex_ingester_active_series Number of currently active series per user.
1232-
# TYPE cortex_ingester_active_series gauge
1233-
cortex_ingester_active_series{user="%s"} 21
1234-
`, user2)), "cortex_ingester_active_series")
1235-
})
1236-
}
1237-
1238-
func TestIngester_ActiveSeriesLoadingWithKafkaTimestamps(t *testing.T) {
1239-
var (
1240-
ctx = context.Background()
1241-
cfg = defaultIngesterTestConfig(t)
1242-
reg = prometheus.NewRegistry()
1243-
user1 = "user-1"
1244-
baseTime = time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
1245-
idleTimeout = 20 * time.Minute
1246-
)
1247-
1248-
cfg.ActiveSeriesMetrics.Enabled = true
1249-
cfg.ActiveSeriesMetrics.IdleTimeout = idleTimeout
1250-
cfg.ActiveSeriesMetrics.UpdatePeriod = 10 * time.Minute
1251-
1252-
overrides := validation.NewOverrides(defaultLimitsTestConfig(), nil)
1253-
ingester, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, nil, reg, util_test.NewTestingLogger(t))
1254-
1255-
partitionID, err := ingest.IngesterPartitionID(cfg.IngesterRing.InstanceID)
1256-
require.NoError(t, err)
1257-
1258-
kafkaClient, err := kgo.NewClient(
1259-
kgo.SeedBrokers(cfg.IngestStorageConfig.KafkaConfig.Address...),
1260-
kgo.RecordPartitioner(kgo.ManualPartitioner()),
1261-
)
1262-
require.NoError(t, err)
1263-
t.Cleanup(kafkaClient.Close)
1264-
1265-
produceRecord := func(tenantID string, seriesName string, kafkaTimestamp time.Time) {
1266-
wreq := &mimirpb.WriteRequest{
1267-
Timeseries: []mimirpb.PreallocTimeseries{{
1268-
TimeSeries: &mimirpb.TimeSeries{
1269-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(model.MetricNameLabel, seriesName)),
1270-
Samples: []mimirpb.Sample{{TimestampMs: kafkaTimestamp.UnixMilli(), Value: 1}},
1271-
},
1272-
}},
1273-
Source: mimirpb.API,
1274-
}
1275-
data, err := wreq.Marshal()
1276-
require.NoError(t, err)
1277-
1278-
results := kafkaClient.ProduceSync(ctx, &kgo.Record{
1279-
Topic: cfg.IngestStorageConfig.KafkaConfig.Topic,
1280-
Key: []byte(tenantID),
1281-
Value: data,
1282-
Headers: []kgo.RecordHeader{ingest.RecordVersionHeader(1)},
1283-
Partition: partitionID,
1284-
Timestamp: kafkaTimestamp,
1285-
})
1286-
require.NoError(t, results.FirstErr())
1287-
}
1288-
1289-
require.NoError(t, services.StartAndAwaitRunning(ctx, ingester))
1290-
t.Cleanup(func() {
1291-
require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester))
1292-
})
1293-
1294-
// Produce a record at baseTime. The loading metric should be 1 after the inline purge.
1295-
produceRecord(user1, "series_0", baseTime)
1296-
1297-
test.Poll(t, 10*time.Second, nil, func() interface{} {
1298-
return testutil.GatherAndCompare(reg, strings.NewReader(`
1299-
# HELP cortex_ingester_active_series_loading 1 if active series counts are still warming up and may be underreported, 0 once they are accurate.
1300-
# TYPE cortex_ingester_active_series_loading gauge
1301-
cortex_ingester_active_series_loading 1
1302-
`), "cortex_ingester_active_series_loading")
1303-
})
1304-
1305-
// Produce a record at baseTime + idleTimeout + UpdatePeriod + 1min.
1306-
// This advances Kafka time past the idle timeout from the first record's timestamp,
1307-
// and also triggers an inline purge (> UpdatePeriod from last purge).
1308-
futureTime := baseTime.Add(idleTimeout + cfg.ActiveSeriesMetrics.UpdatePeriod + 1*time.Minute)
1309-
produceRecord(user1, "series_1", futureTime)
1310-
1311-
test.Poll(t, 10*time.Second, nil, func() interface{} {
1312-
return testutil.GatherAndCompare(reg, strings.NewReader(`
1313-
# HELP cortex_ingester_active_series_loading 1 if active series counts are still warming up and may be underreported, 0 once they are accurate.
1314-
# TYPE cortex_ingester_active_series_loading gauge
1315-
cortex_ingester_active_series_loading 0
1316-
`), "cortex_ingester_active_series_loading")
1317-
})
1318-
}
1319-
13201146
// BenchmarkIngester_ReplayFromKafka tests the ingester replaying records from Kafka at startup.
13211147
// Each scenario is derived from analysis of 1 partition in some production clusters in Grafana Cloud,
13221148
// using "kafkatool dump analyse". The fixture generator simulates the WriteRequest patterns

0 commit comments

Comments
 (0)