Skip to content

Commit 9786c9e

Browse files
committed
Define a DLocker interface for distributed locker
Also updated the contrib/lock example to demo how to use the DLocker Signed-off-by: Benjamin Wang <[email protected]>
1 parent 18eb5c6 commit 9786c9e

File tree

4 files changed

+31
-9
lines changed

4 files changed

+31
-9
lines changed

client/v3/concurrency/mutex.go

+21
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (m *Mutex) IsOwner() v3.Cmp {
154154
}
155155

156156
func (m *Mutex) Key() string { return m.myKey }
157+
func (m *Mutex) Rev() int64 { return m.myRev }
157158

158159
// Header is the response header received from etcd on acquiring the lock.
159160
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
@@ -178,3 +179,23 @@ func (lm *lockerMutex) Unlock() {
178179
func NewLocker(s *Session, pfx string) sync.Locker {
179180
return &lockerMutex{NewMutex(s, pfx)}
180181
}
182+
183+
// DLocker represents an object that can be locked and unlocked
184+
// in distributed environment.
185+
//
186+
// # Experimental
187+
//
188+
// Notice: This interface is EXPERIMENTAL and may be changed or removed
189+
// in a later release.
190+
type DLocker interface {
191+
sync.Locker
192+
// Rev returns a revision which is monotonically incremental. It can
193+
// be used as a fencing token to prevent expired locker from operating
194+
// the shared resource.
195+
Rev() int64
196+
}
197+
198+
// NewDLocker creates a DLocker backed by an etcd mutex.
199+
func NewDLocker(s *Session, pfx string) DLocker {
200+
return &lockerMutex{NewMutex(s, pfx)}
201+
}

contrib/lock/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ If things go well the second client process invoked as `./client 2` finishes soo
6464
After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below:
6565
```
6666
resuming client 1
67-
expected fail to write to storage with old lease version: error: given version (694d82254d5fa305) is different from the existing version (694d82254e18770a)
67+
expected fail to write to storage with old lease version: error: given version (8) is smaller than the existing version (10)
6868
```
6969

7070
[fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

contrib/lock/client/client.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -118,29 +118,30 @@ func main() {
118118

119119
log.Print("created etcd client and session")
120120

121-
locker := concurrency.NewLocker(session, "/lock")
121+
locker := concurrency.NewDLocker(session, "/lock")
122122
locker.Lock()
123123
defer locker.Unlock()
124-
version := session.Lease()
124+
leaseID := session.Lease()
125+
version := locker.Rev()
125126
log.Printf("acquired lock, version: %x", version)
126127

127128
if mode == 1 {
128-
log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", version)
129+
log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", leaseID)
129130
reader := bufio.NewReader(os.Stdin)
130131
_, _ = reader.ReadByte()
131132
log.Print("resuming client 1")
132133
} else {
133134
log.Print("this is client 2, continuing\n")
134135
}
135136

136-
err = write("key0", fmt.Sprintf("value from client %x", mode), int64(version))
137+
err = write("key0", fmt.Sprintf("value from client %x", mode), version)
137138
if err != nil {
138139
if mode == 1 {
139-
log.Printf("expected fail to write to storage with old lease version: %s\n", err) // client 1 should show this message
140+
log.Printf("expected fail to write to storage with old version: %s\n", err) // client 1 should show this message
140141
} else {
141142
log.Fatalf("unexpected fail to write to storage: %s\n", err)
142143
}
143144
} else {
144-
log.Printf("successfully write a key to storage using lease %x\n", int64(version))
145+
log.Printf("successfully write a key to storage with version %x\n", version)
145146
}
146147
}

contrib/lock/storage/storage.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ func handler(w http.ResponseWriter, r *http.Request) {
7878
}
7979
} else if strings.Compare(req.Op, "write") == 0 {
8080
if val, ok := data[req.Key]; ok {
81-
if req.Version != val.version {
82-
writeResponse(response{"", -1, fmt.Sprintf("given version (%x) is different from the existing version (%x)", req.Version, val.version)}, w)
81+
if req.Version < val.version {
82+
writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is smaller than the existing version (%d)", req.Version, val.version)}, w)
8383
} else {
8484
data[req.Key].val = req.Val
8585
data[req.Key].version = req.Version

0 commit comments

Comments
 (0)