Skip to content

Commit fd2b85b

Browse files
authored
Merge branch 'develop' into fixKeyspaceComments
2 parents f3b2103 + dc5f8b9 commit fd2b85b

File tree

6 files changed

+94
-8
lines changed

6 files changed

+94
-8
lines changed

src/etl/impl/GrpcSource.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
#include <boost/asio/io_context.hpp>
2828
#include <boost/asio/ip/tcp.hpp>
2929
#include <fmt/format.h>
30+
#include <grpc/grpc.h>
3031
#include <grpcpp/client_context.h>
3132
#include <grpcpp/security/credentials.h>
3233
#include <grpcpp/support/channel_arguments.h>
3334
#include <grpcpp/support/status.h>
3435
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
3536
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
3637

38+
#include <chrono>
3739
#include <cstddef>
3840
#include <cstdint>
3941
#include <exception>
@@ -52,17 +54,25 @@ GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::
5254
try {
5355
boost::asio::io_context ctx;
5456
boost::asio::ip::tcp::resolver resolver{ctx};
57+
5558
auto const resolverResult = resolver.resolve(ip, grpcPort);
56-
if (resolverResult.empty()) {
59+
if (resolverResult.empty())
5760
throw std::runtime_error("Failed to resolve " + ip + ":" + grpcPort);
58-
}
61+
5962
std::stringstream ss;
6063
ss << resolverResult.begin()->endpoint();
64+
6165
grpc::ChannelArguments chArgs;
6266
chArgs.SetMaxReceiveMessageSize(-1);
67+
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKEEPALIVE_PING_INTERVAL_MS);
68+
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKEEPALIVE_TIMEOUT_MS);
69+
chArgs.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, static_cast<int>(kKEEPALIVE_PERMIT_WITHOUT_CALLS));
70+
chArgs.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, kMAX_PINGS_WITHOUT_DATA);
71+
6372
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
6473
grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs)
6574
);
75+
6676
LOG(log_.debug()) << "Made stub for remote.";
6777
} catch (std::exception const& e) {
6878
LOG(log_.warn()) << "Exception while creating stub: " << e.what() << ".";
@@ -76,10 +86,11 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
7686
if (!stub_)
7787
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
7888

79-
// Ledger header with txns and metadata
8089
org::xrpl::rpc::v1::GetLedgerRequest request;
8190
grpc::ClientContext context;
8291

92+
context.set_deadline(std::chrono::system_clock::now() + kDEADLINE); // Prevent indefinite blocking
93+
8394
request.mutable_ledger()->set_sequence(sequence);
8495
request.set_transactions(true);
8596
request.set_expand(true);

src/etl/impl/GrpcSource.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
2727
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
2828

29+
#include <chrono>
2930
#include <cstdint>
3031
#include <memory>
3132
#include <string>
@@ -38,6 +39,12 @@ class GrpcSource {
3839
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
3940
std::shared_ptr<BackendInterface> backend_;
4041

42+
static constexpr auto kKEEPALIVE_PING_INTERVAL_MS = 10000;
43+
static constexpr auto kKEEPALIVE_TIMEOUT_MS = 5000;
44+
static constexpr auto kKEEPALIVE_PERMIT_WITHOUT_CALLS = true; // Allow keepalive pings when no calls
45+
static constexpr auto kMAX_PINGS_WITHOUT_DATA = 0; // No limit
46+
static constexpr auto kDEADLINE = std::chrono::seconds(30);
47+
4148
public:
4249
GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend);
4350

src/etlng/impl/GrpcSource.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include <boost/asio/spawn.hpp>
3030
#include <fmt/format.h>
31+
#include <grpc/grpc.h>
3132
#include <grpcpp/client_context.h>
3233
#include <grpcpp/security/credentials.h>
3334
#include <grpcpp/support/channel_arguments.h>
@@ -36,6 +37,7 @@
3637
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
3738

3839
#include <atomic>
40+
#include <chrono>
3941
#include <cstddef>
4042
#include <cstdint>
4143
#include <exception>
@@ -63,13 +65,18 @@ resolve(std::string const& ip, std::string const& port)
6365

6466
namespace etlng::impl {
6567

66-
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort)
68+
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::chrono::system_clock::duration deadline)
6769
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort))
6870
, initialLoadShouldStop_(std::make_unique<std::atomic_bool>(false))
71+
, deadline_{deadline}
6972
{
7073
try {
7174
grpc::ChannelArguments chArgs;
7275
chArgs.SetMaxReceiveMessageSize(-1);
76+
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKEEPALIVE_PING_INTERVAL_MS);
77+
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKEEPALIVE_TIMEOUT_MS);
78+
chArgs.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, static_cast<int>(kKEEPALIVE_PERMIT_WITHOUT_CALLS));
79+
chArgs.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, kMAX_PINGS_WITHOUT_DATA);
7380

7481
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
7582
grpc::CreateCustomChannel(resolve(ip, grpcPort), grpc::InsecureChannelCredentials(), chArgs)
@@ -88,10 +95,11 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
8895
if (!stub_)
8996
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
9097

91-
// Ledger header with txns and metadata
9298
org::xrpl::rpc::v1::GetLedgerRequest request;
9399
grpc::ClientContext context;
94100

101+
context.set_deadline(std::chrono::system_clock::now() + deadline_); // Prevent indefinite blocking
102+
95103
request.mutable_ledger()->set_sequence(sequence);
96104
request.set_transactions(true);
97105
request.set_expand(true);

src/etlng/impl/GrpcSource.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
3030

3131
#include <atomic>
32+
#include <chrono>
3233
#include <cstdint>
3334
#include <memory>
3435
#include <string>
@@ -40,9 +41,20 @@ class GrpcSource {
4041
util::Logger log_;
4142
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
4243
std::unique_ptr<std::atomic_bool> initialLoadShouldStop_;
44+
std::chrono::system_clock::duration deadline_;
45+
46+
static constexpr auto kKEEPALIVE_PING_INTERVAL_MS = 10000;
47+
static constexpr auto kKEEPALIVE_TIMEOUT_MS = 5000;
48+
static constexpr auto kKEEPALIVE_PERMIT_WITHOUT_CALLS = true; // Allow keepalive pings when no calls
49+
static constexpr auto kMAX_PINGS_WITHOUT_DATA = 0; // No limit
50+
static constexpr auto kDEADLINE = std::chrono::seconds(30);
4351

4452
public:
45-
GrpcSource(std::string const& ip, std::string const& grpcPort);
53+
GrpcSource(
54+
std::string const& ip,
55+
std::string const& grpcPort,
56+
std::chrono::system_clock::duration deadline = kDEADLINE
57+
);
4658

4759
/**
4860
* @brief Fetch data for a specific ledger.

tests/common/util/MockXrpLedgerAPIService.hpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
#include <org/xrpl/rpc/v1/get_ledger_entry.pb.h>
3333
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
3434

35+
#include <chrono>
3536
#include <memory>
37+
#include <optional>
3638
#include <string>
3739
#include <thread>
3840

@@ -90,15 +92,27 @@ struct WithMockXrpLedgerAPIService : virtual ::testing::Test {
9092

9193
~WithMockXrpLedgerAPIService() override
9294
{
93-
server_->Shutdown();
94-
serverThread_.join();
95+
shutdown();
9596
}
9697

9798
int
9899
getXRPLMockPort() const
99100
{
100101
return port_;
101102
}
103+
104+
void
105+
shutdown(std::optional<std::chrono::system_clock::duration> deadline = std::nullopt)
106+
{
107+
if (deadline.has_value()) {
108+
server_->Shutdown(std::chrono::system_clock::now() + *deadline);
109+
} else {
110+
server_->Shutdown();
111+
}
112+
if (serverThread_.joinable())
113+
serverThread_.join();
114+
}
115+
102116
MockXrpLedgerAPIService mockXrpLedgerAPIService;
103117

104118
private:

tests/unit/etlng/GrpcSourceTests.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,18 @@
4141
#include <xrpl/basics/strHex.h>
4242

4343
#include <atomic>
44+
#include <chrono>
4445
#include <condition_variable>
4546
#include <cstddef>
4647
#include <cstdint>
4748
#include <functional>
4849
#include <future>
4950
#include <map>
51+
#include <memory>
5052
#include <mutex>
5153
#include <optional>
5254
#include <queue>
55+
#include <semaphore>
5356
#include <string>
5457
#include <vector>
5558

@@ -357,3 +360,34 @@ TEST_F(GrpcSourceStopTests, LoadInitialLedgerStopsWhenRequested)
357360
ASSERT_FALSE(res.has_value());
358361
EXPECT_EQ(res.error(), etlng::InitialLedgerLoadError::Cancelled);
359362
}
363+
364+
TEST_F(GrpcSourceNgTests, DeadlineIsHandledCorrectly)
365+
{
366+
static constexpr auto kDEADLINE = std::chrono::milliseconds{5};
367+
368+
uint32_t const sequence = 123u;
369+
bool const getObjects = true;
370+
bool const getObjectNeighbors = false;
371+
372+
std::binary_semaphore sem(0);
373+
374+
auto grpcSource =
375+
std::make_unique<etlng::impl::GrpcSource>("localhost", std::to_string(getXRPLMockPort()), kDEADLINE);
376+
377+
EXPECT_CALL(mockXrpLedgerAPIService, GetLedger)
378+
.WillOnce([&](grpc::ServerContext*,
379+
org::xrpl::rpc::v1::GetLedgerRequest const*,
380+
org::xrpl::rpc::v1::GetLedgerResponse*) {
381+
// wait for main thread to discard us and fail the test if unsuccessful within expected timeframe
382+
[&] { ASSERT_TRUE(sem.try_acquire_for(std::chrono::milliseconds{50})); }();
383+
return grpc::Status{};
384+
});
385+
386+
auto const [status, response] = grpcSource->fetchLedger(sequence, getObjects, getObjectNeighbors);
387+
ASSERT_FALSE(status.ok()); // timed out after kDEADLINE
388+
389+
sem.release(); // we don't need to hold GetLedger thread any longer
390+
grpcSource.reset();
391+
392+
shutdown(std::chrono::milliseconds{10});
393+
}

0 commit comments

Comments
 (0)