1919
2020package io .temporal .internal .replay ;
2121
22+ import io .temporal .common .converter .DataConverter ;
2223import io .temporal .internal .common .OptionsUtils ;
2324import io .temporal .internal .common .WorkflowExecutionUtils ;
2425import io .temporal .internal .replay .HistoryHelper .DecisionEvents ;
5253import io .temporal .proto .event .EventType ;
5354import io .temporal .proto .event .ExternalWorkflowExecutionCancelRequestedEventAttributes ;
5455import io .temporal .proto .event .HistoryEvent ;
55- import io .temporal .proto .event .MarkerRecordedEventAttributes ;
5656import io .temporal .proto .event .RequestCancelActivityTaskFailedEventAttributes ;
5757import io .temporal .proto .event .RequestCancelExternalWorkflowExecutionFailedEventAttributes ;
5858import io .temporal .proto .event .StartChildWorkflowExecutionFailedEventAttributes ;
7070import java .util .Map ;
7171import java .util .Objects ;
7272import java .util .Optional ;
73- import java .util .function .Predicate ;
7473
7574class DecisionsHelper {
7675
@@ -117,7 +116,7 @@ long getNextDecisionEventId() {
117116 }
118117
119118 long scheduleActivityTask (ScheduleActivityTaskDecisionAttributes schedule ) {
120- addAllMissingVersionMarker (false , Optional . empty () );
119+ addAllMissingVersionMarker ();
121120
122121 long nextDecisionEventId = getNextDecisionEventId ();
123122 DecisionId decisionId = new DecisionId (DecisionTarget .ACTIVITY , nextDecisionEventId );
@@ -198,7 +197,7 @@ boolean handleRequestCancelActivityTaskFailed(HistoryEvent event) {
198197 }
199198
200199 long startChildWorkflowExecution (StartChildWorkflowExecutionDecisionAttributes childWorkflow ) {
201- addAllMissingVersionMarker (false , Optional . empty () );
200+ addAllMissingVersionMarker ();
202201
203202 long nextDecisionEventId = getNextDecisionEventId ();
204203 DecisionId decisionId = new DecisionId (DecisionTarget .CHILD_WORKFLOW , nextDecisionEventId );
@@ -269,7 +268,7 @@ boolean handleStartChildWorkflowExecutionFailed(HistoryEvent event) {
269268 */
270269 long requestCancelExternalWorkflowExecution (
271270 RequestCancelExternalWorkflowExecutionDecisionAttributes schedule ) {
272- addAllMissingVersionMarker (false , Optional . empty () );
271+ addAllMissingVersionMarker ();
273272
274273 long nextDecisionEventId = getNextDecisionEventId ();
275274 DecisionId decisionId =
@@ -306,7 +305,7 @@ void handleRequestCancelExternalWorkflowExecutionFailed(HistoryEvent event) {
306305 }
307306
308307 long signalExternalWorkflowExecution (SignalExternalWorkflowExecutionDecisionAttributes signal ) {
309- addAllMissingVersionMarker (false , Optional . empty () );
308+ addAllMissingVersionMarker ();
310309
311310 long nextDecisionEventId = getNextDecisionEventId ();
312311 DecisionId decisionId =
@@ -339,7 +338,7 @@ boolean handleExternalWorkflowExecutionSignaled(long initiatedEventId) {
339338 }
340339
341340 long startTimer (StartTimerDecisionAttributes request ) {
342- addAllMissingVersionMarker (false , Optional . empty () );
341+ addAllMissingVersionMarker ();
343342
344343 long startEventId = getNextDecisionEventId ();
345344 DecisionId decisionId = new DecisionId (DecisionTarget .TIMER , startEventId );
@@ -461,7 +460,7 @@ public void handleWorkflowExecutionCompleted(HistoryEvent event) {
461460 }
462461
463462 void completeWorkflowExecution (byte [] output ) {
464- addAllMissingVersionMarker (false , Optional . empty () );
463+ addAllMissingVersionMarker ();
465464
466465 Decision decision =
467466 Decision .newBuilder ()
@@ -475,7 +474,7 @@ void completeWorkflowExecution(byte[] output) {
475474 }
476475
477476 void continueAsNewWorkflowExecution (ContinueAsNewWorkflowExecutionParameters continueParameters ) {
478- addAllMissingVersionMarker (false , Optional . empty () );
477+ addAllMissingVersionMarker ();
479478
480479 HistoryEvent firstEvent = task .getHistory ().getEvents (0 );
481480 if (!firstEvent .hasWorkflowExecutionStartedEventAttributes ()) {
@@ -522,7 +521,7 @@ void continueAsNewWorkflowExecution(ContinueAsNewWorkflowExecutionParameters con
522521 }
523522
524523 void failWorkflowExecution (WorkflowExecutionException failure ) {
525- addAllMissingVersionMarker (false , Optional . empty () );
524+ addAllMissingVersionMarker ();
526525
527526 Decision decision =
528527 Decision .newBuilder ()
@@ -541,7 +540,7 @@ void failWorkflowExecution(WorkflowExecutionException failure) {
541540 * CancelWorkflowExecution was created.
542541 */
543542 void cancelWorkflowExecution () {
544- addAllMissingVersionMarker (false , Optional . empty () );
543+ addAllMissingVersionMarker ();
545544
546545 Decision decision =
547546 Decision .newBuilder ()
@@ -685,60 +684,90 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) {
685684 nextDecisionEventId ++;
686685 }
687686
688- // This is to support the case where a getVersion call presents during workflow execution but
689- // is removed in replay.
690- void addAllMissingVersionMarker (
691- boolean isNextDecisionVersionMarker ,
692- Optional <Predicate <MarkerRecordedEventAttributes >> isDifferentChange ) {
693- boolean added ;
694- do {
695- added = addMissingVersionMarker (isNextDecisionVersionMarker , isDifferentChange );
696- } while (added );
687+ void addAllMissingVersionMarker () {
688+ addAllMissingVersionMarker (Optional .empty (), Optional .empty ());
697689 }
698690
699- private boolean addMissingVersionMarker (
700- boolean isNextDecisionVersionMarker ,
701- Optional <Predicate <MarkerRecordedEventAttributes >> changeIdEquals ) {
702- Optional <HistoryEvent > optionalEvent = getOptionalDecisionEvent (nextDecisionEventId );
691+ Optional <HistoryEvent > getVersionMakerEvent (long eventId ) {
692+ Optional <HistoryEvent > optionalEvent = getOptionalDecisionEvent (eventId );
703693 if (!optionalEvent .isPresent ()) {
704- return false ;
694+ return Optional . empty () ;
705695 }
706696
707697 HistoryEvent event = optionalEvent .get ();
708698 if (event .getEventType () != EventType .MarkerRecorded ) {
709- return false ;
699+ return Optional . empty () ;
710700 }
711701
712702 if (!event
713703 .getMarkerRecordedEventAttributes ()
714704 .getMarkerName ()
715705 .equals (ClockDecisionContext .VERSION_MARKER_NAME )) {
716- return false ;
706+ return Optional . empty () ;
717707 }
708+ return Optional .of (event );
709+ }
718710
719- // Next decision is for version marker and the event is for the same.
720- if (isNextDecisionVersionMarker
721- && (!changeIdEquals .isPresent ()
722- || changeIdEquals .get ().test (event .getMarkerRecordedEventAttributes ()))) {
723- return false ;
711+ /**
712+ * As getVersion calls can be added and removed any time this method inserts missing decision
713+ * events that correspond to removed getVersion calls.
714+ *
715+ * @param changeId optional getVersion change id to compare
716+ * @param converter must be present if changeId is present
717+ */
718+ void addAllMissingVersionMarker (Optional <String > changeId , Optional <DataConverter > converter ) {
719+ Optional <HistoryEvent > markerEvent = getVersionMakerEvent (nextDecisionEventId );
720+
721+ if (!markerEvent .isPresent ()) {
722+ return ;
724723 }
725724
726- // If we have a version marker in history event but not in decisions, let's add one.
727- RecordMarkerDecisionAttributes .Builder marker =
728- RecordMarkerDecisionAttributes .newBuilder ()
729- .setMarkerName (ClockDecisionContext .VERSION_MARKER_NAME )
730- .setHeader (event .getMarkerRecordedEventAttributes ().getHeader ())
731- .setDetails (event .getMarkerRecordedEventAttributes ().getDetails ());
732- Decision markerDecision =
733- Decision .newBuilder ()
734- .setDecisionType (DecisionType .RecordMarker )
735- .setRecordMarkerDecisionAttributes (marker )
736- .build ();
737- DecisionId markerDecisionId = new DecisionId (DecisionTarget .MARKER , nextDecisionEventId );
738- decisions .put (
739- markerDecisionId , new MarkerDecisionStateMachine (markerDecisionId , markerDecision ));
740- nextDecisionEventId ++;
741- return true ;
725+ // Look ahead to see if there is a marker with changeId following current version marker
726+ // If it is the case then all the markers that precede it should be added as decisions
727+ // as their correspondent getVersion calls were removed.
728+ long changeIdMarkerEventId = -1 ;
729+ if (changeId .isPresent ()) {
730+ String id = changeId .get ();
731+ long eventId = nextDecisionEventId ;
732+ while (true ) {
733+ MarkerHandler .MarkerInterface markerData =
734+ MarkerHandler .MarkerInterface .fromEventAttributes (
735+ markerEvent .get ().getMarkerRecordedEventAttributes (), converter .get ());
736+
737+ if (id .equals (markerData .getId ())) {
738+ changeIdMarkerEventId = eventId ;
739+ break ;
740+ }
741+ eventId ++;
742+ markerEvent = getVersionMakerEvent (eventId );
743+ if (!markerEvent .isPresent ()) {
744+ break ;
745+ }
746+ }
747+ // There are no version markers preceding a marker with the changeId
748+ if (changeIdMarkerEventId < 0 || changeIdMarkerEventId == nextDecisionEventId ) {
749+ return ;
750+ }
751+ }
752+ do {
753+ // If we have a version marker in history event but not in decisions, let's add one.
754+ RecordMarkerDecisionAttributes .Builder attributes =
755+ RecordMarkerDecisionAttributes .newBuilder ()
756+ .setMarkerName (ClockDecisionContext .VERSION_MARKER_NAME )
757+ .setHeader (markerEvent .get ().getMarkerRecordedEventAttributes ().getHeader ())
758+ .setDetails (markerEvent .get ().getMarkerRecordedEventAttributes ().getDetails ());
759+ Decision markerDecision =
760+ Decision .newBuilder ()
761+ .setDecisionType (DecisionType .RecordMarker )
762+ .setRecordMarkerDecisionAttributes (attributes )
763+ .build ();
764+ DecisionId markerDecisionId = new DecisionId (DecisionTarget .MARKER , nextDecisionEventId );
765+ decisions .put (
766+ markerDecisionId , new MarkerDecisionStateMachine (markerDecisionId , markerDecision ));
767+ nextDecisionEventId ++;
768+ markerEvent = getVersionMakerEvent (nextDecisionEventId );
769+ } while (markerEvent .isPresent ()
770+ && (changeIdMarkerEventId < 0 || nextDecisionEventId < changeIdMarkerEventId ));
742771 }
743772
744773 private DecisionStateMachine getDecision (DecisionId decisionId ) {
0 commit comments