Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ It is recommend to upgrade the storage components first (Receive, Store, etc.) a
### Added

- [#8691](https://github.com/thanos-io/thanos/pull/8691): Compactor: remove the directory marker objects for some s3 compatible object stores
- [#8753](https://github.com/thanos-io/thanos/pull/8753): Receiver: add the --tsdb.flush-blocks-on-shutdown flag to control shutdown behaviour

### Changed

Expand Down
18 changes: 14 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up TSDB")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.tsdbFlushOnShutdown); err != nil {
return err
}
}
Expand Down Expand Up @@ -662,6 +662,7 @@ func startTSDBAndUpload(g *run.Group,
statusProber prober.Probe,
bkt objstore.Bucket,
hashringAlgorithm receive.HashringAlgorithm,
flushOnShutdown bool,
) error {

log.With(logger, "component", "storage")
Expand All @@ -687,10 +688,14 @@ func startTSDBAndUpload(g *run.Group,
// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
level.Info(logger).Log("msg", "shutting down storage")
if err := dbs.Flush(); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
if flushOnShutdown {
if err := dbs.Flush(); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
}
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
level.Info(logger).Log("msg", "flush on shutdown is disabled, skipping")
}
dbs.Close()
level.Info(logger).Log("msg", "storage is closed")
Expand Down Expand Up @@ -887,6 +892,7 @@ type receiveConfig struct {
tsdbMaxBytes units.Base2Bytes
tsdbWriteQueueSize int64
tsdbMemorySnapshotOnShutdown bool
tsdbFlushOnShutdown bool
tsdbEnableNativeHistograms bool

walCompression bool
Expand Down Expand Up @@ -1051,6 +1057,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts.").
Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown)

cmd.Flag("tsdb.flush-blocks-on-shutdown",
"If true, receive will flush TSDB blocks to disk on shutdown.").
Default("true").BoolVar(&rc.tsdbFlushOnShutdown)

cmd.Flag("tsdb.enable-native-histograms",
"(Deprecated) Enables the ingestion of native histograms. This flag is a no-op now and will be removed in the future. Native histogram ingestion is always enabled.").
Default("true").BoolVar(&rc.tsdbEnableNativeHistograms)
Expand Down
5 changes: 4 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need
The following formula is used for calculating quorum:

```go mdox-exec="sed -n '1068,1078p' pkg/receive/handler.go"
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes
// would need to succeed all the time. Another way to think about it is when migrating
Expand All @@ -384,6 +383,7 @@ func (h *Handler) writeQuorum() int {
}
return int((h.options.ReplicationFactor / 2) + 1)
}

```

So, if the replication factor is 2 then at least one write must succeed. With RF=3, two writes must succeed, and so on.
Expand Down Expand Up @@ -634,6 +634,9 @@ Flags:
ingesting a new exemplar will evict the oldest
exemplar from storage. 0 (or less) value of
this flag disables exemplars storage.
--[no-]tsdb.flush-blocks-on-shutdown
If true, receive will flush TSDB blocks to disk
on shutdown.
--[no-]tsdb.enable-native-histograms
(Deprecated) Enables the ingestion of native
histograms. This flag is a no-op now and will
Expand Down
59 changes: 57 additions & 2 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,63 @@ func TestMultiTSDB(t *testing.T) {
})
}

// TestFlushOnShutdown verifies that when flush-on-shutdown is enabled, a
// sample in the head is compacted into a block during shutdown, and when
// disabled, no block is produced.
func TestFlushOnShutdown(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
// name describes the test scenario.
name string
// flushOnShutdown controls whether Flush is called before Close.
flushOnShutdown bool
// expectedBlocks is the number of blocks expected after shutdown.
expectedBlocks int
}{
{
// Simulates shutdown with flush enabled: sample should be compacted into a block.
name: "flush enabled produces block on shutdown",
flushOnShutdown: true,
expectedBlocks: 1,
},
{
// Simulates shutdown with flush disabled: sample stays in WAL, no block is produced.
name: "flush disabled produces no block on shutdown",
flushOnShutdown: false,
expectedBlocks: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
logger := log.NewLogfmtLogger(os.Stderr)
const testTenant = "test_tenant"

m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, false, metadata.NoneFunc)

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())
testutil.Ok(t, appendSample(m, testTenant, time.Now()))

tenant := m.testGetTenant(testTenant)
db := tenant.readyStorage().Get()
testutil.Equals(t, 0, len(db.Blocks()))

if tc.flushOnShutdown {
testutil.Ok(t, m.Flush())
}

testutil.Equals(t, tc.expectedBlocks, len(db.Blocks()))
m.Close()
})
}
}

var (
expectedFooResp = &storepb.Series{
Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "foo"}},
Expand All @@ -213,7 +270,6 @@ func testMulitTSDBSeries(t *testing.T, m *MultiTSDB) {
testutil.Equals(t, 2, len(ss))

for _, s := range ss {

switch isFoo := strings.Contains(labelpb.PromLabelSetsToString(s.LabelSets()), "foo"); isFoo {
case true:
g.Go(func() error {
Expand Down Expand Up @@ -522,7 +578,6 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10))))
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
})

}

func TestMultiTSDBAddNewTenant(t *testing.T) {
Expand Down
Loading