Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ It is recommend to upgrade the storage components first (Receive, Store, etc.) a
### Added

- [#8691](https://github.com/thanos-io/thanos/pull/8691): Compactor: remove the directory marker objects for some s3 compatible object stores
- [#8753](https://github.com/thanos-io/thanos/pull/8753): Receiver: add the --tsdb.flush-blocks-on-shutdown flag to control shutdown behaviour

### Changed

Expand Down
18 changes: 14 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up TSDB")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.tsdbFlushOnShutdown); err != nil {
return err
}
}
Expand Down Expand Up @@ -662,6 +662,7 @@ func startTSDBAndUpload(g *run.Group,
statusProber prober.Probe,
bkt objstore.Bucket,
hashringAlgorithm receive.HashringAlgorithm,
flushOnShutdown bool,
) error {

log.With(logger, "component", "storage")
Expand All @@ -687,10 +688,14 @@ func startTSDBAndUpload(g *run.Group,
// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
level.Info(logger).Log("msg", "shutting down storage")
if err := dbs.Flush(); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
if flushOnShutdown {
if err := dbs.Flush(); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
}
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
level.Info(logger).Log("msg", "flush on shutdown is disabled, skipping")
}
dbs.Close()
level.Info(logger).Log("msg", "storage is closed")
Expand Down Expand Up @@ -887,6 +892,7 @@ type receiveConfig struct {
tsdbMaxBytes units.Base2Bytes
tsdbWriteQueueSize int64
tsdbMemorySnapshotOnShutdown bool
tsdbFlushOnShutdown bool
tsdbEnableNativeHistograms bool

walCompression bool
Expand Down Expand Up @@ -1051,6 +1057,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts.").
Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown)

cmd.Flag("tsdb.flush-blocks-on-shutdown",
"If true, receive will flush TSDB blocks to disk on shutdown.").
Default("true").BoolVar(&rc.tsdbFlushOnShutdown)

cmd.Flag("tsdb.enable-native-histograms",
"(Deprecated) Enables the ingestion of native histograms. This flag is a no-op now and will be removed in the future. Native histogram ingestion is always enabled.").
Default("true").BoolVar(&rc.tsdbEnableNativeHistograms)
Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,9 @@ Flags:
histograms. This flag is a no-op now and will
be removed in the future. Native histogram
ingestion is always enabled.
--tsdb.flush-blocks-on-shutdown
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to run make docs but that made a ton of changes to a lot of files.
Either my local clone did something wrong or all docs are very stale.

Copy link
Copy Markdown
Contributor

@MichaHoffmann MichaHoffmann Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets ignore the docs check, ill add a followup PR to sync the docs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was my laptop, had older thanos binary in my PATH and make build puts binaries in go/bin for some reason

If true, receive will flush TSDB blocks to disk
on shutdown.
--hash-func= Specify which hash function to use when
calculating the hashes of produced files.
If no function has been specified, it does not
Expand Down
59 changes: 57 additions & 2 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,63 @@ func TestMultiTSDB(t *testing.T) {
})
}

// TestFlushOnShutdown verifies that when flush-on-shutdown is enabled, a
// sample in the head is compacted into a block during shutdown, and when
// disabled, no block is produced.
func TestFlushOnShutdown(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
// name describes the test scenario.
name string
// flushOnShutdown controls whether Flush is called before Close.
flushOnShutdown bool
// expectedBlocks is the number of blocks expected after shutdown.
expectedBlocks int
}{
{
// Simulates shutdown with flush enabled: sample should be compacted into a block.
name: "flush enabled produces block on shutdown",
flushOnShutdown: true,
expectedBlocks: 1,
},
{
// Simulates shutdown with flush disabled: sample stays in WAL, no block is produced.
name: "flush disabled produces no block on shutdown",
flushOnShutdown: false,
expectedBlocks: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
logger := log.NewLogfmtLogger(os.Stderr)
const testTenant = "test_tenant"

m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, false, metadata.NoneFunc)

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())
testutil.Ok(t, appendSample(m, testTenant, time.Now()))

tenant := m.testGetTenant(testTenant)
db := tenant.readyStorage().Get()
testutil.Equals(t, 0, len(db.Blocks()))

if tc.flushOnShutdown {
testutil.Ok(t, m.Flush())
}

testutil.Equals(t, tc.expectedBlocks, len(db.Blocks()))
m.Close()
})
}
}

var (
expectedFooResp = &storepb.Series{
Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "foo"}},
Expand All @@ -213,7 +270,6 @@ func testMulitTSDBSeries(t *testing.T, m *MultiTSDB) {
testutil.Equals(t, 2, len(ss))

for _, s := range ss {

switch isFoo := strings.Contains(labelpb.PromLabelSetsToString(s.LabelSets()), "foo"); isFoo {
case true:
g.Go(func() error {
Expand Down Expand Up @@ -522,7 +578,6 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10))))
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
})

}

func TestMultiTSDBAddNewTenant(t *testing.T) {
Expand Down
Loading