From 3d837b39debd5ecd32468c33418c2ffda4e684b3 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Tue, 7 Apr 2026 10:04:11 +0100 Subject: [PATCH 1/3] Add --tsdb.flush-blocks-on-shutdown flag By default Receive will flush (write to disk and/or upload to S3) on shutdown. This takes ages and we don't need it at all because we run Thanos on bare metal. Allow this to be configurable since this feature seems to be aimed mostly at environments like k8s, where instances ephemeral. Signed-off-by: Lukasz Mierzwa --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 18 ++++++++--- pkg/receive/multitsdb_test.go | 57 +++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90f7e56e4c6..757886da12e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ It is recommend to upgrade the storage components first (Receive, Store, etc.) a ### Added - [#8691](https://github.com/thanos-io/thanos/pull/8691): Compactor: remove the directory marker objects for some s3 compatible object stores +- [#8753](https://github.com/thanos-io/thanos/pull/8753): Receiver: add the --tsdb.flush-blocks-on-shutdown flag to control shutdown behaviour ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 63896c6df7f..4cf52b09c9d 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -313,7 +313,7 @@ func runReceive( level.Debug(logger).Log("msg", "setting up TSDB") { - if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil { + if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.tsdbFlushOnShutdown); err != nil { return err } } @@ -662,6 +662,7 @@ func startTSDBAndUpload(g *run.Group, statusProber prober.Probe, bkt objstore.Bucket, hashringAlgorithm receive.HashringAlgorithm, + flushOnShutdown bool, ) error { log.With(logger, "component", "storage") @@ -687,10 +688,14 @@ func startTSDBAndUpload(g *run.Group, // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { level.Info(logger).Log("msg", "shutting down storage") - if err := dbs.Flush(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to flush storage") + if flushOnShutdown { + if err := dbs.Flush(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") + } } else { - level.Info(logger).Log("msg", "storage is flushed successfully") + level.Info(logger).Log("msg", "flush on shutdown is disabled, skipping") } dbs.Close() level.Info(logger).Log("msg", "storage is closed") @@ -887,6 +892,7 @@ type receiveConfig struct { tsdbMaxBytes units.Base2Bytes tsdbWriteQueueSize int64 tsdbMemorySnapshotOnShutdown bool + tsdbFlushOnShutdown bool tsdbEnableNativeHistograms bool walCompression bool @@ -1051,6 +1057,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts."). Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown) + cmd.Flag("tsdb.flush-blocks-on-shutdown", + "If true, receive will flush TSDB blocks to disk on shutdown."). + Default("true").BoolVar(&rc.tsdbFlushOnShutdown) + cmd.Flag("tsdb.enable-native-histograms", "(Deprecated) Enables the ingestion of native histograms. This flag is a no-op now and will be removed in the future. Native histogram ingestion is always enabled."). Default("true").BoolVar(&rc.tsdbEnableNativeHistograms) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 93ce1f78e16..6ac0d8862fe 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -193,6 +193,63 @@ func TestMultiTSDB(t *testing.T) { }) } +// TestFlushOnShutdown verifies that when flush-on-shutdown is enabled, a +// sample in the head is compacted into a block during shutdown, and when +// disabled, no block is produced. +func TestFlushOnShutdown(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + // name describes the test scenario. + name string + // flushOnShutdown controls whether Flush is called before Close. + flushOnShutdown bool + // expectedBlocks is the number of blocks expected after shutdown. + expectedBlocks int + }{ + { + // Simulates shutdown with flush enabled: sample should be compacted into a block. + name: "flush enabled produces block on shutdown", + flushOnShutdown: true, + expectedBlocks: 1, + }, + { + // Simulates shutdown with flush disabled: sample stays in WAL, no block is produced. + name: "flush disabled produces no block on shutdown", + flushOnShutdown: false, + expectedBlocks: 0, + }, + } { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + logger := log.NewLogfmtLogger(os.Stderr) + const testTenant = "test_tenant" + + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, false, metadata.NoneFunc) + + testutil.Ok(t, m.Flush()) + testutil.Ok(t, m.Open()) + testutil.Ok(t, appendSample(m, testTenant, time.Now())) + + tenant := m.testGetTenant(testTenant) + db := tenant.readyStorage().Get() + testutil.Equals(t, 0, len(db.Blocks())) + + if tc.flushOnShutdown { + testutil.Ok(t, m.Flush()) + } + + testutil.Equals(t, tc.expectedBlocks, len(db.Blocks())) + testutil.Ok(t, m.Close()) + }) + } +} + var ( expectedFooResp = &storepb.Series{ Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "foo"}}, From c3b54bd9024c4288e401cc87336252b4be73538d Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Tue, 7 Apr 2026 10:23:44 +0100 Subject: [PATCH 2/3] Fix errors reported by CI Signed-off-by: Lukasz Mierzwa --- docs/components/receive.md | 3 +++ pkg/receive/multitsdb_test.go | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/components/receive.md b/docs/components/receive.md index 9cbd5443016..5597ce7a279 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -639,6 +639,9 @@ Flags: histograms. This flag is a no-op now and will be removed in the future. Native histogram ingestion is always enabled. + --tsdb.flush-blocks-on-shutdown + If true, receive will flush TSDB blocks to disk + on shutdown. --hash-func= Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 6ac0d8862fe..eb03420017b 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -245,7 +245,7 @@ func TestFlushOnShutdown(t *testing.T) { } testutil.Equals(t, tc.expectedBlocks, len(db.Blocks())) - testutil.Ok(t, m.Close()) + m.Close() }) } } @@ -270,7 +270,6 @@ func testMulitTSDBSeries(t *testing.T, m *MultiTSDB) { testutil.Equals(t, 2, len(ss)) for _, s := range ss { - switch isFoo := strings.Contains(labelpb.PromLabelSetsToString(s.LabelSets()), "foo"); isFoo { case true: g.Go(func() error { @@ -579,7 +578,6 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10)))) testutil.Equals(t, 1, len(m.TSDBLocalClients())) }) - } func TestMultiTSDBAddNewTenant(t *testing.T) { From 489550b0fc12cac3dd48f2f1d7154732d364f7d5 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Tue, 7 Apr 2026 12:19:11 +0100 Subject: [PATCH 3/3] Run make docs --- docs/components/receive.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/components/receive.md b/docs/components/receive.md index 5597ce7a279..15eea091015 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -373,7 +373,6 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need The following formula is used for calculating quorum: ```go mdox-exec="sed -n '1068,1078p' pkg/receive/handler.go" -// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. func (h *Handler) writeQuorum() int { // NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes // would need to succeed all the time. Another way to think about it is when migrating @@ -384,6 +383,7 @@ func (h *Handler) writeQuorum() int { } return int((h.options.ReplicationFactor / 2) + 1) } + ``` So, if the replication factor is 2 then at least one write must succeed. With RF=3, two writes must succeed, and so on. @@ -634,14 +634,14 @@ Flags: ingesting a new exemplar will evict the oldest exemplar from storage. 0 (or less) value of this flag disables exemplars storage. + --[no-]tsdb.flush-blocks-on-shutdown + If true, receive will flush TSDB blocks to disk + on shutdown. --[no-]tsdb.enable-native-histograms (Deprecated) Enables the ingestion of native histograms. This flag is a no-op now and will be removed in the future. Native histogram ingestion is always enabled. - --tsdb.flush-blocks-on-shutdown - If true, receive will flush TSDB blocks to disk - on shutdown. --hash-func= Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not