Skip to content

Commit ffac11b

Browse files
authored
conn pool: add extra details to ASSERT, clarify comments (#39286)
Also added a test for a case that wasn't covered for negative capacity, and removed a function that wasn't used. Signed-off-by: Greg Greenway <[email protected]>
1 parent 09c3e3f commit ffac11b

File tree

4 files changed

+91
-35
lines changed

4 files changed

+91
-35
lines changed

source/common/conn_pool/conn_pool_base.cc

+41-28
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ int64_t currentUnusedCapacity(const std::list<ActiveClientPtr>& connecting_clien
1919
}
2020
} // namespace
2121

22+
std::string ConnPoolImplBase::dumpState() const { return fmt::format("State: {}", *this); }
23+
2224
void ConnPoolImplBase::assertCapacityCountsAreCorrect() {
2325
SLOW_ASSERT(static_cast<int64_t>(connecting_stream_capacity_) ==
24-
currentUnusedCapacity(connecting_clients_) +
25-
currentUnusedCapacity(early_data_clients_));
26+
currentUnusedCapacity(connecting_clients_) +
27+
currentUnusedCapacity(early_data_clients_),
28+
dumpState());
2629

2730
// Note: must include `busy_clients_` because they can have negative current unused capacity,
2831
// which is included in `connecting_and_connected_stream_capacity_`.
@@ -31,10 +34,18 @@ void ConnPoolImplBase::assertCapacityCountsAreCorrect() {
3134
(static_cast<int64_t>(connecting_stream_capacity_) +
3235
currentUnusedCapacity(ready_clients_) + currentUnusedCapacity(busy_clients_)),
3336
fmt::format(
34-
"connecting_and_connected_stream_capacity_ {}, connecting_stream_capacity_ {}, "
35-
"currentUnusedCapacity(ready_clients_) {}, currentUnusedCapacity(busy_clients_) {}",
36-
connecting_and_connected_stream_capacity_, connecting_stream_capacity_,
37-
currentUnusedCapacity(ready_clients_), currentUnusedCapacity(busy_clients_)));
37+
"{} currentUnusedCapacity(ready_clients_) {}, currentUnusedCapacity(busy_clients_) {}",
38+
*this, currentUnusedCapacity(ready_clients_), currentUnusedCapacity(busy_clients_)));
39+
40+
SLOW_ASSERT(currentUnusedCapacity(busy_clients_) <= 0, dumpState());
41+
42+
if (ready_clients_.empty()) {
43+
ASSERT((connecting_and_connected_stream_capacity_ - connecting_stream_capacity_) <= 0,
44+
dumpState());
45+
} else {
46+
ASSERT((connecting_and_connected_stream_capacity_ - connecting_stream_capacity_) > 0,
47+
dumpState());
48+
}
3849
}
3950

4051
ConnPoolImplBase::ConnPoolImplBase(
@@ -47,16 +58,16 @@ ConnPoolImplBase::ConnPoolImplBase(
4758
upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}
4859

4960
ConnPoolImplBase::~ConnPoolImplBase() {
50-
ASSERT(isIdleImpl());
51-
ASSERT(connecting_stream_capacity_ == 0);
52-
ASSERT(connecting_and_connected_stream_capacity_ == 0);
61+
ASSERT(isIdleImpl(), dumpState());
62+
ASSERT(connecting_stream_capacity_ == 0, dumpState());
63+
ASSERT(connecting_and_connected_stream_capacity_ == 0, dumpState());
5364
}
5465

5566
void ConnPoolImplBase::deleteIsPendingImpl() {
5667
deferred_deleting_ = true;
57-
ASSERT(isIdleImpl());
58-
ASSERT(connecting_stream_capacity_ == 0);
59-
ASSERT(connecting_and_connected_stream_capacity_ == 0);
68+
ASSERT(isIdleImpl(), dumpState());
69+
ASSERT(connecting_stream_capacity_ == 0, dumpState());
70+
ASSERT(connecting_and_connected_stream_capacity_ == 0, dumpState());
6071
}
6172

6273
void ConnPoolImplBase::destructAllConnections() {
@@ -156,7 +167,8 @@ ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
156167
break;
157168
}
158169
}
159-
ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection);
170+
ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection,
171+
dumpState());
160172
return result;
161173
}
162174

@@ -184,9 +196,10 @@ ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
184196
ENVOY_LOG(trace, "connection creation failed");
185197
return ConnectionResult::FailedToCreateConnection;
186198
}
187-
ASSERT(client->state() == ActiveClient::State::Connecting);
199+
ASSERT(client->state() == ActiveClient::State::Connecting, dumpState());
188200
ASSERT(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
189-
static_cast<uint64_t>(client->currentUnusedCapacity()));
201+
static_cast<uint64_t>(client->currentUnusedCapacity()),
202+
dumpState());
190203
ASSERT(client->real_host_description_);
191204
// Increase the connecting capacity to reflect the streams this connection can serve.
192205
incrConnectingAndConnectedStreamCapacity(client->currentUnusedCapacity(), *client);
@@ -202,7 +215,7 @@ ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
202215

203216
void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
204217
AttachContext& context) {
205-
ASSERT(client.readyForStream());
218+
ASSERT(client.readyForStream(), dumpState());
206219

207220
Upstream::ClusterTrafficStats& traffic_stats = *host_->cluster().trafficStats();
208221
if (client.state() == Envoy::ConnectionPool::ActiveClient::State::ReadyForEarlyData) {
@@ -252,7 +265,7 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
252265
ENVOY_CONN_LOG(
253266
debug, "destroying stream: {} active remaining, readyForStream {}, currentUnusedCapacity {}",
254267
client, client.numActiveStreams(), client.readyForStream(), client.currentUnusedCapacity());
255-
ASSERT(num_active_streams_ > 0);
268+
ASSERT(num_active_streams_ > 0, dumpState());
256269
state_.decrActiveStreams(1);
257270
num_active_streams_--;
258271
host_->stats().rq_active_.dec();
@@ -265,9 +278,9 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
265278
bool limited_by_concurrency =
266279
client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1;
267280
// The capacity calculated by concurrency could be negative if a SETTINGS frame lowered the
268-
// number of allowed streams. In this case, effective client capacity was still limited by
269-
// concurrency, compare client.concurrent_stream_limit_ and client.numActiveStreams() directly
270-
// to avoid overflow.
281+
// number of allowed streams. In this case, connecting_and_connected_stream_capacity_ can be
282+
// negative, and effective client capacity was still limited by concurrency. Compare
283+
// client.concurrent_stream_limit_ and client.numActiveStreams() directly to avoid overflow.
271284
bool negative_capacity = client.concurrent_stream_limit_ < client.numActiveStreams() + 1;
272285
if (negative_capacity || limited_by_concurrency) {
273286
incrConnectingAndConnectedStreamCapacity(1, client);
@@ -293,8 +306,8 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
293306

294307
ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
295308
bool can_send_early_data) {
296-
ASSERT(!is_draining_for_deletion_);
297-
ASSERT(!deferred_deleting_);
309+
ASSERT(!is_draining_for_deletion_, dumpState());
310+
ASSERT(!deferred_deleting_, dumpState());
298311
assertCapacityCountsAreCorrect();
299312

300313
if (!ready_clients_.empty()) {
@@ -351,7 +364,7 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& cont
351364
}
352365

353366
bool ConnPoolImplBase::maybePreconnectImpl(float global_preconnect_ratio) {
354-
ASSERT(!deferred_deleting_);
367+
ASSERT(!deferred_deleting_, dumpState());
355368
return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
356369
}
357370

@@ -440,7 +453,7 @@ void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
440453

441454
void ConnPoolImplBase::drainClients(std::list<ActiveClientPtr>& clients) {
442455
while (!clients.empty()) {
443-
ASSERT(clients.front()->numActiveStreams() > 0u);
456+
ASSERT(clients.front()->numActiveStreams() > 0u, dumpState());
444457
ENVOY_LOG_EVENT(
445458
debug, "draining_non_idle_client", "draining {} client {} for cluster {}",
446459
(clients.front()->state() == ActiveClient::State::Ready ? "ready" : "early data"),
@@ -469,7 +482,7 @@ void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) {
469482

470483
// Changing busy_clients_ to Draining does not move them between lists,
471484
// so use a for-loop since the list is not mutated.
472-
ASSERT(&owningList(ActiveClient::State::Draining) == &busy_clients_);
485+
ASSERT(&owningList(ActiveClient::State::Draining) == &busy_clients_, dumpState());
473486
for (auto& busy_client : busy_clients_) {
474487
if (busy_client->state() == ActiveClient::State::Draining) {
475488
continue;
@@ -609,7 +622,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
609622
client.connect_timer_->disableTimer();
610623
client.connect_timer_.reset();
611624

612-
ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity());
625+
ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity(), dumpState());
613626
connecting_stream_capacity_ -= client.currentUnusedCapacity();
614627
client.has_handshake_completed_ = true;
615628
client.conn_connect_ms_->complete();
@@ -690,7 +703,7 @@ void ConnPoolImplBase::purgePendingStreams(
690703

691704
bool ConnPoolImplBase::connectingConnectionIsExcess(const ActiveClient& client) const {
692705
ASSERT(!client.hasHandshakeCompleted());
693-
ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity());
706+
ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity(), dumpState());
694707
// If perUpstreamPreconnectRatio is one, this simplifies to checking if there would still be
695708
// sufficient connecting stream capacity to serve all pending streams if the most recent client
696709
// were removed from the picture.
@@ -747,7 +760,7 @@ void ConnPoolImplBase::decrConnectingAndConnectedStreamCapacity(uint32_t delta,
747760
if (!client.hasHandshakeCompleted()) {
748761
// If still doing handshake, it is contributing to the local connecting stream capacity. Update
749762
// the capacity as well.
750-
ASSERT(connecting_stream_capacity_ >= delta);
763+
ASSERT(connecting_stream_capacity_ >= delta, dumpState());
751764
connecting_stream_capacity_ -= delta;
752765
}
753766
}

source/common/conn_pool/conn_pool_base.h

+6-7
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,6 @@ class ActiveClient : public LinkedObject<ActiveClient>,
8585
return state_ == State::Ready;
8686
}
8787

88-
// This function is called onStreamClosed to see if there was a negative delta
89-
// and (if necessary) update associated bookkeeping.
90-
// HTTP/1 and TCP pools can not have negative delta so the default implementation simply returns
91-
// false. The HTTP/2 connection pool can have this state, so overrides this function.
92-
virtual bool hadNegativeDeltaOnStreamClosed() { return false; }
93-
9488
enum class State {
9589
Connecting, // Connection is not yet established.
9690
ReadyForEarlyData, // Any additional early data stream can be immediately dispatched to this
@@ -296,7 +290,8 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
296290
const char* spaces = spacesForLevel(indent_level);
297291
os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
298292
<< DUMP_MEMBER(busy_clients_.size()) << DUMP_MEMBER(connecting_clients_.size())
299-
<< DUMP_MEMBER(connecting_stream_capacity_) << DUMP_MEMBER(num_active_streams_)
293+
<< DUMP_MEMBER(connecting_stream_capacity_)
294+
<< DUMP_MEMBER(connecting_and_connected_stream_capacity_) << DUMP_MEMBER(num_active_streams_)
300295
<< DUMP_MEMBER(pending_streams_.size())
301296
<< " per upstream preconnect ratio: " << perUpstreamPreconnectRatio();
302297
}
@@ -305,6 +300,10 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
305300
s.dumpState(os);
306301
return os;
307302
}
303+
304+
// Helper for use as the 2nd argument to ASSERT.
305+
std::string dumpState() const;
306+
308307
Upstream::ClusterConnectivityState& state() { return state_; }
309308

310309
void decrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);

test/common/conn_pool/conn_pool_base_test.cc

+1
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ TEST_F(ConnPoolImplBaseTest, DumpState) {
250250
std::string state = out.str();
251251
EXPECT_THAT(state, HasSubstr("ready_clients_.size(): 0, busy_clients_.size(): 0, "
252252
"connecting_clients_.size(): 0, connecting_stream_capacity_: 0, "
253+
"connecting_and_connected_stream_capacity_: 0, "
253254
"num_active_streams_: 0"));
254255
}
255256

test/common/http/http2/conn_pool_test.cc

+43
Original file line numberDiff line numberDiff line change
@@ -1805,6 +1805,49 @@ TEST_F(Http2ConnPoolImplTest, TestUnusedCapacity) {
18051805
CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*capacity*/);
18061806
}
18071807

1808+
TEST_F(Http2ConnPoolImplTest, TestNegativeUnusedCapacity) {
1809+
cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(4);
1810+
1811+
expectClientsCreate(2);
1812+
ActiveTestRequest r1(*this, 0, false);
1813+
CHECK_STATE(0 /*active*/, 1 /*pending*/, 4 /*capacity*/);
1814+
expectClientConnect(0, r1);
1815+
CHECK_STATE(1 /*active*/, 0 /*pending*/, 3 /*capacity*/);
1816+
1817+
ActiveTestRequest r2(*this, 0, true);
1818+
ActiveTestRequest r3(*this, 0, true);
1819+
CHECK_STATE(3 /*active*/, 0 /*pending*/, 1 /*capacity*/);
1820+
1821+
// Settings frame results in negative unused capacity.
1822+
NiceMock<MockReceivedSettings> settings;
1823+
settings.max_concurrent_streams_ = 1;
1824+
test_clients_[0].codec_client_->onSettings(settings);
1825+
CHECK_STATE(3 /*active*/, 0 /*pending*/, -2 /*capacity*/);
1826+
1827+
completeRequest(r1);
1828+
CHECK_STATE(2 /*active*/, 0 /*pending*/, -1 /*capacity*/);
1829+
1830+
// With negative capacity, verify that a new request creates a new connection.
1831+
ActiveTestRequest r4(*this, 1, false);
1832+
CHECK_STATE(2 /*active*/, 1 /*pending*/, 3 /*capacity*/);
1833+
expectClientConnect(1, r4);
1834+
CHECK_STATE(3 /*active*/, 0 /*pending*/, 2 /*capacity*/);
1835+
1836+
completeRequest(r2);
1837+
CHECK_STATE(2 /*active*/, 0 /*pending*/, 3 /*capacity*/);
1838+
1839+
completeRequest(r3);
1840+
CHECK_STATE(1 /*active*/, 0 /*pending*/, 4 /*capacity*/);
1841+
1842+
completeRequest(r4);
1843+
CHECK_STATE(0 /*active*/, 0 /*pending*/, 5 /*capacity*/);
1844+
1845+
// Clean up with an outstanding stream.
1846+
pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);
1847+
closeAllClients();
1848+
CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*capacity*/);
1849+
}
1850+
18081851
TEST_F(Http2ConnPoolImplTest, TestStateWithMultiplexing) {
18091852
cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(2);
18101853
cluster_->max_requests_per_connection_ = 4;

0 commit comments

Comments
 (0)