From 6cd9c557b984615bb5adaba7f414d0294d4a12f6 Mon Sep 17 00:00:00 2001 From: Namit Dhameja Date: Sun, 13 Jul 2025 00:05:56 -0700 Subject: [PATCH 1/3] Added in process wrapper restart latency --- src/nvidia_resiliency_ext/inprocess/wrap.py | 24 +++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/nvidia_resiliency_ext/inprocess/wrap.py b/src/nvidia_resiliency_ext/inprocess/wrap.py index 27d206a2..f70b1dae 100644 --- a/src/nvidia_resiliency_ext/inprocess/wrap.py +++ b/src/nvidia_resiliency_ext/inprocess/wrap.py @@ -401,6 +401,7 @@ def __call__(self, fn, args, kwargs): wrapper = self.wrapper progress_watchdog = self.progress_watchdog + exception_recvd_time = None rank_assignment_ctx = RankAssignmentCtx(state, store, set()) reassigned_ctx = wrapper.rank_assignment(rank_assignment_ctx) self.state = state = reassigned_ctx.state @@ -463,6 +464,27 @@ def __call__(self, fn, args, kwargs): raise HealthCheckError from health_ex if state.mode == Mode.ACTIVE: + if exception_recvd_time is not None: + # Store local trigger time in the distributed store + store.set(f"exception_recvd_time_{state.rank}", str(exception_recvd_time)) + # Barrier to ensure all ranks have set their trigger time + store.completion_barrier( + ranks=[state.rank], + rendezvous_count=state.world_size, + timeout=wrapper.completion_timeout, + timeout_chunk=wrapper.progress_watchdog_interval, + ) + if state.rank == 0: + excp_recvd_times = [] + for r in range(state.world_size): + excp_recvd_times.append(float(store.get(f"exception_recvd_time_{r}"))) + restart_latency_min = int((time.monotonic() - min(excp_recvd_times))*1000) + restart_latency_max = int((time.monotonic() - max(excp_recvd_times))*1000) + log.info(f"In-Process Wrapper restart latency: ({restart_latency_min}, {restart_latency_max}) ms") + # Also log local latency for reference + local_restart_latency = int((time.monotonic() - exception_recvd_time)*1000) + log.debug(f"Local In-Process Wrapper restart latency: {local_restart_latency} ms") + exception_recvd_time = None ret = fn(*args, **kwargs) store.record_completed() elif state.mode == Mode.INACTIVE: @@ -484,6 +506,8 @@ def __call__(self, fn, args, kwargs): ) except Exception as fn_ex: try: + if exception_recvd_time is None: + exception_recvd_time = time.monotonic() log.error(log_exc(state, fn_ex, 'fn_ex')) monitor_process.record_interrupted( [InterruptionRecord(state.rank, Interruption.EXCEPTION)] From e93a506c180ad0ae42a147ba34ae0401f49b3aab Mon Sep 17 00:00:00 2001 From: Namit Dhameja Date: Sun, 13 Jul 2025 03:17:11 -0700 Subject: [PATCH 2/3] fix lint --- src/nvidia_resiliency_ext/inprocess/wrap.py | 29 ++++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/nvidia_resiliency_ext/inprocess/wrap.py b/src/nvidia_resiliency_ext/inprocess/wrap.py index f70b1dae..da47a968 100644 --- a/src/nvidia_resiliency_ext/inprocess/wrap.py +++ b/src/nvidia_resiliency_ext/inprocess/wrap.py @@ -466,7 +466,10 @@ def __call__(self, fn, args, kwargs): if state.mode == Mode.ACTIVE: if exception_recvd_time is not None: # Store local trigger time in the distributed store - store.set(f"exception_recvd_time_{state.rank}", str(exception_recvd_time)) + store.set( + f"exception_recvd_time_{state.rank}", + str(exception_recvd_time), + ) # Barrier to ensure all ranks have set their trigger time store.completion_barrier( ranks=[state.rank], @@ -477,13 +480,25 @@ def __call__(self, fn, args, kwargs): if state.rank == 0: excp_recvd_times = [] for r in range(state.world_size): - excp_recvd_times.append(float(store.get(f"exception_recvd_time_{r}"))) - restart_latency_min = int((time.monotonic() - min(excp_recvd_times))*1000) - restart_latency_max = int((time.monotonic() - max(excp_recvd_times))*1000) - log.info(f"In-Process Wrapper restart latency: ({restart_latency_min}, {restart_latency_max}) ms") + excp_recvd_times.append( + float(store.get(f"exception_recvd_time_{r}")) + ) + restart_latency_min = int( + (time.monotonic() - min(excp_recvd_times)) * 1000 + ) + restart_latency_max = int( + (time.monotonic() - max(excp_recvd_times)) * 1000 + ) + log.info( + f"In-Process Wrapper restart latency: ({restart_latency_min}, {restart_latency_max}) ms" + ) # Also log local latency for reference - local_restart_latency = int((time.monotonic() - exception_recvd_time)*1000) - log.debug(f"Local In-Process Wrapper restart latency: {local_restart_latency} ms") + local_restart_latency = int( + (time.monotonic() - exception_recvd_time) * 1000 + ) + log.debug( + f"Local In-Process Wrapper restart latency: {local_restart_latency} ms" + ) exception_recvd_time = None ret = fn(*args, **kwargs) store.record_completed() From c8f9e1f40ef7ef72b913456947f6bb7010c40940 Mon Sep 17 00:00:00 2001 From: Namit Dhameja Date: Mon, 14 Jul 2025 11:48:18 -0700 Subject: [PATCH 3/3] align on how checkpoint metrics are logged --- src/nvidia_resiliency_ext/inprocess/wrap.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/nvidia_resiliency_ext/inprocess/wrap.py b/src/nvidia_resiliency_ext/inprocess/wrap.py index da47a968..569cd7d1 100644 --- a/src/nvidia_resiliency_ext/inprocess/wrap.py +++ b/src/nvidia_resiliency_ext/inprocess/wrap.py @@ -490,14 +490,14 @@ def __call__(self, fn, args, kwargs): (time.monotonic() - max(excp_recvd_times)) * 1000 ) log.info( - f"In-Process Wrapper restart latency: ({restart_latency_min}, {restart_latency_max}) ms" + f"global-in-process ....: ({restart_latency_min}, {restart_latency_max})" ) # Also log local latency for reference local_restart_latency = int( (time.monotonic() - exception_recvd_time) * 1000 ) log.debug( - f"Local In-Process Wrapper restart latency: {local_restart_latency} ms" + f"local-in-process ....: {local_restart_latency}" ) exception_recvd_time = None ret = fn(*args, **kwargs)