Skip to content

Commit ced3722

Browse files
Robert Roeserfacebook-github-bot
Robert Roeser
authored andcommitted
refactor to use variant
Summary: Replaces the unique pointer with a std::variant and wires up the AlignedParser. Makes some changes to run the AlignedParser in the stress test tool. Reviewed By: praihan Differential Revision: D70032449 fbshipit-source-id: 426ddc69118e3ff32c2a55cdb912ead5db558109
1 parent e9782d7 commit ced3722

File tree

5 files changed

+74
-42
lines changed

5 files changed

+74
-42
lines changed

thrift/conformance/stresstest/example/ExampleScenarios.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,21 @@ THRIFT_STRESS_TEST(Echo512k) {
5555
co_await client->co_echo(s);
5656
}
5757

58+
THRIFT_STRESS_TEST(Echo512k_eb) {
59+
static std::string const s(1 << 19, '?');
60+
co_await client->co_echoEb(s);
61+
}
62+
5863
THRIFT_STRESS_TEST(Echo4M) {
5964
static std::string const s(4096000, '?');
6065
co_await client->co_echo(s);
6166
}
6267

68+
THRIFT_STRESS_TEST(Echo4M_eb) {
69+
static std::string const s(4096000, '?');
70+
co_await client->co_echoEb(s);
71+
}
72+
6373
/**
6474
* Send a request with a small payload, have the server "process" it for 50ms on
6575
* the CPU threadpool via sleeping instead of the default busy wait, and return

thrift/conformance/stresstest/server/StressTestServer.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424

2525
#include <scripts/rroeser/src/executor/WorkStealingExecutor.h>
2626
#include <folly/executors/CPUThreadPoolExecutor.h>
27+
#include <thrift/lib/cpp2/Flags.h>
2728
#include <thrift/lib/cpp2/server/ParallelConcurrencyController.h>
2829
#include <thrift/lib/cpp2/server/ThriftServer.h>
30+
#include <thrift/lib/cpp2/transport/rocket/framing/Parser.h>
2931

3032
DEFINE_int32(port, 5000, "Server port");
3133
DEFINE_int32(io_threads, 0, "Number of IO threads (0 == number of cores)");
@@ -65,6 +67,7 @@ DEFINE_bool(enable_resource_pools, false, "Enable resource pools");
6567
DEFINE_bool(
6668
disable_active_request_tracking, false, "Disabled Active Request Tracking");
6769
DEFINE_bool(enable_checksum, false, "Enable Server Side Checksum support");
70+
DEFINE_bool(aligned_parser, false, "Enable AlignedParser");
6871

6972
namespace apache {
7073
namespace thrift {
@@ -165,6 +168,11 @@ std::shared_ptr<ThriftServer> createStressTestServer(
165168
if (!handler) {
166169
handler = createStressTestHandler();
167170
}
171+
172+
if (FLAGS_aligned_parser) {
173+
THRIFT_FLAG_SET_MOCK(rocket_frame_parser, "aligned");
174+
}
175+
168176
auto server = std::make_shared<ThriftServer>();
169177
server->setInterface(std::move(handler));
170178
server->setPort(FLAGS_port);

thrift/lib/cpp2/transport/rocket/framing/Parser.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ ParserMode stringToMode(const std::string& modeStr) noexcept {
3838
return ParserMode::STRATEGY;
3939
} else if (modeStr == "allocating") {
4040
return ParserMode::ALLOCATING;
41+
} else if (modeStr == "aligned") {
42+
return ParserMode::ALIGNED;
4143
}
4244

4345
LOG(WARNING) << "Invalid parser mode: '" << modeStr

thrift/lib/cpp2/transport/rocket/framing/Parser.h

+53-42
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include <thrift/lib/cpp2/Flags.h>
2828
#include <thrift/lib/cpp2/async/RpcOptions.h>
29+
#include <thrift/lib/cpp2/transport/rocket/framing/parser/AlignedParserStrategy.h>
2930
#include <thrift/lib/cpp2/transport/rocket/framing/parser/AllocatingParserStrategy.h>
3031
#include <thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy.h>
3132
#include <thrift/lib/cpp2/transport/rocket/framing/parser/ParserStrategy.h>
@@ -35,33 +36,42 @@ THRIFT_FLAG_DECLARE_string(rocket_frame_parser);
3536
namespace apache::thrift::rocket {
3637

3738
namespace detail {
38-
enum class ParserMode { STRATEGY, ALLOCATING };
39+
enum class ParserMode { STRATEGY, ALLOCATING, ALIGNED };
3940
ParserMode stringToMode(const std::string& modeStr) noexcept;
4041
ParserAllocatorType& getDefaultAllocator();
4142
} // namespace detail
4243

43-
// TODO (T160861572): deprecate most of logic in this class and replace with
44-
// either AllocatingParserStrategy or FrameLengthParserStrategy
4544
template <class T>
4645
class Parser final : public folly::AsyncTransport::ReadCallback {
46+
using FrameLengthParser = ParserStrategy<T, FrameLengthParserStrategy>;
47+
using AllocatingParser =
48+
ParserStrategy<T, AllocatingParserStrategy, ParserAllocatorType>;
49+
using AlignedParser = ParserStrategy<T, AlignedParserStrategy>;
50+
using ParserVariant = std::variant<
51+
std::monostate,
52+
FrameLengthParser,
53+
AllocatingParser,
54+
AlignedParser>;
55+
4756
public:
4857
explicit Parser(
4958
T& owner, std::shared_ptr<ParserAllocatorType> alloc = nullptr)
5059
: owner_(owner),
5160
mode_(detail::stringToMode(THRIFT_FLAG(rocket_frame_parser))) {
52-
if (mode_ == detail::ParserMode::STRATEGY) {
53-
frameLengthParser_ =
54-
std::make_unique<ParserStrategy<T, FrameLengthParserStrategy>>(
55-
owner_);
56-
}
57-
if (mode_ == detail::ParserMode::ALLOCATING) {
58-
allocatingParser_ = std::make_unique<
59-
ParserStrategy<T, AllocatingParserStrategy, ParserAllocatorType>>(
60-
owner_, alloc ? *alloc : detail::getDefaultAllocator());
61+
switch (mode_) {
62+
case detail::ParserMode::STRATEGY: {
63+
parser_.template emplace<FrameLengthParser>(owner_);
64+
} break;
65+
case detail::ParserMode::ALLOCATING: {
66+
parser_.template emplace<AllocatingParser>(
67+
owner_, alloc ? *alloc : detail::getDefaultAllocator());
68+
} break;
69+
case detail::ParserMode::ALIGNED:
70+
parser_.template emplace<AlignedParser>(owner_);
71+
break;
6172
}
6273
}
6374

64-
// AsyncTransport::ReadCallback implementation
6575
FOLLY_NOINLINE void getReadBuffer(void** bufout, size_t* lenout) override;
6676
FOLLY_NOINLINE void readDataAvailable(size_t nbytes) noexcept override;
6777
FOLLY_NOINLINE void readEOF() noexcept override;
@@ -80,37 +90,46 @@ class Parser final : public folly::AsyncTransport::ReadCallback {
8090
T& owner_;
8191

8292
detail::ParserMode mode_;
83-
std::unique_ptr<ParserStrategy<T, FrameLengthParserStrategy>>
84-
frameLengthParser_;
85-
std::unique_ptr<
86-
ParserStrategy<T, AllocatingParserStrategy, ParserAllocatorType>>
87-
allocatingParser_;
93+
ParserVariant parser_;
94+
95+
template <typename DelegateFunc>
96+
FOLLY_ALWAYS_INLINE decltype(auto) visit(DelegateFunc&& delegate);
8897
};
8998

9099
template <class T>
91-
void Parser<T>::getReadBuffer(void** bufout, size_t* lenout) {
100+
template <typename DelegateFunc>
101+
FOLLY_ALWAYS_INLINE decltype(auto) Parser<T>::visit(DelegateFunc&& delegate) {
102+
FOLLY_SAFE_DCHECK(
103+
std::holds_alternative<std::monostate>(parser_) == false,
104+
"parser variant must be set");
92105
switch (mode_) {
93-
case (detail::ParserMode::STRATEGY):
94-
frameLengthParser_->getReadBuffer(bufout, lenout);
95-
break;
96-
case (detail::ParserMode::ALLOCATING):
97-
allocatingParser_->getReadBuffer(bufout, lenout);
98-
break;
106+
case detail::ParserMode::STRATEGY:
107+
return std::invoke(
108+
std::forward<DelegateFunc>(delegate),
109+
std::get<FrameLengthParser>(parser_));
110+
case detail::ParserMode::ALLOCATING:
111+
return std::invoke(
112+
std::forward<DelegateFunc>(delegate),
113+
std::get<AllocatingParser>(parser_));
114+
case detail::ParserMode::ALIGNED:
115+
return std::invoke(
116+
std::forward<DelegateFunc>(delegate),
117+
std::get<AlignedParser>(parser_));
118+
default:
119+
LOG(FATAL) << "Unknown parser type";
99120
}
100121
}
101122

123+
template <class T>
124+
void Parser<T>::getReadBuffer(void** bufout, size_t* lenout) {
125+
visit([&](auto& parser) { parser.getReadBuffer(bufout, lenout); });
126+
}
127+
102128
template <class T>
103129
void Parser<T>::readDataAvailable(size_t nbytes) noexcept {
104130
folly::DelayedDestruction::DestructorGuard dg(&this->owner_);
105131
try {
106-
switch (mode_) {
107-
case (detail::ParserMode::STRATEGY):
108-
frameLengthParser_->readDataAvailable(nbytes);
109-
break;
110-
case (detail::ParserMode::ALLOCATING):
111-
allocatingParser_->readDataAvailable(nbytes);
112-
break;
113-
}
132+
visit([&](auto& parser) { parser.readDataAvailable(nbytes); });
114133
} catch (...) {
115134
auto exceptionStr =
116135
folly::exceptionStr(folly::current_exception()).toStdString();
@@ -140,15 +159,7 @@ void Parser<T>::readBufferAvailable(
140159
std::unique_ptr<folly::IOBuf> buf) noexcept {
141160
folly::DelayedDestruction::DestructorGuard dg(&this->owner_);
142161
try {
143-
switch (mode_) {
144-
case (detail::ParserMode::STRATEGY):
145-
frameLengthParser_->readBufferAvailable(std::move(buf));
146-
break;
147-
case (detail::ParserMode::ALLOCATING):
148-
// Will throw not implemented runtime exception
149-
allocatingParser_->readBufferAvailable(std::move(buf));
150-
break;
151-
}
162+
visit([&](auto& parser) { parser.readBufferAvailable(std::move(buf)); });
152163
} catch (...) {
153164
auto exceptionStr =
154165
folly::exceptionStr(folly::current_exception()).toStdString();

thrift/lib/cpp2/transport/rocket/framing/parser/test/AlignedParserStrategyTest.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <gtest/gtest.h>
1818
#include <folly/String.h>
1919
#include <folly/io/IOBuf.h>
20+
#include <thrift/lib/cpp2/server/ThriftServer.h>
2021
#include <thrift/lib/cpp2/transport/rocket/framing/Frames.h>
2122
#include <thrift/lib/cpp2/transport/rocket/framing/parser/AlignedParserStrategy.h>
2223
#include <thrift/lib/cpp2/transport/rocket/framing/parser/test/TestUtil.h>

0 commit comments

Comments
 (0)