diff --git a/CHANGELOG.md b/CHANGELOG.md index 90f7e56e4c6..757886da12e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ It is recommend to upgrade the storage components first (Receive, Store, etc.) a ### Added - [#8691](https://github.com/thanos-io/thanos/pull/8691): Compactor: remove the directory marker objects for some s3 compatible object stores +- [#8753](https://github.com/thanos-io/thanos/pull/8753): Receiver: add the --tsdb.flush-blocks-on-shutdown flag to control shutdown behaviour ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 63896c6df7f..4cf52b09c9d 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -313,7 +313,7 @@ func runReceive( level.Debug(logger).Log("msg", "setting up TSDB") { - if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil { + if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.tsdbFlushOnShutdown); err != nil { return err } } @@ -662,6 +662,7 @@ func startTSDBAndUpload(g *run.Group, statusProber prober.Probe, bkt objstore.Bucket, hashringAlgorithm receive.HashringAlgorithm, + flushOnShutdown bool, ) error { log.With(logger, "component", "storage") @@ -687,10 +688,14 @@ func startTSDBAndUpload(g *run.Group, // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { level.Info(logger).Log("msg", "shutting down storage") - if err := dbs.Flush(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to flush storage") + if flushOnShutdown { + if err := dbs.Flush(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") + } } else { - level.Info(logger).Log("msg", "storage is flushed successfully") + level.Info(logger).Log("msg", "flush on shutdown is disabled, skipping") } dbs.Close() level.Info(logger).Log("msg", "storage is closed") @@ -887,6 +892,7 @@ type receiveConfig struct { tsdbMaxBytes units.Base2Bytes tsdbWriteQueueSize int64 tsdbMemorySnapshotOnShutdown bool + tsdbFlushOnShutdown bool tsdbEnableNativeHistograms bool walCompression bool @@ -1051,6 +1057,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts."). Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown) + cmd.Flag("tsdb.flush-blocks-on-shutdown", + "If true, receive will flush TSDB blocks to disk on shutdown."). + Default("true").BoolVar(&rc.tsdbFlushOnShutdown) + cmd.Flag("tsdb.enable-native-histograms", "(Deprecated) Enables the ingestion of native histograms. This flag is a no-op now and will be removed in the future. Native histogram ingestion is always enabled."). Default("true").BoolVar(&rc.tsdbEnableNativeHistograms) diff --git a/docs/components/receive.md b/docs/components/receive.md index 9cbd5443016..15eea091015 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -373,7 +373,6 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need The following formula is used for calculating quorum: ```go mdox-exec="sed -n '1068,1078p' pkg/receive/handler.go" -// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. func (h *Handler) writeQuorum() int { // NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes // would need to succeed all the time. Another way to think about it is when migrating @@ -384,6 +383,7 @@ func (h *Handler) writeQuorum() int { } return int((h.options.ReplicationFactor / 2) + 1) } + ``` So, if the replication factor is 2 then at least one write must succeed. With RF=3, two writes must succeed, and so on. @@ -634,6 +634,9 @@ Flags: ingesting a new exemplar will evict the oldest exemplar from storage. 0 (or less) value of this flag disables exemplars storage. + --[no-]tsdb.flush-blocks-on-shutdown + If true, receive will flush TSDB blocks to disk + on shutdown. --[no-]tsdb.enable-native-histograms (Deprecated) Enables the ingestion of native histograms. This flag is a no-op now and will diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 93ce1f78e16..eb03420017b 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -193,6 +193,63 @@ func TestMultiTSDB(t *testing.T) { }) } +// TestFlushOnShutdown verifies that when flush-on-shutdown is enabled, a +// sample in the head is compacted into a block during shutdown, and when +// disabled, no block is produced. +func TestFlushOnShutdown(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + // name describes the test scenario. + name string + // flushOnShutdown controls whether Flush is called before Close. + flushOnShutdown bool + // expectedBlocks is the number of blocks expected after shutdown. + expectedBlocks int + }{ + { + // Simulates shutdown with flush enabled: sample should be compacted into a block. + name: "flush enabled produces block on shutdown", + flushOnShutdown: true, + expectedBlocks: 1, + }, + { + // Simulates shutdown with flush disabled: sample stays in WAL, no block is produced. + name: "flush disabled produces no block on shutdown", + flushOnShutdown: false, + expectedBlocks: 0, + }, + } { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + logger := log.NewLogfmtLogger(os.Stderr) + const testTenant = "test_tenant" + + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, false, metadata.NoneFunc) + + testutil.Ok(t, m.Flush()) + testutil.Ok(t, m.Open()) + testutil.Ok(t, appendSample(m, testTenant, time.Now())) + + tenant := m.testGetTenant(testTenant) + db := tenant.readyStorage().Get() + testutil.Equals(t, 0, len(db.Blocks())) + + if tc.flushOnShutdown { + testutil.Ok(t, m.Flush()) + } + + testutil.Equals(t, tc.expectedBlocks, len(db.Blocks())) + m.Close() + }) + } +} + var ( expectedFooResp = &storepb.Series{ Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "foo"}}, @@ -213,7 +270,6 @@ func testMulitTSDBSeries(t *testing.T, m *MultiTSDB) { testutil.Equals(t, 2, len(ss)) for _, s := range ss { - switch isFoo := strings.Contains(labelpb.PromLabelSetsToString(s.LabelSets()), "foo"); isFoo { case true: g.Go(func() error { @@ -522,7 +578,6 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10)))) testutil.Equals(t, 1, len(m.TSDBLocalClients())) }) - } func TestMultiTSDBAddNewTenant(t *testing.T) {