-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsemaphore.go
156 lines (139 loc) · 4.98 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package shopifysemaphore
import (
"context"
"sync"
"time"
)
var (
DefaultAquireBuffer = 200 * time.Millisecond // Default aquire throttle duration.
DefaultPauseBuffer = 1 * time.Second // Default pause buffer to append to pause duration calculation.
)
// Semaphore is responsible regulating when to pause and resume processing of Goroutines.
// Points remaining, point thresholds, and point refill rates are taken into
// consideration. If remaining points go below the threshold, a pause is initiated
// which will also calculate how long a pause should happen based on the refill rate.
// Once pause is completed, the processing will resume. A PauceFunc and ResumeFunc
// can optionally be passed in which will fire respectively when a pause happens
// and when a resume happens.
type Semaphore struct {
*Balance // Point information and tracking.
PauseFunc func(int32, time.Duration) // Optional callback for when pause happens.
ResumeFunc func() // Optional callback for when resume happens.
PauseBuffer time.Duration // Buffer of time to wait before attempting to re-aquire a spot.
AquireBuffer time.Duration // Buffer of time to extend the pause with.
pausedAt time.Time // When paused last happened.
sema chan struct{} // Semaphore for controlling the number of Goroutines running.
mu sync.Mutex // For handling paused flag control.
paused bool // Pause flag.
}
// NewSemaphore returns a pointer to Semaphore. It accepts a cap which represents the
// capacity of how many Goroutines can run at a time, it also accepts information
// about the point balance and lastly, optional parameters.
func NewSemaphore(cap int, b *Balance, opts ...func(*Semaphore)) *Semaphore {
sem := &Semaphore{
Balance: b,
sema: make(chan struct{}, cap),
}
for _, opt := range opts {
opt(sem)
}
if sem.PauseFunc == nil {
// Provide default PauseFunc.
WithPauseFunc(func(_ int32, _ time.Duration) {})(sem)
}
if sem.ResumeFunc == nil {
// Provide default ResumeFunc.
WithResumeFunc(func() {})(sem)
}
if sem.AquireBuffer == 0 {
WithAquireBuffer(DefaultAquireBuffer)(sem)
}
return sem
}
// Aquire will attempt to aquire a spot to run the Goroutine.
// It will continue in a loop until it does aquire also pausing
// if the pause flag has been enabled. Aquiring is throttled at
// the value of AquireBuffer.
func (sem *Semaphore) Aquire(ctx context.Context) (err error) {
for aquired := false; !aquired; {
for {
if !sem.paused {
// Not paused. Break loop.
break
}
}
// Attempt to aquire a spot, if not we will throttle the next loop.
select {
case <-ctx.Done():
// Context cancelled. Break loop and return error.
aquired = true
err = ctx.Err()
case sem.sema <- struct{}{}:
// Spot aquired. Break loop.
aquired = true
default:
// Can not yet aquire a spot. Throttle for a set duration.
time.Sleep(sem.AquireBuffer)
}
}
return
}
// Release will release a spot for another Goroutine to take.
// It accepts a current value of remaining point balance, to which the
// remaining point balance will only be updated if the count is greater than -1.
// If the remaining points is below the set threshold, a pause will be
// initiated and a duration of this pause will be calculated based
// upon several factors surrouding the point information such as limit,
// threshold, and the refull rate.
func (sem *Semaphore) Release(pts int32) {
defer sem.mu.Unlock()
sem.mu.Lock()
sem.Update(pts)
if sem.AtThreshold() {
// Calculate the duration required to refill and that duration time
// has passed before we call for a pause.
ra := sem.RefillDuration() + sem.PauseBuffer
if sem.pausedAt.Add(ra).Before(time.Now()) {
sem.paused = true
sem.pausedAt = time.Now()
go sem.PauseFunc(pts, ra)
// Unflag as paused after the determined duration and run the ResumeFunc.
go func() {
time.Sleep(ra)
sem.paused = false
sem.ResumeFunc()
}()
}
}
// Perform the actual release.
<-sem.sema
}
// withPauseFunc is a functional option for Semaphore to call when
// a pause happens. The point balance remaining and the duration of
// the pause will passed into the function.
func WithPauseFunc(fn func(int32, time.Duration)) func(*Semaphore) {
return func(sem *Semaphore) {
sem.PauseFunc = fn
}
}
// withResumeFunc is a functional option for Semaphore to call when
// resume from a pause happens.
func WithResumeFunc(fn func()) func(*Semaphore) {
return func(sem *Semaphore) {
sem.ResumeFunc = fn
}
}
// WithAquireBuffer is a functional option for Semaphore which
// will set the throttle duration for attempting to re-aquire a spot.
func WithAquireBuffer(dur time.Duration) func(*Semaphore) {
return func(sem *Semaphore) {
sem.AquireBuffer = dur
}
}
// WithPauseBuffer is a functional option for Semaphore which
// will set an additional duration to append to the pause duration.
func WithPauseBuffer(dur time.Duration) func(*Semaphore) {
return func(sem *Semaphore) {
sem.PauseBuffer = dur
}
}