Skip to content

Commit 4fab488

Browse files
committed
feat(queue): implement global rate limiting and robust state management
- Replace per-worker rate limiters with a single global rate limiter - Add atomic file operations for state persistence - Implement backup and recovery mechanisms for queue state - Add temporary file handling during state saves - Improve error handling and logging for state operations BREAKING: none
1 parent 1beddbc commit 4fab488

File tree

2 files changed

+41
-16
lines changed

2 files changed

+41
-16
lines changed

pkg/x/queue.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Worker struct {
8989
id int // Unique identifier for the worker
9090
jobChannel chan RequestData // Channel for receiving jobs
9191
quit chan bool // Channel for shutdown signaling
92-
rateLimit *time.Ticker // Rate limiter for API requests
9392
queue *RequestQueue // Reference to parent queue
9493
}
9594

@@ -103,6 +102,7 @@ type RequestQueue struct {
103102
mu sync.Mutex // Mutex for thread-safe operations
104103
paused bool // Pause state for queue processing
105104
pauseMux sync.RWMutex // Mutex for pause state access
105+
globalRateLimit *time.Ticker // Global rate limiter shared across workers
106106
}
107107

108108
// NewRequestQueue creates and initializes a new RequestQueue instance
@@ -122,6 +122,7 @@ func NewRequestQueue(maxWorkers int) *RequestQueue {
122122
maxWorkers: maxWorkers,
123123
jobChannel: make(chan RequestData, maxWorkers*2),
124124
requestsPerSecond: DefaultAPIRequestsPerSecond,
125+
globalRateLimit: time.NewTicker(time.Second / DefaultAPIRequestsPerSecond),
125126
}
126127

127128
// Initialize queues
@@ -231,7 +232,6 @@ func (rq *RequestQueue) newWorker(id int) *Worker {
231232
id: id,
232233
jobChannel: rq.jobChannel,
233234
quit: make(chan bool),
234-
rateLimit: time.NewTicker(time.Second / time.Duration(rq.requestsPerSecond)),
235235
queue: rq,
236236
}
237237
}
@@ -331,7 +331,7 @@ func (w *Worker) start() {
331331
for {
332332
select {
333333
case job := <-w.jobChannel:
334-
<-w.rateLimit.C // Rate limiting
334+
<-w.queue.globalRateLimit.C // Global rate limiting
335335
w.processRequest(job)
336336
case <-w.quit:
337337
return
@@ -459,7 +459,7 @@ func (rq *RequestQueue) Stop() {
459459

460460
// Wait for workers to finish
461461
for _, worker := range rq.workers {
462-
worker.rateLimit.Stop()
462+
worker.queue.globalRateLimit.Stop()
463463
}
464464

465465
// Close job channel after all workers have stopped
@@ -487,7 +487,7 @@ func (rq *RequestQueue) StopWithContext(ctx context.Context) error {
487487
// Force close channels
488488
close(rq.jobChannel)
489489
for _, worker := range rq.workers {
490-
worker.rateLimit.Stop()
490+
worker.queue.globalRateLimit.Stop()
491491
close(worker.quit)
492492
}
493493
return ctx.Err()
@@ -557,10 +557,6 @@ func (rq *RequestQueue) SetRequestsPerSecond(rps float64) {
557557
rq.requestsPerSecond = rps
558558
logger.Infof("Queue rate limit set to %.2f requests per second", rps)
559559

560-
// Update existing workers' rate limiters
561-
if rq.workers != nil {
562-
for _, worker := range rq.workers {
563-
worker.rateLimit.Reset(time.Second / time.Duration(rps))
564-
}
565-
}
560+
// Update the global rate limiter
561+
rq.globalRateLimit.Reset(time.Second / time.Duration(rps))
566562
}

pkg/x/queue_state.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,30 @@ func (rq *RequestQueue) SaveState() error {
7676
return err
7777
}
7878

79-
// Write state to file
79+
// Write state to temporary file first
80+
statePath := filepath.Join(stateDir, queueStateFile)
81+
tempPath := statePath + ".tmp"
82+
8083
data, err := json.MarshalIndent(state, "", " ")
8184
if err != nil {
8285
return err
8386
}
8487

85-
statePath := filepath.Join(stateDir, queueStateFile)
86-
if err := os.WriteFile(statePath, data, 0644); err != nil {
88+
// Write to temp file first
89+
if err := os.WriteFile(tempPath, data, 0644); err != nil {
90+
return err
91+
}
92+
93+
// Backup existing state file if it exists
94+
if _, err := os.Stat(statePath); err == nil {
95+
backupPath := statePath + ".bak"
96+
if err := os.Rename(statePath, backupPath); err != nil {
97+
logger.Warnf("Failed to create backup file: %v", err)
98+
}
99+
}
100+
101+
// Atomically rename temp file to actual state file
102+
if err := os.Rename(tempPath, statePath); err != nil {
87103
return err
88104
}
89105

@@ -94,13 +110,26 @@ func (rq *RequestQueue) SaveState() error {
94110
// LoadState loads the queue state from disk
95111
func (rq *RequestQueue) LoadState() error {
96112
statePath := filepath.Join(getStateDir(), queueStateFile)
113+
114+
// Try loading the main state file
97115
data, err := os.ReadFile(statePath)
98116
if err != nil {
99-
if os.IsNotExist(err) {
117+
if !os.IsNotExist(err) {
118+
// Try loading from backup if main file is corrupted
119+
backupPath := statePath + ".bak"
120+
data, err = os.ReadFile(backupPath)
121+
if err != nil {
122+
if os.IsNotExist(err) {
123+
logger.Debugf("No existing queue state found at %s", statePath)
124+
return nil
125+
}
126+
return err
127+
}
128+
logger.Warnf("Loaded queue state from backup file")
129+
} else {
100130
logger.Debugf("No existing queue state found at %s", statePath)
101131
return nil
102132
}
103-
return err
104133
}
105134

106135
var state QueueState

0 commit comments

Comments
 (0)