Skip to content

Commit b6d591f

Browse files
Alexey Sharpclaude
authored andcommitted
Pool QueueWithRetry and cache filterDirtyFiles regexp
QueueWithRetry (79.5GB, 18.8% of allocs): pool via sync.Pool with Release() that drains the 100K-element channel without closing it, preserving the 1.6MB buffer across reuses. parallelExecutor.run uses Release() instead of Close(); workers exit via context cancellation when the exec loop goroutine defers execLoopCtxCancel(). Cleanup ordering ensures stopWorkers() completes before Release(). filterDirtyFiles regexp (14.6GB, 3.3% of allocs): cache compiled regexps in sync.Map keyed by pattern string. Each unique (filenameBase, ext) pair compiles once instead of per-call. Combined expected savings: ~94GB (22% of total allocations). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fe17d1f commit b6d591f

File tree

3 files changed

+39
-4
lines changed

3 files changed

+39
-4
lines changed

db/state/dirty_files.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,15 @@ func (i *FilesItem) closeFilesAndRemove() {
258258
}
259259
}
260260

261+
var filterDirtyFilesReCache sync.Map // pattern string → *regexp.Regexp
262+
261263
func filterDirtyFiles(fileNames []string, stepSize, stepsInFrozenFile uint64, filenameBase, ext string, logger log.Logger) (res []*FilesItem) {
262-
re := regexp.MustCompile(`^v(\d+(?:\.\d+)?)-` + filenameBase + `\.(\d+)-(\d+)\.` + ext + `$`)
264+
pattern := `^v(\d+(?:\.\d+)?)-` + filenameBase + `\.(\d+)-(\d+)\.` + ext + `$`
265+
reVal, ok := filterDirtyFilesReCache.Load(pattern)
266+
if !ok {
267+
reVal, _ = filterDirtyFilesReCache.LoadOrStore(pattern, regexp.MustCompile(pattern))
268+
}
269+
re := reVal.(*regexp.Regexp)
263270
var err error
264271

265272
for _, name := range fileNames {

execution/exec/txtask.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,10 +699,36 @@ type QueueWithRetry struct {
699699
capacity int
700700
}
701701

702+
var queuePool sync.Pool
703+
702704
func NewQueueWithRetry(capacity int) *QueueWithRetry {
705+
if v := queuePool.Get(); v != nil {
706+
q := v.(*QueueWithRetry)
707+
if q.capacity == capacity {
708+
return q
709+
}
710+
queuePool.Put(q) // wrong capacity, return to pool for another caller
711+
}
703712
return &QueueWithRetry{newTasks: make(chan Task, capacity), capacity: capacity}
704713
}
705714

715+
// Release drains the queue and returns it to the pool for reuse.
716+
// The channel is preserved (not closed), avoiding reallocation of the
717+
// 100K-element buffer on the next NewQueueWithRetry call.
718+
// Must be called only after all producers and consumers have stopped.
719+
func (q *QueueWithRetry) Release() {
720+
q.lock.Lock()
721+
// Drain channel.
722+
for len(q.newTasks) > 0 {
723+
<-q.newTasks
724+
}
725+
// Clear retry heap, keep backing array.
726+
q.retires = q.retires[:0]
727+
q.closed = false
728+
q.lock.Unlock()
729+
queuePool.Put(q)
730+
}
731+
706732
func (q *QueueWithRetry) NewTasksLen() int {
707733
q.lock.Lock()
708734
defer q.lock.Unlock()

execution/stagedsync/exec3_parallel.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -905,15 +905,17 @@ func (pe *parallelExecutor) run(ctx context.Context) (context.Context, context.C
905905

906906
pe.execLoopGroup.Go(func() error {
907907
defer pe.rws.Close()
908-
defer pe.in.Close()
908+
defer execLoopCtxCancel() // cancel workers' context when exec loop exits
909909
pe.resetWorkers(execLoopCtx, pe.rs, nil)
910910
return pe.execLoop(execLoopCtx)
911911
})
912912

913913
return execLoopCtx, func() {
914914
execLoopCtxCancel()
915-
defer pe.stopWorkers()
916-
defer pe.in.Close()
915+
defer func() {
916+
pe.stopWorkers()
917+
pe.in.Release()
918+
}()
917919

918920
if err := pe.wait(ctx); err != nil {
919921
pe.logger.Debug("exec loop cancel failed", "err", err)

0 commit comments

Comments
 (0)