11use crate :: integ_tests:: activity_functions:: echo;
22use anyhow:: anyhow;
33use futures_util:: future:: join_all;
4+ use rstest:: Context ;
45use std:: {
56 sync:: {
67 Arc ,
@@ -11,7 +12,8 @@ use std::{
1112use temporal_client:: { WfClientExt , WorkflowClientTrait , WorkflowOptions } ;
1213use temporal_sdk:: {
1314 ActContext , ActivityError , ActivityOptions , CancellableFuture , LocalActivityOptions ,
14- UpdateContext , WfContext , WorkflowResult , interceptors:: WorkerInterceptor ,
15+ UpdateContext , WfContext , WorkflowResult ,
16+ interceptors:: { FailOnNondeterminismInterceptor , WorkerInterceptor } ,
1517} ;
1618use temporal_sdk_core:: replay:: HistoryForReplay ;
1719use temporal_sdk_core_protos:: {
@@ -29,8 +31,8 @@ use temporal_sdk_core_protos::{
2931 } ,
3032} ;
3133use temporal_sdk_core_test_utils:: {
32- CoreWfStarter , WorkflowHandleExt , history_from_proto_binary, replay_sdk_worker ,
33- workflows:: la_problem_workflow,
34+ CoreWfStarter , WorkflowHandleExt , history_from_proto_binary, init_core_replay_preloaded ,
35+ replay_sdk_worker , workflows:: la_problem_workflow,
3436} ;
3537use tokio_util:: sync:: CancellationToken ;
3638
@@ -757,13 +759,18 @@ async fn la_resolve_same_time_as_other_cancel() {
757759}
758760
759761#[ rstest:: rstest]
762+ #[ case( 200 , 0 ) ]
763+ #[ case( 200 , 2000 ) ]
764+ #[ case( 2000 , 0 ) ]
765+ #[ case( 2000 , 2000 ) ]
760766#[ tokio:: test]
761767async fn long_local_activity_with_update (
762- #[ values( 200 , 2000 ) ] update_interval_ms : u64 ,
763- #[ values( 0 , 2000 ) ] update_inner_timer : u64 ,
768+ #[ context] ctx : Context ,
769+ #[ case] update_interval_ms : u64 ,
770+ #[ case] update_inner_timer : u64 ,
764771) {
765- let wf_name = "long_local_activity_with_update" ;
766- let mut starter = CoreWfStarter :: new ( wf_name) ;
772+ let wf_name = format ! ( "{}-{}" , ctx . name , ctx . case . unwrap ( ) ) ;
773+ let mut starter = CoreWfStarter :: new ( & wf_name) ;
767774 starter. workflow_options . task_timeout = Some ( Duration :: from_secs ( 1 ) ) ;
768775 let mut worker = starter. worker ( ) . await ;
769776 let client = starter. get_client ( ) . await ;
@@ -793,15 +800,17 @@ async fn long_local_activity_with_update(
793800 ..Default :: default ( )
794801 } )
795802 . await ;
796- dbg ! ( update_counter. load( Ordering :: Relaxed ) ) ;
803+ update_counter. load ( Ordering :: Relaxed ) ;
797804 Ok ( ( ) . into ( ) )
798805 } ) ;
799806 worker. register_activity ( "delay" , |_: ActContext , _: String | async {
800807 tokio:: time:: sleep ( Duration :: from_secs ( 6 ) ) . await ;
801808 Ok ( ( ) )
802809 } ) ;
803810
804- let handle = starter. start_with_worker ( wf_name, & mut worker) . await ;
811+ let handle = starter
812+ . start_with_worker ( wf_name. clone ( ) , & mut worker)
813+ . await ;
805814
806815 let wf_id = starter. get_task_queue ( ) . to_string ( ) ;
807816 let update = async {
@@ -834,4 +843,19 @@ async fn long_local_activity_with_update(
834843 . await
835844 . unwrap ( ) ;
836845 assert_eq ! ( res[ 0 ] , replay_res. unwrap( ) ) ;
846+
847+ // Load histories from pre-fix version and ensure compat
848+ let replay_worker = init_core_replay_preloaded (
849+ starter. get_task_queue ( ) ,
850+ [ HistoryForReplay :: new (
851+ history_from_proto_binary ( & format ! ( "histories/{}_history.bin" , wf_name) )
852+ . await
853+ . unwrap ( ) ,
854+ "fake" . to_owned ( ) ,
855+ ) ] ,
856+ ) ;
857+ let inner_worker = worker. inner_mut ( ) ;
858+ inner_worker. with_new_core_worker ( replay_worker) ;
859+ inner_worker. set_worker_interceptor ( FailOnNondeterminismInterceptor { } ) ;
860+ inner_worker. run ( ) . await . unwrap ( ) ;
837861}
0 commit comments