Skip to content
Draft

initial #1596

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions pkg/capabilities/consensus/requests/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type ConsensusRequest[T any, R ConsensusResponse] interface {
SendTimeout(ctx context.Context)
}

type responseWithRequestStoreRemovalChan[R ConsensusResponse] struct {
response R
requestRemovedFromStore chan struct{}
}

type ConsensusResponse interface {
RequestID() string
}
Expand All @@ -38,7 +43,7 @@ type Handler[T ConsensusRequest[T, R], R ConsensusResponse] struct {
responseCache map[string]*responseCacheEntry[R]
cacheExpiryTime time.Duration

responseCh chan R
responseCh chan responseWithRequestStoreRemovalChan[R]
requestCh chan T

clock clockwork.Clock
Expand All @@ -49,7 +54,7 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge
store: s,
pendingRequests: map[string]T{},
responseCache: map[string]*responseCacheEntry[R]{},
responseCh: make(chan R),
responseCh: make(chan responseWithRequestStoreRemovalChan[R]),
requestCh: make(chan T),
clock: clock,
cacheExpiryTime: responseExpiryTime,
Expand All @@ -62,11 +67,23 @@ func NewHandler[T ConsensusRequest[T, R], R ConsensusResponse](lggr logger.Logge
}

func (h *Handler[T, R]) SendResponse(ctx context.Context, resp R) {
respWithRemovalChan := responseWithRequestStoreRemovalChan[R]{
response: resp,
requestRemovedFromStore: make(chan struct{}, 1),
}

select {
case <-ctx.Done():
return
case h.responseCh <- respWithRemovalChan:
}

select {
case <-ctx.Done():
return
case h.responseCh <- resp:
case <-respWithRemovalChan.requestRemovedFromStore:
}

}

func (h *Handler[T, R]) SendRequest(ctx context.Context, r T) {
Expand Down Expand Up @@ -113,8 +130,10 @@ func (h *Handler[T, R]) worker(ctx context.Context) {
h.eng.Errorw("failed to add request to store", "err", err)
}

case resp := <-h.responseCh:
case respWithChannel := <-h.responseCh:
resp := respWithChannel.response
req, wasPresent := h.store.Evict(resp.RequestID())
respWithChannel.requestRemovedFromStore <- struct{}{}
if !wasPresent {
h.responseCache[resp.RequestID()] = &responseCacheEntry[R]{
response: resp,
Expand Down
Loading