Skip to content

Define a DLocker interface for distributed locker #18779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions client/v3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (m *Mutex) IsOwner() v3.Cmp {
}

func (m *Mutex) Key() string { return m.myKey }
func (m *Mutex) Rev() int64 { return m.myRev }

// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
Expand All @@ -178,3 +179,23 @@ func (lm *lockerMutex) Unlock() {
func NewLocker(s *Session, pfx string) sync.Locker {
return &lockerMutex{NewMutex(s, pfx)}
}

// DLocker represents an object that can be locked and unlocked
// in distributed environment.
//
// # Experimental
//
// Notice: This interface is EXPERIMENTAL and may be changed or removed
// in a later release.
type DLocker interface {
sync.Locker
// Rev returns a revision which is monotonically incremental. It can
// be used as a fencing token to prevent expired locker from operating
// the shared resource.
Rev() int64
}

// NewDLocker creates a DLocker backed by an etcd mutex.
func NewDLocker(s *Session, pfx string) DLocker {
return &lockerMutex{NewMutex(s, pfx)}
}
2 changes: 1 addition & 1 deletion contrib/lock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ If things go well the second client process invoked as `./client 2` finishes soo
After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below:
```
resuming client 1
expected fail to write to storage with old lease version: error: given version (694d82254d5fa305) is different from the existing version (694d82254e18770a)
expected fail to write to storage with old lease version: error: given version (8) is smaller than the existing version (10)
```

[fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
Expand Down
13 changes: 7 additions & 6 deletions contrib/lock/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,30 @@ func main() {

log.Print("created etcd client and session")

locker := concurrency.NewLocker(session, "/lock")
locker := concurrency.NewDLocker(session, "/lock")
locker.Lock()
defer locker.Unlock()
version := session.Lease()
leaseID := session.Lease()
version := locker.Rev()
log.Printf("acquired lock, version: %x", version)

if mode == 1 {
log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", version)
log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", leaseID)
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadByte()
log.Print("resuming client 1")
} else {
log.Print("this is client 2, continuing\n")
}

err = write("key0", fmt.Sprintf("value from client %x", mode), int64(version))
err = write("key0", fmt.Sprintf("value from client %x", mode), version)
if err != nil {
if mode == 1 {
log.Printf("expected fail to write to storage with old lease version: %s\n", err) // client 1 should show this message
log.Printf("expected fail to write to storage with old version: %s\n", err) // client 1 should show this message
} else {
log.Fatalf("unexpected fail to write to storage: %s\n", err)
}
} else {
log.Printf("successfully write a key to storage using lease %x\n", int64(version))
log.Printf("successfully write a key to storage with version %x\n", version)
}
}
4 changes: 2 additions & 2 deletions contrib/lock/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func handler(w http.ResponseWriter, r *http.Request) {
}
} else if strings.Compare(req.Op, "write") == 0 {
if val, ok := data[req.Key]; ok {
if req.Version != val.version {
writeResponse(response{"", -1, fmt.Sprintf("given version (%x) is different from the existing version (%x)", req.Version, val.version)}, w)
if req.Version < val.version {
writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is smaller than the existing version (%d)", req.Version, val.version)}, w)
} else {
data[req.Key].val = req.Val
data[req.Key].version = req.Version
Expand Down