Skip to content

feat(replication): implement connection health checks and task retry logic#65

Draft
g-husam wants to merge 5 commits intomainfrom
feature/replication-robustness
Draft

feat(replication): implement connection health checks and task retry logic#65
g-husam wants to merge 5 commits intomainfrom
feature/replication-robustness

Conversation

@g-husam
Copy link
Collaborator

@g-husam g-husam commented Mar 2, 2026

This PR enhances the ConnectionPool and TransferService to robustly handle stale TCP connections and transient network failures, ensuring high reliability in scenarios where connections may be closed by peers or intermediate firewalls due to idleness.

  1. Proactive Health Checks in ConnectionPool
  • Zero-Cost Detection: Added IsConnectionAlive() using MSG_PEEK | MSG_DONTWAIT to verify socket health before handing connections to worker threads. This detects remote closures (FIN) or resets (RST) without consuming protocol data.
  • Self-Healing Logic: Updated GetConnection() to automatically discard dead sockets and replenish the pool with fresh connections. The method respects the original user timeout even if multiple stale connections are encountered.
  • Automatic Replenishment: Improved ReleaseConnection() to ensure the pool maintains its configured size by opening new connections when a socket is discarded as unusable.
  1. Resilient Task Execution in TransferService
  • Transparent Retries: Implemented a dual-attempt retry mechanism for PutTask, GetTask, and RespondToGetTask. Tasks now automatically recover from "race condition" connection failures that occur between the health check and the first protocol write.
  • Prevention of Pool Poisoning: Added ScopedConnection::SetUnusable(). The TransferService now explicitly flags connections as bad upon encountering any socket-level error (SendAll/RecvHeader failures). This ensures broken sockets are closed permanently and not returned to the pool for reuse by other tasks.
  • Exhaustive Protocol Handling: Replaced default cases with exhaustive switch statements for all MessageType handling, ensuring strict protocol compliance and easier debugging of unexpected message types.
  1. Verification & Testing
  • Simulated Stale Connections: Added DiscardDeadConnection unit test to verify the pool's ability to recover when the remote side closes a socket while it is idle in the queue.
  • Invalidation Logic: Added SetUnusablePreventsReuse to confirm that connections flagged as unusable are correctly disposed of.
  • Regression Testing: Verified that all 21 peer-to-peer integration tests in TransferServiceP2PTest pass with the new recovery logic.

…logic

This PR enhances the ConnectionPool and TransferService to robustly handle stale TCP connections and transient network failures, ensuring
  high reliability in distributed environments where connections may be closed by peers or intermediate firewalls due to idleness.

  1. Proactive Health Checks in ConnectionPool
   * Zero-Cost Detection: Added IsConnectionAlive() using MSG_PEEK | MSG_DONTWAIT to verify socket health before handing connections to
     worker threads. This detects remote closures (FIN) or resets (RST) without consuming protocol data.
   * Self-Healing Logic: Updated GetConnection() to automatically discard dead sockets and replenish the pool with fresh connections. The
     method respects the original user timeout even if multiple stale connections are encountered.
   * Automatic Replenishment: Improved ReleaseConnection() to ensure the pool maintains its configured size by opening new connections when a
     socket is discarded as unusable.

  2. Resilient Task Execution in TransferService
   * Transparent Retries: Implemented a dual-attempt retry mechanism for PutTask, GetTask, and RespondToGetTask. Tasks now automatically
     recover from "race condition" connection failures that occur between the health check and the first protocol write.
   * Prevention of Pool Poisoning: Added ScopedConnection::SetUnusable(). The TransferService now explicitly flags connections as bad upon
     encountering any socket-level error (SendAll/RecvHeader failures). This ensures broken sockets are closed permanently and not returned
     to the pool for reuse by other tasks.
   * Exhaustive Protocol Handling: Replaced default cases with exhaustive switch statements for all MessageType handling, ensuring strict
     protocol compliance and easier debugging of unexpected message types.

  3. Verification & Testing
   * Simulated Stale Connections: Added DiscardDeadConnection unit test to verify the pool's ability to recover when the remote side closes a
     socket while it is idle in the queue.
   * Invalidation Logic: Added SetUnusablePreventsReuse to confirm that connections flagged as unusable are correctly disposed of.
   * Regression Testing: Verified that all 21 peer-to-peer integration tests in TransferServiceP2PTest pass with the new recovery logic.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly improves the robustness and reliability of the replication transfer service by addressing issues related to stale TCP connections and transient network failures. It introduces proactive health checks within the connection pool to identify and replace dead connections, implements a retry mechanism for transfer tasks to handle intermittent communication issues, and prevents the reuse of broken sockets, ensuring a more stable and self-healing system.

Highlights

  • Connection Health Checks: Implemented IsConnectionAlive() using MSG_PEEK | MSG_DONTWAIT for zero-cost detection of stale TCP connections in ConnectionPool, identifying remote closures or errors without consuming protocol data.
  • Self-Healing Connection Pool: Enhanced GetConnection() to automatically discard dead sockets, replenish the pool with fresh connections, and respect user timeouts even when encountering multiple stale connections.
  • Automatic Pool Replenishment: Modified ReleaseConnection() to ensure the pool maintains its configured size by creating new connections when an unusable socket is discarded.
  • Resilient Task Execution: Introduced a dual-attempt retry mechanism for PutTask, GetTask, and RespondToGetTask in TransferService to recover transparently from transient connection failures that occur after health checks.
  • Prevention of Pool Poisoning: Added ScopedConnection::SetUnusable() to explicitly mark broken sockets upon encountering socket-level errors, ensuring they are closed permanently and not returned to the pool for reuse.
  • Exhaustive Protocol Handling: Replaced default cases with exhaustive switch statements for MessageType handling in TransferService tasks, ensuring strict protocol compliance and easier debugging of unexpected message types.
  • Comprehensive Testing: Added unit tests (DiscardDeadConnection, SetUnusablePreventsReuse) to verify the new connection pool logic and confirmed all existing peer-to-peer integration tests pass with the recovery logic.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • src/ml_flashpoint/replication/transfer_service/connection_pool.cpp
    • Modified ScopedConnection::Release to use a reuse_ flag for conditional connection release.
    • Added IsConnectionAlive function to perform non-blocking socket health checks.
    • Refactored GetConnection to proactively check connection health, discard dead connections, and attempt to replenish the pool.
    • Updated ReleaseConnection to replenish the pool with a new connection if the released connection was marked as unusable.
  • src/ml_flashpoint/replication/transfer_service/connection_pool.h
    • Added SetUnusable method and reuse_ member to ScopedConnection to control connection reuse.
    • Declared IsConnectionAlive method in ConnectionPool.
  • src/ml_flashpoint/replication/transfer_service/transfer_service.cpp
    • Implemented a 2-attempt retry loop in ExecutePutTask for sending data and receiving acknowledgements, marking connections as unusable on failure.
    • Implemented a 2-attempt retry loop in ExecuteGetTask for sending requests and receiving responses, marking connections as unusable on failure.
    • Implemented a 2-attempt retry loop in ExecuteRespondToGetTask for sending data and receiving acknowledgements, marking connections as unusable on failure.
    • Replaced default cases with exhaustive switch statements for MessageType handling in task execution methods.
  • tests/replication/transfer_service/connection_pool_test.cpp
    • Modified TearDown to properly close all accepted file descriptors.
    • Added DiscardDeadConnection test to verify the pool's ability to handle and replace dead connections.
    • Added SetUnusablePreventsReuse test to confirm that connections marked as unusable are not reused.
Activity
  • No specific activity (comments, reviews, etc.) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly enhances the robustness of the connection pool and transfer service by adding proactive connection health checks and task retry logic. The changes are well-structured and the added features are crucial for handling transient network failures and stale connections. My review identifies a couple of high-severity issues related to concurrency and data correctness, and a medium-severity style guide violation in the new tests. Addressing these points will further improve the quality and reliability of the implementation.

Comment on lines +239 to 243
if (new_fd >= 0) {
available_connections_.push(new_fd);
// The loop will continue and pick up this or another connection.
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling CreateConnection() while holding the pool's mutex (mtx_) can cause significant contention, as it's a blocking network operation. This will block any other thread trying to get or release a connection. Consider releasing the lock before this call and re-acquiring it after to push the new connection to the pool. Remember to re-check stopping_ after re-acquiring the lock.

Comment on lines +284 to +291
int new_fd = CreateConnection();
if (new_fd >= 0) {
available_connections_.push(new_fd);
cv_.notify_one();
} else {
LOG(ERROR) << "ConnectionPool::ReleaseConnection: failed to replenish "
"pool after discarding unusable connection";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

As in GetConnection, CreateConnection() is called here while holding the pool's mutex. This blocking call can harm concurrency. The lock should be released before creating the connection and re-acquired after. Don't forget to handle the stopping_ case after re-acquiring the lock.

}
ScopedConnection conn = std::move(conn_opt.value());
metric_container.connection_acquired_time = absl::Now();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The header object is not initialized here. This means that any fields not explicitly set will contain garbage values, which are then sent over the network. This is inconsistent with ExecutePutTask and ExecuteRespondToGetTask where the header is zero-initialized. Please initialize the header to prevent sending indeterminate data.

Suggested change
ObjInfoHeader header{};

Comment on lines +74 to +98
TEST_F(ConnectionPoolTest, DiscardDeadConnection) {
ConnectionPool pool("127.0.0.1", port_, 1);
EXPECT_TRUE(pool.Initialize());

// Wait for the connection to be accepted by the server.
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// Close the connection on the server side to make it stale in the pool.
{
std::lock_guard<std::mutex> lock(accepted_fds_mutex_);
ASSERT_EQ(accepted_fds_.size(), 1);
close(accepted_fds_[0]);
accepted_fds_.clear();
}

// GetConnection should detect it's dead, discard it, and create a new one.
auto conn = pool.GetConnection();
EXPECT_TRUE(conn.has_value());
EXPECT_TRUE(conn->IsValid());

// Verify that a new connection was indeed established.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lock(accepted_fds_mutex_);
EXPECT_EQ(accepted_fds_.size(), 1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to the repository's testing style guide (rule #1 under 'Testing Rules'), tests should be structured with # Given, # When, and # Then comments to separate setup, action, and assertion phases. This improves readability. This applies to both DiscardDeadConnection and SetUnusablePreventsReuse tests.

TEST_F(ConnectionPoolTest, DiscardDeadConnection) {
  // # Given
  ConnectionPool pool("127.0.0.1", port_, 1);
  EXPECT_TRUE(pool.Initialize());

  // Wait for the connection to be accepted by the server.
  std::this_thread::sleep_for(std::chrono::milliseconds(100));

  // Close the connection on the server side to make it stale in the pool.
  {
    std::lock_guard<std::mutex> lock(accepted_fds_mutex_);
    ASSERT_EQ(accepted_fds_.size(), 1);
    close(accepted_fds_[0]);
    accepted_fds_.clear();
  }

  // # When
  // GetConnection should detect it's dead, discard it, and create a new one.
  auto conn = pool.GetConnection();

  // # Then
  EXPECT_TRUE(conn.has_value());
  EXPECT_TRUE(conn->IsValid());

  // Verify that a new connection was indeed established.
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  std::lock_guard<std::mutex> lock(accepted_fds_mutex_);
  EXPECT_EQ(accepted_fds_.size(), 1);
}
References
  1. Format tests such that the setup, action and assertion parts are separated by an empty line when possible. Add the comments "# Given", "# When" and "# Then" before each section respectively. (link)

@github-actions
Copy link

github-actions bot commented Mar 2, 2026

Python Code Coverage Summary

Code Coverage

Package Line Rate Branch Rate Health
src.ml_flashpoint 100% 100%
src.ml_flashpoint.adapter 100% 100%
src.ml_flashpoint.adapter.megatron 97% 94%
src.ml_flashpoint.adapter.nemo 98% 94%
src.ml_flashpoint.adapter.pytorch 99% 88%
src.ml_flashpoint.checkpoint_object_manager 92% 91%
src.ml_flashpoint.core 96% 92%
src.ml_flashpoint.replication 81% 81%
Summary 95% (2058 / 2170) 91% (471 / 518)

Minimum allowed line rate is 90%

@github-actions
Copy link

github-actions bot commented Mar 2, 2026

C++ Code Coverage Summary

Code Coverage

Package Line Rate Branch Rate Health
src.ml_flashpoint.checkpoint_object_manager.buffer_object 93% 54%
src.ml_flashpoint.checkpoint_object_manager.object_manager 70% 37%
src.ml_flashpoint.replication.transfer_service 78% 39%
Summary 81% (933 / 1157) 42% (703 / 1686)

Minimum allowed line rate is 80%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant