Skip to content

Commit 52e44f2

Browse files
authored
[0.77] Implement WebSocket resource task sequencer (#14580)
* Implement WebSocket resource task sequencer (#14577) * Define TaskSequencer * Explicitly capture suspend * Temporarily commit packages locks * Move TaskSequencer into header * Declare m_sequencer - Make PerformWrite IAsyncAction * Temporarily add packages lock * Define EnqueueWrite,DequeueWrite * Temporarily hard-code V2 resource * Use EnqueueWrite * Enqueue write within PerformWrite * Enqueue connection * Add test thread IDs * Enqueue close op * Connect in background * Enqueue Connect, Write, Close after resume_background * Sequence sequencer's sequenced sequences using DispatchQueue * Drop m_connectPerformed * Delete commented code * Remove thread ID traces * Use explicit capture in calls to sequencer * Make EnqueueWrite call PerformWrite - Drop DequeueWrite. - Make EnqueueWrite the fire-and-forget method. - Make PerformWrite the actual message-sending implementation. * Drop SendPendingMessages * Drop m_outgoingMessages * Drop m_outgoingMessages, SendPendingMessages - Avoid awaiting in catch clauses * Drop isBinaryLocal * Drop unused length in PerformWrite * Drop unused messageLocal in PerformWrite * clang format * Revert packages lock files * Revert WebSocketResourceFactory * Change files * Add missing captured resource reference in PerformWrite * Remove change file * Change files
1 parent ef89f59 commit 52e44f2

File tree

3 files changed

+180
-108
lines changed

3 files changed

+180
-108
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"type": "patch",
3+
"comment": "Implement WebSocket resource task sequencer (#14577)",
4+
"packageName": "react-native-windows",
5+
"email": "[email protected]",
6+
"dependentChangeType": "patch"
7+
}

vnext/Shared/Networking/WinRTWebSocketResource.cpp

+82-101
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ DispatchQueue GetCurrentOrSerialQueue() noexcept {
8989

9090
return queue;
9191
}
92-
9392
} // namespace
9493

9594
namespace Microsoft::React::Networking {
@@ -104,7 +103,6 @@ WinRTWebSocketResource2::WinRTWebSocketResource2(
104103
: m_socket{std::move(socket)},
105104
m_writer(std::move(writer)),
106105
m_readyState{ReadyState::Connecting},
107-
m_connectPerformed{CreateEvent(/*attributes*/ nullptr, /*manual reset*/ true, /*state*/ false, /*name*/ nullptr)},
108106
m_callingQueue{callingQueue} {
109107
for (const auto &certException : certExceptions) {
110108
m_socket.Control().IgnorableServerCertificateErrors().Append(certException);
@@ -221,125 +219,112 @@ void WinRTWebSocketResource2::OnClosed(IWebSocket const &sender, IWebSocketClose
221219

222220
fire_and_forget WinRTWebSocketResource2::PerformConnect(Uri &&uri) noexcept {
223221
auto self = shared_from_this();
224-
auto coUri = std::move(uri);
222+
auto movedUri = std::move(uri);
225223

226224
co_await resume_in_queue(self->m_backgroundQueue);
227225

228-
auto async = self->m_socket.ConnectAsync(coUri);
229-
co_await lessthrow_await_adapter<IAsyncAction>{async};
230-
231-
co_await resume_in_queue(self->m_callingQueue);
232-
233-
auto result = async.ErrorCode();
234-
235-
try {
236-
if (result >= 0) { // Non-failing HRESULT
237-
co_await resume_in_queue(self->m_backgroundQueue);
238-
self->m_readyState = ReadyState::Open;
239-
240-
co_await resume_in_queue(self->m_callingQueue);
241-
if (self->m_connectHandler) {
242-
self->m_connectHandler();
243-
}
244-
} else {
245-
self->Fail(std::move(result), ErrorType::Connection);
246-
}
247-
} catch (hresult_error const &e) {
248-
self->Fail(e, ErrorType::Connection);
249-
} catch (std::exception const &e) {
250-
self->Fail(e.what(), ErrorType::Connection);
251-
}
252-
253-
SetEvent(self->m_connectPerformed.get());
226+
co_await self->m_sequencer.QueueTaskAsync(
227+
[self = self->shared_from_this(), coUri = std::move(movedUri)]() -> IAsyncAction {
228+
auto coSelf = self->shared_from_this();
229+
230+
auto async = coSelf->m_socket.ConnectAsync(coUri);
231+
co_await lessthrow_await_adapter<IAsyncAction>{async};
232+
233+
auto result = async.ErrorCode();
234+
try {
235+
if (result >= 0) { // Non-failing HRESULT
236+
coSelf->m_readyState = ReadyState::Open;
237+
238+
co_await resume_in_queue(coSelf->m_callingQueue);
239+
if (coSelf->m_connectHandler) {
240+
coSelf->m_connectHandler();
241+
}
242+
} else {
243+
coSelf->Fail(std::move(result), ErrorType::Connection);
244+
}
245+
} catch (hresult_error const &e) {
246+
coSelf->Fail(e, ErrorType::Connection);
247+
} catch (std::exception const &e) {
248+
coSelf->Fail(e.what(), ErrorType::Connection);
249+
}
250+
});
254251
}
255252

256253
fire_and_forget WinRTWebSocketResource2::PerformClose() noexcept {
257254
auto self = shared_from_this();
258255

259-
co_await resume_on_signal(self->m_connectPerformed.get());
260-
261256
co_await resume_in_queue(self->m_backgroundQueue);
262257

263-
// See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close
264-
co_await self->SendPendingMessages();
258+
co_await self->m_sequencer.QueueTaskAsync([self = self->shared_from_this()]() -> IAsyncAction {
259+
auto coSelf = self->shared_from_this();
265260

266-
try {
267-
self->m_socket.Close(static_cast<uint16_t>(m_closeCode), winrt::to_hstring(m_closeReason));
268-
self->m_readyState = ReadyState::Closing;
269-
} catch (winrt::hresult_invalid_argument const &e) {
270-
Fail(e, ErrorType::Close);
271-
} catch (hresult_error const &e) {
272-
Fail(e, ErrorType::Close);
273-
} catch (const std::exception &e) {
274-
Fail(e.what(), ErrorType::Close);
275-
}
261+
try {
262+
coSelf->m_socket.Close(static_cast<uint16_t>(coSelf->m_closeCode), winrt::to_hstring(coSelf->m_closeReason));
263+
coSelf->m_readyState = ReadyState::Closing;
264+
} catch (winrt::hresult_invalid_argument const &e) {
265+
coSelf->Fail(e, ErrorType::Close);
266+
} catch (hresult_error const &e) {
267+
coSelf->Fail(e, ErrorType::Close);
268+
} catch (const std::exception &e) {
269+
coSelf->Fail(e.what(), ErrorType::Close);
270+
}
271+
272+
co_return;
273+
});
276274
}
277275

278-
fire_and_forget WinRTWebSocketResource2::PerformWrite(string &&message, bool isBinary) noexcept {
276+
fire_and_forget WinRTWebSocketResource2::EnqueueWrite(string &&message, bool isBinary) noexcept {
279277
auto self = shared_from_this();
280278
string coMessage = std::move(message);
281279

282-
co_await resume_in_queue(self->m_backgroundQueue); // Ensure writes happen sequentially
283-
self->m_outgoingMessages.emplace(std::move(coMessage), isBinary);
284-
285-
co_await resume_on_signal(self->m_connectPerformed.get());
286-
287280
co_await resume_in_queue(self->m_backgroundQueue);
288281

289-
co_await self->SendPendingMessages();
290-
}
282+
co_await self->m_sequencer.QueueTaskAsync(
283+
[self = self->shared_from_this(), message = std::move(coMessage), isBinary]() -> IAsyncAction {
284+
auto coSelf = self->shared_from_this();
285+
auto coMessage = std::move(message);
291286

292-
IAsyncAction WinRTWebSocketResource2::SendPendingMessages() noexcept {
293-
// Enforcing execution in the background queue.
294-
// Awaiting of this coroutine will schedule its execution in the thread pool, ignoring the intended dispatch queue.
295-
co_await resume_in_queue(m_backgroundQueue);
287+
co_await coSelf->PerformWrite(std::move(coMessage), isBinary);
288+
});
289+
}
296290

291+
IAsyncAction WinRTWebSocketResource2::PerformWrite(string &&message, bool isBinary) noexcept {
297292
auto self = shared_from_this();
298293

299-
while (!self->m_outgoingMessages.empty()) {
300-
if (self->m_readyState != ReadyState::Open) {
301-
co_return;
302-
}
303-
304-
size_t length = 0;
305-
string messageLocal;
306-
bool isBinaryLocal;
307-
try {
308-
std::tie(messageLocal, isBinaryLocal) = self->m_outgoingMessages.front();
309-
self->m_outgoingMessages.pop();
310-
if (isBinaryLocal) {
311-
self->m_socket.Control().MessageType(SocketMessageType::Binary);
312-
313-
auto buffer = CryptographicBuffer::DecodeFromBase64String(winrt::to_hstring(messageLocal));
314-
if (buffer) {
315-
length = buffer.Length();
316-
self->m_writer.WriteBuffer(buffer);
317-
}
318-
} else {
319-
self->m_socket.Control().MessageType(SocketMessageType::Utf8);
294+
try {
295+
if (isBinary) {
296+
self->m_socket.Control().MessageType(SocketMessageType::Binary);
320297

321-
length = messageLocal.size();
322-
winrt::array_view<const uint8_t> view(
323-
CheckedReinterpretCast<const uint8_t *>(messageLocal.c_str()),
324-
CheckedReinterpretCast<const uint8_t *>(messageLocal.c_str()) + messageLocal.length());
325-
self->m_writer.WriteBytes(view);
298+
auto buffer = CryptographicBuffer::DecodeFromBase64String(winrt::to_hstring(message));
299+
if (buffer) {
300+
self->m_writer.WriteBuffer(buffer);
326301
}
327-
} catch (hresult_error const &e) { // TODO: Remove after fixing unit tests exceptions.
328-
self->Fail(e, ErrorType::Send);
329-
co_return;
330-
} catch (const std::exception &e) {
331-
self->Fail(e.what(), ErrorType::Send);
332-
co_return;
302+
} else {
303+
self->m_socket.Control().MessageType(SocketMessageType::Utf8);
304+
305+
winrt::array_view<const uint8_t> view(
306+
CheckedReinterpretCast<const uint8_t *>(message.c_str()),
307+
CheckedReinterpretCast<const uint8_t *>(message.c_str()) + message.length());
308+
self->m_writer.WriteBytes(view);
333309
}
310+
} catch (hresult_error const &e) { // TODO: Remove after fixing unit tests exceptions.
311+
self->Fail(e, ErrorType::Send);
312+
} catch (const std::exception &e) {
313+
self->Fail(e.what(), ErrorType::Send);
314+
}
334315

335-
auto async = self->m_writer.StoreAsync();
336-
co_await lessthrow_await_adapter<DataWriterStoreOperation>{async};
316+
co_await resume_in_queue(self->m_backgroundQueue);
317+
// If an exception occurred, abort write process.
318+
if (self->m_readyState != ReadyState::Open) {
319+
co_return;
320+
}
337321

338-
auto result = async.ErrorCode();
339-
if (result < 0) {
340-
Fail(std::move(result), ErrorType::Send);
341-
co_return;
342-
}
322+
auto async = self->m_writer.StoreAsync();
323+
co_await lessthrow_await_adapter<DataWriterStoreOperation>{async};
324+
325+
auto result = async.ErrorCode();
326+
if (result < 0) {
327+
self->Fail(std::move(result), ErrorType::Send);
343328
}
344329
}
345330

@@ -393,11 +378,7 @@ void WinRTWebSocketResource2::Connect(string &&url, const Protocols &protocols,
393378
m_socket.SetRequestHeader(L"Origin", std::move(origin));
394379
}
395380
} catch (hresult_error const &e) {
396-
Fail(e, ErrorType::Connection);
397-
398-
SetEvent(m_connectPerformed.get());
399-
400-
return;
381+
return Fail(e, ErrorType::Connection);
401382
}
402383

403384
PerformConnect(std::move(uri));
@@ -406,11 +387,11 @@ void WinRTWebSocketResource2::Connect(string &&url, const Protocols &protocols,
406387
void WinRTWebSocketResource2::Ping() noexcept {}
407388

408389
void WinRTWebSocketResource2::Send(string &&message) noexcept {
409-
PerformWrite(std::move(message), false);
390+
EnqueueWrite(std::move(message), false);
410391
}
411392

412393
void WinRTWebSocketResource2::SendBinary(string &&base64String) noexcept {
413-
PerformWrite(std::move(base64String), true);
394+
EnqueueWrite(std::move(base64String), true);
414395
}
415396

416397
void WinRTWebSocketResource2::Close(CloseCode code, const string &reason) noexcept {

vnext/Shared/Networking/WinRTWebSocketResource.h

+91-7
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,101 @@ namespace Microsoft::React::Networking {
1818

1919
class WinRTWebSocketResource2 : public IWebSocketResource,
2020
public std::enable_shared_from_this<WinRTWebSocketResource2> {
21-
winrt::Windows::Networking::Sockets::IMessageWebSocket m_socket;
22-
2321
///
24-
// Connection attempt performed, either succeeding or failing
22+
// See https://devblogs.microsoft.com/oldnewthing/20250328-00/?p=111016
2523
///
26-
winrt::handle m_connectPerformed;
24+
struct TaskSequencer {
25+
TaskSequencer() = default;
26+
TaskSequencer(const TaskSequencer &) = delete;
27+
void operator=(const TaskSequencer &) = delete;
28+
29+
private:
30+
using CoroHandle = std::experimental::coroutine_handle<>;
31+
32+
struct Suspender {
33+
CoroHandle m_handle;
34+
35+
bool await_ready() const noexcept {
36+
return false;
37+
}
38+
39+
void await_suspend(CoroHandle h) noexcept {
40+
m_handle = h;
41+
}
42+
43+
void await_resume() const noexcept {}
44+
};
45+
46+
static void *Completed() {
47+
return reinterpret_cast<void *>(1);
48+
}
49+
50+
struct ChainedTask {
51+
private:
52+
std::atomic<void *> m_next;
53+
54+
public:
55+
ChainedTask(void *state = nullptr) : m_next(state) {}
56+
57+
void ContinueWith(CoroHandle h) {
58+
if (m_next.exchange(h.address(), std::memory_order_acquire) != nullptr) {
59+
h();
60+
}
61+
}
62+
63+
void Complete() {
64+
auto resumeAddress = m_next.exchange(Completed());
65+
if (resumeAddress) {
66+
CoroHandle::from_address(resumeAddress).resume();
67+
}
68+
}
69+
};
2770

71+
struct Completer {
72+
std::shared_ptr<ChainedTask> m_chain;
73+
74+
~Completer() {
75+
m_chain->Complete();
76+
}
77+
};
78+
79+
winrt::slim_mutex m_mutex;
80+
std::shared_ptr<ChainedTask> m_latest = std::make_shared<ChainedTask>(Completed());
81+
82+
public:
83+
template <typename Maker>
84+
auto QueueTaskAsync(Maker &&maker) -> decltype(maker()) {
85+
auto node = std::make_shared<ChainedTask>();
86+
Suspender suspend;
87+
88+
using Async = decltype(maker());
89+
auto task = [&node, &suspend, &maker]() -> Async {
90+
Completer completer{node};
91+
auto localMaker = std::forward<Maker>(maker);
92+
auto context = winrt::apartment_context();
93+
94+
co_await suspend;
95+
co_await context;
96+
97+
co_return co_await localMaker();
98+
}();
99+
100+
{
101+
winrt::slim_lock_guard guard(m_mutex);
102+
m_latest.swap(node);
103+
}
104+
105+
node->ContinueWith(suspend.m_handle);
106+
107+
return task;
108+
}
109+
};
110+
111+
winrt::Windows::Networking::Sockets::IMessageWebSocket m_socket;
28112
ReadyState m_readyState;
113+
TaskSequencer m_sequencer;
29114
Mso::DispatchQueue m_callingQueue;
30115
Mso::DispatchQueue m_backgroundQueue;
31-
std::queue<std::pair<std::string, bool>> m_outgoingMessages;
32116
CloseCode m_closeCode{CloseCode::Normal};
33117
std::string m_closeReason;
34118

@@ -52,9 +136,9 @@ class WinRTWebSocketResource2 : public IWebSocketResource,
52136
winrt::Windows::Networking::Sockets::IWebSocketClosedEventArgs const &args);
53137

54138
winrt::fire_and_forget PerformConnect(winrt::Windows::Foundation::Uri &&uri) noexcept;
55-
winrt::fire_and_forget PerformWrite(std::string &&message, bool isBinary) noexcept;
139+
winrt::fire_and_forget EnqueueWrite(std::string &&message, bool isBinary) noexcept;
140+
winrt::Windows::Foundation::IAsyncAction PerformWrite(std::string &&message, bool isBinary) noexcept;
56141
winrt::fire_and_forget PerformClose() noexcept;
57-
winrt::Windows::Foundation::IAsyncAction SendPendingMessages() noexcept;
58142

59143
WinRTWebSocketResource2(
60144
winrt::Windows::Networking::Sockets::IMessageWebSocket &&socket,

0 commit comments

Comments
 (0)