Skip to content

Commit e53f75e

Browse files
fix: discard excessive errors (#22379)
The tsmBatchKeyIterator discards excessive errors to avoid out-of-memory crashes when compacting very corrupt files. Any error beyond DefaultMaxSavedErrors (100) will be discarded instead of appended to the error slice. closes #22328
1 parent 50d3bca commit e53f75e

File tree

2 files changed

+121
-5
lines changed

2 files changed

+121
-5
lines changed

tsdb/engine/tsm1/compact.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ const (
4040

4141
// TSMFileExtension is the extension used for TSM files.
4242
TSMFileExtension = "tsm"
43+
44+
// DefaultMaxSavedErrors is the number of errors that are stored by a TSMBatchKeyReader before
45+
// subsequent errors are discarded
46+
DefaultMaxSavedErrors = 100
4347
)
4448

4549
var (
@@ -954,7 +958,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([
954958
return nil, nil
955959
}
956960

957-
tsm, err := NewTSMBatchKeyIterator(size, fast, intC, tsmFiles, trs...)
961+
tsm, err := NewTSMBatchKeyIterator(size, fast, DefaultMaxSavedErrors, intC, tsmFiles, trs...)
958962
if err != nil {
959963
return nil, err
960964
}
@@ -1660,15 +1664,28 @@ type tsmBatchKeyIterator struct {
16601664
// without decode
16611665
merged blocks
16621666
interrupt chan struct{}
1667+
1668+
// maxErrors is the maximum number of errors to store before discarding.
1669+
maxErrors int
1670+
// overflowErrors is the number of errors we have ignored.
1671+
overflowErrors int
16631672
}
16641673

1665-
func (t *tsmBatchKeyIterator) AppendError(err error) {
1666-
t.errs = append(t.errs, err)
1674+
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
1675+
if t.maxErrors > len(t.errs) {
1676+
t.errs = append(t.errs, err)
1677+
// Was the error stored?
1678+
return true
1679+
} else {
1680+
// Was the error dropped
1681+
t.overflowErrors++
1682+
return false
1683+
}
16671684
}
16681685

16691686
// NewTSMBatchKeyIterator returns a new TSM key iterator from readers.
16701687
// size indicates the maximum number of values to encode in a single block.
1671-
func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) {
1688+
func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) {
16721689
var iter []*BlockIterator
16731690
for _, r := range readers {
16741691
iter = append(iter, r.BlockIterator())
@@ -1689,6 +1706,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFil
16891706
mergedBooleanValues: &tsdb.BooleanArray{},
16901707
mergedStringValues: &tsdb.StringArray{},
16911708
interrupt: interrupt,
1709+
maxErrors: maxErrors,
16921710
}, nil
16931711
}
16941712

@@ -1916,7 +1934,12 @@ func (k *tsmBatchKeyIterator) Err() error {
19161934
if len(k.errs) == 0 {
19171935
return nil
19181936
}
1919-
return k.errs
1937+
// Copy the errors before appending the dropped error count
1938+
var errs TSMErrors
1939+
errs = make([]error, 0, len(k.errs)+1)
1940+
errs = append(errs, k.errs...)
1941+
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
1942+
return errs
19201943
}
19211944

19221945
type cacheKeyIterator struct {

tsdb/engine/tsm1/reader_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path/filepath"
1111
"sort"
12+
"strings"
1213
"testing"
1314

1415
"github.com/stretchr/testify/require"
@@ -1866,6 +1867,98 @@ func TestTSMReader_References(t *testing.T) {
18661867
}
18671868
}
18681869

1870+
func TestBatchKeyIterator_Errors(t *testing.T) {
1871+
const MaxErrors = 10
1872+
1873+
dir, name := createTestTSM(t)
1874+
defer os.RemoveAll(dir)
1875+
fr, err := os.Open(name)
1876+
if err != nil {
1877+
t.Fatalf("unexpected error opening file %s: %v", name, err)
1878+
}
1879+
r, err := NewTSMReader(fr)
1880+
if err != nil {
1881+
// Only have a deferred close if we could not create the TSMReader
1882+
defer func() {
1883+
if e := fr.Close(); e != nil {
1884+
t.Fatalf("unexpected error closing %s: %v", name, e)
1885+
}
1886+
}()
1887+
1888+
t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
1889+
}
1890+
defer func() {
1891+
if e := r.Close(); e != nil {
1892+
t.Fatalf("error closing TSMReader for %s: %v", name, e)
1893+
}
1894+
}()
1895+
interrupts := make(chan struct{})
1896+
var iter KeyIterator
1897+
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil {
1898+
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
1899+
}
1900+
var i int
1901+
for i = 0; i < MaxErrors*2; i++ {
1902+
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i))
1903+
if i < MaxErrors && !saved {
1904+
t.Fatalf("error unexpectedly not saved: %d", i)
1905+
}
1906+
if i >= MaxErrors && saved {
1907+
t.Fatalf("error unexpectedly saved: %d", i)
1908+
}
1909+
}
1910+
errs := iter.Err()
1911+
if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) {
1912+
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt)
1913+
}
1914+
expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors)
1915+
if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 {
1916+
t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error())
1917+
}
1918+
}
1919+
1920+
func createTestTSM(t *testing.T) (dir string, name string) {
1921+
dir = MustTempDir()
1922+
f := mustTempFile(dir)
1923+
name = f.Name()
1924+
w, err := NewTSMWriter(f)
1925+
if err != nil {
1926+
f.Close()
1927+
t.Fatalf("unexpected error creating writer for %s: %v", name, err)
1928+
}
1929+
defer func() {
1930+
if e := w.Close(); e != nil {
1931+
t.Fatalf("write TSM close of %s: %v", name, err)
1932+
}
1933+
}()
1934+
1935+
var data = map[string][]Value{
1936+
"float": []Value{NewValue(1, 1.0)},
1937+
"int": []Value{NewValue(1, int64(1))},
1938+
"uint": []Value{NewValue(1, ^uint64(0))},
1939+
"bool": []Value{NewValue(1, true)},
1940+
"string": []Value{NewValue(1, "foo")},
1941+
}
1942+
1943+
keys := make([]string, 0, len(data))
1944+
for k := range data {
1945+
keys = append(keys, k)
1946+
}
1947+
sort.Strings(keys)
1948+
1949+
for _, k := range keys {
1950+
if err := w.Write([]byte(k), data[k]); err != nil {
1951+
t.Fatalf("write TSM value: %v", err)
1952+
}
1953+
}
1954+
1955+
if err := w.WriteIndex(); err != nil {
1956+
t.Fatalf("write TSM index: %v", err)
1957+
}
1958+
1959+
return dir, name
1960+
}
1961+
18691962
func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
18701963
index := NewIndexWriter()
18711964
for i := 0; i < 100000; i++ {

0 commit comments

Comments
 (0)