@@ -80,6 +80,57 @@ JournalFile::~JournalFile()
80
80
// NOTHING
81
81
}
82
82
83
+ JournalFile::RecordBufferType
84
+ JournalFile::makeQueueOpRecord (unsigned int primaryLeaseId,
85
+ bsls::Types::Uint64 sequenceNumber)
86
+ {
87
+ // QueueOpRec
88
+ OffsetPtr<QueueOpRecord> rec (d_block, d_currPos);
89
+ new (rec.get ()) QueueOpRecord ();
90
+ rec->header ()
91
+ .setPrimaryLeaseId (primaryLeaseId)
92
+ .setSequenceNumber (sequenceNumber)
93
+ .setTimestamp (sequenceNumber * d_timestampIncrement);
94
+ rec->setFlags (3 )
95
+ .setQueueKey (mqbu::StorageKey (mqbu::StorageKey::BinaryRepresentation (),
96
+ " abcde" ))
97
+ .setAppKey (mqbu::StorageKey (mqbu::StorageKey::BinaryRepresentation (),
98
+ " appid" ))
99
+ .setType (QueueOpType::e_PURGE)
100
+ .setMagic (RecordHeader::k_MAGIC);
101
+
102
+ RecordBufferType buf;
103
+ bsl::memcpy (buf.buffer (),
104
+ rec.get (),
105
+ FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
106
+ return buf;
107
+ }
108
+
109
+ JournalFile::RecordBufferType
110
+ JournalFile::makeJournalOpRecord (unsigned int primaryLeaseId,
111
+ bsls::Types::Uint64 sequenceNumber)
112
+ {
113
+ OffsetPtr<JournalOpRecord> rec (d_block, d_currPos);
114
+ new (rec.get ()) JournalOpRecord (JournalOpType::e_SYNCPOINT,
115
+ SyncPointType::e_REGULAR,
116
+ 1234567 , // seqNum
117
+ 25 , // leaderTerm
118
+ 121 , // leaderNodeId
119
+ 8800 , // dataFilePosition
120
+ 100 , // qlistFilePosition
121
+ RecordHeader::k_MAGIC);
122
+
123
+ rec->header ()
124
+ .setPrimaryLeaseId (primaryLeaseId)
125
+ .setSequenceNumber (sequenceNumber)
126
+ .setTimestamp (sequenceNumber * d_timestampIncrement);
127
+ RecordBufferType buf;
128
+ bsl::memcpy (buf.buffer (),
129
+ rec.get (),
130
+ FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
131
+ return buf;
132
+ }
133
+
83
134
void JournalFile::addAllTypesRecords (RecordsListType* records)
84
135
{
85
136
// PRECONDITIONS
@@ -166,49 +217,12 @@ void JournalFile::addAllTypesRecords(RecordsListType* records)
166
217
records->push_back (bsl::make_pair (RecordType::e_DELETION, buf));
167
218
}
168
219
else if (3 == remainder ) {
169
- // QueueOpRec
170
- OffsetPtr<QueueOpRecord> rec (d_block, d_currPos);
171
- new (rec.get ()) QueueOpRecord ();
172
- rec->header ()
173
- .setPrimaryLeaseId (100 )
174
- .setSequenceNumber (i)
175
- .setTimestamp (i * d_timestampIncrement);
176
- rec->setFlags (3 )
177
- .setQueueKey (
178
- mqbu::StorageKey (mqbu::StorageKey::BinaryRepresentation (),
179
- " abcde" ))
180
- .setAppKey (
181
- mqbu::StorageKey (mqbu::StorageKey::BinaryRepresentation (),
182
- " appid" ))
183
- .setType (QueueOpType::e_PURGE)
184
- .setMagic (RecordHeader::k_MAGIC);
185
-
186
- RecordBufferType buf;
187
- bsl::memcpy (buf.buffer (),
188
- rec.get (),
189
- FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
190
- records->push_back (bsl::make_pair (RecordType::e_QUEUE_OP, buf));
220
+ records->push_back (bsl::make_pair (RecordType::e_QUEUE_OP,
221
+ makeQueueOpRecord (100 , i)));
191
222
}
192
223
else {
193
- OffsetPtr<JournalOpRecord> rec (d_block, d_currPos);
194
- new (rec.get ()) JournalOpRecord (JournalOpType::e_SYNCPOINT,
195
- SyncPointType::e_REGULAR,
196
- 1234567 , // seqNum
197
- 25 , // leaderTerm
198
- 121 , // leaderNodeId
199
- 8800 , // dataFilePosition
200
- 100 , // qlistFilePosition
201
- RecordHeader::k_MAGIC);
202
-
203
- rec->header ()
204
- .setPrimaryLeaseId (100 )
205
- .setSequenceNumber (i)
206
- .setTimestamp (i * d_timestampIncrement);
207
- RecordBufferType buf;
208
- bsl::memcpy (buf.buffer (),
209
- rec.get (),
210
- FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
211
- records->push_back (bsl::make_pair (RecordType::e_JOURNAL_OP, buf));
224
+ records->push_back (bsl::make_pair (RecordType::e_JOURNAL_OP,
225
+ makeJournalOpRecord (100 , i)));
212
226
}
213
227
214
228
d_currPos += FileStoreProtocol::k_JOURNAL_RECORD_SIZE;
@@ -335,7 +349,6 @@ void JournalFile::addJournalRecordsWithPartiallyConfirmedMessages(
335
349
unsigned int remainder = i % 3 ;
336
350
337
351
if (1 == remainder ) {
338
- // bmqt::MessageGUID g;
339
352
mqbu::MessageGUIDUtil::generateGUID (&lastMessageGUID);
340
353
OffsetPtr<MessageRecord> rec (d_block, d_currPos);
341
354
new (rec.get ()) MessageRecord ();
@@ -698,27 +711,9 @@ void JournalFile::addMultipleTypesRecordsWithMultipleLeaseId(
698
711
}
699
712
else {
700
713
// QueueOpRec
701
- OffsetPtr<QueueOpRecord> rec (d_block, d_currPos);
702
- new (rec.get ()) QueueOpRecord ();
703
- rec->header ()
704
- .setPrimaryLeaseId (leaseId)
705
- .setSequenceNumber (seqNumber++)
706
- .setTimestamp (i * d_timestampIncrement);
707
- rec->setFlags (3 )
708
- .setQueueKey (
709
- mqbu::StorageKey (mqbu::StorageKey::BinaryRepresentation (),
710
- " abcde" ))
711
- .setAppKey (
712
- mqbu::StorageKey (mqbu::StorageKey::BinaryRepresentation (),
713
- " appid" ))
714
- .setType (QueueOpType::e_PURGE)
715
- .setMagic (RecordHeader::k_MAGIC);
716
-
717
- RecordBufferType buf;
718
- bsl::memcpy (buf.buffer (),
719
- rec.get (),
720
- FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
721
- records->push_back (bsl::make_pair (RecordType::e_QUEUE_OP, buf));
714
+ records->push_back (
715
+ bsl::make_pair (RecordType::e_QUEUE_OP,
716
+ makeQueueOpRecord (leaseId, seqNumber++)));
722
717
}
723
718
724
719
d_currPos += FileStoreProtocol::k_JOURNAL_RECORD_SIZE;
0 commit comments