Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/callback/callback_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static void handler(CallbackMsg* msg) {
auto data_msg = vt::makeMessage<DataMsg>();
data_msg->vec_ = std::vector<int>{18,45,28,-1,344};
fmt::print("handler: vec.size={}\n", data_msg->vec_.size());
cb.sendMsg(data_msg);
cb.sendMsg<DataMsg>(data_msg);
}

// Some instance of the context
Expand Down
7 changes: 7 additions & 0 deletions src/vt/collective/reduce/operators/default_op.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ template <typename MsgT, typename Op, typename ActOp>
);
while (cur_msg != nullptr) {
ReduceCombine<>::combine<MsgT,Op,ActOp>(fst_msg, cur_msg);
if (!fst_msg->hasValidCallback() && cur_msg->hasValidCallback()) {
if (cur_msg->isParamCallback()) {
fst_msg->setCallback(cur_msg->getParamCallback());
} else {
fst_msg->setCallback(cur_msg->getMsgCallback());
}
}
cur_msg = cur_msg->template getNext<MsgT>();
}
}
Expand Down
63 changes: 2 additions & 61 deletions src/vt/pipe/callback/cb_union/cb_raw_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,75 +229,16 @@ struct CallbackTyped : CallbackRawBaseSingle {
}

template <typename... Params>
void sendTuple(std::tuple<Params...> tup) {
using Trait = CBTraits<Args...>;
using MsgT = messaging::ParamMsg<typename Trait::TupleType>;
auto msg = vt::makeMessage<MsgT>();
msg->setParams(std::move(tup));
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}
void sendTuple(std::tuple<Params...> tup);

template <typename... Params>
void send(Params&&... params) {
using Trait = CBTraits<Args...>;
if constexpr (std::is_same_v<typename Trait::MsgT, NoMsg>) {
// We have to go through some tricky code to make the MsgProps case work
// If we use the type for Params to send, it's possible that we have a
// type mismatch in the actual handler type. A possible edge case is when
// a char const* is sent, but the handler is a std::string. In this case,
// the ParamMsg will be cast incorrectly during the virual dispatch to a
// collection because callbacks don't have the collection type. Thus, the
// wrong ParamMsg will be cast to which requires serialization, leading to
// a failure.
if constexpr (sizeof...(Params) == sizeof...(Args) + 1) {
using MsgT = messaging::ParamMsg<
std::tuple<
std::decay_t<std::tuple_element_t<0, std::tuple<Params...>>>,
std::decay_t<Args>...
>
>;
auto msg = vt::makeMessage<MsgT>();
msg->setParams(std::forward<Params>(params)...);
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
} else {
using MsgT = messaging::ParamMsg<typename Trait::TupleType>;
auto msg = vt::makeMessage<MsgT>();
msg->setParams(std::forward<Params>(params)...);
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}
} else {
using MsgT = typename Trait::MsgT;
auto msg = makeMessage<MsgT>(std::forward<Params>(params)...);
sendMsg(msg.get());
}
}

void send(typename CBTraits<Args...>::MsgT* msg) {
using MsgT = typename CBTraits<Args...>::MsgT;
if constexpr (not std::is_same_v<MsgT, NoMsg>) {
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}
}
void send(Params&&... params);

template <typename MsgT>
void send(messaging::MsgPtrThief<MsgT> msg) {
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}

void sendMsg(messaging::MsgPtrThief<typename CBTraits<Args...>::MsgT> msg) {
using MsgT = typename CBTraits<Args...>::MsgT;
if constexpr (not std::is_same_v<MsgT, NoMsg>) {
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}
}

void sendMsg(typename CBTraits<Args...>::MsgT* msg) {
using MsgT = typename CBTraits<Args...>::MsgT;
if constexpr (not std::is_same_v<MsgT, NoMsg>) {
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}
}

template <typename SerializerT>
void serialize(SerializerT& s) {
CallbackRawBaseSingle::serialize(s);
Expand Down
45 changes: 45 additions & 0 deletions src/vt/pipe/callback/cb_union/cb_raw_base.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,51 @@ void CallbackRawBaseSingle::serialize(SerializerT& s) {
s | cb_ | pipe_;
}

template <typename... Args>
template <typename... Params>
void CallbackTyped<Args...>::sendTuple(std::tuple<Params...> tup) {
using Trait = CBTraits<Args...>;
using MsgT = messaging::ParamMsg<typename Trait::TupleType>;
auto msg = vt::makeMessage<MsgT>();
msg->setParams(std::move(tup));
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}

template <typename... Args>
template <typename... Params>
void CallbackTyped<Args...>::send(Params&&... params) {
using Trait = CBTraits<Args...>;
if constexpr (std::is_same_v<typename Trait::MsgT, NoMsg>) {
// We have to go through some tricky code to make the MsgProps case work
// If we use the type for Params to send, it's possible that we have a
// type mismatch in the actual handler type. A possible edge case is when
// a char const* is sent, but the handler is a std::string. In this case,
// the ParamMsg will be cast incorrectly during the virual dispatch to a
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'virual' to 'virtual'.

Suggested change
// the ParamMsg will be cast incorrectly during the virual dispatch to a
// the ParamMsg will be cast incorrectly during the virtual dispatch to a

Copilot uses AI. Check for mistakes.
// collection because callbacks don't have the collection type. Thus, the
// wrong ParamMsg will be cast to which requires serialization, leading to
// a failure.
if constexpr (sizeof...(Params) == sizeof...(Args) + 1) {
using MsgT = messaging::ParamMsg<
std::tuple<
std::decay_t<std::tuple_element_t<0, std::tuple<Params...>>>,
std::decay_t<Args>...
>
>;
auto msg = vt::makeMessage<MsgT>();
msg->setParams(std::forward<Params>(params)...);
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
} else {
using MsgT = messaging::ParamMsg<typename Trait::TupleType>;
auto msg = vt::makeMessage<MsgT>();
msg->setParams(std::forward<Params>(params)...);
CallbackRawBaseSingle::sendMsg<MsgT>(msg);
}
} else {
using MsgT = typename Trait::MsgT;
auto msg = makeMessage<MsgT>(std::forward<Params>(params)...);
sendMsg(msg.get());
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing template parameter for sendMsg call. This should be CallbackRawBaseSingle::sendMsg<MsgT>(msg) to match the pattern used elsewhere in the function (lines 144, 149) and maintain consistency with the explicit template parameter requirement throughout this PR.

Suggested change
sendMsg(msg.get());
CallbackRawBaseSingle::sendMsg<MsgT>(msg.get());

Copilot uses AI. Check for mistakes.
}
}

}}}} /* end namespace vt::pipe::callback::cbunion */

Expand Down
2 changes: 1 addition & 1 deletion src/vt/rdma/rdma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ RDMAManager::RDMAManager()

if (not msg->has_bytes) {
auto cbmsg = makeMessage<GetInfoChannel>(num_bytes);
msg->cb.sendMsg(cbmsg);
msg->cb.sendMsg<GetInfoChannel>(cbmsg);
}

theRDMA()->createDirectChannelInternal(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/active/test_active_send_large.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ template <typename MsgT>
void myHandler(MsgT* m) {
checkMsg(m);
auto msg = makeMessage<RecvMsg>();
m->cb_.send(msg.get());
m->cb_.template sendMsg<RecvMsg>(msg.get());
}

template <typename T>
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/memory/test_memory_lifetime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ TEST_F(TestMemoryLifetime, test_active_bcast_normal_lifetime_msgptr) {
////////////////////////////////////////////////////////////////////////////////
static void callbackHan(CallbackMsg<NormalTestMsg>* msg) {
auto send_msg = makeMessage<NormalTestMsg>();
msg->cb_.send(send_msg.get());
msg->cb_.template sendMsg<NormalTestMsg>(send_msg.get());

theTerm()->addAction([send_msg]{
// Call event cleanup all pending MPI requests to clear
Expand Down Expand Up @@ -244,7 +244,7 @@ TEST_F(TestMemoryLifetime, test_active_send_callback_lifetime_1) {
////////////////////////////////////////////////////////////////////////////////
static void callbackHan(CallbackMsg<SerialTestMsg>* msg) {
auto send_msg = makeMessage<SerialTestMsg>();
msg->cb_.send(send_msg.get());
msg->cb_.template sendMsg<SerialTestMsg>(send_msg.get());

theTerm()->addAction([send_msg]{
// Call event cleanup all pending MPI requests to clear
Expand Down
Loading