diff --git a/_sql/mssql/quickfix_database.sql b/_sql/mssql/quickfix_database.sql index 10456d5af..e5d4f9c16 100644 --- a/_sql/mssql/quickfix_database.sql +++ b/_sql/mssql/quickfix_database.sql @@ -12,8 +12,8 @@ CREATE TABLE sessions ( targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, creation_time DATETIME NOT NULL, - incoming_seqnum INT NOT NULL, - outgoing_seqnum INT NOT NULL, + incoming_seqnum BIGINT NOT NULL, + outgoing_seqnum BIGINT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier) ); @@ -27,7 +27,7 @@ CREATE TABLE messages ( targetsubid VARCHAR(64) NOT NULL, targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, - msgseqnum INT NOT NULL, + msgseqnum BIGINT NOT NULL, message TEXT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier, diff --git a/_sql/mysql/messages_table.sql b/_sql/mysql/messages_table.sql index fd8f66c9b..3687b31ae 100644 --- a/_sql/mysql/messages_table.sql +++ b/_sql/mysql/messages_table.sql @@ -11,7 +11,7 @@ CREATE TABLE messages ( targetsubid VARCHAR(64) NOT NULL, targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, - msgseqnum INT NOT NULL, + msgseqnum BIGINT NOT NULL, message TEXT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier, diff --git a/_sql/mysql/sessions_table.sql b/_sql/mysql/sessions_table.sql index 2f27d25d8..53f63289c 100644 --- a/_sql/mysql/sessions_table.sql +++ b/_sql/mysql/sessions_table.sql @@ -12,8 +12,8 @@ CREATE TABLE sessions ( targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, creation_time DATETIME NOT NULL, - incoming_seqnum INT NOT NULL, - outgoing_seqnum INT NOT NULL, + incoming_seqnum BIGINT NOT NULL, + outgoing_seqnum BIGINT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier) ); \ No newline at end of file diff --git a/_sql/oracle/messages_table.sql b/_sql/oracle/messages_table.sql index ddfb3991f..8648529cc 100644 --- a/_sql/oracle/messages_table.sql +++ b/_sql/oracle/messages_table.sql @@ -7,7 +7,7 @@ CREATE TABLE messages ( targetsubid VARCHAR2(64) NOT NULL, targetlocid VARCHAR2(64) NOT NULL, session_qualifier VARCHAR2(64) NOT NULL, - msgseqnum INTEGER NOT NULL, + msgseqnum BIGINT NOT NULL, message VARCHAR2(4000) NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier, msgseqnum) diff --git a/_sql/oracle/sessions_table.sql b/_sql/oracle/sessions_table.sql index ebf2d76a6..00ae4c5a8 100644 --- a/_sql/oracle/sessions_table.sql +++ b/_sql/oracle/sessions_table.sql @@ -8,8 +8,8 @@ CREATE TABLE sessions ( targetlocid VARCHAR2(64) NOT NULL, session_qualifier VARCHAR2(64) NOT NULL, creation_time TIMESTAMP NOT NULL, - incoming_seqnum INTEGER NOT NULL, - outgoing_seqnum INTEGER NOT NULL, + incoming_seqnum BIGINT NOT NULL, + outgoing_seqnum BIGINT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier) ); \ No newline at end of file diff --git a/_sql/postgresql/messages_table.sql b/_sql/postgresql/messages_table.sql index 38507c091..7e7169527 100644 --- a/_sql/postgresql/messages_table.sql +++ b/_sql/postgresql/messages_table.sql @@ -7,7 +7,7 @@ CREATE TABLE messages ( targetsubid VARCHAR(64) NOT NULL, targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, - msgseqnum INTEGER NOT NULL, + msgseqnum BIGINT NOT NULL, message TEXT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier, diff --git a/_sql/postgresql/sessions_table.sql b/_sql/postgresql/sessions_table.sql index 4abbe165b..db52e4d65 100644 --- a/_sql/postgresql/sessions_table.sql +++ b/_sql/postgresql/sessions_table.sql @@ -8,8 +8,8 @@ CREATE TABLE sessions ( targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, creation_time TIMESTAMP WITH TIME ZONE NOT NULL, - incoming_seqnum INTEGER NOT NULL, - outgoing_seqnum INTEGER NOT NULL, + incoming_seqnum BIGINT NOT NULL, + outgoing_seqnum BIGINT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier) ); \ No newline at end of file diff --git a/_sql/sqlite3/messages_table.sql b/_sql/sqlite3/messages_table.sql index 77515f4d7..0b446df6c 100644 --- a/_sql/sqlite3/messages_table.sql +++ b/_sql/sqlite3/messages_table.sql @@ -9,7 +9,7 @@ CREATE TABLE messages ( targetsubid VARCHAR(64) NOT NULL, targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, - msgseqnum INT NOT NULL, + msgseqnum BIGINT NOT NULL, message TEXT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier, diff --git a/_sql/sqlite3/sessions_table.sql b/_sql/sqlite3/sessions_table.sql index 173989691..100d76360 100644 --- a/_sql/sqlite3/sessions_table.sql +++ b/_sql/sqlite3/sessions_table.sql @@ -10,8 +10,8 @@ CREATE TABLE sessions ( targetlocid VARCHAR(64) NOT NULL, session_qualifier VARCHAR(64) NOT NULL, creation_time DATETIME NOT NULL, - incoming_seqnum INT NOT NULL, - outgoing_seqnum INT NOT NULL, + incoming_seqnum BIGINT NOT NULL, + outgoing_seqnum BIGINT NOT NULL, PRIMARY KEY (beginstring, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid, session_qualifier) ); diff --git a/_test/test-server/main.go b/_test/test-server/main.go index d3e680d9b..5a68ab404 100644 --- a/_test/test-server/main.go +++ b/_test/test-server/main.go @@ -200,7 +200,7 @@ func main() { } num, ok := queryParams["NEXTTARGETSEQNUM"] if ok { - seqnumInt, cErr := strconv.Atoi(num[0]) + seqnumInt, cErr := strconv.ParseUint(num[0], 10, 64) if cErr != nil { fmt.Println("cannot find seqnum") os.Exit(1) @@ -216,7 +216,7 @@ func main() { num, ok = queryParams["NEXTSENDERSEQNUM"] if ok { - seqnumInt, cErr := strconv.Atoi(num[0]) + seqnumInt, cErr := strconv.ParseUint(num[0], 10, 64) if cErr != nil { fmt.Println("cannot find seqnum") os.Exit(1) diff --git a/field_map.go b/field_map.go index 4aac64b1d..5c9515963 100644 --- a/field_map.go +++ b/field_map.go @@ -176,6 +176,21 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) { return int(val), err } +// GetInt is a GetField wrapper for int fields. +func (m FieldMap) GetUint64(tag Tag) (uint64, MessageRejectError) { + bytes, err := m.GetBytes(tag) + if err != nil { + return 0, err + } + + var val FIXUint64 + if val.Read(bytes) != nil { + err = IncorrectDataFormatForValue(tag) + } + + return uint64(val), err +} + // GetInt is a lock free GetField wrapper for int fields. func (m FieldMap) getIntNoLock(tag Tag) (int, MessageRejectError) { bytes, err := m.getBytesNoLock(tag) @@ -270,6 +285,12 @@ func (m *FieldMap) SetInt(tag Tag, value int) *FieldMap { return m.SetBytes(tag, v.Write()) } +// SetUint64 is a SetField wrapper for int fields. +func (m *FieldMap) SetUint64(tag Tag, value uint64) *FieldMap { + v := FIXUint64(value) + return m.SetBytes(tag, v.Write()) +} + // SetString is a SetField wrapper for string fields. func (m *FieldMap) SetString(tag Tag, value string) *FieldMap { return m.SetBytes(tag, []byte(value)) diff --git a/fix_uint.go b/fix_uint.go new file mode 100644 index 000000000..633e8af15 --- /dev/null +++ b/fix_uint.go @@ -0,0 +1,59 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package quickfix + +import ( + "errors" + "strconv" +) + +// parseUInt is similar to the function in strconv, but is tuned for uint64s appearing in FIX field types. +func parseUInt64(d []byte) (n uint64, err error) { + if len(d) == 0 { + err = errors.New("empty bytes") + return + } + + for _, dec := range d { + if dec < ascii0 || dec > ascii9 { + err = errors.New("invalid format") + return + } + + n = n*10 + (uint64(dec) - ascii0) + } + + return +} + +// FIXUInt64 is a FIX Uint64 Value, implements FieldValue. +type FIXUint64 uint64 + +// Uint64 converts the FIXUint64 value to uint64. +func (f FIXUint64) Uint64() uint64 { return uint64(f) } + +func (f *FIXUint64) Read(bytes []byte) error { + i, err := parseUInt64(bytes) + if err != nil { + return err + } + *f = FIXUint64(i) + return nil +} + +func (f FIXUint64) Write() []byte { + return strconv.AppendInt(nil, int64(f), 10) +} diff --git a/fix_uint_test.go b/fix_uint_test.go new file mode 100644 index 000000000..38ac7b66e --- /dev/null +++ b/fix_uint_test.go @@ -0,0 +1,52 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package quickfix + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFIXUInt_Write(t *testing.T) { + field := FIXUint64(5) + + assert.Equal(t, "5", string(field.Write())) +} + +func TestFIXUInt_Read(t *testing.T) { + var field FIXUint64 + err := field.Read([]byte("15")) + assert.Nil(t, err, "Unexpected error") + assert.Equal(t, uint64(15), uint64(field)) + + err = field.Read([]byte("blah")) + assert.NotNil(t, err, "Unexpected error") +} + +func TestFIXUInt_UInt(t *testing.T) { + f := FIXUint64(4) + assert.Equal(t, uint64(4), f.Uint64()) +} + +func BenchmarkFIXUInt_Read(b *testing.B) { + intBytes := []byte("1500") + var field FIXUint64 + + for i := 0; i < b.N; i++ { + _ = field.Read(intBytes) + } +} diff --git a/in_session.go b/in_session.go index 5445967cd..7c8b87db7 100644 --- a/in_session.go +++ b/in_session.go @@ -165,7 +165,7 @@ func (state inSession) handleSequenceReset(session *session, msg *Message) (next switch { case newSeqNo > expectedSeqNum: - if err := session.store.SetNextTargetMsgSeqNum(int(newSeqNo)); err != nil { + if err := session.store.SetNextTargetMsgSeqNum(uint64(newSeqNo)); err != nil { return handleStateError(session, err) } case newSeqNo < expectedSeqNum: @@ -196,7 +196,7 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next return state.processReject(session, msg, RequiredTagMissing(tagEndSeqNo)) } - endSeqNo := int(endSeqNoField) + endSeqNo := uint64(endSeqNoField) session.log.OnEventf("Received ResendRequest FROM: %d TO: %d", beginSeqNo, endSeqNo) expectedSeqNum := session.store.NextSenderMsgSeqNum() @@ -207,7 +207,7 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next endSeqNo = expectedSeqNum - 1 } - if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil { + if err := state.resendMessages(session, uint64(beginSeqNo), endSeqNo, *msg); err != nil { return handleStateError(session, err) } @@ -225,7 +225,7 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next return state } -func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int, inReplyTo Message) error { +func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo uint64, inReplyTo Message) error { if session.DisableMessagePersist { return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo) } @@ -240,7 +240,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int return err // We cant continue with a message that cant be parsed correctly. } msgType, _ := msg.Header.GetBytes(tagMsgType) - sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum) + sentMessageSeqNum, _ := msg.Header.GetUint64(tagMsgSeqNum) if isAdminMessageType(msgType) { nextSeqNum = sentMessageSeqNum + 1 @@ -297,7 +297,7 @@ func (state inSession) processReject(session *session, msg *Message, rej Message } if nextState.messageStash == nil { - nextState.messageStash = make(map[int]*Message) + nextState.messageStash = make(map[uint64]*Message) } nextState.messageStash[TypedError.ReceivedTarget] = msg @@ -387,7 +387,7 @@ func (state inSession) doTargetTooLow(session *session, msg *Message, rej target return state } -func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, endSeqNo int, inReplyTo Message) (err error) { +func (state *inSession) generateSequenceReset(session *session, beginSeqNo uint64, endSeqNo uint64, inReplyTo Message) (err error) { sequenceReset := NewMessage() session.fillDefaultHeader(sequenceReset, &inReplyTo) diff --git a/in_session_test.go b/in_session_test.go index 8973e4ef0..42a3ac163 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -198,8 +198,8 @@ func (s *InSessionTestSuite) TestFIXMsgInTargetTooHigh() { } func (s *InSessionTestSuite) TestFIXMsgInTargetTooHighResendRequestChunkSize() { var tests = []struct { - chunkSize int - expectedEndSeqNo int + chunkSize uint64 + expectedEndSeqNo uint64 }{ {0, 0}, {10, 0}, diff --git a/internal/session_settings.go b/internal/session_settings.go index f679473e6..5b1708ac1 100644 --- a/internal/session_settings.go +++ b/internal/session_settings.go @@ -12,7 +12,7 @@ type SessionSettings struct { HeartBtIntOverride bool SessionTime *TimeRange InitiateLogon bool - ResendRequestChunkSize int + ResendRequestChunkSize uint64 EnableLastMsgSeqNumProcessed bool EnableNextExpectedMsgSeqNum bool SkipCheckLatency bool diff --git a/internal/testsuite/store_suite.go b/internal/testsuite/store_suite.go index da7e62b91..7a9038880 100644 --- a/internal/testsuite/store_suite.go +++ b/internal/testsuite/store_suite.go @@ -39,23 +39,23 @@ func (s *StoreTestSuite) TestMessageStoreSetNextMsgSeqNumRefreshIncrNextMsgSeqNu s.Require().Nil(s.MsgStore.Refresh()) // Then the sender and target seqnums should still be - s.Equal(867, s.MsgStore.NextSenderMsgSeqNum()) - s.Equal(5309, s.MsgStore.NextTargetMsgSeqNum()) + s.Equal(uint64(867), s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(5309), s.MsgStore.NextTargetMsgSeqNum()) // When the sender and target seqnums are incremented s.Require().Nil(s.MsgStore.IncrNextSenderMsgSeqNum()) s.Require().Nil(s.MsgStore.IncrNextTargetMsgSeqNum()) // Then the sender and target seqnums should be - s.Equal(868, s.MsgStore.NextSenderMsgSeqNum()) - s.Equal(5310, s.MsgStore.NextTargetMsgSeqNum()) + s.Equal(uint64(868), s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(5310), s.MsgStore.NextTargetMsgSeqNum()) // When the store is refreshed from its backing store s.Require().Nil(s.MsgStore.Refresh()) // Then the sender and target seqnums should still be - s.Equal(868, s.MsgStore.NextSenderMsgSeqNum()) - s.Equal(5310, s.MsgStore.NextTargetMsgSeqNum()) + s.Equal(uint64(868), s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(5310), s.MsgStore.NextTargetMsgSeqNum()) } func (s *StoreTestSuite) TestMessageStoreReset() { @@ -67,18 +67,18 @@ func (s *StoreTestSuite) TestMessageStoreReset() { s.Require().Nil(s.MsgStore.Reset()) // Then the sender and target seqnums should be - s.Equal(1, s.MsgStore.NextSenderMsgSeqNum()) - s.Equal(1, s.MsgStore.NextTargetMsgSeqNum()) + s.Equal(uint64(1), s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(1), s.MsgStore.NextTargetMsgSeqNum()) // When the store is refreshed from its backing store s.Require().Nil(s.MsgStore.Refresh()) // Then the sender and target seqnums should still be - s.Equal(1, s.MsgStore.NextSenderMsgSeqNum()) - s.Equal(1, s.MsgStore.NextTargetMsgSeqNum()) + s.Equal(uint64(1), s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(1), s.MsgStore.NextTargetMsgSeqNum()) } -func (s *StoreTestSuite) fetchMessages(beginSeqNum, endSeqNum int) (msgs [][]byte) { +func (s *StoreTestSuite) fetchMessages(beginSeqNum, endSeqNum uint64) (msgs [][]byte) { s.T().Helper() // Fetch messages from the new iterator @@ -102,16 +102,16 @@ func (s *StoreTestSuite) fetchMessages(beginSeqNum, endSeqNum int) (msgs [][]byt func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() { // Given the following saved messages - expectedMsgsBySeqNum := map[int]string{ + expectedMsgsBySeqNum := map[uint64]string{ 1: "In the frozen land of Nador", 2: "they were forced to eat Robin's minstrels", 3: "and there was much rejoicing", } - var seqNums []int + var seqNums []uint64 for seqNum := range expectedMsgsBySeqNum { seqNums = append(seqNums, seqNum) } - sort.Ints(seqNums) + sort.Slice(seqNums, func(i, j int) bool { return seqNums[i] < seqNums[j] }) for _, seqNum := range seqNums { s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(expectedMsgsBySeqNum[seqNum]))) } @@ -142,20 +142,20 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() { s.Require().Nil(s.MsgStore.SetNextSenderMsgSeqNum(420)) // Given the following saved messages - expectedMsgsBySeqNum := map[int]string{ + expectedMsgsBySeqNum := map[uint64]string{ 1: "In the frozen land of Nador", 2: "they were forced to eat Robin's minstrels", 3: "and there was much rejoicing", } - var seqNums []int + var seqNums []uint64 for seqNum := range expectedMsgsBySeqNum { seqNums = append(seqNums, seqNum) } - sort.Ints(seqNums) + sort.Slice(seqNums, func(i, j int) bool { return seqNums[i] < seqNums[j] }) for _, seqNum := range seqNums { s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(expectedMsgsBySeqNum[seqNum]))) } - s.Equal(423, s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(423), s.MsgStore.NextSenderMsgSeqNum()) // When the messages are retrieved from the MessageStore actualMsgs := s.fetchMessages(1, 3) @@ -172,7 +172,7 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() { // And the messages are retrieved from the MessageStore actualMsgs = s.fetchMessages(1, 3) - s.Equal(423, s.MsgStore.NextSenderMsgSeqNum()) + s.Equal(uint64(423), s.MsgStore.NextSenderMsgSeqNum()) // Then the messages should still be s.Require().Len(actualMsgs, 3) @@ -199,7 +199,7 @@ func (s *StoreTestSuite) TestMessageStoreGetMessagesVariousRanges() { // When the following requests are made to the store var testCases = []struct { - beginSeqNo, endSeqNo int + beginSeqNo, endSeqNo uint64 expectedBytes [][]byte }{ {beginSeqNo: 1, endSeqNo: 1, expectedBytes: [][]byte{[]byte("hello")}}, diff --git a/logon_state_test.go b/logon_state_test.go index da7fb644f..386451e74 100644 --- a/logon_state_test.go +++ b/logon_state_test.go @@ -430,3 +430,34 @@ func (s *LogonStateTestSuite) TestStayLoggedInOnReset() { s.NextTargetMsgSeqNum(2) s.NextSenderMsgSeqNum(2) } + +func (s *LogonStateTestSuite) TestLargeSeqNum() { + + logon := s.Logon() + logon.Body.SetField(tagResetSeqNumFlag, FIXBoolean(true)) + + s.MockApp.On("FromAdmin").Return(nil) + s.MockApp.On("OnLogon") + s.MockApp.On("ToAdmin") + s.fixMsgIn(s.session, logon) + + s.MockApp.AssertExpectations(s.T()) + + s.State(inSession{}) + + s.session.store.SetNextSenderMsgSeqNum(16000000000000000000) + s.session.store.SetNextTargetMsgSeqNum(16000000000000000000) + s.IncrNextTargetMsgSeqNum() + s.IncrNextSenderMsgSeqNum() + + s.NextTargetMsgSeqNum(16000000000000000001) + s.NextSenderMsgSeqNum(16000000000000000001) + + s.fixMsgIn(s.session, logon) + + s.True(s.session.IsConnected()) + s.True(s.session.IsLoggedOn()) + + s.NextTargetMsgSeqNum(2) + s.NextSenderMsgSeqNum(2) +} diff --git a/memory_store.go b/memory_store.go index 7c57ca783..66b75c5eb 100644 --- a/memory_store.go +++ b/memory_store.go @@ -22,16 +22,16 @@ import ( ) type memoryStore struct { - senderMsgSeqNum, targetMsgSeqNum int + senderMsgSeqNum, targetMsgSeqNum uint64 creationTime time.Time - messageMap map[int][]byte + messageMap map[uint64][]byte } -func (store *memoryStore) NextSenderMsgSeqNum() int { +func (store *memoryStore) NextSenderMsgSeqNum() uint64 { return store.senderMsgSeqNum + 1 } -func (store *memoryStore) NextTargetMsgSeqNum() int { +func (store *memoryStore) NextTargetMsgSeqNum() uint64 { return store.targetMsgSeqNum + 1 } @@ -45,11 +45,11 @@ func (store *memoryStore) IncrNextTargetMsgSeqNum() error { return nil } -func (store *memoryStore) SetNextSenderMsgSeqNum(nextSeqNum int) error { +func (store *memoryStore) SetNextSenderMsgSeqNum(nextSeqNum uint64) error { store.senderMsgSeqNum = nextSeqNum - 1 return nil } -func (store *memoryStore) SetNextTargetMsgSeqNum(nextSeqNum int) error { +func (store *memoryStore) SetNextTargetMsgSeqNum(nextSeqNum uint64) error { store.targetMsgSeqNum = nextSeqNum - 1 return nil } @@ -80,16 +80,16 @@ func (store *memoryStore) Close() error { return nil } -func (store *memoryStore) SaveMessage(seqNum int, msg []byte) error { +func (store *memoryStore) SaveMessage(seqNum uint64, msg []byte) error { if store.messageMap == nil { - store.messageMap = make(map[int][]byte) + store.messageMap = make(map[uint64][]byte) } store.messageMap[seqNum] = msg return nil } -func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { +func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum uint64, msg []byte) error { err := store.SaveMessage(seqNum, msg) if err != nil { return err @@ -97,7 +97,7 @@ func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg return store.IncrNextSenderMsgSeqNum() } -func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { +func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum uint64, cb func([]byte) error) error { for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ { if m, ok := store.messageMap[seqNum]; ok { if err := cb(m); err != nil { @@ -108,7 +108,7 @@ func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([] return nil } -func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { +func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum uint64) ([][]byte, error) { var msgs [][]byte err := store.IterateMessages(beginSeqNum, endSeqNum, func(m []byte) error { msgs = append(msgs, m) diff --git a/quickfix_test.go b/quickfix_test.go index db308b48e..763603804 100644 --- a/quickfix_test.go +++ b/quickfix_test.go @@ -33,6 +33,7 @@ type KnowsFieldMap interface { Has(Tag) bool GetString(Tag) (string, MessageRejectError) GetInt(Tag) (int, MessageRejectError) + GetUint64(Tag) (uint64, MessageRejectError) GetField(Tag, FieldValueReader) MessageRejectError } @@ -52,6 +53,10 @@ func (s *QuickFIXSuite) FieldEquals(tag Tag, expectedValue interface{}, fieldMap val, err := fieldMap.GetInt(tag) s.Nil(err) s.Equal(expected, val) + case uint64: + val, err := fieldMap.GetUint64(tag) + s.Nil(err) + s.Equal(expected, val) case bool: var val FIXBoolean err := fieldMap.GetField(tag, &val) @@ -128,10 +133,10 @@ func (e *MockApp) FromApp(_ *Message, _ SessionID) (reject MessageRejectError) { } type MessageFactory struct { - seqNum int + seqNum uint64 } -func (m *MessageFactory) SetNextSeqNum(next int) { +func (m *MessageFactory) SetNextSeqNum(next uint64) { m.seqNum = next - 1 } @@ -143,7 +148,7 @@ func (m *MessageFactory) buildMessage(msgType string) *Message { SetField(tagSenderCompID, FIXString("TW")). SetField(tagTargetCompID, FIXString("ISLD")). SetField(tagSendingTime, FIXUTCTimestamp{Time: time.Now()}). - SetField(tagMsgSeqNum, FIXInt(m.seqNum)). + SetField(tagMsgSeqNum, FIXUint64(m.seqNum)). SetField(tagMsgType, FIXString(msgType)) return msg } @@ -164,17 +169,17 @@ func (m *MessageFactory) Logon() *Message { return m.buildMessage(string(msgTypeLogon)) } -func (m *MessageFactory) ResendRequest(beginSeqNo int) *Message { +func (m *MessageFactory) ResendRequest(beginSeqNo uint64) *Message { msg := m.buildMessage(string(msgTypeResendRequest)) - msg.Body.SetField(tagBeginSeqNo, FIXInt(beginSeqNo)) - msg.Body.SetField(tagEndSeqNo, FIXInt(0)) + msg.Body.SetField(tagBeginSeqNo, FIXUint64(beginSeqNo)) + msg.Body.SetField(tagEndSeqNo, FIXUint64(0)) return msg } -func (m *MessageFactory) SequenceReset(seqNo int) *Message { +func (m *MessageFactory) SequenceReset(seqNo uint64) *Message { msg := m.buildMessage(string(msgTypeSequenceReset)) - msg.Body.SetField(tagNewSeqNo, FIXInt(seqNo)) + msg.Body.SetField(tagNewSeqNo, FIXUint64(seqNo)) return msg } @@ -272,11 +277,11 @@ func (s *SessionSuiteRig) ExpectStoreReset() { s.NextTargetMsgSeqNum(1) } -func (s *SessionSuiteRig) NextTargetMsgSeqNum(expected int) { +func (s *SessionSuiteRig) NextTargetMsgSeqNum(expected uint64) { s.Equal(expected, s.session.store.NextTargetMsgSeqNum(), "NextTargetMsgSeqNum should be %v ", expected) } -func (s *SessionSuiteRig) NextSenderMsgSeqNum(expected int) { +func (s *SessionSuiteRig) NextSenderMsgSeqNum(expected uint64) { s.Equal(expected, s.session.store.NextSenderMsgSeqNum(), "NextSenderMsgSeqNum should be %v", expected) } @@ -288,7 +293,7 @@ func (s *SessionSuiteRig) IncrNextTargetMsgSeqNum() { s.Require().Nil(s.session.store.IncrNextTargetMsgSeqNum()) } -func (s *SessionSuiteRig) NoMessagePersisted(seqNum int) { +func (s *SessionSuiteRig) NoMessagePersisted(seqNum uint64) { persistedMessages, err := s.session.store.GetMessages(seqNum, seqNum) s.Nil(err) s.Empty(persistedMessages, "The message should not be persisted") @@ -296,7 +301,7 @@ func (s *SessionSuiteRig) NoMessagePersisted(seqNum int) { func (s *SessionSuiteRig) MessagePersisted(msg *Message) { var err error - seqNum, err := msg.Header.GetInt(tagMsgSeqNum) + seqNum, err := msg.Header.GetUint64(tagMsgSeqNum) s.Nil(err, "message should have seq num") persistedMessages, err := s.session.store.GetMessages(seqNum, seqNum) diff --git a/registry.go b/registry.go index 5f8e69fc2..287ebf842 100644 --- a/registry.go +++ b/registry.go @@ -94,7 +94,7 @@ func UnregisterSession(sessionID SessionID) error { } // SetNextTargetMsgSeqNum set the next expected target message sequence number for the session matching the session id. -func SetNextTargetMsgSeqNum(sessionID SessionID, seqNum int) error { +func SetNextTargetMsgSeqNum(sessionID SessionID, seqNum uint64) error { session, ok := lookupSession(sessionID) if !ok { return errUnknownSession @@ -103,7 +103,7 @@ func SetNextTargetMsgSeqNum(sessionID SessionID, seqNum int) error { } // SetNextSenderMsgSeqNum sets the next outgoing message sequence number for the session matching the session id. -func SetNextSenderMsgSeqNum(sessionID SessionID, seqNum int) error { +func SetNextSenderMsgSeqNum(sessionID SessionID, seqNum uint64) error { session, ok := lookupSession(sessionID) if !ok { return errUnknownSession @@ -112,7 +112,7 @@ func SetNextSenderMsgSeqNum(sessionID SessionID, seqNum int) error { } // GetExpectedSenderNum retrieves the expected sender sequence number for the session matching the session id. -func GetExpectedSenderNum(sessionID SessionID) (int, error) { +func GetExpectedSenderNum(sessionID SessionID) (uint64, error) { session, ok := lookupSession(sessionID) if !ok { return 0, errUnknownSession @@ -121,7 +121,7 @@ func GetExpectedSenderNum(sessionID SessionID) (int, error) { } // GetExpectedTargetNum retrieves the next target sequence number for the session matching the session id. -func GetExpectedTargetNum(sessionID SessionID) (int, error) { +func GetExpectedTargetNum(sessionID SessionID) (uint64, error) { session, ok := lookupSession(sessionID) if !ok { return 0, errUnknownSession diff --git a/resend_state.go b/resend_state.go index f3f601e9c..71a2d9b46 100644 --- a/resend_state.go +++ b/resend_state.go @@ -19,9 +19,9 @@ import "github.com/quickfixgo/quickfix/internal" type resendState struct { loggedOn - messageStash map[int]*Message - currentResendRangeEnd int - resendRangeEnd int + messageStash map[uint64]*Message + currentResendRangeEnd uint64 + resendRangeEnd uint64 } func (s resendState) String() string { return "Resend" } diff --git a/session.go b/session.go index 8213f8486..10895e8d7 100644 --- a/session.go +++ b/session.go @@ -146,7 +146,7 @@ func (s *session) fillDefaultHeader(msg *Message, inReplyTo *Message) { msg.Header.SetInt(tagLastMsgSeqNumProcessed, lastSeqNum) } } else { - msg.Header.SetInt(tagLastMsgSeqNumProcessed, s.store.NextTargetMsgSeqNum()-1) + msg.Header.SetUint64(tagLastMsgSeqNumProcessed, s.store.NextTargetMsgSeqNum()-1) } } } @@ -184,7 +184,7 @@ func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) er // Evaluate tag 789. if s.EnableNextExpectedMsgSeqNum { if inReplyTo != nil { - targetWantsNextSeqNumToBe, getErr := inReplyTo.Body.GetInt(tagNextExpectedMsgSeqNum) + targetWantsNextSeqNumToBe, getErr := inReplyTo.Body.GetUint64(tagNextExpectedMsgSeqNum) if getErr == nil { actualNextNum := s.store.NextSenderMsgSeqNum() // // Is the 789 we received too high ?? @@ -209,7 +209,7 @@ func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) er return nil } -func (s *session) generateSequenceReset(beginSeqNo int, endSeqNo int, inReplyTo Message) (err error) { +func (s *session) generateSequenceReset(beginSeqNo uint64, endSeqNo uint64, inReplyTo Message) (err error) { sequenceReset := NewMessage() s.fillDefaultHeader(sequenceReset, &inReplyTo) @@ -372,7 +372,7 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes s.sentReset = true seqNum = s.store.NextSenderMsgSeqNum() - msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum)) + msg.Header.SetField(tagMsgSeqNum, FIXUint64(seqNum)) } } } else { @@ -388,7 +388,7 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes return } -func (s *session) persist(seqNum int, msgBytes []byte) error { +func (s *session) persist(seqNum uint64, msgBytes []byte) error { if !s.DisableMessagePersist { return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes) } @@ -448,14 +448,14 @@ func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1) } -func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState, err error) { +func (s *session) sendResendRequest(beginSeq, endSeq uint64) (nextState resendState, err error) { nextState.resendRangeEnd = endSeq resend := NewMessage() resend.Header.SetBytes(tagMsgType, msgTypeResendRequest) - resend.Body.SetField(tagBeginSeqNo, FIXInt(beginSeq)) + resend.Body.SetField(tagBeginSeqNo, FIXUint64(beginSeq)) - var endSeqNo int + var endSeqNo uint64 if s.ResendRequestChunkSize != 0 { endSeqNo = beginSeq + s.ResendRequestChunkSize - 1 } else { @@ -556,7 +556,7 @@ func (s *session) handleLogon(msg *Message) error { // Evaluate tag 789 to see if we end up with an implied gapfill/resend. if s.EnableNextExpectedMsgSeqNum && !msg.Body.Has(tagResetSeqNumFlag) { - targetWantsNextSeqNumToBe, getErr := msg.Body.GetInt(tagNextExpectedMsgSeqNum) + targetWantsNextSeqNumToBe, getErr := msg.Body.GetUint64(tagNextExpectedMsgSeqNum) if getErr == nil { if targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived { if !s.DisableMessagePersist { @@ -668,7 +668,7 @@ func (s *session) checkTargetTooLow(msg *Message) MessageRejectError { return RequiredTagMissing(tagMsgSeqNum) } - seqNum, err := msg.Header.GetInt(tagMsgSeqNum) + seqNum, err := msg.Header.GetUint64(tagMsgSeqNum) if err != nil { return err } @@ -685,7 +685,7 @@ func (s *session) checkTargetTooHigh(msg *Message) MessageRejectError { return RequiredTagMissing(tagMsgSeqNum) } - seqNum, err := msg.Header.GetInt(tagMsgSeqNum) + seqNum, err := msg.Header.GetUint64(tagMsgSeqNum) if err != nil { return err } diff --git a/session_factory.go b/session_factory.go index dbb8e0a87..df8e4b4d9 100644 --- a/session_factory.go +++ b/session_factory.go @@ -232,7 +232,7 @@ func (f sessionFactory) newSession( } if settings.HasSetting(config.ResendRequestChunkSize) { - if s.ResendRequestChunkSize, err = settings.IntSetting(config.ResendRequestChunkSize); err != nil { + if s.ResendRequestChunkSize, err = settings.Uint64Setting(config.ResendRequestChunkSize); err != nil { return } } diff --git a/session_factory_test.go b/session_factory_test.go index 349e7580c..91bcca304 100644 --- a/session_factory_test.go +++ b/session_factory_test.go @@ -61,7 +61,7 @@ func (s *SessionFactorySuite) TestDefaults() { s.Nil(session.SessionTime, "By default, start and end time unset") s.Equal("", session.DefaultApplVerID) s.False(session.InitiateLogon) - s.Equal(0, session.ResendRequestChunkSize) + s.Equal(uint64(0), session.ResendRequestChunkSize) s.False(session.EnableLastMsgSeqNumProcessed) s.False(session.SkipCheckLatency) s.Equal(Millis, session.timestampPrecision) @@ -143,7 +143,7 @@ func (s *SessionFactorySuite) TestResendRequestChunkSize() { session, err := s.newSession(s.SessionID, s.MessageStoreFactory, s.SessionSettings, s.LogFactory, s.App) s.Nil(err) s.NotNil(session) - s.Equal(2500, session.ResendRequestChunkSize) + s.Equal(uint64(2500), session.ResendRequestChunkSize) s.SessionSettings.Set(config.ResendRequestChunkSize, "notanint") _, err = s.newSession(s.SessionID, s.MessageStoreFactory, s.SessionSettings, s.LogFactory, s.App) diff --git a/session_rejects.go b/session_rejects.go index 4ea16b61e..2fbc850ca 100644 --- a/session_rejects.go +++ b/session_rejects.go @@ -27,8 +27,8 @@ func (e incorrectBeginString) Error() string { return "Incorrect BeginString" } // targetTooHigh is a MessageReject where the sequence number is larger than expected. type targetTooHigh struct { messageRejectError - ReceivedTarget int - ExpectedTarget int + ReceivedTarget uint64 + ExpectedTarget uint64 } func (e targetTooHigh) Error() string { @@ -38,8 +38,8 @@ func (e targetTooHigh) Error() string { // targetTooLow is a MessageReject where the sequence number is less than expected. type targetTooLow struct { messageRejectError - ReceivedTarget int - ExpectedTarget int + ReceivedTarget uint64 + ExpectedTarget uint64 } func (e targetTooLow) Error() string { diff --git a/session_settings.go b/session_settings.go index 74d6cdd27..5b659c1a0 100644 --- a/session_settings.go +++ b/session_settings.go @@ -120,6 +120,19 @@ func (s *SessionSettings) IntSetting(setting string) (int, error) { return 0, IncorrectFormatForSetting{Setting: setting, Value: rawVal, Err: err} } +func (s *SessionSettings) Uint64Setting(setting string) (uint64, error) { + rawVal, err := s.RawSetting(setting) + if err != nil { + return 0, err + } + + if val, err := strconv.ParseUint(string(rawVal), 10, 64); err == nil { + return val, nil + } + + return 0, IncorrectFormatForSetting{Setting: setting, Value: rawVal, Err: err} +} + // DurationSetting returns the requested setting parsed as a time.Duration. // Returns an error if the setting is not set or cannot be parsed as a time.Duration. func (s *SessionSettings) DurationSetting(setting string) (time.Duration, error) { diff --git a/session_test.go b/session_test.go index 83226fef7..2b177e685 100644 --- a/session_test.go +++ b/session_test.go @@ -249,8 +249,8 @@ func (s *SessionSuite) TestShouldSendReset() { ResetOnLogon bool ResetOnDisconnect bool ResetOnLogout bool - NextSenderMsgSeqNum int - NextTargetMsgSeqNum int + NextSenderMsgSeqNum uint64 + NextTargetMsgSeqNum uint64 Expected bool }{ {BeginStringFIX40, true, false, false, 1, 1, false}, // ResetSeqNumFlag not available < fix41. diff --git a/store.go b/store.go index 6689297ba..98e9b0259 100644 --- a/store.go +++ b/store.go @@ -21,22 +21,22 @@ import ( // The MessageStore interface provides methods to record and retrieve messages for resend purposes. type MessageStore interface { - NextSenderMsgSeqNum() int - NextTargetMsgSeqNum() int + NextSenderMsgSeqNum() uint64 + NextTargetMsgSeqNum() uint64 IncrNextSenderMsgSeqNum() error IncrNextTargetMsgSeqNum() error - SetNextSenderMsgSeqNum(next int) error - SetNextTargetMsgSeqNum(next int) error + SetNextSenderMsgSeqNum(next uint64) error + SetNextTargetMsgSeqNum(next uint64) error CreationTime() time.Time SetCreationTime(time.Time) - SaveMessage(seqNum int, msg []byte) error - SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error - GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) - IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error + SaveMessage(seqNum uint64, msg []byte) error + SaveMessageAndIncrNextSenderMsgSeqNum(seqNum uint64, msg []byte) error + GetMessages(beginSeqNum, endSeqNum uint64) ([][]byte, error) + IterateMessages(beginSeqNum, endSeqNum uint64, cb func([]byte) error) error Refresh() error Reset() error diff --git a/store/file/file_store.go b/store/file/file_store.go index 04bced51a..16686c909 100644 --- a/store/file/file_store.go +++ b/store/file/file_store.go @@ -202,7 +202,7 @@ func (store *fileStore) populateCache() (creationTimePopulated bool, err error) } if senderSeqNumBytes, err := os.ReadFile(store.senderSeqNumsFname); err == nil { - if senderSeqNum, err := strconv.Atoi(strings.Trim(string(senderSeqNumBytes), "\r\n")); err == nil { + if senderSeqNum, err := strconv.ParseUint(strings.Trim(string(senderSeqNumBytes), "\r\n"), 10, 64); err == nil { if err = store.cache.SetNextSenderMsgSeqNum(senderSeqNum); err != nil { return creationTimePopulated, errors.Wrap(err, "cache set next sender") } @@ -210,7 +210,7 @@ func (store *fileStore) populateCache() (creationTimePopulated bool, err error) } if targetSeqNumBytes, err := os.ReadFile(store.targetSeqNumsFname); err == nil { - if targetSeqNum, err := strconv.Atoi(strings.Trim(string(targetSeqNumBytes), "\r\n")); err == nil { + if targetSeqNum, err := strconv.ParseUint(strings.Trim(string(targetSeqNumBytes), "\r\n"), 10, 64); err == nil { if err = store.cache.SetNextTargetMsgSeqNum(targetSeqNum); err != nil { return creationTimePopulated, errors.Wrap(err, "cache set next target") } @@ -243,7 +243,7 @@ func (store *fileStore) setSession() error { return nil } -func (store *fileStore) setSeqNum(f *os.File, seqNum int) error { +func (store *fileStore) setSeqNum(f *os.File, seqNum uint64) error { store.fileMu.Lock() defer store.fileMu.Unlock() if _, err := f.Seek(0, io.SeekStart); err != nil { @@ -261,17 +261,17 @@ func (store *fileStore) setSeqNum(f *os.File, seqNum int) error { } // NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent. -func (store *fileStore) NextSenderMsgSeqNum() int { +func (store *fileStore) NextSenderMsgSeqNum() uint64 { return store.cache.NextSenderMsgSeqNum() } // NextTargetMsgSeqNum returns the next MsgSeqNum that should be received. -func (store *fileStore) NextTargetMsgSeqNum() int { +func (store *fileStore) NextTargetMsgSeqNum() uint64 { return store.cache.NextTargetMsgSeqNum() } // SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. -func (store *fileStore) SetNextSenderMsgSeqNum(next int) error { +func (store *fileStore) SetNextSenderMsgSeqNum(next uint64) error { if err := store.setSeqNum(store.senderSeqNumsFile, next); err != nil { return errors.Wrap(err, "file") } @@ -279,7 +279,7 @@ func (store *fileStore) SetNextSenderMsgSeqNum(next int) error { } // SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. -func (store *fileStore) SetNextTargetMsgSeqNum(next int) error { +func (store *fileStore) SetNextTargetMsgSeqNum(next uint64) error { if err := store.setSeqNum(store.targetSeqNumsFile, next); err != nil { return errors.Wrap(err, "file") } @@ -311,7 +311,7 @@ func (store *fileStore) CreationTime() time.Time { func (store *fileStore) SetCreationTime(_ time.Time) { } -func (store *fileStore) SaveMessage(seqNum int, msg []byte) error { +func (store *fileStore) SaveMessage(seqNum uint64, msg []byte) error { store.fileMu.Lock() defer store.fileMu.Unlock() offset, err := store.bodyFile.Seek(0, io.SeekEnd) @@ -334,7 +334,7 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error { return nil } -func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { +func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum uint64, msg []byte) error { err := store.SaveMessage(seqNum, msg) if err != nil { return err @@ -351,7 +351,7 @@ func (store *fileStore) syncBodyAndHeaderFilesLocked() error { return nil } -func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { +func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum uint64, cb func([]byte) error) error { // Sync files store.fileMu.Lock() err := store.syncBodyAndHeaderFilesLocked() @@ -377,7 +377,7 @@ func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]by // Iterate over the header file for { - var seqNum, size int + var seqNum, size uint64 var offset int64 if cnt, err := fmt.Fscanf(headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil { if errors.Is(err, io.EOF) { @@ -402,7 +402,7 @@ func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]by return nil } -func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { +func (store *fileStore) GetMessages(beginSeqNum, endSeqNum uint64) ([][]byte, error) { var msgs [][]byte err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error { msgs = append(msgs, msg) diff --git a/store/mongo/mongo_store.go b/store/mongo/mongo_store.go index 42696388e..7b78f9e14 100644 --- a/store/mongo/mongo_store.go +++ b/store/mongo/mongo_store.go @@ -139,12 +139,12 @@ func generateMessageFilter(s *quickfix.SessionID) (messageFilter *mongoQuickFixE type mongoQuickFixEntryData struct { // Message specific data. - Msgseq int `bson:"msgseq,omitempty"` + Msgseq uint64 `bson:"msgseq,omitempty"` Message []byte `bson:"message,omitempty"` // Session specific data. CreationTime time.Time `bson:"creation_time,omitempty"` - IncomingSeqNum int `bson:"incoming_seq_num,omitempty"` - OutgoingSeqNum int `bson:"outgoing_seq_num,omitempty"` + IncomingSeqNum uint64 `bson:"incoming_seq_num,omitempty"` + OutgoingSeqNum uint64 `bson:"outgoing_seq_num,omitempty"` // Indexed data. BeginString string `bson:"begin_string"` SessionQualifier string `bson:"session_qualifier"` @@ -224,17 +224,17 @@ func (store *mongoStore) populateCache() error { } // NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent. -func (store *mongoStore) NextSenderMsgSeqNum() int { +func (store *mongoStore) NextSenderMsgSeqNum() uint64 { return store.cache.NextSenderMsgSeqNum() } // NextTargetMsgSeqNum returns the next MsgSeqNum that should be received. -func (store *mongoStore) NextTargetMsgSeqNum() int { +func (store *mongoStore) NextTargetMsgSeqNum() uint64 { return store.cache.NextTargetMsgSeqNum() } // SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. -func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { +func (store *mongoStore) SetNextSenderMsgSeqNum(next uint64) error { msgFilter := generateMessageFilter(&store.sessionID) sessionUpdate := generateMessageFilter(&store.sessionID) sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() @@ -247,7 +247,7 @@ func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { } // SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. -func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { +func (store *mongoStore) SetNextTargetMsgSeqNum(next uint64) error { msgFilter := generateMessageFilter(&store.sessionID) sessionUpdate := generateMessageFilter(&store.sessionID) sessionUpdate.IncomingSeqNum = next @@ -284,7 +284,7 @@ func (store *mongoStore) CreationTime() time.Time { func (store *mongoStore) SetCreationTime(_ time.Time) { } -func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { +func (store *mongoStore) SaveMessage(seqNum uint64, msg []byte) (err error) { msgFilter := generateMessageFilter(&store.sessionID) msgFilter.Msgseq = seqNum msgFilter.Message = msg @@ -292,7 +292,7 @@ func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { return } -func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { +func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum uint64, msg []byte) error { if !store.allowTransactions { err := store.SaveMessage(seqNum, msg) @@ -303,7 +303,7 @@ func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg [ } // If the mongodb supports replicasets, perform this operation as a transaction instead- - var next int + var next uint64 err := store.db.UseSession(context.Background(), func(sessionCtx mongo.SessionContext) error { if err := sessionCtx.StartTransaction(); err != nil { return err @@ -338,7 +338,7 @@ func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg [ return store.cache.SetNextSenderMsgSeqNum(next) } -func (store *mongoStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { +func (store *mongoStore) IterateMessages(beginSeqNum, endSeqNum uint64, cb func([]byte) error) error { msgFilter := generateMessageFilter(&store.sessionID) // Marshal into database form. msgFilterBytes, err := bson.Marshal(msgFilter) @@ -371,7 +371,7 @@ func (store *mongoStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]b return nil } -func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { +func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum uint64) ([][]byte, error) { var msgs [][]byte err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error { msgs = append(msgs, msg) diff --git a/store/sql/sql_store.go b/store/sql/sql_store.go index c1162464a..2bb9cb9fe 100644 --- a/store/sql/sql_store.go +++ b/store/sql/sql_store.go @@ -245,7 +245,7 @@ func (store *sqlStore) Refresh() error { func (store *sqlStore) populateCache() error { s := store.sessionID var creationTime time.Time - var incomingSeqNum, outgoingSeqNum int + var incomingSeqNum, outgoingSeqNum uint64 row := store.db.QueryRow(sqlString(store.sqlGetSeqNums, store.placeholder), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, @@ -283,17 +283,17 @@ func (store *sqlStore) populateCache() error { } // NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent. -func (store *sqlStore) NextSenderMsgSeqNum() int { +func (store *sqlStore) NextSenderMsgSeqNum() uint64 { return store.cache.NextSenderMsgSeqNum() } // NextTargetMsgSeqNum returns the next MsgSeqNum that should be received. -func (store *sqlStore) NextTargetMsgSeqNum() int { +func (store *sqlStore) NextTargetMsgSeqNum() uint64 { return store.cache.NextTargetMsgSeqNum() } // SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. -func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error { +func (store *sqlStore) SetNextSenderMsgSeqNum(next uint64) error { s := store.sessionID _, err := store.db.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder), next, s.BeginString, s.Qualifier, @@ -306,7 +306,7 @@ func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error { } // SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. -func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error { +func (store *sqlStore) SetNextTargetMsgSeqNum(next uint64) error { s := store.sessionID _, err := store.db.Exec(sqlString(store.sqlUpdateTargetSeqNum, store.placeholder), next, s.BeginString, s.Qualifier, @@ -343,7 +343,7 @@ func (store *sqlStore) CreationTime() time.Time { func (store *sqlStore) SetCreationTime(_ time.Time) { } -func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error { +func (store *sqlStore) SaveMessage(seqNum uint64, msg []byte) error { s := store.sessionID _, err := store.db.Exec(sqlString(store.sqlInsertMessage, store.placeholder), @@ -355,7 +355,7 @@ func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error { return err } -func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { +func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum uint64, msg []byte) error { s := store.sessionID tx, err := store.db.Begin() @@ -390,7 +390,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b return store.cache.SetNextSenderMsgSeqNum(next) } -func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { +func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum uint64, cb func([]byte) error) error { s := store.sessionID rows, err := store.db.Query(sqlString(store.sqlGetMessages, store.placeholder), s.BeginString, s.Qualifier, @@ -414,7 +414,7 @@ func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byt return rows.Err() } -func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { +func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum uint64) ([][]byte, error) { var msgs [][]byte err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error { msgs = append(msgs, msg) diff --git a/store/sql/sql_store_test.go b/store/sql/sql_store_test.go index e988e807e..3c5195f52 100644 --- a/store/sql/sql_store_test.go +++ b/store/sql/sql_store_test.go @@ -136,9 +136,9 @@ TargetCompID=%s // Get and check sequence numbers nextSender := store.NextSenderMsgSeqNum() - suite.Equal(3, nextSender) + suite.Equal(uint64(3), nextSender) nextTarget := store.NextTargetMsgSeqNum() - suite.Equal(2, nextTarget) + suite.Equal(uint64(2), nextTarget) // IterateMessages count := 0 @@ -154,9 +154,9 @@ TargetCompID=%s // After reset, sequence numbers should be 1 nextSender = store.NextSenderMsgSeqNum() - suite.Equal(1, nextSender) + suite.Equal(uint64(1), nextSender) nextTarget = store.NextTargetMsgSeqNum() - suite.Equal(1, nextTarget) + suite.Equal(uint64(1), nextTarget) } func (suite *SQLStoreTestSuite) TearDownTest() {