@@ -695,7 +695,8 @@ fn find_end_index_of_next_wft_seq(
695695 return NextWFTSeqEndIndex :: Incomplete ( 0 ) ;
696696 }
697697 let mut last_index = 0 ;
698- let mut saw_any_command_event = false ;
698+ let mut saw_command_or_started = false ;
699+ let mut saw_command = false ;
699700 let mut wft_started_event_id_to_index = vec ! [ ] ;
700701 for ( ix, e) in events. iter ( ) . enumerate ( ) {
701702 last_index = ix;
@@ -706,8 +707,11 @@ fn find_end_index_of_next_wft_seq(
706707 continue ;
707708 }
708709
709- if e. is_command_event ( ) || e. event_type ( ) == EventType :: WorkflowExecutionStarted {
710- saw_any_command_event = true ;
710+ if e. is_command_event ( ) {
711+ saw_command = true ;
712+ if e. event_type ( ) == EventType :: WorkflowExecutionStarted {
713+ saw_command_or_started = true ;
714+ }
711715 }
712716 if e. is_final_wf_execution_event ( ) {
713717 return NextWFTSeqEndIndex :: Complete ( last_index) ;
@@ -728,13 +732,14 @@ fn find_end_index_of_next_wft_seq(
728732 | EventType :: WorkflowExecutionCanceled
729733 ) {
730734 continue ;
731- }
732- // If we've never seen an interesting event and the next two events are a completion
733- // followed immediately again by scheduled, then this is a WFT heartbeat and also
734- // doesn't conclude the sequence.
735- else if next_event_type == EventType :: WorkflowTaskCompleted {
735+ } else if next_event_type == EventType :: WorkflowTaskCompleted {
736736 if let Some ( next_next_event) = events. get ( ix + 2 ) {
737- if next_next_event. event_type ( ) == EventType :: WorkflowTaskScheduled {
737+ if !saw_command
738+ && next_next_event. event_type ( ) == EventType :: WorkflowTaskScheduled
739+ {
740+ // If we've never seen an interesting event and the next two events are
741+ // a completion followed immediately again by scheduled, then this is a
742+ // WFT heartbeat and also doesn't conclude the sequence.
738743 continue ;
739744 } else {
740745 // If we see an update accepted command after WFT completed, we want to
@@ -766,18 +771,18 @@ fn find_end_index_of_next_wft_seq(
766771 }
767772 return NextWFTSeqEndIndex :: Complete ( ix) ;
768773 }
769- } else if !has_last_wft && !saw_any_command_event {
774+ } else if !has_last_wft && !saw_command_or_started {
770775 // Don't have enough events to look ahead of the WorkflowTaskCompleted. Need
771776 // to fetch more.
772777 continue ;
773778 }
774779 }
775- } else if !has_last_wft && !saw_any_command_event {
780+ } else if !has_last_wft && !saw_command_or_started {
776781 // Don't have enough events to look ahead of the WorkflowTaskStarted. Need to fetch
777782 // more.
778783 continue ;
779784 }
780- if saw_any_command_event {
785+ if saw_command_or_started {
781786 return NextWFTSeqEndIndex :: Complete ( ix) ;
782787 }
783788 }
@@ -921,7 +926,9 @@ mod tests {
921926 let seq = next_check_peek ( & mut update, 0 ) ;
922927 assert_eq ! ( seq. len( ) , 6 ) ;
923928 let seq = next_check_peek ( & mut update, 6 ) ;
924- assert_eq ! ( seq. len( ) , 13 ) ;
929+ assert_eq ! ( seq. len( ) , 4 ) ;
930+ let seq = next_check_peek ( & mut update, 10 ) ;
931+ assert_eq ! ( seq. len( ) , 9 ) ;
925932 let seq = next_check_peek ( & mut update, 19 ) ;
926933 assert_eq ! ( seq. len( ) , 4 ) ;
927934 let seq = next_check_peek ( & mut update, 23 ) ;
0 commit comments