Skip to content

Commit f736a43

Browse files
ctillercopybara-github
authored andcommitted
[promises] Fork mpsc
Ahead of landing a new lock free implementation, fork the existing implementation and keep it around for a while for the production version of chaotic-good (we don't want to change that yet) PiperOrigin-RevId: 762585546
1 parent ae601fa commit f736a43

File tree

8 files changed

+729
-19
lines changed

8 files changed

+729
-19
lines changed

build_autogenerated.yaml

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/core/BUILD

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,28 @@ grpc_cc_library(
13461346
],
13471347
)
13481348

1349+
grpc_cc_library(
1350+
name = "lock_based_mpsc",
1351+
hdrs = [
1352+
"lib/promise/lock_based_mpsc.h",
1353+
],
1354+
external_deps = [
1355+
"absl/base:core_headers",
1356+
"absl/log:check",
1357+
],
1358+
deps = [
1359+
"activity",
1360+
"dump_args",
1361+
"poll",
1362+
"ref_counted",
1363+
"status_flag",
1364+
"sync",
1365+
"wait_set",
1366+
"//:gpr",
1367+
"//:ref_counted_ptr",
1368+
],
1369+
)
1370+
13491371
grpc_cc_library(
13501372
name = "observable",
13511373
hdrs = [
@@ -8874,9 +8896,9 @@ grpc_cc_library(
88748896
"event_engine_context",
88758897
"event_engine_tcp_socket_utils",
88768898
"grpc_promise_endpoint",
8899+
"lock_based_mpsc",
88778900
"loop",
88788901
"match_promise",
8879-
"mpsc",
88808902
"seq",
88818903
"try_join",
88828904
"try_seq",
@@ -8919,12 +8941,12 @@ grpc_cc_library(
89198941
"grpc_promise_endpoint",
89208942
"if",
89218943
"inter_activity_pipe",
8944+
"lock_based_mpsc",
89228945
"loop",
89238946
"map",
89248947
"memory_quota",
89258948
"metadata_batch",
89268949
"metrics",
8927-
"mpsc",
89288950
"pipe",
89298951
"poll",
89308952
"resource_quota",
@@ -8980,11 +9002,11 @@ grpc_cc_library(
89809002
"if",
89819003
"inter_activity_latch",
89829004
"inter_activity_pipe",
9005+
"lock_based_mpsc",
89839006
"loop",
89849007
"memory_quota",
89859008
"metadata_batch",
89869009
"metrics",
8987-
"mpsc",
89889010
"pipe",
89899011
"poll",
89909012
"resource_quota",

src/core/ext/transport/chaotic_good_legacy/chaotic_good_transport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
#include "src/core/lib/debug/trace.h"
3232
#include "src/core/lib/event_engine/event_engine_context.h"
3333
#include "src/core/lib/event_engine/tcp_socket_utils.h"
34+
#include "src/core/lib/promise/lock_based_mpsc.h"
3435
#include "src/core/lib/promise/loop.h"
3536
#include "src/core/lib/promise/match_promise.h"
36-
#include "src/core/lib/promise/mpsc.h"
3737
#include "src/core/lib/promise/seq.h"
3838
#include "src/core/lib/promise/try_join.h"
3939
#include "src/core/lib/promise/try_seq.h"
@@ -154,7 +154,7 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
154154
// Common outbound loop for both client and server (these vary only over the
155155
// frame type).
156156
template <typename Frame>
157-
auto TransportWriteLoop(MpscReceiver<Frame>& outgoing_frames) {
157+
auto TransportWriteLoop(LockBasedMpscReceiver<Frame>& outgoing_frames) {
158158
return Loop([self = Ref(), &outgoing_frames] {
159159
return TrySeq(
160160
// Get next outgoing frame.

src/core/ext/transport/chaotic_good_legacy/client_transport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
#include "src/core/lib/promise/for_each.h"
4949
#include "src/core/lib/promise/if.h"
5050
#include "src/core/lib/promise/inter_activity_pipe.h"
51+
#include "src/core/lib/promise/lock_based_mpsc.h"
5152
#include "src/core/lib/promise/loop.h"
52-
#include "src/core/lib/promise/mpsc.h"
5353
#include "src/core/lib/promise/pipe.h"
5454
#include "src/core/lib/promise/poll.h"
5555
#include "src/core/lib/promise/try_join.h"
@@ -114,7 +114,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
114114
grpc_event_engine::experimental::MemoryAllocator allocator_;
115115
// Max buffer is set to 4, so that for stream writes each time it will queue
116116
// at most 2 frames.
117-
MpscReceiver<ClientFrame> outgoing_frames_;
117+
LockBasedMpscReceiver<ClientFrame> outgoing_frames_;
118118
Mutex mu_;
119119
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
120120
// Map of stream incoming server frames, key is stream_id.

src/core/ext/transport/chaotic_good_legacy/server_transport.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ auto BooleanSuccessToTransportErrorCapturingInitiator(CallInitiator initiator) {
132132
} // namespace
133133

134134
auto ChaoticGoodServerTransport::SendFrame(
135-
ServerFrame frame, MpscSender<ServerFrame> outgoing_frames,
135+
ServerFrame frame, LockBasedMpscSender<ServerFrame> outgoing_frames,
136136
CallInitiator call_initiator) {
137137
// Capture the call_initiator to ensure the underlying call spine is alive
138138
// until the outgoing_frames.Send promise completes.
@@ -142,7 +142,7 @@ auto ChaoticGoodServerTransport::SendFrame(
142142
}
143143

144144
auto ChaoticGoodServerTransport::SendFrameAcked(
145-
ServerFrame frame, MpscSender<ServerFrame> outgoing_frames,
145+
ServerFrame frame, LockBasedMpscSender<ServerFrame> outgoing_frames,
146146
CallInitiator call_initiator) {
147147
// Capture the call_initiator to ensure the underlying call spine is alive
148148
// until the outgoing_frames.Send promise completes.
@@ -152,7 +152,7 @@ auto ChaoticGoodServerTransport::SendFrameAcked(
152152
}
153153

154154
auto ChaoticGoodServerTransport::SendCallBody(
155-
uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
155+
uint32_t stream_id, LockBasedMpscSender<ServerFrame> outgoing_frames,
156156
CallInitiator call_initiator) {
157157
// Continuously send client frame with client to server messages.
158158
return ForEach(MessagesFrom(call_initiator),
@@ -166,7 +166,7 @@ auto ChaoticGoodServerTransport::SendCallBody(
166166
}
167167

168168
auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
169-
uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
169+
uint32_t stream_id, LockBasedMpscSender<ServerFrame> outgoing_frames,
170170
CallInitiator call_initiator) {
171171
return TrySeq(
172172
// Wait for initial metadata then send it out.

src/core/ext/transport/chaotic_good_legacy/server_transport.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
#include "src/core/lib/promise/if.h"
5555
#include "src/core/lib/promise/inter_activity_latch.h"
5656
#include "src/core/lib/promise/inter_activity_pipe.h"
57+
#include "src/core/lib/promise/lock_based_mpsc.h"
5758
#include "src/core/lib/promise/loop.h"
58-
#include "src/core/lib/promise/mpsc.h"
5959
#include "src/core/lib/promise/party.h"
6060
#include "src/core/lib/promise/pipe.h"
6161
#include "src/core/lib/promise/poll.h"
@@ -108,10 +108,11 @@ class ChaoticGoodServerTransport final : public ServerTransport {
108108
absl::Status NewStream(uint32_t stream_id, CallInitiator call_initiator);
109109
RefCountedPtr<Stream> LookupStream(uint32_t stream_id);
110110
RefCountedPtr<Stream> ExtractStream(uint32_t stream_id);
111-
auto SendCallInitialMetadataAndBody(uint32_t stream_id,
112-
MpscSender<ServerFrame> outgoing_frames,
113-
CallInitiator call_initiator);
114-
auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
111+
auto SendCallInitialMetadataAndBody(
112+
uint32_t stream_id, LockBasedMpscSender<ServerFrame> outgoing_frames,
113+
CallInitiator call_initiator);
114+
auto SendCallBody(uint32_t stream_id,
115+
LockBasedMpscSender<ServerFrame> outgoing_frames,
115116
CallInitiator call_initiator);
116117
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
117118
auto OnTransportActivityDone(absl::string_view activity);
@@ -132,18 +133,19 @@ class ChaoticGoodServerTransport final : public ServerTransport {
132133
auto PushFrameIntoCall(RefCountedPtr<Stream> stream, ClientEndOfStream frame);
133134
auto PushFrameIntoCall(RefCountedPtr<Stream> stream, BeginMessageFrame frame);
134135
auto PushFrameIntoCall(RefCountedPtr<Stream> stream, MessageChunkFrame frame);
135-
auto SendFrame(ServerFrame frame, MpscSender<ServerFrame> outgoing_frames,
136+
auto SendFrame(ServerFrame frame,
137+
LockBasedMpscSender<ServerFrame> outgoing_frames,
136138
CallInitiator call_initiator);
137139
auto SendFrameAcked(ServerFrame frame,
138-
MpscSender<ServerFrame> outgoing_frames,
140+
LockBasedMpscSender<ServerFrame> outgoing_frames,
139141
CallInitiator call_initiator);
140142

141143
RefCountedPtr<UnstartedCallDestination> call_destination_;
142144
const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;
143145
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
144146
event_engine_;
145147
InterActivityLatch<void> got_acceptor_;
146-
MpscReceiver<ServerFrame> outgoing_frames_;
148+
LockBasedMpscReceiver<ServerFrame> outgoing_frames_;
147149
Mutex mu_;
148150
// Map of stream incoming server frames, key is stream_id.
149151
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);

0 commit comments

Comments
 (0)