Skip to content

Commit 0c6cf56

Browse files
channel fixes.
1 parent 61cf72a commit 0c6cf56

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

include/boost/cobalt/impl/channel.hpp

+17-7
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,21 @@ system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &
165165
if (cancelled)
166166
return {system::in_place_error, asio::error::operation_aborted};
167167

168-
T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
169-
if (!direct)
168+
T value = chn->buffer_.empty() ? std::move(*direct) : std::move(chn->buffer_.front());
169+
if (!chn->buffer_.empty())
170+
{
170171
chn->buffer_.pop_front();
172+
if (direct)
173+
chn->buffer_.push_back(std::move(*direct));
174+
}
171175

172176
if (!chn->write_queue_.empty())
173177
{
174178
auto &op = chn->write_queue_.front();
175179
BOOST_ASSERT(chn->read_queue_.empty());
176180
if (op.await_ready())
177181
{
178-
op.transactional_unlink();
182+
op.unlink();
179183
BOOST_ASSERT(op.awaited_from);
180184
asio::post(chn->executor_, std::move(op.awaited_from));
181185
}
@@ -211,6 +215,7 @@ std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_h
211215
if constexpr (requires (Promise p) {p.begin_transaction();})
212216
begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
213217

218+
BOOST_ASSERT(this->chn->buffer_.full());
214219
if (chn->read_queue_.empty())
215220
{
216221
chn->write_queue_.push_back(*this);
@@ -259,7 +264,7 @@ system::result<void> channel<T>::write_op::await_resume(const struct as_result_
259264
if (cancel_slot.is_connected())
260265
cancel_slot.clear();
261266
if (cancelled)
262-
boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
267+
return {system::in_place_error, asio::error::operation_aborted};
263268

264269
if (!direct)
265270
{
@@ -281,7 +286,8 @@ system::result<void> channel<T>::write_op::await_resume(const struct as_result_
281286
BOOST_ASSERT(chn->write_queue_.empty());
282287
if (op.await_ready())
283288
{
284-
op.transactional_unlink();
289+
// unlink?
290+
op.unlink();
285291
BOOST_ASSERT(op.awaited_from);
286292
asio::post(chn->executor_, std::move(op.awaited_from));
287293
}
@@ -297,7 +303,8 @@ struct channel<void>::read_op::cancel_impl
297303
{
298304
op->cancelled = true;
299305
op->unlink();
300-
asio::post(op->chn->executor_, std::move(op->awaited_from));
306+
if (op->awaited_from)
307+
asio::post(op->chn->executor_, std::move(op->awaited_from));
301308
op->cancel_slot.clear();
302309
}
303310
};
@@ -310,7 +317,8 @@ struct channel<void>::write_op::cancel_impl
310317
{
311318
op->cancelled = true;
312319
op->unlink();
313-
asio::post(op->chn->executor_, std::move(op->awaited_from));
320+
if (op->awaited_from)
321+
asio::post(op->chn->executor_, std::move(op->awaited_from));
314322
op->cancel_slot.clear();
315323
}
316324
};
@@ -322,6 +330,8 @@ std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine
322330
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
323331
cancel_slot.emplace<cancel_impl>(this);
324332

333+
if (awaited_from)
334+
boost::throw_exception(std::runtime_error("already-awaited"), loc);
325335
awaited_from.reset(h.address());
326336

327337
if constexpr (requires (Promise p) {p.begin_transaction();})

src/channel.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ system::result<void> channel<void>::read_op::await_resume(const struct as_resul
6262
{
6363
op.unlink();
6464
BOOST_ASSERT(op.awaited_from);
65-
asio::post(
66-
chn->executor_, std::move(op.awaited_from));
65+
asio::post(chn->executor_, std::move(op.awaited_from));
6766
}
6867
}
6968
return {system::in_place_value};
@@ -97,8 +96,7 @@ system::result<void> channel<void>::write_op::await_resume(const struct as_resul
9796
{
9897
op.unlink();
9998
BOOST_ASSERT(op.awaited_from);
100-
asio::post(
101-
chn->executor_, std::move(op.awaited_from));
99+
asio::post(chn->executor_, std::move(op.awaited_from));
102100
}
103101
}
104102
return {system::in_place_value};

test/channel.cpp

+28
Original file line numberDiff line numberDiff line change
@@ -363,4 +363,32 @@ CO_TEST_CASE(data_loss)
363363
}
364364
}
365365

366+
367+
368+
CO_TEST_CASE(interrupt_1)
369+
{
370+
cobalt::channel<int> c{1u};
371+
372+
auto lr = co_await cobalt::left_race(c.write(42), c.read());
373+
BOOST_CHECK(lr.index() == 0);
374+
BOOST_CHECK(c.read().await_ready());
375+
BOOST_CHECK(!c.write(12).await_ready());
376+
lr = co_await cobalt::left_race(c.write(43), c.read());
377+
BOOST_CHECK(lr.index() == 1);
378+
BOOST_CHECK(get<1u>(lr) == 42);
379+
auto rl = co_await cobalt::left_race(c.read(), c.write(42));
380+
BOOST_CHECK(rl.index() == 1);
381+
}
382+
383+
CO_TEST_CASE(interrupt_void_1)
384+
{
385+
cobalt::channel<void> c{1};
386+
auto lr = co_await cobalt::left_race(c.write(), c.read());
387+
BOOST_CHECK(lr == 0);
388+
lr = co_await cobalt::left_race(c.write(), c.read());
389+
BOOST_CHECK(lr == 1);
390+
auto rl = co_await cobalt::left_race(c.read(), c.write());
391+
BOOST_CHECK(rl == 1);
392+
}
393+
366394
}

0 commit comments

Comments
 (0)