Skip to content

Commit 112137e

Browse files
committed
use stack instead of queue
1 parent 51a274a commit 112137e

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

src/ml_flashpoint/replication/transfer_service/connection_pool.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ ConnectionPool::~ConnectionPool() {
8181
cv_.notify_all();
8282
std::unique_lock<std::mutex> lock(mtx_);
8383
while (!available_connections_.empty()) {
84-
close(available_connections_.front());
84+
close(available_connections_.top());
8585
available_connections_.pop();
8686
}
8787
}
@@ -96,7 +96,7 @@ bool ConnectionPool::Initialize() {
9696
int fd = CreateConnection();
9797
if (fd < 0) {
9898
while (!available_connections_.empty()) {
99-
close(available_connections_.front());
99+
close(available_connections_.top());
100100
available_connections_.pop();
101101
}
102102
return false;
@@ -215,8 +215,12 @@ std::optional<ScopedConnection> ConnectionPool::GetConnection(int timeout_ms) {
215215
return std::nullopt;
216216
}
217217

218-
// Pop the oldest connection from the FIFO queue.
219-
int fd = available_connections_.front();
218+
// Pop the most recently used connection from the LIFO stack.
219+
// LIFO (Last-In, First-Out) is preferred for connection pools as it
220+
// increases the likelihood of reusing "hot" connections that still have
221+
// active TCP state (e.g., large congestion windows) and are still cached
222+
// in the kernel/CPU.
223+
int fd = available_connections_.top();
220224
available_connections_.pop();
221225

222226
// Verify the connection's health before handing it to the caller.
@@ -227,7 +231,7 @@ std::optional<ScopedConnection> ConnectionPool::GetConnection(int timeout_ms) {
227231
}
228232

229233
// The connection is dead. We close it and attempt to retrieve another
230-
// one from the queue.
234+
// one from the stack.
231235
LOG(INFO) << "ConnectionPool::GetConnection: discarded dead connection; "
232236
"retrying with next available connection";
233237
close(fd);
@@ -246,7 +250,7 @@ std::optional<ScopedConnection> ConnectionPool::GetConnection(int timeout_ms) {
246250
// Returns a connection to the pool, allowing it to be reused.
247251
//
248252
// If `reuse` is true and the pool is not full, the connection is added back to
249-
// the queue of available connections. Otherwise, the connection is closed.
253+
// the stack of available connections. Otherwise, the connection is closed.
250254
void ConnectionPool::ReleaseConnection(int sockfd, bool reuse) {
251255
if (sockfd < 0) {
252256
LOG(WARNING) << "ConnectionPool::ReleaseConnection: invalid sockfd";
@@ -264,6 +268,8 @@ void ConnectionPool::ReleaseConnection(int sockfd, bool reuse) {
264268
if (reuse) {
265269
if (available_connections_.size() < max_size_) {
266270
LOG(INFO) << "ConnectionPool::ReleaseConnection: reuse connection";
271+
// We push to the stack to ensure this connection is the first to be
272+
// reused by the next caller (LIFO).
267273
available_connections_.push(sockfd);
268274
cv_.notify_one();
269275
} else {

src/ml_flashpoint/replication/transfer_service/connection_pool.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
#include <memory>
3636
#include <mutex>
3737
#include <optional>
38-
#include <queue>
38+
#include <stack>
3939
#include <string>
4040

4141
namespace ml_flashpoint::replication::transfer_service {
@@ -125,7 +125,7 @@ class ConnectionPool {
125125
std::string peer_host_;
126126
int peer_port_;
127127
size_t max_size_;
128-
std::queue<int> available_connections_; // Guarded by mtx_.
128+
std::stack<int> available_connections_; // Guarded by mtx_.
129129
std::mutex mtx_; // Protects available_connections_ and stopping_.
130130
std::condition_variable
131131
cv_; // Signaled when a connection is released or the pool is stopping.

0 commit comments

Comments
 (0)