Skip to content

Commit 140e510

Browse files
committed
chore: improve buffer functionality
1 parent 0364154 commit 140e510

File tree

3 files changed

+368
-64
lines changed

3 files changed

+368
-64
lines changed

router/batchrouter/handle_lifecycle.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func (brt *Handle) Setup(
166166
brt.jobBuffer = &JobBuffer{
167167
sourceDestMap: make(map[string]chan *jobsdb.JobT),
168168
uploadTimers: make(map[string]*time.Timer),
169+
batchWorkers: make(map[string]*batchWorker),
169170
brt: brt,
170171
}
171172

@@ -273,14 +274,12 @@ func (brt *Handle) Shutdown() {
273274
brt.logger.Info("Initiating batch router shutdown")
274275
brt.backgroundCancel()
275276

276-
// Stop all job buffer timers
277+
// Stop all job buffer timers and workerPool
277278
if brt.jobBuffer != nil {
278-
brt.jobBuffer.mu.Lock()
279-
for key, timer := range brt.jobBuffer.uploadTimers {
280-
timer.Stop()
281-
brt.logger.Debugf("Stopped timer for source-destination: %s", key)
282-
}
283-
brt.jobBuffer.mu.Unlock()
279+
brt.logger.Debug("Stopping JobBuffer and its resources")
280+
281+
// Stop the job buffer (which will also stop the worker pool)
282+
brt.jobBuffer.Stop()
284283
}
285284

286285
// Wait for all background goroutines to complete

0 commit comments

Comments
 (0)