Skip to content

Commit 022ec8d

Browse files
authored
Add context to ErrLeaseNotAcquired (#87)
* clientlibrary/checkpoint: convert ErrLeaseAcquired to struct Signed-off-by: Aurélien Rainone <[email protected]> * clientlibrary/checkpoint: add context to ErrLeaseNotAcquired Signed-off-by: Aurélien Rainone <[email protected]> * Use errors.As to check for ErrLeaseNotAcquired error Signed-off-by: Aurélien Rainone <[email protected]>
1 parent ff6f70d commit 022ec8d

File tree

5 files changed

+19
-10
lines changed

5 files changed

+19
-10
lines changed

clientlibrary/checkpoint/checkpointer.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ package checkpoint
2929

3030
import (
3131
"errors"
32+
"fmt"
33+
3234
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
3335
)
3436

@@ -41,11 +43,16 @@ const (
4143

4244
// We've completely processed all records in this shard.
4345
ShardEnd = "SHARD_END"
44-
45-
// ErrLeaseNotAcquired is returned when we failed to get a lock on the shard
46-
ErrLeaseNotAcquired = "lease is already held by another node"
4746
)
4847

48+
type ErrLeaseNotAcquired struct {
49+
cause string
50+
}
51+
52+
func (e ErrLeaseNotAcquired) Error() string {
53+
return fmt.Sprintf("lease not acquired: %s", e.cause)
54+
}
55+
4956
// Checkpointer handles checkpointing when a record has been processed
5057
type Checkpointer interface {
5158
// Init initialises the Checkpoint

clientlibrary/checkpoint/dynamodb-checkpointer.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
package checkpoint
2929

3030
import (
31-
"errors"
3231
"time"
3332

3433
"github.com/aws/aws-sdk-go/aws"
@@ -142,7 +141,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
142141
}
143142

144143
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
145-
return errors.New(ErrLeaseNotAcquired)
144+
return ErrLeaseNotAcquired{"current lease timeout not yet expired"}
146145
}
147146

148147
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
@@ -186,7 +185,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
186185
if err != nil {
187186
if awsErr, ok := err.(awserr.Error); ok {
188187
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
189-
return errors.New(ErrLeaseNotAcquired)
188+
return ErrLeaseNotAcquired{dynamodb.ErrCodeConditionalCheckFailedException}
190189
}
191190
}
192191
return err

clientlibrary/checkpoint/dynamodb-checkpointer_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ package checkpoint
2929

3030
import (
3131
"errors"
32-
"github.com/stretchr/testify/assert"
3332
"sync"
3433
"testing"
3534
"time"
3635

36+
"github.com/stretchr/testify/assert"
37+
3738
"github.com/aws/aws-sdk-go/aws"
3839
"github.com/aws/aws-sdk-go/aws/awserr"
3940
"github.com/aws/aws-sdk-go/service/dynamodb"
@@ -85,7 +86,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
8586
Checkpoint: "",
8687
Mux: &sync.Mutex{},
8788
}, "ijkl-mnop")
88-
if err == nil || err.Error() != ErrLeaseNotAcquired {
89+
if err == nil || !errors.As(err, &ErrLeaseNotAcquired{}) {
8990
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
9091
}
9192
}

clientlibrary/worker/shard-consumer.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
package worker
2929

3030
import (
31+
"errors"
3132
"math"
3233
"sync"
3334
"time"
@@ -162,7 +163,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
162163
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
163164
err = sc.checkpointer.GetLease(shard, sc.consumerID)
164165
if err != nil {
165-
if err.Error() == chk.ErrLeaseNotAcquired {
166+
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
166167
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
167168
return nil
168169
}

clientlibrary/worker/worker.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
package worker
2929

3030
import (
31+
"errors"
3132
"math/rand"
3233
"sync"
3334
"time"
@@ -277,7 +278,7 @@ func (w *Worker) eventLoop() {
277278
err = w.checkpointer.GetLease(shard, w.workerID)
278279
if err != nil {
279280
// cannot get lease on the shard
280-
if err.Error() != chk.ErrLeaseNotAcquired {
281+
if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
281282
log.Errorf("Cannot get lease: %+v", err)
282283
}
283284
continue

0 commit comments

Comments
 (0)