Skip to content

Commit 2f1e3f8

Browse files
[OCC] perform validateAll after execution (#437)
## Describe your changes and provide context - This performs validateAll after execution ## Testing performed to validate your change - Unit Tests
1 parent 5a05027 commit 2f1e3f8

File tree

2 files changed

+59
-44
lines changed

2 files changed

+59
-44
lines changed

tasks/scheduler.go

Lines changed: 31 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type deliverTxTask struct {
4141
Ctx sdk.Context
4242
AbortCh chan occ.Abort
4343

44+
mx sync.RWMutex
4445
Status status
4546
Dependencies []int
4647
Abort *occ.Abort
@@ -52,8 +53,20 @@ type deliverTxTask struct {
5253
ValidateCh chan status
5354
}
5455

56+
func (dt *deliverTxTask) IsStatus(s status) bool {
57+
dt.mx.RLock()
58+
defer dt.mx.RUnlock()
59+
return dt.Status == s
60+
}
61+
62+
func (dt *deliverTxTask) SetStatus(s status) {
63+
dt.mx.Lock()
64+
defer dt.mx.Unlock()
65+
dt.Status = s
66+
}
67+
5568
func (dt *deliverTxTask) Reset() {
56-
dt.Status = statusPending
69+
dt.SetStatus(statusPending)
5770
dt.Response = nil
5871
dt.Abort = nil
5972
dt.AbortCh = nil
@@ -182,7 +195,7 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) {
182195

183196
func indexesValidated(tasks []*deliverTxTask, idx []int) bool {
184197
for _, i := range idx {
185-
if tasks[i].Status != statusValidated {
198+
if !tasks[i].IsStatus(statusValidated) {
186199
return false
187200
}
188201
}
@@ -191,7 +204,7 @@ func indexesValidated(tasks []*deliverTxTask, idx []int) bool {
191204

192205
func allValidated(tasks []*deliverTxTask) bool {
193206
for _, t := range tasks {
194-
if t.Status != statusValidated {
207+
if !t.IsStatus(statusValidated) {
195208
return false
196209
}
197210
}
@@ -220,7 +233,7 @@ type schedulerMetrics struct {
220233

221234
func (s *scheduler) emitMetrics() {
222235
telemetry.IncrCounter(float32(s.metrics.retries), "scheduler", "retries")
223-
telemetry.SetGauge(float32(s.metrics.maxIncarnation), "scheduler", "max_incarnation")
236+
telemetry.IncrCounter(float32(s.metrics.maxIncarnation), "scheduler", "incarnations")
224237
}
225238

226239
func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
@@ -296,12 +309,12 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
296309
} else {
297310
// otherwise, wait for completion
298311
task.Dependencies = conflicts
299-
task.Status = statusWaiting
312+
task.SetStatus(statusWaiting)
300313
return false
301314
}
302315
} else if len(conflicts) == 0 {
303316
// mark as validated, which will avoid re-validating unless a lower-index re-validates
304-
task.Status = statusValidated
317+
task.SetStatus(statusValidated)
305318
return false
306319
}
307320
// conflicts and valid, so it'll validate next time
@@ -346,18 +359,18 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del
346359
return nil, nil
347360
}
348361

349-
wg := sync.WaitGroup{}
362+
wg := &sync.WaitGroup{}
350363
for i := startIdx; i < len(tasks); i++ {
351-
t := tasks[i]
352364
wg.Add(1)
365+
t := tasks[i]
353366
s.DoValidate(func() {
354367
defer wg.Done()
355368
if !s.validateTask(ctx, t) {
369+
mx.Lock()
370+
defer mx.Unlock()
356371
t.Reset()
357372
t.Increment()
358-
mx.Lock()
359373
res = append(res, t)
360-
mx.Unlock()
361374
}
362375
})
363376
}
@@ -373,53 +386,28 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
373386

374387
// validationWg waits for all validations to complete
375388
// validations happen in separate goroutines in order to wait on previous index
376-
validationWg := &sync.WaitGroup{}
377-
validationWg.Add(len(tasks))
389+
wg := &sync.WaitGroup{}
390+
wg.Add(len(tasks))
378391

379392
for _, task := range tasks {
380393
t := task
381394
s.DoExecute(func() {
382-
s.prepareAndRunTask(validationWg, ctx, t)
395+
s.prepareAndRunTask(wg, ctx, t)
383396
})
384397
}
385398

386-
validationWg.Wait()
399+
wg.Wait()
387400

388401
return nil
389402
}
390403

391-
func (s *scheduler) waitOnPreviousAndValidate(wg *sync.WaitGroup, task *deliverTxTask) {
392-
defer wg.Done()
393-
defer close(task.ValidateCh)
394-
// wait on previous task to finish validation
395-
// if a previous task fails validation, then subsequent should fail too (cascade)
396-
if task.Index > 0 {
397-
res, ok := <-s.allTasks[task.Index-1].ValidateCh
398-
if ok && res != statusValidated {
399-
task.Reset()
400-
task.ValidateCh <- task.Status
401-
return
402-
}
403-
}
404-
// if not validated, reset the task
405-
if !s.validateTask(task.Ctx, task) {
406-
task.Reset()
407-
}
408-
409-
// notify next task of this one's status
410-
task.ValidateCh <- task.Status
411-
}
412-
413404
func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task *deliverTxTask) {
414405
eCtx, eSpan := s.traceSpan(ctx, "SchedulerExecute", task)
415406
defer eSpan.End()
416-
task.Ctx = eCtx
417407

408+
task.Ctx = eCtx
418409
s.executeTask(task)
419-
420-
s.DoValidate(func() {
421-
s.waitOnPreviousAndValidate(wg, task)
422-
})
410+
wg.Done()
423411
}
424412

425413
func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) {
@@ -509,12 +497,12 @@ func (s *scheduler) executeTask(task *deliverTxTask) {
509497

510498
// If abort has occurred, return, else set the response and status
511499
if abortOccurred {
512-
task.Status = statusAborted
500+
task.SetStatus(statusAborted)
513501
task.Abort = abort
514502
return
515503
}
516504

517-
task.Status = statusExecuted
505+
task.SetStatus(statusExecuted)
518506
task.Response = &resp
519507

520508
// write from version store to multiversion stores

tasks/scheduler_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestProcessAll(t *testing.T) {
9999
name: "Test tx writing to a store that another tx is iterating",
100100
workers: 50,
101101
runs: 1,
102-
requests: requestList(500),
102+
requests: requestList(100),
103103
addStores: true,
104104
before: func(ctx sdk.Context) {
105105
kv := ctx.MultiStore().GetKVStore(testStoreKey)
@@ -208,6 +208,33 @@ func TestProcessAll(t *testing.T) {
208208
},
209209
expectedErr: nil,
210210
},
211+
{
212+
name: "Test some tx accesses same key",
213+
workers: 50,
214+
runs: 1,
215+
addStores: true,
216+
requests: requestList(2000),
217+
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
218+
if ctx.TxIndex()%10 != 0 {
219+
return types.ResponseDeliverTx{
220+
Info: "none",
221+
}
222+
}
223+
// all txs read and write to the same key to maximize conflicts
224+
kv := ctx.MultiStore().GetKVStore(testStoreKey)
225+
val := string(kv.Get(itemKey))
226+
227+
// write to the store with this tx's index
228+
kv.Set(itemKey, req.Tx)
229+
230+
// return what was read from the store (final attempt should be index-1)
231+
return types.ResponseDeliverTx{
232+
Info: val,
233+
}
234+
},
235+
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {},
236+
expectedErr: nil,
237+
},
211238
{
212239
name: "Test no stores on context should not panic",
213240
workers: 50,

0 commit comments

Comments
 (0)