Skip to content

Commit 4094fcf

Browse files
committed
Simplify interface, just fallback to noop
Signed-off-by: Oleg Zaytsev <[email protected]>
1 parent 346b79e commit 4094fcf

File tree

4 files changed

+61
-73
lines changed

4 files changed

+61
-73
lines changed

storage/interface.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,14 @@ type SeriesWithRef struct {
289289
Ref SeriesRef
290290
}
291291

292-
type BatchAppender interface {
293-
Appender
294-
// BatchSeriesRefs will create series and return their SeriesRef and existing labels, reusing buf when possible.
295-
// Series that could not be created because of invalid labelsets will have their SeriesRef set to 0.
296-
// Series that have to be created will copy (calling Labels.Copy()) the labels before storing them,
297-
// so no references to series will be kept.
298-
// Series that already exist and didn't have to be created will return the labels stored in the TSDB.
292+
type BatchSeriesReferencer interface {
293+
// BatchSeriesRefs will attempt to, for each one of the series provided:
294+
// - If it does not exist, clone the labels and create a new series, returning the reference and the cloned labels.
295+
// - If it does exist, return the reference and the labels stored in the TSDB.
296+
// The result will contain the references and labels for each series in the same order as the input.
297+
// If the reference is 0, it means that series was not created,
298+
// this could happen because the input was not valid (e.g. empty labels),
299+
// or because the implementation does not support this feature (for example, initAppender when head is not initialized yet).
299300
BatchSeriesRefs(series []labels.Labels, buf []SeriesWithRef) []SeriesWithRef
300301
}
301302

tsdb/db.go

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,27 +1203,6 @@ func (db *DB) run(ctx context.Context) {
12031203
}
12041204
}
12051205

1206-
// HeadInitialized returns true if the Head has been initialized with a starting time.
1207-
func (db *DB) HeadInitialized() bool {
1208-
return db.head.initialized()
1209-
}
1210-
1211-
// InitializeHead the Head with a starting time.
1212-
// After Initialize() is called for the first time, Initialized() is always true.
1213-
func (db *DB) InitializeHead(t int64) {
1214-
db.head.initTime(t)
1215-
}
1216-
1217-
// BatchAppender returns a storage.BatchAppender, it requires the head to be initialized.
1218-
// If the Head is not initialized, it returns nil and false.
1219-
func (db *DB) BatchAppender(ctx context.Context) (storage.BatchAppender, bool) {
1220-
app, ok := db.head.batchAppender(ctx)
1221-
if ok {
1222-
return batchDBAppender{BatchAppender: app, dba: dbAppender{Appender: app, db: db}}, true
1223-
}
1224-
return nil, false
1225-
}
1226-
12271206
// Appender opens a new appender against the database.
12281207
func (db *DB) Appender(ctx context.Context) storage.Appender {
12291208
return dbAppender{db: db, Appender: db.head.Appender(ctx)}
@@ -1302,6 +1281,7 @@ type dbAppender struct {
13021281
}
13031282

13041283
var _ storage.GetRef = dbAppender{}
1284+
var _ storage.BatchSeriesReferencer = dbAppender{}
13051285

13061286
func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
13071287
if g, ok := a.Appender.(storage.GetRef); ok {
@@ -1310,6 +1290,13 @@ func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef,
13101290
return 0, labels.EmptyLabels()
13111291
}
13121292

1293+
func (a dbAppender) BatchSeriesRefs(series []labels.Labels, buf []storage.SeriesWithRef) []storage.SeriesWithRef {
1294+
if r, ok := a.Appender.(storage.BatchSeriesReferencer); ok {
1295+
return r.BatchSeriesRefs(series, buf)
1296+
}
1297+
return unknownBatchSeriesRefs(series, buf)
1298+
}
1299+
13131300
func (a dbAppender) Commit() error {
13141301
err := a.Appender.Commit()
13151302

@@ -1324,21 +1311,6 @@ func (a dbAppender) Commit() error {
13241311
return err
13251312
}
13261313

1327-
var _ storage.BatchAppender = batchDBAppender{}
1328-
1329-
type batchDBAppender struct {
1330-
storage.BatchAppender
1331-
dba dbAppender
1332-
}
1333-
1334-
func (a batchDBAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
1335-
return a.dba.GetRef(lset, hash)
1336-
}
1337-
1338-
func (a batchDBAppender) Commit() error {
1339-
return a.dba.Commit()
1340-
}
1341-
13421314
// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay.
13431315
// This doesn't guarantee that the Head is really compactable.
13441316
func (db *DB) waitingForCompactionDelay() bool {

tsdb/db_test.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -335,12 +335,13 @@ func TestBatchDBAppenderAddRef(t *testing.T) {
335335
}()
336336

337337
ctx := context.Background()
338-
db.InitializeHead(123)
339-
app1, ok := db.BatchAppender(ctx)
338+
db.Head().Init(123)
339+
app1 := db.Appender(ctx)
340+
batchApp1, ok := app1.(storage.BatchSeriesReferencer)
340341
require.True(t, ok)
341342

342343
lset1 := labels.FromStrings("a", "b")
343-
entries1 := app1.BatchSeriesRefs([]labels.Labels{lset1}, nil)
344+
entries1 := batchApp1.BatchSeriesRefs([]labels.Labels{lset1}, nil)
344345
require.NotZero(t, entries1[0])
345346
require.True(t, labels.Equal(entries1[0].Labels, lset1))
346347

@@ -353,12 +354,13 @@ func TestBatchDBAppenderAddRef(t *testing.T) {
353354
err = app1.Commit()
354355
require.NoError(t, err)
355356

356-
app2, ok := db.BatchAppender(ctx)
357+
app2 := db.Appender(ctx)
358+
batchApp2, ok := app2.(storage.BatchSeriesReferencer)
357359
require.True(t, ok)
358360

359361
lset2 := labels.FromStrings("a", "c")
360362
lset3 := labels.FromStrings("x", "y")
361-
entries2 := app1.BatchSeriesRefs([]labels.Labels{lset1, lset2, lset3, labels.EmptyLabels()}, nil)
363+
entries2 := batchApp2.BatchSeriesRefs([]labels.Labels{lset1, lset2, lset3, labels.EmptyLabels()}, nil)
362364
require.Equal(t, entries1[0], entries2[0])
363365
require.NotEqual(t, entries1[0], entries2[2])
364366
require.Zero(t, entries2[3], "Empty labels should not create a new series")
@@ -398,15 +400,16 @@ func TestBatchDBAppenderBatchSeriesRefsCopiesLabels(t *testing.T) {
398400
}()
399401

400402
ctx := context.Background()
401-
db.InitializeHead(123)
402-
app1, ok := db.BatchAppender(ctx)
403+
db.Head().Init(123)
404+
app1 := db.Appender(ctx)
405+
batchApp1, ok := app1.(storage.BatchSeriesReferencer)
403406
require.True(t, ok)
404407

405408
b := labels.NewScratchBuilder(0)
406409
b.Add("a", "b")
407410
unsafeMutableLabels := b.Labels()
408411

409-
entries1 := app1.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
412+
entries1 := batchApp1.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
410413
require.NotZero(t, entries1[0])
411414
require.True(t, labels.Equal(entries1[0].Labels, unsafeMutableLabels))
412415

@@ -426,9 +429,10 @@ func TestBatchDBAppenderBatchSeriesRefsCopiesLabels(t *testing.T) {
426429
require.NoError(t, err)
427430

428431
// Check that overwritten labels create new series
429-
app2, ok := db.BatchAppender(ctx)
432+
app2 := db.Appender(ctx)
433+
batchApp2, ok := app2.(storage.BatchSeriesReferencer)
430434
require.True(t, ok)
431-
entries2 := app1.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
435+
entries2 := batchApp2.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
432436
require.NotEqual(t, entries2[0], entries1[0])
433437
require.True(t, labels.Equal(entries2[0].Labels, unsafeMutableLabels))
434438

@@ -442,24 +446,26 @@ func TestBatchDBAppenderBatchSeriesRefsReturnsLabelsFromStorage(t *testing.T) {
442446
}()
443447

444448
ctx := context.Background()
445-
db.InitializeHead(123)
446-
app1, ok := db.BatchAppender(ctx)
449+
db.Head().Init(123)
450+
app1 := db.Appender(ctx)
451+
batchApp1, ok := app1.(storage.BatchSeriesReferencer)
447452
require.True(t, ok)
448453

449454
b := labels.NewScratchBuilder(0)
450455
b.Add("a", "b")
451456
unsafeMutableLabels := b.Labels()
452457

453-
entries1 := app1.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
458+
entries1 := batchApp1.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
454459
require.NotZero(t, entries1[0])
455460
require.True(t, labels.Equal(entries1[0].Labels, unsafeMutableLabels))
456461

457462
require.NoError(t, app1.Commit())
458463

459464
// Retrieve same labels again, they should exist now.
460-
app2, ok := db.BatchAppender(ctx)
465+
app2 := db.Appender(ctx)
466+
batchApp2, ok := app2.(storage.BatchSeriesReferencer)
461467
require.True(t, ok)
462-
entries2 := app1.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
468+
entries2 := batchApp2.BatchSeriesRefs([]labels.Labels{unsafeMutableLabels}, nil)
463469
require.Equal(t, entries2[0], entries1[0])
464470
require.True(t, labels.Equal(entries2[0].Labels, unsafeMutableLabels))
465471

@@ -481,11 +487,12 @@ func TestBatchDBAppenderTracksCreatedSeriesInWAL(t *testing.T) {
481487

482488
// Append to that DB and close it.
483489
{
484-
db.InitializeHead(123)
485-
app, ok := db.BatchAppender(ctx)
490+
db.Head().Init(123)
491+
app := db.Appender(ctx)
492+
batchApp, ok := app.(storage.BatchSeriesReferencer)
486493
require.True(t, ok)
487494

488-
refs := app.BatchSeriesRefs([]labels.Labels{lbls}, nil)
495+
refs := batchApp.BatchSeriesRefs([]labels.Labels{lbls}, nil)
489496
require.NotZero(t, refs[0])
490497
_, err := app.Append(refs[0].Ref, labels.EmptyLabels(), 0, 1)
491498
require.NoError(t, err)
@@ -595,14 +602,15 @@ func TestBatchSeriesRefEmptyLabelsIgnored(t *testing.T) {
595602
}()
596603

597604
ctx := context.Background()
598-
db.InitializeHead(123)
599-
app1, ok := db.BatchAppender(ctx)
605+
db.Head().Init(123)
606+
app1 := db.Appender(ctx)
607+
batchApp1, ok := app1.(storage.BatchSeriesReferencer)
600608
require.True(t, ok)
601609

602610
lset1 := labels.FromStrings("a", "b")
603611
lset2 := labels.FromStrings("a", "b", "c", "")
604612

605-
entries := app1.BatchSeriesRefs([]labels.Labels{lset1, lset2}, nil)
613+
entries := batchApp1.BatchSeriesRefs([]labels.Labels{lset1, lset2}, nil)
606614
require.Len(t, entries, 2)
607615
require.Equal(t, entries[0], entries[1])
608616

tsdb/head_append.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type initAppender struct {
4040
}
4141

4242
var _ storage.GetRef = &initAppender{}
43+
var _ storage.BatchSeriesReferencer = &initAppender{}
4344

4445
func (a *initAppender) SetOptions(opts *storage.AppendOptions) {
4546
if a.app != nil {
@@ -132,6 +133,13 @@ func (a *initAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
132133
return 0, labels.EmptyLabels()
133134
}
134135

136+
func (a *initAppender) BatchSeriesRefs(series []labels.Labels, buf []storage.SeriesWithRef) []storage.SeriesWithRef {
137+
if r, ok := a.app.(storage.BatchSeriesReferencer); ok {
138+
return r.BatchSeriesRefs(series, buf)
139+
}
140+
return unknownBatchSeriesRefs(series, buf)
141+
}
142+
135143
func (a *initAppender) Commit() error {
136144
if a.app == nil {
137145
a.head.metrics.activeAppenders.Dec()
@@ -148,15 +156,6 @@ func (a *initAppender) Rollback() error {
148156
return a.app.Rollback()
149157
}
150158

151-
// batchAppender returns a storage.BatchAppender, it requires the head to be initialized.
152-
// If the Head is not initialized, it returns nil and false.
153-
func (h *Head) batchAppender(_ context.Context) (app storage.BatchAppender, ok bool) {
154-
if h.initialized() {
155-
return h.appender(), true
156-
}
157-
return nil, false
158-
}
159-
160159
// Appender returns a new Appender on the database.
161160
func (h *Head) Appender(_ context.Context) storage.Appender {
162161
h.metrics.activeAppenders.Inc()
@@ -2096,6 +2095,14 @@ func (a *headAppender) Rollback() (err error) {
20962095
return a.log()
20972096
}
20982097

2098+
// unknownBatchSeriesRefs is a default non-implementation for storage.BatchSeriesReferencer
2099+
// that will return zero refs and labels for each series provided.
2100+
func unknownBatchSeriesRefs(series []labels.Labels, buf []storage.SeriesWithRef) []storage.SeriesWithRef {
2101+
result := reuseOrMake(&buf, len(series), len(series))
2102+
clear(result)
2103+
return result
2104+
}
2105+
20992106
// reuseOrMake will try to reuse the reusable slice if it has enough capacity, setting it to the desired len.
21002107
// If it can't be reused, it will make a new slice with the desired length and capacity.
21012108
// If a new slice is allocated, it will update the reusable slice to point to the new slice.

0 commit comments

Comments
 (0)