Skip to content

Commit 5badc84

Browse files
committed
GOCBC-1827: Range scan - Use atomic types for variables intended to only be used with atomic operations
Motivation ========== We use a number of variables in the range scan code which are only ever supposed to be used with atomic operations. We should use the more ergonomic atomic types, to avoid any accidental non-atomic usage of these variables. This also allows us to remove the direct dependency of gocb on the "unsafe" package. These were added with Go 1.19 that we still supported when range scan was added, but since we no longer maintain compatibility with Go 1.19 we can now make use of these types. Changes ======= * Use appropriate atomic types for the various counters that are used with atomic operations * Replace use of unsafe.Pointer for the peeked scan item with atomic.Pointer[ScanResultItem] Change-Id: If0ef05dfc322c3ef3d2b1f42ba366918cedffda6 Reviewed-on: https://review.couchbase.org/c/gocb/+/245686 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Charles Dixon <chvckd@gmail.com>
1 parent 9a8c1dc commit 5badc84

3 files changed

Lines changed: 25 additions & 29 deletions

File tree

rangescanopmanager.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"sync"
1111
"sync/atomic"
1212
"time"
13-
"unsafe"
1413

1514
"github.com/couchbase/gocbcore/v10"
1615
)
@@ -60,7 +59,7 @@ type rangeScanOpManager struct {
6059

6160
result *ScanResult
6261

63-
cancelled uint32
62+
cancelled atomic.Bool
6463
}
6564

6665
type rangeScanVbucket struct {
@@ -344,7 +343,7 @@ func (m *rangeScanOpManager) IsRangeScan() bool {
344343
}
345344

346345
func (m *rangeScanOpManager) cancelScan(err error) {
347-
if atomic.CompareAndSwapUint32(&m.cancelled, 0, 1) {
346+
if m.cancelled.CompareAndSwap(false, true) {
348347
if err != nil {
349348
m.result.setErr(err)
350349
}
@@ -373,27 +372,26 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
373372

374373
balancer := m.createLoadBalancer()
375374

376-
var complete uint32
377-
var seenData uint32
378-
379375
// We keep separate counts of running and completed to simplify shutdown of the scan.
380-
scansRunning := int32(m.maxConcurrency)
376+
var complete atomic.Uint32
377+
var scansRunning atomic.Int32
378+
scansRunning.Store(int32(m.maxConcurrency))
381379

382380
isRangeScan := m.IsRangeScan()
383381

384382
var i uint16
385383
for i = 0; i < m.maxConcurrency; i++ {
386384
go func() {
387385
defer func() {
388-
if atomic.AddUint32(&complete, 1) == uint32(m.maxConcurrency) {
386+
if complete.Add(1) == uint32(m.maxConcurrency) {
389387
m.Finish()
390388
balancer.close()
391389
close(resultCh)
392390
}
393391
}()
394392

395393
for vbucket, ok := balancer.selectVbucket(); ok; vbucket, ok = balancer.selectVbucket() {
396-
if atomic.LoadUint32(&m.cancelled) == 1 {
394+
if m.cancelled.Load() {
397395
return
398396
}
399397

@@ -415,7 +413,7 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
415413
if errors.Is(err, gocbcore.ErrBusy) {
416414
// Busy indicates that the server is reporting too many active scans.
417415
// Shut ourselves down if we're not the only runner remaining.
418-
running := atomic.AddInt32(&scansRunning, -1)
416+
running := scansRunning.Add(-1)
419417
if running >= 1 {
420418
// Shutdown this worker.
421419
logDebugf("Shutting down scan runner, remaining %d", running)
@@ -479,8 +477,7 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
479477
case item, more := <-resultCh:
480478
// more could be false if no sampling scans returned any data, but that isn't an error case.
481479
if more {
482-
atomic.StoreUint32(&seenData, 1)
483-
r.peeked = unsafe.Pointer(item) //nolint:gosec
480+
r.peeked.Store(item)
484481
}
485482
}
486483

@@ -567,7 +564,7 @@ func (m *rangeScanOpManager) scanPartition(ctx context.Context, deadline time.Ti
567564
return 0, nil
568565
}
569566
}
570-
if atomic.LoadUint32(&m.cancelled) == 1 {
567+
if m.cancelled.Load() {
571568
m.cancelStream(ctx, span, deadline, createRes)
572569
return 0, nil
573570
}
@@ -775,15 +772,15 @@ func (b *rangeScanLoadBalancer) selectVbucket() (rangeScanVbucket, bool) {
775772

776773
var selectedServer int
777774
selected := false
778-
min := uint32(math.MaxUint32)
775+
minimum := uint32(math.MaxUint32)
779776

780777
for s := range b.servers {
781778
if len(b.vbucketChannels[s]) == 0 {
782779
continue
783780
}
784781
activeScans := b.numActiveScans(s).Load()
785-
if activeScans < min {
786-
min = activeScans
782+
if activeScans < minimum {
783+
minimum = activeScans
787784
selectedServer = s
788785
selected = true
789786
}

rangescanopmanager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,11 @@ func (r *rangeScanCreateResult) RangeScanCancel(opts gocbcore.RangeScanCancelOpt
229229
func (suite *UnitTestSuite) TestScanFirstCreateFailsUnknownError() {
230230
expectedErr := errors.New("unknown error")
231231
test := func(scan ScanType) (*ScanResult, error) {
232-
var calls uint32
232+
var calls atomic.Uint32
233233
provider := makeRangeScanProvider(func(args mock.Arguments) {
234234
cb := args.Get(2).(gocbcore.RangeScanCreateCallback)
235235

236-
if atomic.AddUint32(&calls, 1) == 1 {
236+
if calls.Add(1) == 1 {
237237
cb(nil, expectedErr)
238238
return
239239
}
@@ -285,11 +285,11 @@ func (suite *UnitTestSuite) TestScanFirstCreateFailsUnknownError() {
285285
func (suite *UnitTestSuite) TestScanSecondCreateFailsUnknownError() {
286286
expectedErr := errors.New("unknown error")
287287
test := func(scan ScanType) (*ScanResult, error) {
288-
var calls uint32
288+
var calls atomic.Uint32
289289
provider := makeRangeScanProvider(func(args mock.Arguments) {
290290
cb := args.Get(2).(gocbcore.RangeScanCreateCallback)
291291

292-
if atomic.AddUint32(&calls, 1) == 2 {
292+
if calls.Add(1) == 2 {
293293
cb(nil, expectedErr)
294294
return
295295
}
@@ -349,15 +349,15 @@ func (suite *UnitTestSuite) TestScanNMV() {
349349
keyAfter := "keyafter"
350350
test := func(scan ScanType) (map[string]struct{}, string, error) {
351351
var firstTermAfter string
352-
var calls uint32
352+
var calls atomic.Uint32
353353
var vbID int16 = -1
354354
var createCalls uint32
355355
provider := makeRangeScanProvider(func(args mock.Arguments) {
356356
thisVBID := int16(args.Get(0).(uint16))
357357
opts := args.Get(1).(gocbcore.RangeScanCreateOptions)
358358
cb := args.Get(2).(gocbcore.RangeScanCreateCallback)
359359

360-
if atomic.AddUint32(&calls, 1) == 2 || thisVBID == vbID {
360+
if calls.Add(1) == 2 || thisVBID == vbID {
361361
vbID = thisVBID
362362
if createCalls == 0 {
363363
createCalls++

results.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"sync"
77
"sync/atomic"
88
"time"
9-
"unsafe"
109

1110
"github.com/couchbase/gocbcore/v10/memd"
1211
)
@@ -461,9 +460,9 @@ type ScanResult struct {
461460
errLock sync.Mutex
462461

463462
limit uint64
464-
numItems uint64
463+
numItems atomic.Uint64
465464

466-
peeked unsafe.Pointer
465+
peeked atomic.Pointer[ScanResultItem]
467466
}
468467

469468
func (sr *ScanResult) setErr(err error) {
@@ -474,10 +473,10 @@ func (sr *ScanResult) setErr(err error) {
474473

475474
// Next returns the next item on the stream, if there are no items remaining then nil is returned.
476475
func (sr *ScanResult) Next() *ScanResultItem {
477-
peeked := atomic.SwapPointer(&sr.peeked, nil)
476+
peeked := sr.peeked.Swap(nil)
478477
if peeked != nil {
479-
atomic.AddUint64(&sr.numItems, 1)
480-
return (*ScanResultItem)(peeked)
478+
sr.numItems.Add(1)
479+
return peeked
481480
}
482481

483482
item, more := <-sr.resultChan
@@ -488,7 +487,7 @@ func (sr *ScanResult) Next() *ScanResultItem {
488487
// If we're doing a sampling scan then we need to only write data into the channel
489488
// if we haven't seen the number of items that the user requested. Otherwise
490489
// we need to cancel the streams
491-
numItems := atomic.AddUint64(&sr.numItems, 1)
490+
numItems := sr.numItems.Add(1)
492491
if sr.limit == 0 || numItems <= sr.limit {
493492
return item
494493
}

0 commit comments

Comments
 (0)