Skip to content

Commit e7d2a97

Browse files
committed
test(e2e): add full e2e test suite and fix three production bugs found during testing
E2E tests (test/e2e/e2e_test.go): - FullHappyPath: validates end-to-end wiring (PostbackURL, BotConfig propagation) - DebounceAggregation: verifies buffer concatenation through the full HTTP→AI chain - AIInterruption: exercises real HTTP cancellation integrated with pipeline context - ResponseTime: validates NFR-05 (202 < 1s regardless of pipeline duration) - ExactlyOnce: exercises Redlock under real goroutine pressure - PipelineIsolation: verifies one pair's failure does not affect another - DispatchSegmentation: validates BotConfig segmentation wiring to dispatch - DispatchInterruption: verifies mid-dispatch cancellation drops remaining parts - RecoveryAfterRestart: validates NFR-01 — debounce pairs survive a restart - PostbackFailure: verifies state is cleared and no retry on non-2xx postback Production bug fixes surfaced by the tests: - PipelineState now persists BotConfig and PostbackURL at StageDebounce so Start() recovery can reconstruct pipelineEntry correctly after restart (NFR-01) - Removed entries.Delete from ErrDispatchInterrupted handler — it raced with the replacement event's Store and deleted the new entry before launchAIStage loaded it - Shutdown is now idempotent via sync.Once (close on closed channel panicked) Test infrastructure: - E2E suite uses Redis DB 14; unit tests use DB 15 — prevents FlushDB interference when go test ./... runs packages concurrently
1 parent e28c06a commit e7d2a97

3 files changed

Lines changed: 827 additions & 10 deletions

File tree

pkg/pipeline/model/pipeline.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ type PairID struct {
2424

2525
// PipelineState is what is stored in Redis (JSON-serializable).
2626
// The cancel func is NOT stored here — it lives in PipelineService memory (Story 2.2).
27+
// BotConfig and PostbackURL are persisted for StageDebounce so that the service can
28+
// reconstruct the pipelineEntry correctly after a restart (NFR-01 recovery).
2729
type PipelineState struct {
28-
Stage Stage `json:"stage"`
29-
CreatedAt time.Time `json:"created_at"`
30+
Stage Stage `json:"stage"`
31+
CreatedAt time.Time `json:"created_at"`
32+
BotConfig BotConfig `json:"bot_config,omitempty"`
33+
PostbackURL string `json:"postback_url,omitempty"`
3034
}
3135

3236
// MessageEvent is the inbound payload from evo-ai-crm AgentBotListener.

pkg/pipeline/service/pipeline_service.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type pipelineService struct {
4545
entries sync.Map // string → pipelineEntry
4646
stopCh chan struct{} // closed by Shutdown to stop pollDebounceExpiry
4747
stoppedCh chan struct{} // closed by pollDebounceExpiry when it exits
48+
stopOnce sync.Once // ensures stopCh is closed exactly once
4849
}
4950

5051
// NewPipelineService constructs the service. Returns interface (GEAR R03).
@@ -86,7 +87,12 @@ func (s *pipelineService) Start() error {
8687
key := pairKey(pair.ContactID, pair.ConversationID)
8788
if _, alreadyExists := s.entries.Load(key); !alreadyExists {
8889
pipelineCtx, cancel := context.WithCancel(context.Background())
89-
s.entries.Store(key, pipelineEntry{ctx: pipelineCtx, cancel: cancel})
90+
s.entries.Store(key, pipelineEntry{
91+
ctx: pipelineCtx,
92+
cancel: cancel,
93+
cfg: state.BotConfig,
94+
postbackURL: state.PostbackURL,
95+
})
9096
}
9197

9298
// If timer already expired during the restart window → advance immediately.
@@ -115,6 +121,12 @@ func (s *pipelineService) Process(ctx context.Context, event *model.MessageEvent
115121

116122
switch {
117123
case state == nil || state.Stage == model.StageIncoming:
124+
// The HTTP handler always writes StageIncoming before launching the Process
125+
// goroutine (NFR-01 durability). When two events for the same pair arrive
126+
// rapidly, the second handler call overwrites any active StageAI/StageDispatch
127+
// with StageIncoming before this goroutine runs — bypassing the interrupt
128+
// branch below. cancelPair is a no-op when no entry exists.
129+
s.cancelPair(event.ContactID, event.ConversationID)
118130
if event.BotConfig.DebounceTime == 0 {
119131
return s.skipDebounce(ctx, event)
120132
}
@@ -154,7 +166,14 @@ func (s *pipelineService) startDebounce(ctx context.Context, event *model.Messag
154166
return fmt.Errorf("pipeline.debounce.start: %w", err)
155167
}
156168

157-
newState := &model.PipelineState{Stage: model.StageDebounce, CreatedAt: time.Now()}
169+
// Persist BotConfig and PostbackURL alongside the stage so that Start()
170+
// recovery after a restart can reconstruct the pipelineEntry correctly (NFR-01).
171+
newState := &model.PipelineState{
172+
Stage: model.StageDebounce,
173+
CreatedAt: time.Now(),
174+
BotConfig: event.BotConfig,
175+
PostbackURL: event.PostbackURL,
176+
}
158177
if err := s.repo.SetState(ctx, event.ContactID, event.ConversationID, newState); err != nil {
159178
cancel()
160179
s.entries.Delete(key)
@@ -402,14 +421,13 @@ func (s *pipelineService) runDispatchStage(
402421
case errors.Is(err, brtErrors.ErrDispatchInterrupted):
403422
// New message arrived — Process already set StageDebounce.
404423
// Do NOT call ClearState: would destroy the new active state.
424+
// Do NOT call entries.Delete: cancelPair already did LoadAndDelete
425+
// atomically. A Delete here would race with the new event's Store
426+
// and could delete the replacement entry.
405427
slog.Info("pipeline.dispatch.cancelled",
406428
"contact_id", contactID,
407429
"conversation_id", conversationID,
408430
)
409-
// Entry already removed by cancelPair (LoadAndDelete) — this delete is
410-
// idempotent; it guards against future refactors where cancelPair may not
411-
// delete the entry from s.entries.
412-
s.entries.Delete(pairKey(contactID, conversationID))
413431
default:
414432
slog.Error("pipeline.dispatch.error",
415433
"contact_id", contactID,
@@ -516,6 +534,8 @@ func (s *pipelineService) Cancel(contactID, conversationID int64) error {
516534

517535
// Shutdown stops the polling goroutine and cancels all in-flight pipeline
518536
// contexts. It blocks until the poller exits or ctx is cancelled.
537+
// Safe to call multiple times — subsequent calls are no-ops for the stop signal
538+
// but still wait for the poller to finish if it hasn't already.
519539
func (s *pipelineService) Shutdown(ctx context.Context) {
520540
// Cancel all in-flight pipeline goroutines.
521541
s.entries.Range(func(k, v any) bool {
@@ -525,8 +545,9 @@ func (s *pipelineService) Shutdown(ctx context.Context) {
525545
return true
526546
})
527547

528-
// Signal poller to stop and wait for it to exit.
529-
close(s.stopCh)
548+
// Signal poller to stop exactly once — close on a closed channel panics.
549+
s.stopOnce.Do(func() { close(s.stopCh) })
550+
530551
select {
531552
case <-s.stoppedCh:
532553
slog.Info("pipeline.shutdown.complete")

0 commit comments

Comments
 (0)