Skip to content

Commit 1e768c3

Browse files
committed
Go: introduce context for Transact/ReadTransact
Use a goroutine to automatically cancel transaction when context has an error. Correctly close transactions by returning a function to caller.
1 parent 534b1fe commit 1e768c3

File tree

11 files changed

+243
-129
lines changed

11 files changed

+243
-129
lines changed

bindings/go/src/fdb/database.go

Lines changed: 64 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package fdb
2727
import "C"
2828

2929
import (
30+
"context"
3031
"errors"
3132
)
3233

@@ -162,23 +163,52 @@ func (d Database) GetClientStatus() ([]byte, error) {
162163
return b, nil
163164
}
164165

165-
func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNil) (ret interface{}, err error) {
166+
func autoCancel(ctx context.Context, tr CancellableTransaction) chan<- struct{} {
167+
exitCh := make(chan struct{})
168+
// This goroutine ensures that as soon as context is canceled the FoundationDB transaction will be canceled as well;
169+
// this can in turn cause FoundationDB's `transaction_canceled` errors.
170+
go func() {
171+
select {
172+
case <-exitCh:
173+
// transaction finished before any context cancellation was acknowledged
174+
return
175+
case <-ctx.Done():
176+
// context cancellation happened before transaction finished
177+
// NOTE: canceling the transaction does not mean that transaction state changes did not happen,
178+
179+
tr.Cancel()
180+
181+
return
182+
}
183+
}()
184+
185+
return exitCh
186+
}
187+
188+
// retryable is responsible for retrying the wrapped function for as long as the returned error is retryable.
189+
// In case of a non-retryable error, it is returned and transaction is closed.
190+
func retryable(ctx context.Context, tr Transaction, wrapped func() (interface{}, error)) (ret interface{}, txClose func(), err error) {
191+
// NOTE: 'autoCancel' must be called outside of the defer function, so that the goroutine is started
192+
ch := autoCancel(ctx, tr)
193+
defer close(ch)
194+
166195
for {
167196
ret, err = wrapped()
168-
169-
// No error means success!
170197
if err == nil {
198+
// No error means success!
199+
// Caller is responsible for closing transaction after finishing using any future created within the transaction
200+
// This return value is always nil in case of errors
201+
txClose = tr.Close
202+
171203
return
172204
}
173205

174-
// Check if the error chain contains an
175-
// fdb.Error
206+
// Check if the error chain contains an fdb.Error
176207
var ep Error
177208
if errors.As(err, &ep) {
178-
f := onError(ep)
179-
defer f.Close()
180-
209+
f := tr.OnError(ep)
181210
processedErr := f.Get()
211+
f.Close()
182212
var newEp Error
183213
if !errors.As(processedErr, &newEp) || newEp.Code != ep.Code {
184214
// override original error only if not an Error or code changed
@@ -190,6 +220,10 @@ func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNi
190220
// If OnError returns an error, then it's not
191221
// retryable; otherwise take another pass at things
192222
if err != nil {
223+
224+
// destroy transaction; this will cancel all futures created within it
225+
tr.Close()
226+
193227
return
194228
}
195229
}
@@ -207,40 +241,37 @@ func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNi
207241
// error.
208242
//
209243
// The transaction is retried if the error is or wraps a retryable Error.
210-
// The error is unwrapped.
211244
//
212-
// Do not return Future objects from the function provided to Transact. The
213-
// Transaction created by Transact may be finalized at any point after Transact
214-
// returns, resulting in the cancellation of any outstanding
215-
// reads. Additionally, any errors returned or panicked by the Future will no
216-
// longer be able to trigger a retry of the caller-provided function.
245+
// In case of success a transaction close function is returned and caller must
246+
// call it after all futures have been used.
247+
// Any errors returned or panicked by the Future outside of the Transact() call
248+
// will no longer be able to trigger a retry of the caller-provided function.
217249
//
218250
// See the Transactor interface for an example of using Transact with
219251
// Transaction and Database objects.
220-
func (d Database) Transact(f func(Transaction) (interface{}, error)) (interface{}, error) {
252+
func (d Database) Transact(ctx context.Context, f func(Transaction) (interface{}, error)) (interface{}, func(), error) {
221253
tr, err := d.CreateTransaction()
222254
if err != nil {
223255
// Any error here is non-retryable
224-
return nil, err
256+
return nil, nil, err
225257
}
226-
defer tr.Close()
227258

228259
wrapped := func() (ret interface{}, err error) {
229260
defer panicToError(&err)
230261

231262
ret, err = f(tr)
232-
233-
if err == nil {
234-
f := tr.Commit()
235-
defer f.Close()
236-
237-
err = f.Get()
263+
if err != nil {
264+
return
238265
}
239266

267+
f := tr.Commit()
268+
err = f.Get()
269+
f.Close()
270+
240271
return
241272
}
242273

243-
return retryable(wrapped, tr.OnError)
274+
return retryable(ctx, tr, wrapped)
244275
}
245276

246277
// ReadTransact runs a caller-provided function inside a retry loop, providing
@@ -254,37 +285,33 @@ func (d Database) Transact(f func(Transaction) (interface{}, error)) (interface{
254285
// transaction or return the error.
255286
//
256287
// The transaction is retried if the error is or wraps a retryable Error.
257-
// The error is unwrapped.
258-
// Read transactions are never committed and destroyed automatically via GC,
259-
// once all their futures go out of scope.
288+
// Read transactions are never committed.
260289
//
261-
// Do not return Future objects from the function provided to ReadTransact. The
262-
// Transaction created by ReadTransact may be finalized at any point after
263-
// ReadTransact returns, resulting in the cancellation of any outstanding
264-
// reads. Additionally, any errors returned or panicked by the Future will no
265-
// longer be able to trigger a retry of the caller-provided function.
290+
// In case of success a transaction close function is returned and caller must
291+
// call it after all futures have been used.
292+
// Any errors returned or panicked by the Future outside of the Transact() call
293+
// will no longer be able to trigger a retry of the caller-provided function.
266294
//
267295
// See the ReadTransactor interface for an example of using ReadTransact with
268296
// Transaction, Snapshot and Database objects.
269-
func (d Database) ReadTransact(f func(ReadTransaction) (interface{}, error)) (interface{}, error) {
297+
func (d Database) ReadTransact(ctx context.Context, f func(ReadTransaction) (interface{}, error)) (interface{}, func(), error) {
270298
tr, err := d.CreateTransaction()
271299
if err != nil {
272300
// Any error here is non-retryable
273-
return nil, err
301+
return nil, nil, err
274302
}
275-
defer tr.Close()
276303

277304
wrapped := func() (ret interface{}, err error) {
278305
defer panicToError(&err)
279306

280307
ret, err = f(tr)
281308

282-
// read-only transactions are not committed and will be destroyed automatically by the deferred Close()
309+
// read-only transactions are not committed and will be destroyed automatically when Transaction's Close() is called
283310

284311
return
285312
}
286313

287-
return retryable(wrapped, tr.OnError)
314+
return retryable(ctx, tr, wrapped)
288315
}
289316

290317
// Options returns a DatabaseOptions instance suitable for setting options

bindings/go/src/fdb/directory/directory.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
package directory
4141

4242
import (
43+
"context"
4344
"errors"
4445

4546
"github.com/apple/foundationdb/bindings/go/src/fdb"
@@ -79,7 +80,7 @@ type Directory interface {
7980
// recorded as the layer; if layer is specified and the directory already
8081
// exists, it is compared against the layer specified when the directory was
8182
// created, and an error is returned if they differ.
82-
CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error)
83+
CreateOrOpen(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error)
8384

8485
// Open opens the directory specified by path (relative to this Directory),
8586
// and returns the directory and its contents as a DirectorySubspace (or ErrDirNotExists
@@ -89,15 +90,15 @@ type Directory interface {
8990
// If the byte slice layer is specified, it is compared against the layer
9091
// specified when the directory was created, and an error is returned if
9192
// they differ.
92-
Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error)
93+
Open(ctx context.Context, rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error)
9394

9495
// Create creates a directory specified by path (relative to this
9596
// Directory), and returns the directory and its contents as a
9697
// DirectorySubspace (or ErrDirAlreadyExists if the directory already exists).
9798
//
9899
// If the byte slice layer is specified, it is recorded as the layer and
99100
// will be checked when opening the directory in the future.
100-
Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error)
101+
Create(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error)
101102

102103
// CreatePrefix behaves like Create, but uses a manually specified byte
103104
// slice prefix to physically store the contents of this directory, rather
@@ -106,7 +107,7 @@ type Directory interface {
106107
// If this Directory was created in a root directory that does not allow
107108
// manual prefixes, CreatePrefix will return an error. The default root
108109
// directory does not allow manual prefixes.
109-
CreatePrefix(t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error)
110+
CreatePrefix(ctx context.Context, t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error)
110111

111112
// Move moves the directory at oldPath to newPath (both relative to this
112113
// Directory), and returns the directory (at its new location) and its
@@ -116,7 +117,7 @@ type Directory interface {
116117
//
117118
// There is no effect on the physical prefix of the given directory or on
118119
// clients that already have the directory open.
119-
Move(t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error)
120+
Move(ctx context.Context, t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error)
120121

121122
// MoveTo moves this directory to newAbsolutePath (relative to the root
122123
// directory of this Directory), and returns the directory (at its new
@@ -126,7 +127,7 @@ type Directory interface {
126127
//
127128
// There is no effect on the physical prefix of the given directory or on
128129
// clients that already have the directory open.
129-
MoveTo(t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error)
130+
MoveTo(ctx context.Context, t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error)
130131

131132
// Remove removes the directory at path (relative to this Directory), its
132133
// content, and all subdirectories. Remove returns true if a directory
@@ -135,16 +136,16 @@ type Directory interface {
135136
//
136137
// Note that clients that have already opened this directory might still
137138
// insert data into its contents after removal.
138-
Remove(t fdb.Transactor, path []string) (bool, error)
139+
Remove(ctx context.Context, t fdb.Transactor, path []string) (bool, error)
139140

140141
// Exists returns true if the directory at path (relative to this Directory)
141142
// exists, and false otherwise.
142-
Exists(rt fdb.ReadTransactor, path []string) (bool, error)
143+
Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error)
143144

144145
// List returns the names of the immediate subdirectories of the directory
145146
// at path (relative to this Directory) as a slice of strings. Each string
146147
// is the name of the last component of a subdirectory's path.
147-
List(rt fdb.ReadTransactor, path []string) ([]string, error)
148+
List(ctx context.Context, rt fdb.ReadTransactor, path []string) ([]string, error)
148149

149150
// GetLayer returns the layer specified when this Directory was created.
150151
GetLayer() []byte
@@ -165,14 +166,14 @@ func stringsEqual(a, b []string) bool {
165166
return true
166167
}
167168

168-
func moveTo(t fdb.Transactor, dl directoryLayer, path, newAbsolutePath []string) (DirectorySubspace, error) {
169+
func moveTo(ctx context.Context, t fdb.Transactor, dl directoryLayer, path, newAbsolutePath []string) (DirectorySubspace, error) {
169170
partition_len := len(dl.path)
170171

171172
if !stringsEqual(newAbsolutePath[:partition_len], dl.path) {
172173
return nil, errors.New("cannot move between partitions")
173174
}
174175

175-
return dl.Move(t, path[partition_len:], newAbsolutePath[partition_len:])
176+
return dl.Move(ctx, t, path[partition_len:], newAbsolutePath[partition_len:])
176177
}
177178

178179
var root = NewDirectoryLayer(subspace.FromBytes([]byte{0xFE}), subspace.AllKeys(), false)
@@ -186,8 +187,8 @@ var root = NewDirectoryLayer(subspace.FromBytes([]byte{0xFE}), subspace.AllKeys(
186187
// as the layer; if layer is specified and the directory already exists, it is
187188
// compared against the layer specified when the directory was created, and an
188189
// error is returned if they differ.
189-
func CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) {
190-
return root.CreateOrOpen(t, path, layer)
190+
func CreateOrOpen(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) {
191+
return root.CreateOrOpen(ctx, t, path, layer)
191192
}
192193

193194
// Open opens the directory specified by path (resolved relative to the default
@@ -197,8 +198,8 @@ func CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubsp
197198
// If the byte slice layer is specified, it is compared against the layer
198199
// specified when the directory was created, and an error is returned if they
199200
// differ.
200-
func Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) {
201-
return root.Open(rt, path, layer)
201+
func Open(ctx context.Context, rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) {
202+
return root.Open(ctx, rt, path, layer)
202203
}
203204

204205
// Create creates a directory specified by path (resolved relative to the
@@ -207,8 +208,8 @@ func Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace
207208
//
208209
// If the byte slice layer is specified, it is recorded as the layer and will be
209210
// checked when opening the directory in the future.
210-
func Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) {
211-
return root.Create(t, path, layer)
211+
func Create(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) {
212+
return root.Create(ctx, t, path, layer)
212213
}
213214

214215
// Move moves the directory at oldPath to newPath (both resolved relative to the
@@ -219,21 +220,21 @@ func Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, e
219220
//
220221
// There is no effect on the physical prefix of the given directory or on
221222
// clients that already have the directory open.
222-
func Move(t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) {
223-
return root.Move(t, oldPath, newPath)
223+
func Move(ctx context.Context, t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) {
224+
return root.Move(ctx, t, oldPath, newPath)
224225
}
225226

226227
// Exists returns true if the directory at path (relative to the default root
227228
// directory) exists, and false otherwise.
228-
func Exists(rt fdb.ReadTransactor, path []string) (bool, error) {
229-
return root.Exists(rt, path)
229+
func Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error) {
230+
return root.Exists(ctx, rt, path)
230231
}
231232

232233
// List returns the names of the immediate subdirectories of the default root
233234
// directory as a slice of strings. Each string is the name of the last
234235
// component of a subdirectory's path.
235-
func List(rt fdb.ReadTransactor, path []string) ([]string, error) {
236-
return root.List(rt, path)
236+
func List(ctx context.Context, rt fdb.ReadTransactor, path []string) ([]string, error) {
237+
return root.List(ctx, rt, path)
237238
}
238239

239240
// Root returns the default root directory. Any attempt to move or remove the

0 commit comments

Comments
 (0)