@@ -30,6 +30,10 @@ object SiriusPersistenceActor {
3030 * that is, the state of persisted data
3131 */
3232 sealed trait LogQuery
33+ sealed trait LogQuerySubrange extends LogQuery {
34+ def begin : Long
35+ def end : Long
36+ }
3337
3438 case object GetLogSize extends LogQuery
3539 /**
@@ -42,7 +46,7 @@ object SiriusPersistenceActor {
4246 * @param begin first sequence number of the range
4347 * @param end last sequence number of the range, inclusive
4448 */
45- case class GetLogSubrange (begin : Long , end : Long ) extends LogQuery
49+ case class GetLogSubrange (begin : Long , end : Long ) extends LogQuerySubrange
4650 /**
4751 * Message for directly requesting a chunk of the log from a node.
4852 *
@@ -54,7 +58,7 @@ object SiriusPersistenceActor {
5458 * @param end last sequence number of the range, inclusive
5559 * @param limit the maximum number of events
5660 */
57- case class GetLogSubrangeWithLimit (begin : Long , end : Option [ Long ] , limit : Long ) extends LogQuery
61+ case class GetLogSubrangeWithLimit (begin : Long , end : Long , limit : Long ) extends LogQuerySubrange
5862
5963 trait LogSubrange
6064 trait PopulatedSubrange extends LogSubrange {
@@ -143,14 +147,11 @@ class SiriusPersistenceActor(stateActor: ActorRef,
143147
144148 lastWriteTime = thisWriteTime
145149
146- case GetLogSubrangeWithLimit (start, Some (end), limit) =>
147- sender ! queryLimitedSubrange(start, end, limit)
148-
149- case GetLogSubrangeWithLimit (start, None , limit) =>
150- sender ! queryLimited(start, limit)
151-
152150 case GetLogSubrange (start, end) =>
153- sender ! querySubrange(start, end)
151+ sender ! querySubrange(start, end, Long .MaxValue )
152+
153+ case GetLogSubrangeWithLimit (start, end, limit) =>
154+ sender ! querySubrange(start, end, limit)
154155
155156 case GetNextLogSeq =>
156157 sender ! siriusLog.getNextSeq
@@ -163,99 +164,46 @@ class SiriusPersistenceActor(stateActor: ActorRef,
163164 case _ : SiriusResult =>
164165 }
165166
166- private def queryLimitedSubrange (rangeStart : Long , rangeEnd : Long , limit : Long ): LogSubrange =
167- if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 ) {
168- // invalid query subrange or limit, we can't send anything useful back
167+ private def querySubrange (rangeStart : Long , rangeEnd : Long , limit : Long ): LogSubrange = {
168+ val nextSeq = siriusLog.getNextSeq
169+ val lastSeq = nextSeq - 1
170+ if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 || rangeStart > lastSeq) {
171+ // parameters are out of range, can't return anything useful
169172 EmptySubrange
170- } else if (limit > (rangeEnd - rangeStart)) {
171- // limit is larger than the subrange window, no need to enforce limit
172- querySubrange(rangeStart, rangeEnd)
173173 } else {
174- val nextSeq = siriusLog.getNextSeq
175- if (rangeStart >= nextSeq) {
176- // query is out of range, we can't send anything useful back
177- EmptySubrange
178- } else if (rangeEnd >= nextSeq) {
179- // we can only answer partially
180- val lastSeq = nextSeq - 1
181- val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer .empty[OrderedEvent ])(
182- // continue folding events as long as the buffer is smaller than the limit
183- buffer => buffer.size < limit
184- )(
185- (acc, event) => acc += event
186- )
187- if (buffer.size < limit) {
188- PartialSubrange (rangeStart, lastSeq, buffer.toList)
189- } else {
190- PartialSubrange (rangeStart, buffer.last.sequence, buffer.toList)
191- }
192- } else {
193- val buffer = siriusLog.foldLeftRangeWhile(rangeStart, rangeEnd)(ListBuffer .empty[OrderedEvent ])(
194- // continue folding events as long as the buffer is smaller than the limit
195- buffer => buffer.size < limit
196- )(
197- (acc, event) => acc += event
198- )
199- if (buffer.size < limit) {
200- CompleteSubrange (rangeStart, rangeEnd, buffer.toList)
174+ val endSeq = if (rangeEnd > lastSeq) lastSeq else rangeEnd
175+ if (limit > (endSeq - rangeStart)) {
176+ // the limit is larger than the subrange window, so do not enforce
177+ val events = siriusLog.foldLeftRange(rangeStart, endSeq)(ListBuffer .empty[OrderedEvent ])(
178+ (acc, evt) => acc += evt
179+ ).toList
180+ if (endSeq < rangeEnd) {
181+ // the end of the range extends beyond the end of the log, so can only partially answer
182+ PartialSubrange (rangeStart, endSeq, events)
201183 } else {
202- PartialSubrange (rangeStart, buffer.last.sequence, buffer.toList)
184+ // the range is entirely within the log, so can fully answer
185+ CompleteSubrange (rangeStart, endSeq, events)
203186 }
204- }
205- }
206-
207- private def queryLimited (rangeStart : Long , limit : Long ): LogSubrange = {
208- if (limit <= 0 ) {
209- // invalid query subrange or limit, we can't send anything useful back
210- EmptySubrange
211- } else {
212- val nextSeq = siriusLog.getNextSeq
213- if (rangeStart >= nextSeq) {
214- // query is out of range, we can't send anything useful back
215- EmptySubrange
216187 } else {
217- val nextSeq = siriusLog.getNextSeq
218- val lastSeq = nextSeq - 1
219- val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer .empty[OrderedEvent ])(
220- // continue folding events as long as the buffer is smaller than the limit
188+ // the limit is smaller than the subrange window
189+ val buffer = siriusLog.foldLeftRangeWhile(rangeStart, endSeq)(ListBuffer .empty[OrderedEvent ])(
221190 buffer => buffer.size < limit
222191 )(
223- (acc, event ) => acc += event
192+ (acc, evt ) => acc += evt
224193 )
225- if (buffer.size < limit) {
226- CompleteSubrange (rangeStart, lastSeq, buffer.toList)
194+ if (buffer.size < limit && endSeq < rangeEnd) {
195+ // the end of the subrange extended part the end of the log
196+ // and the buffer was not filled to the limit, so we can only partially respond
197+ PartialSubrange (rangeStart, endSeq, buffer.toList)
227198 } else {
228- PartialSubrange (rangeStart, buffer.last.sequence, buffer.toList)
199+ // the buffer was filled to the limit, so completely respond using the sequence of the
200+ // last event as the end of the range
201+ CompleteSubrange (rangeStart, buffer.last.sequence, buffer.toList)
229202 }
230203 }
231204 }
232205 }
233206
234- private def querySubrange (rangeStart : Long , rangeEnd : Long ): LogSubrange =
235- if (rangeEnd < rangeStart || rangeEnd <= 0 ) {
236- // invalid query subrange or limit, we can't send anything useful back
237- EmptySubrange
238- } else {
239- val nextSeq = siriusLog.getNextSeq
240- if (rangeStart >= nextSeq) {
241- // query is out of range, we can't send anything useful back
242- EmptySubrange
243- } else if (rangeEnd >= nextSeq) {
244- // we can answer partially
245- val lastSeq = nextSeq - 1
246- val buffer = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer .empty[OrderedEvent ])(
247- (acc, event) => acc += event
248- )
249- PartialSubrange (rangeStart, lastSeq, buffer.toList)
250- } else {
251- // we can answer fully
252- val buffer = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer .empty[OrderedEvent ])(
253- (acc, event) => acc += event
254- )
255- CompleteSubrange (rangeStart, rangeEnd, buffer.toList)
256- }
257- }
258-
259207 /**
260208 * Monitoring hooks
261209 */
0 commit comments