Skip to content

Commit 945dd7b

Browse files
committed
Timeout directory opening
1 parent 28c9fcb commit 945dd7b

3 files changed

Lines changed: 52 additions & 19 deletions

File tree

pkg/drivers/fdb/fdb.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,28 +81,25 @@ func (f *FDB) Start(ctx context.Context) error {
8181
}
8282
f.db = db
8383

84-
etcd, err := directory.CreateOrOpen(db, []string{f.dirName}, nil)
85-
if err != nil {
84+
if err = f.openDirectory(); err != nil {
8685
return err
8786
}
88-
f.dir = etcd
8987

9088
if CleanDirOnStart {
91-
_, err = db.Transact(func(tr fdb.Transaction) (ret interface{}, e error) {
92-
tr.ClearRange(etcd)
93-
return
89+
_, err = transact(db, 0, func(tr fdb.Transaction) (interface{}, error) {
90+
tr.ClearRange(f.dir)
91+
return 0, nil
9492
})
93+
if err != nil {
94+
return err
95+
}
9596
}
9697

97-
if err != nil {
98-
return err
99-
}
100-
101-
f.byRevision = CreateByRevisionSubspace(etcd)
102-
f.byKeyAndRevision = CreateByKeyRevisionSubspace(etcd)
103-
f.watch = CreateWatchSubspace(etcd)
104-
f.compactRev = CreateCompactRevisionSubspace(etcd)
105-
f.rev = CreateRevisionSubspace(etcd)
98+
f.byRevision = CreateByRevisionSubspace(f.dir)
99+
f.byKeyAndRevision = CreateByKeyRevisionSubspace(f.dir)
100+
f.watch = CreateWatchSubspace(f.dir)
101+
f.compactRev = CreateCompactRevisionSubspace(f.dir)
102+
f.rev = CreateRevisionSubspace(f.dir)
106103

107104
// https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L97
108105
if !APITest {
@@ -118,6 +115,28 @@ func (f *FDB) Start(ctx context.Context) error {
118115
return nil
119116
}
120117

118+
// The FDB operations intentionally retry forever and there is no explicit way to timeout directory opening.
119+
// https://forums.foundationdb.org/t/golang-fdb-mustopendefault-does-not-fail-when-fdb-cluster-content-points-to-invalid-host/715/2
120+
// Wrapping directory opening in a goroutine.
121+
func (f *FDB) openDirectory() error {
122+
errCh := make(chan error, 1)
123+
go func() {
124+
dir, err := directory.CreateOrOpen(f.db, []string{f.dirName}, nil)
125+
f.dir = dir
126+
errCh <- err
127+
}()
128+
129+
select {
130+
case err := <-errCh:
131+
if err != nil {
132+
return err
133+
}
134+
case <-time.After(transactionTimeout):
135+
return errors.New("directory creation timed out")
136+
}
137+
return nil
138+
}
139+
121140
// https://apple.github.io/foundationdb/tls.html#configuring-tls
122141
func (f *FDB) setTLSConfig() error {
123142
if f.tlsConfig.CertFile != "" {

pkg/drivers/fdb/fdb_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,17 @@ func TestExceedSizeLarge(t *testing.T) {
517517
require.ErrorIs(t, err, rpctypes.ErrRequestTooLarge)
518518
}
519519

520+
func TestFailUnavailableServer(t *testing.T) {
521+
transactionTimeout = 1 * time.Second
522+
523+
f := NewFdbStructured("any:any@1.2.3.4:9999", tls.Config{}, "dir1")
524+
ctx, cancelCtx := context.WithTimeout(context.Background(), time.Duration(3)*time.Second)
525+
defer cancelCtx()
526+
527+
err := f.Start(ctx)
528+
require.Error(t, err)
529+
}
530+
520531
func createRecords(t *testing.T, f server.Backend, ctx context.Context, recordCount int, recordSize int) map[string][]byte {
521532
g := errgroup.Group{}
522533
g.SetLimit(50)

pkg/drivers/fdb/fdb_util.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ const (
1313
notCommittedErrorCode = 1020 // Transaction not committed due to conflict with another transaction
1414

1515
splitRangeAfterDuration = 1 * time.Second
16-
transactionTimeout = 10 * time.Second
1716
transactionMaxRetryCount = 1000
1817
)
1918

19+
// var for testing
20+
var transactionTimeout = 10 * time.Second
21+
2022
var forceRetryTransaction = func(i int) bool { return false }
2123

2224
type Processor[T any] interface {
@@ -60,9 +62,6 @@ func processBatch(db fdb.Database, selector fdb.SelectorRange, collector Process
6062

6163
res, err := transact(db, batchResult{}, func(tr fdb.Transaction) (batchResult, error) {
6264
res := batchResult{collectorNeedsMore: true}
63-
if err := tr.Options().SetTimeout(transactionTimeout.Milliseconds()); err != nil {
64-
return res, fmt.Errorf("failed to set timeout limit: %w", err)
65-
}
6665

6766
start := time.Now()
6867
// Snapshot read does not add read conflict ranges
@@ -104,6 +103,10 @@ func transact[T any](d fdb.Database, defaultValue T, f func(fdb.Transaction) (T,
104103
wrapped := func() (T, error) {
105104
defer panicToError(&e)
106105

106+
if err := tr.Options().SetTimeout(transactionTimeout.Milliseconds()); err != nil {
107+
return defaultValue, fmt.Errorf("failed to set timeout limit: %w", err)
108+
}
109+
107110
// https://forums.foundationdb.org/t/defaults-for-transaction-timeouts-and-retries/315/2
108111
e = tr.Options().SetRetryLimit(transactionMaxRetryCount)
109112
if e != nil {

0 commit comments

Comments
 (0)