From b8dab91c0b0ac04535cc8bc4d495d426d348d204 Mon Sep 17 00:00:00 2001 From: Matthew Yeung Date: Wed, 11 Mar 2026 22:34:41 +0000 Subject: [PATCH 1/3] Implement scatter-done event handling for RDMA writes Added handling for scatter-done events to prevent RDMA writes from racing with ongoing reads in the same pool. Introduced event recording for synchronization in multi-batch processing. Signed-off-by: Matthew Yeung --- lmcache_ascend/v1/npu_connector.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/lmcache_ascend/v1/npu_connector.py b/lmcache_ascend/v1/npu_connector.py index 885b637a..4213adc3 100755 --- a/lmcache_ascend/v1/npu_connector.py +++ b/lmcache_ascend/v1/npu_connector.py @@ -979,9 +979,31 @@ def _remote_batched_to_gpu(self, memory_objs, starts, ends, **kwargs): prev_read_event = None prev_batch = None + # Per-pool scatter-done events: prevent the next RDMA + # write into a pool from racing with a scatter that is + # still reading from the same pool on load_stream. + # Events are pre-allocated and re-recorded each iteration. + channel = proxy_items[0][0]._transfer_channel + transport_stream = getattr( + channel, "transport_stream", None + ) + pool_scatter_events = [ + torch.npu.Event(), + torch.npu.Event(), + ] + pool_scatter_recorded = [False, False] + for batch_idx, batch in enumerate(micro_batches): pool = pools[current_pool] + # Ensure the previous scatter from this pool has + # finished before RDMA overwrites the pool buffers. + if pool_scatter_recorded[current_pool] \ + and transport_stream is not None: + transport_stream.wait_event( + pool_scatter_events[current_pool] + ) + # Assign backing buffers from current pool to proxies for i, (proxy, _, _) in enumerate(batch): proxy.set_backing_obj(pool[i]) @@ -1000,9 +1022,10 @@ def _remote_batched_to_gpu(self, memory_objs, starts, ends, **kwargs): prev_read_event, **kwargs, ) - # TODO (gingfung): investigate whether - # we need to record scatter-done event on load_stream - # so the next RDMA into the same pool waits for it. + pool_scatter_events[1 - current_pool].record( + self.load_stream + ) + pool_scatter_recorded[1 - current_pool] = True self._clear_proxy_batch(prev_batch) prev_read_event = cur_read_event From 31d3afe7d9445eaedb95fc3d67827df0f739ba60 Mon Sep 17 00:00:00 2001 From: Matthew Yeung Date: Wed, 11 Mar 2026 22:40:06 +0000 Subject: [PATCH 2/3] Fix Lint Signed-off-by: Matthew Yeung --- lmcache_ascend/v1/npu_connector.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/lmcache_ascend/v1/npu_connector.py b/lmcache_ascend/v1/npu_connector.py index 4213adc3..a7321b9e 100755 --- a/lmcache_ascend/v1/npu_connector.py +++ b/lmcache_ascend/v1/npu_connector.py @@ -984,9 +984,7 @@ def _remote_batched_to_gpu(self, memory_objs, starts, ends, **kwargs): # still reading from the same pool on load_stream. # Events are pre-allocated and re-recorded each iteration. channel = proxy_items[0][0]._transfer_channel - transport_stream = getattr( - channel, "transport_stream", None - ) + transport_stream = getattr(channel, "transport_stream", None) pool_scatter_events = [ torch.npu.Event(), torch.npu.Event(), @@ -998,11 +996,11 @@ def _remote_batched_to_gpu(self, memory_objs, starts, ends, **kwargs): # Ensure the previous scatter from this pool has # finished before RDMA overwrites the pool buffers. - if pool_scatter_recorded[current_pool] \ - and transport_stream is not None: - transport_stream.wait_event( - pool_scatter_events[current_pool] - ) + if ( + pool_scatter_recorded[current_pool] + and transport_stream is not None + ): + transport_stream.wait_event(pool_scatter_events[current_pool]) # Assign backing buffers from current pool to proxies for i, (proxy, _, _) in enumerate(batch): @@ -1022,9 +1020,7 @@ def _remote_batched_to_gpu(self, memory_objs, starts, ends, **kwargs): prev_read_event, **kwargs, ) - pool_scatter_events[1 - current_pool].record( - self.load_stream - ) + pool_scatter_events[1 - current_pool].record(self.load_stream) pool_scatter_recorded[1 - current_pool] = True self._clear_proxy_batch(prev_batch) From 76826661c18f19996a6ec7e37938766e443c8dbb Mon Sep 17 00:00:00 2001 From: Matthew Yeung Date: Thu, 12 Mar 2026 11:39:35 +0000 Subject: [PATCH 3/3] Add synchronization between compute and load streams Ensure compute stream waits for load_stream's KV scatter completion before attention reads. Signed-off-by: Matthew Yeung --- lmcache_ascend/v1/npu_connector.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lmcache_ascend/v1/npu_connector.py b/lmcache_ascend/v1/npu_connector.py index a7321b9e..18165e8c 100755 --- a/lmcache_ascend/v1/npu_connector.py +++ b/lmcache_ascend/v1/npu_connector.py @@ -885,6 +885,14 @@ def batched_to_gpu(self, memory_objs, starts, ends, **kwargs): assert not is_310p(), "Batched P2P transfer is not supported on 310P." self._remote_batched_to_gpu(memory_objs, starts, ends, **kwargs) + + # NOTE (gingfung): Ensure the compute stream waits for + # load_stream's KV scatter to complete before attention + # reads the same pages. + # load_stream.synchronize() in _remote_batched_to_gpu is + # host-side only, the compute stream has no knowledge of it + # and can race ahead. + torch.npu.current_stream().wait_stream(self.load_stream) else: with torch.cuda.stream(self.load_stream): for memory_obj, start, end in zip(