Skip to content

Commit ba2e2fb

Browse files
[r389] Revert ingester Kafka record timestamps for active series (#14611, #14744) (#15038)
Backport 1989c26 from #15036 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes the time basis for active-series purging/loading in ingest-storage clusters, which can affect metrics accuracy and early-compaction decisions under Kafka lag or replay. > > **Overview** > Reverts the prior **Kafka record timestamp–driven** active-series logic for ingest-storage mode, removing ingester state used to track latest Kafka time and inline Kafka-time purges. > > Active-series purging/loading now uses **wall-clock `time.Now()` only** (including early-compaction estimations), and the `cortex_ingester_active_series_loading` transition is handled by the metrics updater ticker rather than Kafka-time bookkeeping. > > Removes Kafka record timestamp propagation via context in the ingest `PusherConsumer` and deletes the related unit/integration tests and changelog entry for the reverted enhancement. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 8604f22. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
1 parent 1d1582f commit ba2e2fb

9 files changed

Lines changed: 21 additions & 447 deletions

File tree

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@
148148
* [ENHANCEMENT] Querier: Add new config flag `querier.enable-delayed-name-removal-prometheus-engine` to enable delayed name removal for Prometheus engine. #14349
149149
* [ENHANCEMENT] Ingester: reduce heap usage during streaming chunk queries by releasing series label memory after each batch is sent rather than holding it until chunk streaming completes. #14422
150150
* [ENHANCEMENT] Ingester: Eliminate 20-minute active series metrics loading period when custom tracker or cost attribution configuration changes. Active series counts are now immediately correct after a config reload. #14537
151-
* [ENHANCEMENT] Ingester: Use Kafka record timestamps instead of wall-clock time for active series tracking, grace period validation, and purging when ingest storage is enabled. This prevents premature purging or stale series caused by lag between record production and consumption. #14611 #14744
152151
* [ENHANCEMENT] Ingester: Export `cortex_ingester_active_series_loading` gauge metric that is `1` while active series counts are still warming up after ingester startup, and `0` once they are accurate (after IdleTimeout has elapsed). #14783
153152
* [ENHANCEMENT] Ingest storage: Skip kotel tracing hooks for unsampled traces in the franz-go Kafka client, significantly reducing CPU and memory overhead. #14852
154153
* [ENHANCEMENT] Distributor: Reduced CPU utilization when writing to ingest storage with a large number of partitions by batching all partitions into a single Kafka produce call instead of one per partition. #14898

pkg/ingester/ingester.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -349,22 +349,6 @@ type Ingester struct {
349349
ingestPartitionID int32
350350
ingestPartitionLifecycler *ring.PartitionInstanceLifecycler
351351

352-
// latestKafkaRecordTimestamp tracks the most recent Kafka record timestamp
353-
// seen by the ingester (unix milliseconds). Used to provide a Kafka-time-aware
354-
// "now" for active series purging when ingest storage is enabled.
355-
latestKafkaRecordTimestamp atomic.Int64
356-
357-
// lastKafkaActiveSeriesUpdate tracks the last Kafka time (unix milliseconds) at which
358-
// updateActiveSeries was triggered inline during record consumption. This ensures
359-
// active series are purged at approximately UpdatePeriod intervals in Kafka time,
360-
// even when records are replayed faster than real-time.
361-
lastKafkaActiveSeriesUpdate atomic.Int64
362-
363-
// activeSeriesStartMs tracks when the ingester started receiving samples (unix milliseconds).
364-
// For classic mode this is wall-clock time when the ticker starts; for Kafka mode it is the
365-
// first Kafka record timestamp. Used to determine when active series counts become accurate.
366-
activeSeriesStartMs atomic.Int64
367-
368352
circuitBreaker ingesterCircuitBreaker
369353
reactiveLimiter *ingesterReactiveLimiter
370354
}

pkg/ingester/ingester_compaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ func (i *Ingester) compactionServiceRunning(ctx context.Context) error {
140140
i.compactBlocks(ctx, false, 0, nil)
141141

142142
// Check if any TSDB Head should be compacted to reduce the number of in-memory series.
143-
i.compactBlocksToReduceInMemorySeries(ctx, i.activeSeriesNow())
143+
i.compactBlocksToReduceInMemorySeries(ctx, time.Now())
144144

145145
// Check if any TSDB Head should be compacted based on per-tenant owned series thresholds.
146-
i.compactBlocksToReducePerTenantOwnedSeries(ctx, i.activeSeriesNow())
146+
i.compactBlocksToReducePerTenantOwnedSeries(ctx, time.Now())
147147

148148
// Decrement the counter after compaction is complete
149149
i.numCompactionsInProgress.Dec()

pkg/ingester/ingester_ingest_storage_test.go

Lines changed: 0 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,180 +1143,6 @@ func createTestIngesterWithIngestStorage(
11431143
return ingester, kafkaCluster, prw
11441144
}
11451145

1146-
func TestIngester_ActiveSeriesPurgeWithKafkaTimestamps(t *testing.T) {
1147-
var (
1148-
ctx = context.Background()
1149-
cfg = defaultIngesterTestConfig(t)
1150-
reg = prometheus.NewRegistry()
1151-
user1 = "user-1"
1152-
user2 = "user-2"
1153-
baseTime = time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
1154-
idleTimeout = 20 * time.Minute
1155-
)
1156-
1157-
cfg.ActiveSeriesMetrics.Enabled = true
1158-
cfg.ActiveSeriesMetrics.IdleTimeout = idleTimeout
1159-
// Use a long update period so that the wall-clock ticker never fires during this test.
1160-
// Only the inline Kafka-time-based purge trigger should update active series.
1161-
cfg.ActiveSeriesMetrics.UpdatePeriod = 10 * time.Minute
1162-
1163-
overrides := validation.NewOverrides(defaultLimitsTestConfig(), nil)
1164-
ingester, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, nil, reg, util_test.NewTestingLogger(t))
1165-
1166-
partitionID, err := ingest.IngesterPartitionID(cfg.IngesterRing.InstanceID)
1167-
require.NoError(t, err)
1168-
1169-
kafkaClient, err := kgo.NewClient(
1170-
kgo.SeedBrokers(cfg.IngestStorageConfig.KafkaConfig.Address...),
1171-
kgo.RecordPartitioner(kgo.ManualPartitioner()),
1172-
)
1173-
require.NoError(t, err)
1174-
t.Cleanup(kafkaClient.Close)
1175-
1176-
produceRecord := func(tenantID string, seriesName string, kafkaTimestamp time.Time) {
1177-
wreq := &mimirpb.WriteRequest{
1178-
Timeseries: []mimirpb.PreallocTimeseries{{
1179-
TimeSeries: &mimirpb.TimeSeries{
1180-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(model.MetricNameLabel, seriesName)),
1181-
Samples: []mimirpb.Sample{{TimestampMs: kafkaTimestamp.UnixMilli(), Value: 1}},
1182-
},
1183-
}},
1184-
Source: mimirpb.API,
1185-
}
1186-
data, err := wreq.Marshal()
1187-
require.NoError(t, err)
1188-
1189-
results := kafkaClient.ProduceSync(ctx, &kgo.Record{
1190-
Topic: cfg.IngestStorageConfig.KafkaConfig.Topic,
1191-
Key: []byte(tenantID),
1192-
Value: data,
1193-
Headers: []kgo.RecordHeader{ingest.RecordVersionHeader(1)},
1194-
Partition: partitionID,
1195-
Timestamp: kafkaTimestamp,
1196-
})
1197-
require.NoError(t, results.FirstErr())
1198-
}
1199-
1200-
require.NoError(t, services.StartAndAwaitRunning(ctx, ingester))
1201-
t.Cleanup(func() {
1202-
require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester))
1203-
})
1204-
1205-
// Phase 1: Produce 20 series for user1 at T0.
1206-
// The first record triggers an inline purge (Kafka time jumps from 0 to T0 > UpdatePeriod),
1207-
// but since no series exist yet, it has no effect on counts.
1208-
for i := 0; i < 20; i++ {
1209-
produceRecord(user1, fmt.Sprintf("series_%d", i), baseTime)
1210-
}
1211-
1212-
// Phase 2: Produce 20 series for user2 at T0+21min (past idle timeout for user1).
1213-
// The first record triggers an inline purge because Kafka time advanced by 21min > UpdatePeriod (10min).
1214-
// This purge runs with now=T0+21min, which purges user1's series (last seen at T0, idle for 21min > 20min timeout).
1215-
// After the purge, user2 has 1 active series (just added), but user1's 20 series are purged.
1216-
futureTime := baseTime.Add(idleTimeout + 1*time.Minute)
1217-
for i := 0; i < 20; i++ {
1218-
produceRecord(user2, fmt.Sprintf("series_%d", i), futureTime)
1219-
}
1220-
1221-
// Phase 3: Produce one more record at T0+32min to trigger another inline purge
1222-
// (advances by 11min from T0+21min > UpdatePeriod of 10min). This purge updates the
1223-
// active series counts to reflect all 20 of user2's series.
1224-
finalTime := futureTime.Add(cfg.ActiveSeriesMetrics.UpdatePeriod + 1*time.Minute)
1225-
produceRecord(user2, "series_final", finalTime)
1226-
1227-
// Wait for all records to be consumed and the inline purge to update metrics.
1228-
// user1's series should be purged (0 active), user2 should have 21 active series.
1229-
test.Poll(t, 10*time.Second, nil, func() interface{} {
1230-
return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
1231-
# HELP cortex_ingester_active_series Number of currently active series per user.
1232-
# TYPE cortex_ingester_active_series gauge
1233-
cortex_ingester_active_series{user="%s"} 21
1234-
`, user2)), "cortex_ingester_active_series")
1235-
})
1236-
}
1237-
1238-
func TestIngester_ActiveSeriesLoadingWithKafkaTimestamps(t *testing.T) {
1239-
var (
1240-
ctx = context.Background()
1241-
cfg = defaultIngesterTestConfig(t)
1242-
reg = prometheus.NewRegistry()
1243-
user1 = "user-1"
1244-
baseTime = time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
1245-
idleTimeout = 20 * time.Minute
1246-
)
1247-
1248-
cfg.ActiveSeriesMetrics.Enabled = true
1249-
cfg.ActiveSeriesMetrics.IdleTimeout = idleTimeout
1250-
cfg.ActiveSeriesMetrics.UpdatePeriod = 10 * time.Minute
1251-
1252-
overrides := validation.NewOverrides(defaultLimitsTestConfig(), nil)
1253-
ingester, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, nil, reg, util_test.NewTestingLogger(t))
1254-
1255-
partitionID, err := ingest.IngesterPartitionID(cfg.IngesterRing.InstanceID)
1256-
require.NoError(t, err)
1257-
1258-
kafkaClient, err := kgo.NewClient(
1259-
kgo.SeedBrokers(cfg.IngestStorageConfig.KafkaConfig.Address...),
1260-
kgo.RecordPartitioner(kgo.ManualPartitioner()),
1261-
)
1262-
require.NoError(t, err)
1263-
t.Cleanup(kafkaClient.Close)
1264-
1265-
produceRecord := func(tenantID string, seriesName string, kafkaTimestamp time.Time) {
1266-
wreq := &mimirpb.WriteRequest{
1267-
Timeseries: []mimirpb.PreallocTimeseries{{
1268-
TimeSeries: &mimirpb.TimeSeries{
1269-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(model.MetricNameLabel, seriesName)),
1270-
Samples: []mimirpb.Sample{{TimestampMs: kafkaTimestamp.UnixMilli(), Value: 1}},
1271-
},
1272-
}},
1273-
Source: mimirpb.API,
1274-
}
1275-
data, err := wreq.Marshal()
1276-
require.NoError(t, err)
1277-
1278-
results := kafkaClient.ProduceSync(ctx, &kgo.Record{
1279-
Topic: cfg.IngestStorageConfig.KafkaConfig.Topic,
1280-
Key: []byte(tenantID),
1281-
Value: data,
1282-
Headers: []kgo.RecordHeader{ingest.RecordVersionHeader(1)},
1283-
Partition: partitionID,
1284-
Timestamp: kafkaTimestamp,
1285-
})
1286-
require.NoError(t, results.FirstErr())
1287-
}
1288-
1289-
require.NoError(t, services.StartAndAwaitRunning(ctx, ingester))
1290-
t.Cleanup(func() {
1291-
require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester))
1292-
})
1293-
1294-
// Produce a record at baseTime. The loading metric should be 1 after the inline purge.
1295-
produceRecord(user1, "series_0", baseTime)
1296-
1297-
test.Poll(t, 10*time.Second, nil, func() interface{} {
1298-
return testutil.GatherAndCompare(reg, strings.NewReader(`
1299-
# HELP cortex_ingester_active_series_loading 1 if active series counts are still warming up and may be underreported, 0 once they are accurate.
1300-
# TYPE cortex_ingester_active_series_loading gauge
1301-
cortex_ingester_active_series_loading 1
1302-
`), "cortex_ingester_active_series_loading")
1303-
})
1304-
1305-
// Produce a record at baseTime + idleTimeout + UpdatePeriod + 1min.
1306-
// This advances Kafka time past the idle timeout from the first record's timestamp,
1307-
// and also triggers an inline purge (> UpdatePeriod from last purge).
1308-
futureTime := baseTime.Add(idleTimeout + cfg.ActiveSeriesMetrics.UpdatePeriod + 1*time.Minute)
1309-
produceRecord(user1, "series_1", futureTime)
1310-
1311-
test.Poll(t, 10*time.Second, nil, func() interface{} {
1312-
return testutil.GatherAndCompare(reg, strings.NewReader(`
1313-
# HELP cortex_ingester_active_series_loading 1 if active series counts are still warming up and may be underreported, 0 once they are accurate.
1314-
# TYPE cortex_ingester_active_series_loading gauge
1315-
cortex_ingester_active_series_loading 0
1316-
`), "cortex_ingester_active_series_loading")
1317-
})
1318-
}
1319-
13201146
// BenchmarkIngester_ReplayFromKafka tests the ingester replaying records from Kafka at startup.
13211147
// Each scenario is derived from analysis of 1 partition in some production clusters in Grafana Cloud,
13221148
// using "kafkatool dump analyse". The fixture generator simulates the WriteRequest patterns

pkg/ingester/ingester_metrics_update.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ func (i *Ingester) metricsUpdaterServiceRunning(ctx context.Context) error {
3939
defer ingestionRateTicker.Stop()
4040

4141
var activeSeriesTickerChan <-chan time.Time
42-
if i.cfg.ActiveSeriesMetrics.Enabled && !i.cfg.IngestStorageConfig.Enabled {
43-
i.activeSeriesStartMs.Store(time.Now().UnixMilli())
42+
var activeSeriesStartedAt time.Time
43+
if i.cfg.ActiveSeriesMetrics.Enabled {
44+
activeSeriesStartedAt = time.Now()
4445
t := time.NewTicker(i.cfg.ActiveSeriesMetrics.UpdatePeriod)
4546
activeSeriesTickerChan = t.C
4647
defer t.Stop()
@@ -64,7 +65,12 @@ func (i *Ingester) metricsUpdaterServiceRunning(ctx context.Context) error {
6465
}
6566
i.tsdbsMtx.RUnlock()
6667
case <-activeSeriesTickerChan:
67-
i.updateActiveSeries(i.activeSeriesNow())
68+
now := time.Now()
69+
if !activeSeriesStartedAt.IsZero() && now.Sub(activeSeriesStartedAt) >= i.cfg.ActiveSeriesMetrics.IdleTimeout {
70+
i.metrics.activeSeriesLoading.Set(0)
71+
activeSeriesStartedAt = time.Time{} // Only flip once.
72+
}
73+
i.updateActiveSeries(now)
6874
case <-usageStatsUpdateTicker.C:
6975
i.updateUsageStats()
7076
case <-limitMetricsUpdateTicker.C:
@@ -75,23 +81,7 @@ func (i *Ingester) metricsUpdaterServiceRunning(ctx context.Context) error {
7581
}
7682
}
7783

78-
func (i *Ingester) activeSeriesNow() time.Time {
79-
if i.cfg.IngestStorageConfig.Enabled {
80-
if tsMs := i.latestKafkaRecordTimestamp.Load(); tsMs > 0 {
81-
return time.UnixMilli(tsMs)
82-
}
83-
}
84-
return time.Now()
85-
}
86-
8784
func (i *Ingester) updateActiveSeries(now time.Time) {
88-
if startMs := i.activeSeriesStartMs.Load(); startMs > 0 &&
89-
now.UnixMilli()-startMs >= i.cfg.ActiveSeriesMetrics.IdleTimeout.Milliseconds() {
90-
// Active series counts has passed the warm up.
91-
// Mark the loading phase off to signal the counts are now accurate.
92-
i.metrics.activeSeriesLoading.Set(0)
93-
}
94-
9585
for _, userID := range i.getTSDBUsers() {
9686
userDB := i.getTSDB(userID)
9787
if userDB == nil {

pkg/ingester/ingester_push.go

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/grafana/mimir/pkg/ingester/activeseries"
2525
"github.com/grafana/mimir/pkg/mimirpb"
2626
mimir_storage "github.com/grafana/mimir/pkg/storage"
27-
"github.com/grafana/mimir/pkg/storage/ingest"
2827
"github.com/grafana/mimir/pkg/util/globalerror"
2928
util_log "github.com/grafana/mimir/pkg/util/log"
3029
"github.com/grafana/mimir/pkg/util/spanlogger"
@@ -284,8 +283,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
284283
spanlog.DebugLog("event", "acquired append lock")
285284

286285
var (
287-
startAppend = time.Now()
288-
appendWallClockStart = startAppend
286+
startAppend = time.Now()
289287

290288
// Keep track of some stats which are tracked only if the samples will be
291289
// successfully committed
@@ -389,10 +387,6 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
389387
)
390388
)
391389

392-
if ts, ok := ingest.RecordTimestampFromContext(ctx); ok {
393-
startAppend = ts
394-
}
395-
396390
// Walk the samples, appending them to the users database
397391
app := db.Appender(ctx).(extendedAppender)
398392
spanlog.DebugLog("event", "got appender for timeseries", "series", len(req.Timeseries))
@@ -426,7 +420,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
426420
}
427421

428422
// At this point all samples have been added to the appender, so we can track the time it took.
429-
i.metrics.appenderAddDuration.Observe(time.Since(appendWallClockStart).Seconds())
423+
i.metrics.appenderAddDuration.Observe(time.Since(startAppend).Seconds())
430424

431425
spanlog.DebugLog(
432426
"event", "start commit",
@@ -460,32 +454,6 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
460454
appendedSamplesStats.Inc(int64(stats.succeededSamplesCount))
461455
appendedExemplarsStats.Inc(int64(stats.succeededExemplarsCount))
462456

463-
if ts, ok := ingest.RecordTimestampFromContext(ctx); ok {
464-
tsMs := ts.UnixMilli()
465-
for {
466-
cur := i.latestKafkaRecordTimestamp.Load()
467-
if tsMs <= cur || i.latestKafkaRecordTimestamp.CompareAndSwap(cur, tsMs) {
468-
break
469-
}
470-
}
471-
472-
i.activeSeriesStartMs.CompareAndSwap(0, tsMs)
473-
474-
if i.cfg.ActiveSeriesMetrics.Enabled {
475-
updatePeriodMs := i.cfg.ActiveSeriesMetrics.UpdatePeriod.Milliseconds()
476-
for {
477-
lastUpdate := i.lastKafkaActiveSeriesUpdate.Load()
478-
if tsMs-lastUpdate < updatePeriodMs {
479-
break
480-
}
481-
if i.lastKafkaActiveSeriesUpdate.CompareAndSwap(lastUpdate, tsMs) {
482-
i.updateActiveSeries(ts)
483-
break
484-
}
485-
}
486-
}
487-
}
488-
489457
group := i.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(i.limits, userID, req.Timeseries), startAppend)
490458

491459
i.updateMetricsFromPushStats(userID, group, &stats, req.Source, db, i.metrics.discarded)

0 commit comments

Comments
 (0)