Skip to content

Commit cac21cf

Browse files
authored
Accept Task List (#854)
* Accept Task List * oops unlimited headroom isn't zero * lint * oops * reduce workload to the accepted IDs * not every task has storage * limit maxAcceptable to storage partial * order in tasks * cleanup * fix sql * nil fix * fun with yb-not-pg type strictness * ffi update and harmonytask fn * fix merge issues * pull
1 parent 3dc0131 commit cac21cf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+437
-408
lines changed

alertmanager/task_alert.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,8 @@ func (a *AlertTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
164164

165165
}
166166

167-
func (a *AlertTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
168-
id := ids[0]
169-
return &id, nil
167+
func (a *AlertTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) ([]harmonytask.TaskID, error) {
168+
return ids, nil
170169
}
171170

172171
func (a *AlertTask) TypeDetails() harmonytask.TaskTypeDetails {

harmony/harmonydb/harmonydb.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -309,19 +309,11 @@ func ensureSchemaExists(connString, schema string) error {
309309
return xerrors.New("schema must be of the form " + schemaREString + "\n Got: " + schema)
310310
}
311311

312-
retryWait := InitialSerializationErrorRetryWait
312+
_, err = backoffForSerializationError(func() (pgconn.CommandTag, error) {
313+
return p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema)
314+
})
313315

314-
retry:
315-
_, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema)
316-
if err != nil && IsErrSerialization(err) {
317-
time.Sleep(retryWait)
318-
retryWait *= 2
319-
goto retry
320-
}
321-
if err != nil {
322-
return xerrors.Errorf("cannot create schema: %w", err)
323-
}
324-
return nil
316+
return err
325317
}
326318

327319
//go:embed sql

harmony/harmonydb/userfuncs.go

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,25 @@ import (
1212
"github.com/samber/lo"
1313
"github.com/yugabyte/pgx/v5"
1414
"github.com/yugabyte/pgx/v5/pgconn"
15+
"golang.org/x/xerrors"
1516
)
1617

1718
var errTx = errors.New("cannot use a non-transaction func in a transaction")
1819

19-
const InitialSerializationErrorRetryWait = 5 * time.Second
20+
var backoffs = lo.Map([]int{200, 400, 800, 1600, 2400, 5000, 8000}, func(item int, _ int) time.Duration {
21+
return time.Duration(item) * time.Millisecond
22+
})
23+
24+
func backoffForSerializationError[T any](f func() (T, error)) (whatever T, err error) {
25+
for _, backoff := range backoffs {
26+
res, err := f()
27+
if !IsErrSerialization(err) {
28+
return res, nil
29+
}
30+
time.Sleep(backoff)
31+
}
32+
return whatever, xerrors.Errorf("failed to execute function: %w", err)
33+
}
2034

2135
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
2236
// In any package, raw strings will satisfy compilation. Ex:
@@ -34,17 +48,14 @@ func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (co
3448
return 0, errTx
3549
}
3650

37-
retryWait := InitialSerializationErrorRetryWait
38-
39-
retry:
40-
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
41-
if err != nil && IsErrSerialization(err) {
42-
time.Sleep(retryWait)
43-
retryWait *= 2
44-
goto retry
51+
res, err := backoffForSerializationError(func() (pgconn.CommandTag, error) {
52+
return db.pgx.Exec(ctx, string(sql), arguments...)
53+
})
54+
if err != nil {
55+
return 0, err
4556
}
4657

47-
return int(res.RowsAffected()), err
58+
return int(res.RowsAffected()), nil
4859
}
4960

5061
type Qry interface {
@@ -79,7 +90,9 @@ func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*
7990
if db.usedInTransaction() {
8091
return &Query{}, errTx
8192
}
82-
q, err := db.pgx.Query(ctx, string(sql), arguments...)
93+
q, err := backoffForSerializationError(func() (pgx.Rows, error) {
94+
return db.pgx.Query(ctx, string(sql), arguments...)
95+
})
8396
return &Query{q}, err
8497
}
8598

@@ -148,7 +161,9 @@ func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnl
148161
if db.usedInTransaction() {
149162
return errTx
150163
}
151-
rows, err := db.pgx.Query(ctx, string(sql), arguments...)
164+
rows, err := backoffForSerializationError(func() (pgx.Rows, error) {
165+
return db.pgx.Query(ctx, string(sql), arguments...)
166+
})
152167
if err != nil {
153168
return err
154169
}
@@ -171,8 +186,7 @@ func (db *DB) usedInTransaction() bool {
171186
}
172187

173188
type TransactionOptions struct {
174-
RetrySerializationError bool
175-
InitialSerializationErrorRetryWait time.Duration
189+
RetrySerializationError bool
176190
}
177191

178192
type TransactionOption func(*TransactionOptions)
@@ -183,12 +197,6 @@ func OptionRetry() TransactionOption {
183197
}
184198
}
185199

186-
func OptionSerialRetryTime(d time.Duration) TransactionOption {
187-
return func(o *TransactionOptions) {
188-
o.InitialSerializationErrorRetryWait = d
189-
}
190-
}
191-
192200
// BeginTransaction is how you can access transactions using this library.
193201
// The entire transaction happens in the function passed in.
194202
// The return must be true or a rollback will occur.
@@ -208,23 +216,19 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er
208216
}
209217

210218
opts := TransactionOptions{
211-
RetrySerializationError: false,
212-
InitialSerializationErrorRetryWait: InitialSerializationErrorRetryWait,
219+
RetrySerializationError: false,
213220
}
214221

215222
for _, o := range opt {
216223
o(&opts)
217224
}
218225

219-
retry:
220-
comm, err := db.transactionInner(ctx, f)
221-
if err != nil && opts.RetrySerializationError && IsErrSerialization(err) {
222-
time.Sleep(opts.InitialSerializationErrorRetryWait)
223-
opts.InitialSerializationErrorRetryWait *= 2
224-
goto retry
226+
if opts.RetrySerializationError {
227+
return backoffForSerializationError(func() (bool, error) {
228+
return db.transactionInner(ctx, f)
229+
})
225230
}
226-
227-
return comm, err
231+
return db.transactionInner(ctx, f)
228232
}
229233

230234
func (db *DB) transactionInner(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
@@ -256,8 +260,13 @@ func (db *DB) transactionInner(ctx context.Context, f func(*Tx) (commit bool, er
256260

257261
// Exec in a transaction.
258262
func (t *Tx) Exec(sql rawStringOnly, arguments ...any) (count int, err error) {
259-
res, err := t.Tx.Exec(t.ctx, string(sql), arguments...)
260-
return int(res.RowsAffected()), err
263+
res, err := backoffForSerializationError(func() (pgconn.CommandTag, error) {
264+
return t.Tx.Exec(t.ctx, string(sql), arguments...)
265+
})
266+
if err != nil {
267+
return 0, err
268+
}
269+
return int(res.RowsAffected()), nil
261270
}
262271

263272
// Query in a transaction.

harmony/harmonytask/harmonytask.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type TaskInterface interface {
8686
// return null if the task type is not allowed on this machine.
8787
// It should select the task it most wants to accomplish.
8888
// It is also responsible for determining & reserving disk space (including scratch).
89-
CanAccept([]TaskID, *TaskEngine) (*TaskID, error)
89+
CanAccept([]TaskID, *TaskEngine) ([]TaskID, error)
9090

9191
// TypeDetails() returns static details about how this task behaves and
9292
// how this machine will run it. Read once at the beginning.
@@ -217,7 +217,7 @@ func New(
217217
// edge-case: if old assignments are not available tasks, unlock them.
218218
h := e.taskMap[w.Name]
219219
if h == nil || !h.considerWork(WorkSourceRecover, []TaskID{TaskID(w.ID)}) {
220-
_, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, w.ID)
220+
_, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner_id=NULL WHERE id=$1 AND owner_id=$2`, w.ID, e.ownerID)
221221
if err != nil {
222222
log.Errorw("Cannot remove self from owner field", "error", err)
223223
continue // not really fatal, but not great
@@ -420,7 +420,7 @@ func (e *TaskEngine) pollerTryAllWork(schedulable bool) bool {
420420
}
421421
}
422422

423-
if err := v.AssertMachineHasCapacity(); err != nil {
423+
if _, err := v.AssertMachineHasCapacity(); err != nil {
424424
log.Debugf("skipped scheduling %s type tasks on due to %s", v.Name, err.Error())
425425
continue
426426
}
@@ -468,7 +468,7 @@ func (e *TaskEngine) pollerTryAllWork(schedulable bool) bool {
468468
// if no work was accepted, are we bored? Then find work in priority order.
469469
for _, v := range e.handlers {
470470
v := v
471-
if v.AssertMachineHasCapacity() != nil {
471+
if _, err := v.AssertMachineHasCapacity(); err != nil {
472472
continue
473473
}
474474
if v.IAmBored != nil {
@@ -579,3 +579,7 @@ func Reg(t TaskInterface) bool {
579579

580580
return true
581581
}
582+
583+
func (e *TaskEngine) RunningCount(name string) int {
584+
return int(e.taskMap[name].Max.Active())
585+
}

0 commit comments

Comments
 (0)