diff --git a/pkg/capabilities/consensus/requests/handler.go b/pkg/capabilities/consensus/requests/handler.go index 3186be5563..181564b2a9 100644 --- a/pkg/capabilities/consensus/requests/handler.go +++ b/pkg/capabilities/consensus/requests/handler.go @@ -23,6 +23,11 @@ type ConsensusRequest[T any, R ConsensusResponse] interface { SendTimeout(ctx context.Context) } +type responseWithRequestStoreRemovalChan[R ConsensusResponse] struct { + response R + requestRemovedFromStore chan struct{} +} + type ConsensusResponse interface { RequestID() string } @@ -38,7 +43,7 @@ type Handler[T ConsensusRequest[T, R], R ConsensusResponse] struct { responseCache map[string]*responseCacheEntry[R] cacheExpiryTime time.Duration - responseCh chan R + responseCh chan responseWithRequestStoreRemovalChan[R] requestCh chan T clock clockwork.Clock @@ -49,7 +54,7 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge store: s, pendingRequests: map[string]T{}, responseCache: map[string]*responseCacheEntry[R]{}, - responseCh: make(chan R), + responseCh: make(chan responseWithRequestStoreRemovalChan[R]), requestCh: make(chan T), clock: clock, cacheExpiryTime: responseExpiryTime, @@ -62,11 +67,23 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge } func (h *Handler[T, R]) SendResponse(ctx context.Context, resp R) { + respWithRemovalChan := responseWithRequestStoreRemovalChan[R]{ + response: resp, + requestRemovedFromStore: make(chan struct{}, 1), + } + + select { + case <-ctx.Done(): + return + case h.responseCh <- respWithRemovalChan: + } + select { case <-ctx.Done(): return - case h.responseCh <- resp: + case <-respWithRemovalChan.requestRemovedFromStore: } + } func (h *Handler[T, R]) SendRequest(ctx context.Context, r T) { @@ -113,8 +130,10 @@ func (h *Handler[T, R]) worker(ctx context.Context) { h.eng.Errorw("failed to add request to store", "err", err) } - case resp := <-h.responseCh: + case respWithChannel := <-h.responseCh: + resp := respWithChannel.response req, wasPresent := h.store.Evict(resp.RequestID()) + respWithChannel.requestRemovedFromStore <- struct{}{} if !wasPresent { h.responseCache[resp.RequestID()] = &responseCacheEntry[R]{ response: resp,