Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 227 additions & 108 deletions paddle/phi/api/include/compat/c10/core/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,147 +16,266 @@

#include <c10/core/Device.h>
#include <c10/core/DeviceType.h>
#include <c10/cuda/CUDAStream.h>
#include <queue>
namespace c10 {
#include <c10/core/Stream.h>

/**
* A backend-generic movable, not copyable, not thread-safe event.
*
* The design of this event follows that of CUDA and HIP events. These events
* are recorded and waited on by streams and can be rerecorded to,
* each rerecording essentially creating a new version of the event.
* For example, if (in CPU time), stream X is asked to record E,
* stream Y waits on E, and stream X is asked to record E again, then Y will
* wait for X to finish the first call to record and not the second, because
* it's waiting on the first version of event E, not the second.
* Querying an event only returns the status of its most recent version.
*
* Backend-generic events are implemented by this class and
* impl::InlineEvent. In addition to these events there are also
* some backend-specific events, like ATen's CUDAEvent. Each of these
* classes has its own use.
*
* impl::InlineEvent<...> or a backend-specific event should be
* preferred when the backend is known at compile time and known to
* be compiled. Backend-specific events may have additional functionality.
*
* This Event should be used if a particular backend may not be available,
* or the backend required is not known at compile time.
*
* These generic events are built on top of DeviceGuardImpls, analogous
* to DeviceGuard and InlineDeviceGuard. The name "DeviceGuardImpls,"
* is no longer entirely accurate, as these classes implement the
* backend-specific logic for a generic backend interface.
*
* See DeviceGuardImplInterface.h for a list of all supported flags.
*/
#include <utility>

#ifdef PADDLE_WITH_CUDA
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header uses std::mutex/std::unique_lock in EventPool, but doesn't include <mutex> directly (it currently relies on transitive includes from other headers). Adding an explicit #include <mutex> under the PADDLE_WITH_CUDA guard would make this file self-contained and less fragile to include-order changes.

Suggested change
#include <c10/cuda/CUDAStream.h>
#include <c10/cuda/CUDAStream.h>
#include <mutex>

Copilot uses AI. Check for mistakes.
#endif

class EventPool {
public:
EventPool();
EventPool(const EventPool &) = delete;
EventPool(EventPool &&) = delete;
~EventPool();
namespace c10 {

cudaEvent_t CreateCudaEventFromPool();
enum class EventFlag { PYTORCH_DEFAULT, BACKEND_DEFAULT, INVALID };

static EventPool &Instance();
struct Event final {
public:
Event() = delete;
Event(const DeviceType device_type,
const EventFlag flag = EventFlag::PYTORCH_DEFAULT)
: device_type_(device_type), flag_(flag) {}

private:
std::queue<cudaEvent_t> incomplished_events_;
std::mutex mtx_;
};
Event(const Event&) = delete;
Event& operator=(const Event&) = delete;

EventPool &EventPool::Instance() {
static EventPool pool;
return pool;
}
Event(Event&& other) noexcept { MoveFrom(std::move(other)); }
Event& operator=(Event&& other) noexcept {
if (this != &other) {
DestroyCudaEvent();
MoveFrom(std::move(other));
}
return *this;
}

EventPool::EventPool() {
for (size_t i = 0; i < 1000; ++i) {
cudaEvent_t new_event;
C10_CUDA_CHECK(cudaEventCreate(&new_event));
~Event() { DestroyCudaEvent(); }

cudaEventRecord(new_event, 0);
incomplished_events_.push(new_event);
Device device() const noexcept { return Device(device_type_, device_index_); }
DeviceType device_type() const noexcept { return device_type_; }
DeviceIndex device_index() const noexcept { return device_index_; }
EventFlag flag() const noexcept { return flag_; }
bool was_marked_for_recording() const noexcept {
return was_marked_for_recording_;
}
}

EventPool::~EventPool() {
std::unique_lock<std::mutex> lock(mtx_);
while (!incomplished_events_.empty()) {
cudaEvent_t event = incomplished_events_.front();
incomplished_events_.pop();
if (cudaEventQuery(event) == cudaSuccess) {
C10_CUDA_CHECK(cudaEventDestroy(event));

void recordOnce(const Stream& stream) {
if (!was_marked_for_recording_) {
record(stream);
}
}
}

cudaEvent_t EventPool::CreateCudaEventFromPool() {
std::unique_lock<std::mutex> lock(mtx_);

const auto &CreateNewEvent = [&]() -> cudaEvent_t {
cudaEvent_t new_event;
C10_CUDA_CHECK(cudaEventCreate(&new_event));
incomplished_events_.push(new_event);
return new_event;
};

const auto &CreateNewOrReuseEvent = [&]() -> cudaEvent_t {
cudaEvent_t front_event = incomplished_events_.front();
incomplished_events_.pop();
incomplished_events_.push(front_event);
if (cudaEventQuery(front_event) == cudaSuccess) {
return front_event;

void record(const Stream& stream) {
TORCH_CHECK(stream.device_type() == device_type_,
"Event device type ",
device_type_,
" does not match recording stream's device type ",
stream.device_type(),
".");
#ifdef PADDLE_WITH_CUDA
if (device_type_ == DeviceType::CUDA) {
RecordCudaEvent(static_cast<cudaStream_t>(stream.native_handle()),
stream.device_index());
return;
}
return CreateNewEvent();
};
#endif
TORCH_CHECK(false, "Backend doesn't support events.");
}

if (incomplished_events_.empty()) {
return CreateNewEvent();
#ifdef PADDLE_WITH_CUDA
void record(const c10::cuda::CUDAStream& stream) { record(stream.unwrap()); }

// TODO(youge325): Remove after DeepEP paddle branch is updated to use
// c10::Stream
void record(const cudaStream_t& stream) {
TORCH_CHECK(
device_type_ == DeviceType::CUDA,
"Raw cudaStream_t recording is only supported for CUDA events.");
RecordCudaEvent(stream, phi::backends::gpu::GetCurrentDeviceId());
}
return CreateNewOrReuseEvent();
}
#endif

struct Event final {
public:
Event(const DeviceType &type) {
// device_type is useless, only for compatibility
cuda_event_ = EventPool::Instance().CreateCudaEventFromPool();
void block(const Stream& stream) const {
if (!was_marked_for_recording_) {
return;
}
TORCH_CHECK(stream.device_type() == device_type_,
"Event device type ",
device_type_,
" does not match blocking stream's device type ",
stream.device_type(),
".");
#ifdef PADDLE_WITH_CUDA
if (device_type_ == DeviceType::CUDA && cuda_event_) {
TORCH_CHECK(device_index_ == stream.device_index(),
"Event device index ",
static_cast<int>(device_index_),
" does not match blocking stream's device index ",
static_cast<int>(stream.device_index()),
".");
c10::cuda::CUDAGuard guard(device_index_);
C10_CUDA_CHECK(cudaStreamWaitEvent(
static_cast<cudaStream_t>(stream.native_handle()), cuda_event_, 0));
return;
}
#endif
TORCH_CHECK(false, "Backend doesn't support events.");
}

void record(const Stream &stream) {
C10_CUDA_CHECK(cudaEventRecord(
cuda_event_, static_cast<cudaStream_t>(stream.native_handle())));
bool query() const {
if (!was_marked_for_recording_) {
return true;
}
#ifdef PADDLE_WITH_CUDA
if (device_type_ == DeviceType::CUDA && cuda_event_) {
const auto err = cudaEventQuery(cuda_event_);
if (err == cudaSuccess) {
return true;
}
if (err != cudaErrorNotReady) {
C10_CUDA_CHECK(err);
} else {
(void)cudaGetLastError();
}
return false;
}
#endif
TORCH_CHECK(false, "Backend doesn't support events.");
return true;
}

void record(const c10::cuda::CUDAStream &stream) { record(stream.unwrap()); }
double elapsedTime(const Event& event) const {
TORCH_CHECK(event.device_type() == device_type_,
"Event device type ",
device_type_,
" does not match other's device type ",
event.device_type(),
".");
TORCH_CHECK(
flag_ == EventFlag::BACKEND_DEFAULT &&
event.flag_ == EventFlag::BACKEND_DEFAULT,
"Both events must be created with argument 'enable_timing=True'.");
TORCH_CHECK(
was_marked_for_recording_ && event.was_marked_for_recording_,
"Both events must be recorded before calculating elapsed time.");
TORCH_CHECK(
query() && event.query(),
"Both events must be completed before calculating elapsed time.");
#ifdef PADDLE_WITH_CUDA
if (device_type_ == DeviceType::CUDA && cuda_event_ && event.cuda_event_) {
TORCH_CHECK(device_index_ == event.device_index_,
"Event device index ",
static_cast<int>(device_index_),
" does not match other's device index ",
static_cast<int>(event.device_index_),
".");
c10::cuda::CUDAGuard guard(device_index_);
float time_ms = 0.0f;
C10_CUDA_CHECK(
cudaEventElapsedTime(&time_ms, cuda_event_, event.cuda_event_));
return static_cast<double>(time_ms);
}
#endif
TORCH_CHECK(false, "Backend doesn't support event elapsedTime.");
return 0.0;
}

// TODO(youge325): Remove after DeepEP paddle branch is updated to use
// c10::Stream
void record(const cudaStream_t &stream) {
C10_CUDA_CHECK(cudaEventRecord(cuda_event_, stream));
void* eventId() const {
#ifdef PADDLE_WITH_CUDA
return cuda_event_;
#else
return nullptr;
#endif
}

void block(const Stream &stream) const {
C10_CUDA_CHECK(cudaStreamWaitEvent(
static_cast<cudaStream_t>(stream.native_handle()), cuda_event_, 0));
void synchronize() const {
if (!was_marked_for_recording_) {
return;
}
#ifdef PADDLE_WITH_CUDA
if (device_type_ == DeviceType::CUDA && cuda_event_) {
C10_CUDA_CHECK(cudaEventSynchronize(cuda_event_));
return;
}
#endif
TORCH_CHECK(false, "Backend doesn't support events.");
}

#ifdef PADDLE_WITH_CUDA
cudaEvent_t cuda_event() const { return cuda_event_; }
#endif

private:
cudaEvent_t cuda_event_;
DeviceType device_type_;
DeviceIndex device_index_ = -1;
EventFlag flag_ = EventFlag::PYTORCH_DEFAULT;
bool was_marked_for_recording_ = false;
#ifdef PADDLE_WITH_CUDA
cudaEvent_t cuda_event_ = nullptr;

static unsigned int CudaEventCreateFlags(EventFlag flag) {
switch (flag) {
case EventFlag::PYTORCH_DEFAULT:
return cudaEventDisableTiming;
case EventFlag::BACKEND_DEFAULT:
return cudaEventDefault;
default:
TORCH_CHECK(false, "CUDA event received unknown flag");
}
}

void EnsureCudaEventCreated(DeviceIndex stream_device_index) {
if (cuda_event_) {
return;
}
c10::cuda::CUDAGuard guard(stream_device_index);
C10_CUDA_CHECK(
cudaEventCreateWithFlags(&cuda_event_, CudaEventCreateFlags(flag_)));
}

void RecordCudaEvent(cudaStream_t stream, DeviceIndex stream_device_index) {
TORCH_CHECK(device_index_ == -1 || device_index_ == stream_device_index,
"Event device index ",
static_cast<int>(device_index_),
" does not match recording stream's device index ",
static_cast<int>(stream_device_index),
".");
EnsureCudaEventCreated(stream_device_index);
c10::cuda::CUDAGuard guard(stream_device_index);
C10_CUDA_CHECK(cudaEventRecord(cuda_event_, stream));
device_index_ = stream_device_index;
was_marked_for_recording_ = true;
}

void DestroyCudaEvent() noexcept {
if (!cuda_event_) {
return;
}
try {
c10::cuda::CUDAGuard guard(device_index_);
C10_CUDA_CHECK(cudaEventDestroy(cuda_event_));
} catch (...) {
}
cuda_event_ = nullptr;
}
#else
void DestroyCudaEvent() noexcept {}
#endif

void MoveFrom(Event&& other) noexcept {
device_type_ = other.device_type_;
device_index_ = other.device_index_;
flag_ = other.flag_;
was_marked_for_recording_ = other.was_marked_for_recording_;
#ifdef PADDLE_WITH_CUDA
cuda_event_ = std::exchange(other.cuda_event_, nullptr);
#endif
other.device_index_ = -1;
other.was_marked_for_recording_ = false;
}
};

} // namespace c10

namespace torch {
using c10::Event;
} // namespace torch

#endif
Loading
Loading