@@ -25,6 +25,9 @@ type PipelineService interface {
2525 Process (ctx context.Context , event * model.MessageEvent ) error
2626 Cancel (contactID , conversationID int64 ) error
2727 Start () error
28+ // Shutdown stops the polling goroutine and cancels all in-flight pipeline
29+ // contexts. It blocks until the poller exits or ctx is cancelled.
30+ Shutdown (ctx context.Context )
2831}
2932
3033type pipelineEntry struct {
@@ -39,7 +42,9 @@ type pipelineService struct {
3942 debounce debounceIface.DebounceEngine
4043 aiAdapter aiIface.AIAdapter
4144 dispatchEng dispatchIface.DispatchEngine
42- entries sync.Map // string → pipelineEntry
45+ entries sync.Map // string → pipelineEntry
46+ stopCh chan struct {} // closed by Shutdown to stop pollDebounceExpiry
47+ stoppedCh chan struct {} // closed by pollDebounceExpiry when it exits
4348}
4449
4550// NewPipelineService constructs the service. Returns interface (GEAR R03).
@@ -49,7 +54,14 @@ func NewPipelineService(
4954 aiAdapter aiIface.AIAdapter ,
5055 dispatchEng dispatchIface.DispatchEngine ,
5156) PipelineService {
52- return & pipelineService {repo : repo , debounce : debounce , aiAdapter : aiAdapter , dispatchEng : dispatchEng }
57+ return & pipelineService {
58+ repo : repo ,
59+ debounce : debounce ,
60+ aiAdapter : aiAdapter ,
61+ dispatchEng : dispatchEng ,
62+ stopCh : make (chan struct {}),
63+ stoppedCh : make (chan struct {}),
64+ }
5365}
5466
5567// Start recovers in-progress debounce pairs from Redis, then launches the single
@@ -433,11 +445,19 @@ func (s *pipelineService) runDispatchStage(
433445
434446// pollDebounceExpiry is the single timer-detection mechanism (AC: #1).
435447// It runs a 100ms ticker and advances expired StageDebounce pairs to StageAI.
448+ // It exits when s.stopCh is closed and signals s.stoppedCh before returning.
436449func (s * pipelineService ) pollDebounceExpiry () {
450+ defer close (s .stoppedCh )
451+
437452 ticker := time .NewTicker (100 * time .Millisecond )
438453 defer ticker .Stop ()
439454
440- for range ticker .C {
455+ for {
456+ select {
457+ case <- s .stopCh :
458+ return
459+ case <- ticker .C :
460+ }
441461 s .entries .Range (func (k , v any ) bool {
442462 entry , ok := v .(pipelineEntry )
443463 if ! ok {
@@ -494,6 +514,27 @@ func (s *pipelineService) Cancel(contactID, conversationID int64) error {
494514 return nil
495515}
496516
517+ // Shutdown stops the polling goroutine and cancels all in-flight pipeline
518+ // contexts. It blocks until the poller exits or ctx is cancelled.
519+ func (s * pipelineService ) Shutdown (ctx context.Context ) {
520+ // Cancel all in-flight pipeline goroutines.
521+ s .entries .Range (func (k , v any ) bool {
522+ if entry , ok := v .(pipelineEntry ); ok {
523+ entry .cancel ()
524+ }
525+ return true
526+ })
527+
528+ // Signal poller to stop and wait for it to exit.
529+ close (s .stopCh )
530+ select {
531+ case <- s .stoppedCh :
532+ slog .Info ("pipeline.shutdown.complete" )
533+ case <- ctx .Done ():
534+ slog .Warn ("pipeline.shutdown.timeout" )
535+ }
536+ }
537+
497538// recoverPipeline is deferred in every pipeline goroutine to handle panics safely.
498539func (s * pipelineService ) recoverPipeline (contactID , conversationID int64 ) {
499540 if r := recover (); r != nil {
0 commit comments