Skip to content

Commit 76daf1b

Browse files
authored
fix: lastSequenceNumber for initial stashed command (#32657)
* place currentSequenceNumber in mutable field in BehaviorSetup * similar for currentMetadata
1 parent fa68698 commit 76daf1b

File tree

6 files changed

+65
-45
lines changed

6 files changed

+65
-45
lines changed

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedMetadataSpec.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,19 @@ class EventSourcedMetadataSpec
109109

110110
// and during replay
111111
val ref2 = spawn(behavior(PersistenceId.ofUniqueId("ess-1"), probe.ref))
112+
113+
// This command will be handled after the replay.
114+
// It will most likely be stashed during the replay and it should see seqNr 5 when handled.
115+
// Reproducer of issue #32651
116+
ref2 ! "cmd"
117+
112118
probe.expectMessage("Some(meta) eventHandler evt")
113119
probe.expectMessage("Some(meta) eventHandler evt")
114120
probe.expectMessage("Some(meta-1) eventHandler evt1")
115121
probe.expectMessage("Some(meta-2) eventHandler evt2")
116122
probe.expectMessage("Some(meta-3) eventHandler evt3")
117123
probe.expectMessage("Some(meta-3) RecoveryCompleted")
118124

119-
ref2 ! "cmd"
120125
probe.expectMessage("None onCommand")
121126
probe.expectMessage("Some(meta) eventHandler evt")
122127
probe.expectMessage("None thenRun")

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,19 @@ class EventSourcedSequenceNumberSpec
102102

103103
// and during replay
104104
val ref2 = spawn(behavior(PersistenceId.ofUniqueId("ess-1"), probe.ref))
105+
106+
// This command will be handled after the replay.
107+
// It will most likely be stashed during the replay and it should see seqNr 5 when handled.
108+
// Reproducer of issue #32651
109+
ref2 ! "cmd"
110+
105111
probe.expectMessage("1 eventHandler evt")
106112
probe.expectMessage("2 eventHandler evt")
107113
probe.expectMessage("3 eventHandler evt1")
108114
probe.expectMessage("4 eventHandler evt2")
109115
probe.expectMessage("5 eventHandler evt3")
110116
probe.expectMessage("5 onRecoveryComplete")
111117

112-
ref2 ! "cmd"
113118
probe.expectMessage("5 onCommand")
114119
probe.expectMessage("6 eventHandler evt")
115120
probe.expectMessage("6 thenRun")

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ private[akka] final class BehaviorSetup[C, E, S](
102102

103103
private var mdcPhase = PersistenceMdc.Initializing
104104

105+
// Needed for WithSeqNrAccessible
106+
var currentSequenceNumber = 0L
107+
108+
// Needed for WithMetadataAccessible
109+
var currentMetadata: Option[Any] = None
110+
105111
if (isOnlyOneSnapshot) {
106112
retention match {
107113
case SnapshotCountRetentionCriteriaImpl(_, keepNSnapshots, _) if keepNSnapshots > 1 =>

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ private[akka] object ReplayingEvents {
6565
recoveryStartTime: Long,
6666
version: VersionVector,
6767
seenSeqNrPerReplica: Map[ReplicaId, Long],
68-
eventsReplayed: Long,
69-
metadata: Option[Any])
68+
eventsReplayed: Long)
7069

7170
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
7271
Behaviors.setup { _ =>
@@ -142,8 +141,9 @@ private[akka] final class ReplayingEvents[C, E, S](
142141

143142
def handleEvent(event: E): Unit = {
144143
eventForErrorReporting = OptionVal.Some(event)
145-
state =
146-
state.copy(seqNr = repr.sequenceNr, eventsReplayed = state.eventsReplayed + 1, metadata = repr.metadata)
144+
state = state.copy(seqNr = repr.sequenceNr, eventsReplayed = state.eventsReplayed + 1)
145+
setup.currentSequenceNumber = repr.sequenceNr
146+
setup.currentMetadata = repr.metadata
147147

148148
val replicatedMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] =
149149
setup.replication match {
@@ -196,14 +196,17 @@ private[akka] final class ReplayingEvents[C, E, S](
196196
this
197197
} catch {
198198
case NonFatal(ex) =>
199-
state = state.copy(seqNr = repr.sequenceNr, metadata = repr.metadata)
199+
state = state.copy(seqNr = repr.sequenceNr)
200+
setup.currentSequenceNumber = repr.sequenceNr
201+
setup.currentMetadata = repr.metadata
200202
onRecoveryFailure(ex, eventForErrorReporting.toOption, "replaying-event")
201203
}
202204

203205
case RecoverySuccess(highestJournalSeqNr) =>
204206
try {
205207
val highestSeqNr = Math.max(highestJournalSeqNr, state.seqNr)
206208
state = state.copy(seqNr = highestSeqNr)
209+
setup.currentSequenceNumber = highestSeqNr
207210
setup.internalLogger.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
208211
onRecoveryCompleted(state)
209212
} catch {
@@ -330,7 +333,7 @@ private[akka] final class ReplayingEvents[C, E, S](
330333
}
331334
setup.retention match {
332335
case criteria: SnapshotCountRetentionCriteriaImpl if criteria.snapshotEveryNEvents <= state.eventsReplayed =>
333-
internalSaveSnapshot(initialRunningState, state.metadata)
336+
internalSaveSnapshot(initialRunningState, setup.currentMetadata)
334337
new running.StoringSnapshot(initialRunningState, immutable.Seq.empty, SnapshotWithoutRetention)
335338
case _ =>
336339
tryUnstashOne(new running.HandlingCommands(initialRunningState))
@@ -342,10 +345,10 @@ private[akka] final class ReplayingEvents[C, E, S](
342345

343346
// WithSeqNrAccessible
344347
override def currentSequenceNumber: Long =
345-
state.seqNr
348+
setup.currentSequenceNumber
346349

347350
// WithMetadataAccessible
348351
override def metadata[M: ClassTag]: Option[M] =
349-
CompositeMetadata.extract[M](state.metadata)
352+
CompositeMetadata.extract[M](setup.currentMetadata)
350353

351354
}

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
171171

172172
setup.cancelRecoveryTimer()
173173

174+
setup.currentSequenceNumber = seqNr
175+
setup.currentMetadata = metadata
176+
174177
ReplayingEvents[C, E, S](
175178
setup,
176179
ReplayingEvents.ReplayingState(
@@ -182,8 +185,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
182185
System.nanoTime(),
183186
version,
184187
seenPerReplica,
185-
eventsReplayed = 0,
186-
metadata = metadata))
188+
eventsReplayed = 0))
187189
}
188190

189191
response match {

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -261,30 +261,24 @@ private[akka] object Running {
261261
import Running.RunningState
262262
import Running.formatTimestamp
263263

264-
// Needed for WithSeqNrAccessible, when unstashing
265-
private var _currentSequenceNumber = 0L
266-
267-
// Needed for WithMetadataAccessible
268-
private var _currentMetadata: Option[Any] = None
269-
270264
private var recursiveUnstashOne = 0
271265

272266
private def updateMetadata(metadataEntries: Seq[Any]): Unit = {
273267
if (metadataEntries.isEmpty)
274-
_currentMetadata = None
268+
setup.currentMetadata = None
275269
else if (metadataEntries.size == 1)
276-
_currentMetadata = Some(metadataEntries.head)
270+
setup.currentMetadata = Some(metadataEntries.head)
277271
else
278-
_currentMetadata = Some(CompositeMetadata(metadataEntries))
272+
setup.currentMetadata = Some(CompositeMetadata(metadataEntries))
279273
}
280274

281275
final class HandlingCommands(state: RunningState[S])
282276
extends AbstractBehavior[InternalProtocol](setup.context)
283277
with WithSeqNrAccessible
284278
with WithMetadataAccessible {
285279

286-
_currentSequenceNumber = state.seqNr
287-
_currentMetadata = None
280+
setup.currentSequenceNumber = state.seqNr
281+
setup.currentMetadata = None
288282

289283
private def alreadySeen(e: ReplicatedEvent[_]): Boolean = {
290284
e.originSequenceNr <= state.seenPerReplica.getOrElse(e.originReplica, 0L)
@@ -502,8 +496,8 @@ private[akka] object Running {
502496
replication: ReplicationSetup,
503497
event: ReplicatedEvent[E],
504498
ackToOnPersisted: Option[ActorRef[Done]]): Behavior[InternalProtocol] = {
505-
_currentSequenceNumber = state.seqNr + 1
506-
_currentMetadata = event.metadata
499+
setup.currentSequenceNumber = state.seqNr + 1
500+
setup.currentMetadata = event.metadata
507501
val isConcurrent: Boolean = event.originVersion <> state.version
508502
val updatedVersion = event.originVersion.merge(state.version)
509503

@@ -569,7 +563,7 @@ private[akka] object Running {
569563
// apply the event before persist so that validation exception is handled before persisting
570564
// the invalid event, in case such validation is implemented in the event handler.
571565
// also, ensure that there is an event handler for each single event
572-
_currentSequenceNumber = state.seqNr + 1
566+
setup.currentSequenceNumber = state.seqNr + 1
573567
updateMetadata(metadataEntries)
574568

575569
setup.replication.foreach(r => r.setContext(recoveryRunning = false, r.replicaId, concurrent = false))
@@ -580,9 +574,13 @@ private[akka] object Running {
580574

581575
val newState2 = setup.replication match {
582576
case Some(replication) =>
583-
val updatedVersion = stateAfterApply.version.updated(replication.replicaId.id, _currentSequenceNumber)
577+
val updatedVersion = stateAfterApply.version.updated(replication.replicaId.id, setup.currentSequenceNumber)
584578
val replicatedEventMetadata =
585-
ReplicatedEventMetadata(replication.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false)
579+
ReplicatedEventMetadata(
580+
replication.replicaId,
581+
setup.currentSequenceNumber,
582+
updatedVersion,
583+
concurrent = false)
586584
val r = internalPersist(
587585
OptionVal.Some(cmd),
588586
stateAfterApply,
@@ -626,7 +624,7 @@ private[akka] object Running {
626624
// apply the event before persist so that validation exception is handled before persisting
627625
// the invalid event, in case such validation is implemented in the event handler.
628626
// also, ensure that there is an event handler for each single event
629-
_currentSequenceNumber = state.seqNr
627+
setup.currentSequenceNumber = state.seqNr
630628

631629
val replicatedEventMetadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match {
632630
case Some(replication) =>
@@ -641,25 +639,26 @@ private[akka] object Running {
641639

642640
eventsWithMetadata.foreach { evtWithMeta =>
643641
val event = evtWithMeta.event
644-
_currentSequenceNumber += 1
642+
setup.currentSequenceNumber += 1
645643
updateMetadata(evtWithMeta.metadataEntries)
646644
val evtManifest = setup.eventAdapter.manifest(event)
647645
val eventMetadata = replicatedEventMetadataTemplate match {
648646
case Some(template) =>
649-
val updatedVersion = currentState.version.updated(template.originReplica.id, _currentSequenceNumber)
647+
val updatedVersion =
648+
currentState.version.updated(template.originReplica.id, setup.currentSequenceNumber)
650649
if (setup.internalLogger.isDebugEnabled)
651650
setup.internalLogger.trace(
652651
"Processing event [{}] with version vector [{}]",
653652
Logging.simpleName(event.getClass),
654653
updatedVersion)
655654
currentState = currentState.copy(version = updatedVersion)
656-
template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion) +: evtWithMeta.metadataEntries
655+
template.copy(originSequenceNr = setup.currentSequenceNumber, version = updatedVersion) +: evtWithMeta.metadataEntries
657656
case None => evtWithMeta.metadataEntries
658657
}
659658

660659
currentState = currentState.applyEvent(setup, event)
661660
if (shouldSnapshotAfterPersist == NoSnapshot)
662-
shouldSnapshotAfterPersist = setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber)
661+
shouldSnapshotAfterPersist = setup.shouldSnapshot(currentState.state, event, setup.currentSequenceNumber)
663662

664663
val adaptedEvent = adaptEvent(currentState.state, event)
665664

@@ -684,7 +683,7 @@ private[akka] object Running {
684683
}
685684
} else {
686685
// run side-effects even when no events are emitted
687-
_currentMetadata = None
686+
setup.currentMetadata = None
688687
(applySideEffects(sideEffects, state), true)
689688
}
690689
}
@@ -761,11 +760,11 @@ private[akka] object Running {
761760

762761
// WithSeqNrAccessible
763762
override def currentSequenceNumber: Long =
764-
_currentSequenceNumber
763+
setup.currentSequenceNumber
765764

766765
// WithMetadataAccessible
767766
override def metadata[M: ClassTag]: Option[M] =
768-
CompositeMetadata.extract[M](_currentMetadata)
767+
CompositeMetadata.extract[M](setup.currentMetadata)
769768

770769
}
771770

@@ -910,7 +909,7 @@ private[akka] object Running {
910909
onWriteDone(setup.context, p)
911910
if (shouldSnapshotAfterPersist == SnapshotWithRetention)
912911
setup.retentionProgressSaveSnapshotStarted(state2.seqNr)
913-
internalSaveSnapshot(state2, _currentMetadata)
912+
internalSaveSnapshot(state2, setup.currentMetadata)
914913
new StoringSnapshot(state2.clearInstrumentationContext, sideEffects, shouldSnapshotAfterPersist)
915914
}
916915
}
@@ -979,11 +978,11 @@ private[akka] object Running {
979978

980979
// WithSeqNrAccessible
981980
override def currentSequenceNumber: Long =
982-
_currentSequenceNumber
981+
setup.currentSequenceNumber
983982

984983
// WithMetadataAccessible
985984
override def metadata[M: ClassTag]: Option[M] =
986-
CompositeMetadata.extract[M](_currentMetadata)
985+
CompositeMetadata.extract[M](setup.currentMetadata)
987986
}
988987

989988
// ===============================================
@@ -1072,11 +1071,11 @@ private[akka] object Running {
10721071

10731072
// WithSeqNrAccessible
10741073
override def currentSequenceNumber: Long =
1075-
_currentSequenceNumber
1074+
setup.currentSequenceNumber
10761075

10771076
// WithMetadataAccessible
10781077
override def metadata[M: ClassTag]: Option[M] =
1079-
CompositeMetadata.extract[M](_currentMetadata)
1078+
CompositeMetadata.extract[M](setup.currentMetadata)
10801079
}
10811080

10821081
// ===============================================
@@ -1240,11 +1239,11 @@ private[akka] object Running {
12401239

12411240
// WithSeqNrAccessible
12421241
override def currentSequenceNumber: Long =
1243-
_currentSequenceNumber
1242+
setup.currentSequenceNumber
12441243

12451244
// WithMetadataAccessible
12461245
override def metadata[M: ClassTag]: Option[M] =
1247-
CompositeMetadata.extract[M](_currentMetadata)
1246+
CompositeMetadata.extract[M](setup.currentMetadata)
12481247
}
12491248

12501249
// ===============================================
@@ -1291,11 +1290,11 @@ private[akka] object Running {
12911290

12921291
// WithSeqNrAccessible
12931292
override def currentSequenceNumber: Long =
1294-
_currentSequenceNumber
1293+
setup.currentSequenceNumber
12951294

12961295
// WithMetadataAccessible
12971296
override def metadata[M: ClassTag]: Option[M] =
1298-
CompositeMetadata.extract[M](_currentMetadata)
1297+
CompositeMetadata.extract[M](setup.currentMetadata)
12991298
}
13001299

13011300
// --------------------------

0 commit comments

Comments
 (0)