-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathwrite_request.go
More file actions
72 lines (63 loc) · 1.51 KB
/
write_request.go
File metadata and controls
72 lines (63 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package NoKV
import (
"sync"
"sync/atomic"
"time"
"github.com/feichai0017/NoKV/kv"
)
type request struct {
Entries []*kv.Entry
Ptrs []kv.ValuePtr
Err error
ref atomic.Int32
enqueueAt time.Time
wg sync.WaitGroup
}
var requestPool = sync.Pool{
New: func() any { return new(request) },
}
func (req *request) reset() {
req.Entries = req.Entries[:0]
req.Ptrs = req.Ptrs[:0]
req.Err = nil
req.ref.Store(0)
req.enqueueAt = time.Time{}
req.wg = sync.WaitGroup{}
}
// IncrRef increments the lifecycle reference count for this batched write request.
func (req *request) IncrRef() {
req.ref.Add(1)
}
func (req *request) loadEntries(entries []*kv.Entry) {
if cap(req.Entries) < len(entries) {
req.Entries = make([]*kv.Entry, len(entries))
} else {
req.Entries = req.Entries[:len(entries)]
}
copy(req.Entries, entries)
}
// DecrRef releases one lifecycle reference and returns the request to pool at zero.
// It panics on refcount underflow to surface lifecycle bugs early.
func (req *request) DecrRef() {
nRef := req.ref.Add(-1)
if nRef > 0 {
return
}
if nRef < 0 {
panic("request.DecrRef: refcount underflow")
}
// nRef == 0: last reference removed, release entries and return to pool.
for _, e := range req.Entries {
e.DecrRef()
}
req.Entries = nil
req.Ptrs = nil
requestPool.Put(req)
}
// Wait blocks until commit workers finish processing this request.
func (req *request) Wait() error {
req.wg.Wait()
err := req.Err
req.DecrRef() // DecrRef after writing to DB.
return err
}