Open
Description
Is your feature request related to a problem?
Option No
Describe the solution you'd like
Current implements of function Close
is not graceful, which just sets a closed mark and returns, but the goroutine jobs are still running in background. We need wait all goroutine or wait current jobs done before exit the process when receiving shutdown signals.
Describe alternatives you've considered
Features we considered:
1. Configurable Shutdown Options
Introduce shutdown options that allow users to:
- Wait for currently executing tasks to complete before shutdown
- Wait for all queued tasks to complete before shutdown
- Implement blocking shutdown mechanism to ensure proper cleanup
2. Signal Handling
Add built-in signal handling capabilities:
- Automatic monitoring of system shutdown signals
- Graceful shutdown initiation upon signal reception
- Signal handling configuration available during pool creation
Additional
Here are the codes that AI suggests:
// ShutdownOption defines the configuration for graceful shutdown
type ShutdownOption struct {
// WaitForRunningJobs indicates whether to wait for currently running jobs to complete
WaitForRunningJobs bool
// WaitForQueuedJobs indicates whether to wait for all queued jobs to complete
WaitForQueuedJobs bool
// HandleSignals indicates whether to automatically handle system signals for graceful shutdown
HandleSignals bool
}
// PoolOption defines the configuration for pool creation
type PoolOption struct {
// Limit sets the maximum number of goroutines
Limit int
// ShutdownOption configures the graceful shutdown behavior
Shutdown *ShutdownOption
}
{{ ... }}
// ShutdownOption defines the configuration for graceful shutdown
type ShutdownOption struct {
// WaitForRunningJobs indicates whether to wait for currently running jobs to complete
WaitForRunningJobs bool
// WaitForQueuedJobs indicates whether to wait for all queued jobs to complete
WaitForQueuedJobs bool
// HandleSignals indicates whether to automatically handle system signals for graceful shutdown
HandleSignals bool
}
// PoolOption defines the configuration for pool creation
type PoolOption struct {
// Limit sets the maximum number of goroutines
Limit int
// ShutdownOption configures the graceful shutdown behavior
Shutdown *ShutdownOption
}
// Pool manages the goroutines using pool.
type Pool struct {
limit int // Max goroutine count limit.
count *gtype.Int // Current running goroutine count.
list *glist.List // List for asynchronous job adding purpose.
closed *gtype.Bool // Is pool closed or not.
shutdownOpt *ShutdownOption // Shutdown options
runningJobs *gtype.Int // Current running job count
shutdownChan chan struct{} // Channel for shutdown signal
shutdownDone chan struct{} // Channel to signal shutdown completion
}
{{ ... }}
{{ ... }}
// New creates and returns a new goroutine pool object.
// The parameter `option` is used to configure the pool.
func New(option ...*PoolOption) *Pool {
var (
pool = &Pool{
limit: -1,
count: gtype.NewInt(),
list: glist.New(true),
closed: gtype.NewBool(),
runningJobs: gtype.NewInt(),
shutdownChan: make(chan struct{}),
shutdownDone: make(chan struct{}),
shutdownOpt: &ShutdownOption{},
}
timerDuration = grand.D(
minSupervisorTimerDuration,
maxSupervisorTimerDuration,
)
)
// Apply options if provided
if len(option) > 0 && option[0] != nil {
if option[0].Limit > 0 {
pool.limit = option[0].Limit
}
if option[0].Shutdown != nil {
pool.shutdownOpt = option[0].Shutdown
}
}
// Start supervisor
gtimer.Add(context.Background(), timerDuration, pool.supervisor)
// Setup signal handling if enabled
if pool.shutdownOpt.HandleSignals {
go pool.handleSignals()
}
return pool
}
{{ ... }}
{{ ... }}
import (
"os"
"os/signal"
"syscall"
"time"
)
// handleSignals handles system signals for graceful shutdown
func (p *Pool) handleSignals() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
<-sigChan
_ = p.Shutdown()
}
// Shutdown initiates a graceful shutdown of the pool
func (p *Pool) Shutdown() error {
// Prevent multiple shutdowns
if !p.closed.Cas(false, true) {
return gerror.NewCode(
gcode.CodeInvalidOperation,
"pool is already shutting down",
)
}
// Close shutdown channel to notify all workers
close(p.shutdownChan)
// If we don't need to wait for any jobs, return immediately
if !p.shutdownOpt.WaitForRunningJobs && !p.shutdownOpt.WaitForQueuedJobs {
close(p.shutdownDone)
return nil
}
// Wait for running jobs if required
if p.shutdownOpt.WaitForRunningJobs {
for p.runningJobs.Val() > 0 {
time.Sleep(100 * time.Millisecond)
}
}
// Wait for queued jobs if required
if p.shutdownOpt.WaitForQueuedJobs {
for p.list.Size() > 0 || p.runningJobs.Val() > 0 {
time.Sleep(100 * time.Millisecond)
}
}
close(p.shutdownDone)
return nil
}
// WaitForShutdown blocks until the pool has been shut down
func (p *Pool) WaitForShutdown() {
<-p.shutdownDone
}
{{ ... }}
{{ ... }}
// Add pushes a new job to the pool.
func (p *Pool) Add(ctx context.Context, f Func) error {
if p.closed.Val() {
return gerror.NewCode(
gcode.CodeInvalidOperation,
"goroutine pool is closed",
)
}
p.list.PushFront(&localPoolItem{
Ctx: ctx,
Func: f,
})
// Check and fork new worker.
p.checkAndForkNewGoroutineWorker()
return nil
}
// checkAndForkNewGoroutineWorker checks and creates a new goroutine worker.
func (p *Pool) checkAndForkNewGoroutineWorker() {
var n int
for {
n = p.count.Val()
if p.limit != -1 && n >= p.limit {
return
}
if p.count.Cas(n, n+1) {
break
}
}
go func() {
defer p.count.Add(-1)
for {
if p.closed.Val() && p.list.Size() == 0 {
return
}
v := p.list.PopBack()
if v == nil {
return
}
item := v.(*localPoolItem)
p.runningJobs.Add(1)
item.Func(item.Ctx)
p.runningJobs.Add(-1)
// Check if we should exit
select {
case <-p.shutdownChan:
// If we're shutting down and not waiting for queued jobs, exit
if !p.shutdownOpt.WaitForQueuedJobs {
return
}
default:
}
}
}()
}
{{ ... }}