Skip to content

Block Sends when Resend Request is active #715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo)
}

session.resendMutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please place a comment indicating that this lock must always be locked first before the sendMutex, otherwise there is potential for a deadlock, and that mutex is locked within the session.EnqueueBytesAndSend method below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session.resendRequestActive = true
session.resendMutex.Unlock()

defer func() {
session.resendMutex.Lock()
session.resendRequestActive = false
session.resendMutex.Unlock()
session.resendCond.Broadcast()
}()

seqNum := beginSeqNo
nextSeqNum := seqNum
msg := NewMessage()
Expand Down
26 changes: 26 additions & 0 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/quickfixgo/quickfix/internal"
Expand Down Expand Up @@ -373,6 +374,31 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() {
s.State(inSession{})
}

func (s *InSessionTestSuite) TestFIXMsgInResendRequestBlocksSend() {
s.MockApp.On("ToApp").Return(nil)
s.Require().Nil(s.session.send(s.NewOrderSingle()))
s.LastToAppMessageSent()
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
s.NextSenderMsgSeqNum(2)

s.MockStore.On("IterateMessages", mock.Anything, mock.Anything, mock.AnythingOfType("func([]byte) error")).
Run(func(_ mock.Arguments) {
s.Require().Nil(s.session.send(s.NewOrderSingle()))
}).
Return(nil)

s.MockApp.On("FromAdmin").Return(nil)
go s.fixMsgIn(s.session, s.ResendRequest(1))

s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
s.NextSenderMsgSeqNum(2)

s.Require().Nil(s.session.send(s.NewOrderSingle()))
s.LastToAppMessageSent()
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 2)
s.NextSenderMsgSeqNum(3)
}

func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() {
s.IncrNextTargetMsgSeqNum()

Expand Down
2 changes: 2 additions & 0 deletions quickfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package quickfix

import (
"sync"
"time"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -221,6 +222,7 @@ func (s *SessionSuiteRig) Init() {
messageOut: s.Receiver.sendChannel,
sessionEvent: make(chan internal.Event),
}
s.session.resendCond = sync.NewCond(&s.resendMutex)
s.MaxLatency = 120 * time.Second
}

Expand Down
32 changes: 21 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ type session struct {
toSend [][]byte

// Mutex for access to toSend.
sendMutex sync.Mutex
sendMutex sync.RWMutex

resendRequestActive bool
resendMutex sync.Mutex
resendCond *sync.Cond

sessionEvent chan internal.Event
messageEvent chan bool
Expand Down Expand Up @@ -271,8 +275,8 @@ func (s *session) resend(msg *Message) bool {

// queueForSend will validate, persist, and queue the message for send.
func (s *session) queueForSend(msg *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
s.sendMutex.RLock()
defer s.sendMutex.RUnlock()

msgBytes, err := s.prepMessageForSend(msg, nil)
if err != nil {
Expand Down Expand Up @@ -302,8 +306,14 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
return s.queueForSend(msg)
}

s.sendMutex.Lock()
defer s.sendMutex.Unlock()
s.resendMutex.Lock()
for s.resendRequestActive {
s.resendCond.Wait()
}
s.resendMutex.Unlock()

s.sendMutex.RLock()
defer s.sendMutex.RUnlock()

msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
if err != nil {
Expand All @@ -318,8 +328,8 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {

// dropAndReset will drop the send queue and reset the message store.
func (s *session) dropAndReset() error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
s.sendMutex.RLock()
defer s.sendMutex.RUnlock()

s.dropQueued()
return s.store.Reset()
Expand All @@ -330,8 +340,8 @@ func (s *session) dropAndSend(msg *Message) error {
return s.dropAndSendInReplyTo(msg, nil)
}
func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
s.sendMutex.RLock()
defer s.sendMutex.RUnlock()

msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
if err != nil {
Expand Down Expand Up @@ -413,8 +423,8 @@ func (s *session) dropQueued() {
}

func (s *session) EnqueueBytesAndSend(msg []byte) {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
s.sendMutex.RLock()
defer s.sendMutex.RUnlock()

s.toSend = append(s.toSend, msg)
s.sendQueued(true)
Expand Down
1 change: 1 addition & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (f sessionFactory) newSession(
sessionID: sessionID,
stopOnce: sync.Once{},
}
s.resendCond = sync.NewCond(&s.resendMutex)

var validatorSettings = defaultValidatorSettings
if settings.HasSetting(config.ValidateFieldsOutOfOrder) {
Expand Down
Loading