Skip to content

Commit d100151

Browse files
window9uhackerwins
authored andcommitted
Introduce EventTimingControl with Throttling and Debouncing (#1166)
Added a new rate-limiter component that implements both throttling and debouncing mechanisms to control event timing and execution frequency. Key behaviors: - **Immediate Execution**: Executes the callback immediately if allowed by the rate limiter. - **Delayed Execution**: If a token is unavailable, blocks until the next token is available, then executes the callback. - **Debouncing**: Prevents scheduling multiple callbacks if one is already pending, ensuring that the final event's callback is eventually executed.
1 parent 8fde19c commit d100151

File tree

13 files changed

+1202
-208
lines changed

13 files changed

+1202
-208
lines changed

api/types/event_webhook.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616

1717
package types
1818

19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"time"
23+
)
24+
25+
// DateFormat defines the standard format used for timestamps.
26+
const DateFormat = "2006-01-02T15:04:05.000Z"
27+
1928
// EventWebhookType represents event webhook type
2029
type EventWebhookType string
2130

@@ -29,14 +38,63 @@ func IsValidEventType(eventType string) bool {
2938
return eventType == string(DocRootChanged)
3039
}
3140

32-
// EventWebhookAttribute represents the attribute of the webhook.
41+
// EventWebhookAttribute represents metadata associated with a webhook event.
3342
type EventWebhookAttribute struct {
3443
Key string `json:"key"`
3544
IssuedAt string `json:"issuedAt"`
3645
}
3746

38-
// EventWebhookRequest represents the request of the webhook.
47+
// EventWebhookRequest represents a webhook event request payload.
3948
type EventWebhookRequest struct {
4049
Type EventWebhookType `json:"type"`
4150
Attributes EventWebhookAttribute `json:"attributes"`
4251
}
52+
53+
// NewRequestBody builds the JSON request body for a webhook event.
54+
func NewRequestBody(docKey string, event EventWebhookType) ([]byte, error) {
55+
req := EventWebhookRequest{
56+
Type: event,
57+
Attributes: EventWebhookAttribute{
58+
Key: docKey,
59+
IssuedAt: time.Now().UTC().Format(DateFormat),
60+
},
61+
}
62+
63+
body, err := json.Marshal(req)
64+
if err != nil {
65+
return nil, fmt.Errorf("marshal event webhook request: %w", err)
66+
}
67+
return body, nil
68+
}
69+
70+
// EventWebhookInfo holds the webhook EventRefKey and its associated Attribute.
71+
type EventWebhookInfo struct {
72+
EventRefKey EventRefKey
73+
Attribute WebhookAttribute
74+
}
75+
76+
// NewEventWebhookInfo initializes an EventWebhookInfo with the given parameters.
77+
func NewEventWebhookInfo(
78+
docRefKey DocRefKey,
79+
event EventWebhookType,
80+
signingKey, url, docKey string,
81+
) EventWebhookInfo {
82+
return EventWebhookInfo{
83+
EventRefKey: EventRefKey{
84+
DocRefKey: docRefKey,
85+
EventWebhookType: event,
86+
},
87+
Attribute: WebhookAttribute{
88+
SigningKey: signingKey,
89+
URL: url,
90+
DocKey: docKey,
91+
},
92+
}
93+
}
94+
95+
// WebhookAttribute defines attributes necessary for webhook handling.
96+
type WebhookAttribute struct {
97+
SigningKey string
98+
URL string
99+
DocKey string
100+
}

api/types/resource_ref_key.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,14 @@ type SnapshotRefKey struct {
5252
func (r SnapshotRefKey) String() string {
5353
return fmt.Sprintf("Snapshot (%s.%s.%d)", r.ProjectID, r.DocID, r.ServerSeq)
5454
}
55+
56+
// EventRefKey represents an identifier used to reference an event.
57+
type EventRefKey struct {
58+
DocRefKey
59+
EventWebhookType
60+
}
61+
62+
// String returns the string representation of the given EventRefKey.
63+
func (r EventRefKey) String() string {
64+
return fmt.Sprintf("DocEvent (%s.%s.%s)", r.ProjectID, r.DocID, r.EventWebhookType)
65+
}

pkg/limit/bucket.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package limit
2+
3+
import "time"
4+
5+
// Bucket represents a single-token bucket that refills every specified time window.
6+
type Bucket struct {
7+
window time.Duration // The interval at which the bucket refills.
8+
last time.Time // The last time a token was granted.
9+
}
10+
11+
// NewBucket creates a new Bucket with the given initial time and refill window.
12+
func NewBucket(now time.Time, window time.Duration) Bucket {
13+
return Bucket{
14+
window: window,
15+
last: now,
16+
}
17+
}
18+
19+
// Allow checks if a token can be granted at the given time.
20+
// It returns true if the time has advanced past the refill window, otherwise false.
21+
func (b *Bucket) Allow(now time.Time) bool {
22+
if now.Before(b.last.Add(b.window)) {
23+
return false
24+
}
25+
26+
b.last = now
27+
return true
28+
}

pkg/limit/limiter.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright 2025 The Yorkie Authors. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// Package limit provides rate-limiting functionality with debouncing support.
18+
package limit
19+
20+
import (
21+
"container/list"
22+
"sync"
23+
"time"
24+
)
25+
26+
// Limiter provides rate limiting functionality with a debouncing callback.
27+
// It maintains a single token bucket.
28+
type Limiter[K comparable] struct {
29+
mu sync.Mutex
30+
wg sync.WaitGroup
31+
closing chan struct{}
32+
33+
expireInterval time.Duration
34+
throttleWindow time.Duration
35+
debouncingTime time.Duration
36+
expireBatchSize int
37+
38+
// evictionList holds the limiter entries in order of recency.
39+
evictionList *list.List
40+
// entries maps keys to their corresponding list element for quick lookup.
41+
entries map[K]*list.Element
42+
}
43+
44+
// NewLimiter creates and returns a new Limiter instance.
45+
// Parameters:
46+
//
47+
// expireInterval: How often to check for expired entries.
48+
// throttleWindow: The time window for rate limiting.
49+
// debouncingTime: The time-to-live for each rate bucket entry.
50+
func NewLimiter[K comparable](expireNum int, expire, throttle, debouncing time.Duration) *Limiter[K] {
51+
lim := &Limiter[K]{
52+
closing: make(chan struct{}),
53+
expireInterval: expire,
54+
throttleWindow: throttle,
55+
debouncingTime: debouncing,
56+
expireBatchSize: expireNum,
57+
evictionList: list.New(),
58+
entries: make(map[K]*list.Element),
59+
}
60+
61+
// Start the background expiration process.
62+
lim.wg.Add(1)
63+
go lim.expirationLoop()
64+
return lim
65+
}
66+
67+
// limiterEntry represents an entry in the Limiter for a specific key.
68+
type limiterEntry[K comparable] struct {
69+
key K
70+
bucket Bucket
71+
expireTime time.Time
72+
debouncingCallback func()
73+
}
74+
75+
// Allow checks if an event is allowed for the given key based on the rate bucket.
76+
// If allowed, it clears any pending debouncing callback; otherwise, it stores the provided callback.
77+
// It returns true if the event is allowed immediately.
78+
func (l *Limiter[K]) Allow(key K, callback func()) bool {
79+
l.mu.Lock()
80+
defer l.mu.Unlock()
81+
82+
now := time.Now()
83+
if elem, exists := l.entries[key]; exists {
84+
entry := elem.Value.(*limiterEntry[K])
85+
allowed := entry.bucket.Allow(now)
86+
if allowed {
87+
entry.debouncingCallback = nil
88+
} else {
89+
entry.debouncingCallback = callback
90+
}
91+
// Update recency and extend TTL.
92+
l.evictionList.MoveToFront(elem)
93+
entry.expireTime = now.Add(l.throttleWindow + l.debouncingTime)
94+
return allowed
95+
}
96+
97+
// Create a new rate bucket for a new key.
98+
bucket := NewBucket(now, l.throttleWindow)
99+
entry := &limiterEntry[K]{
100+
key: key,
101+
bucket: bucket,
102+
expireTime: now.Add(l.throttleWindow + l.debouncingTime),
103+
}
104+
elem := l.evictionList.PushFront(entry)
105+
l.entries[key] = elem
106+
return true
107+
}
108+
109+
// expirationLoop runs in a separate goroutine to periodically remove expired entries.
110+
func (l *Limiter[K]) expirationLoop() {
111+
ticker := time.NewTicker(l.expireInterval)
112+
defer func() {
113+
ticker.Stop()
114+
l.wg.Done()
115+
}()
116+
117+
for {
118+
select {
119+
case <-ticker.C:
120+
expiredEntries := l.collectEntries(true)
121+
l.runDebounce(expiredEntries)
122+
case <-l.closing:
123+
return
124+
}
125+
}
126+
}
127+
128+
// collectEntries gathers expired entries and removes them from the limiter.
129+
func (l *Limiter[K]) collectEntries(onlyExpired bool) []*limiterEntry[K] {
130+
now := time.Now()
131+
expiredEntries := make([]*limiterEntry[K], 0, l.expireBatchSize)
132+
133+
l.mu.Lock()
134+
defer l.mu.Unlock()
135+
136+
for range l.expireBatchSize {
137+
elem := l.evictionList.Back()
138+
if elem == nil {
139+
break
140+
}
141+
142+
entry := elem.Value.(*limiterEntry[K])
143+
if onlyExpired && now.Before(entry.expireTime) {
144+
break
145+
}
146+
147+
if entry.debouncingCallback != nil {
148+
expiredEntries = append(expiredEntries, entry)
149+
}
150+
l.evictionList.Remove(elem)
151+
delete(l.entries, entry.key)
152+
}
153+
154+
return expiredEntries
155+
}
156+
157+
// runDebounce runs the debouncing callbacks for expired entries asynchronously.
158+
func (l *Limiter[K]) runDebounce(entries []*limiterEntry[K]) {
159+
l.wg.Add(1)
160+
go func() {
161+
defer l.wg.Done()
162+
for _, entry := range entries {
163+
entry.debouncingCallback()
164+
}
165+
}()
166+
}
167+
168+
// Close terminates the expiration loop and cleans up resources.
169+
func (l *Limiter[K]) Close() {
170+
close(l.closing)
171+
172+
// Wait for all previous expiration job done.
173+
l.wg.Wait()
174+
175+
for l.evictionList.Len() > 0 {
176+
expiredEntries := l.collectEntries(false)
177+
l.runDebounce(expiredEntries)
178+
}
179+
180+
l.wg.Wait()
181+
}

0 commit comments

Comments
 (0)