Skip to content

Conversation

@huerni
Copy link
Collaborator

@huerni huerni commented Sep 16, 2025

Summary by CodeRabbit

  • New Features

    • Added a streaming task metadata request/reply, enabling clients to fetch task details and related process info in real time.
  • Improvements

    • Increased stability with reconnection-aware streaming between components, reducing disruptions during transient network issues or restarts.
    • Enforced output queue limits (1MB) to prevent excessive memory use; oversized messages are safely discarded.
    • More reliable message delivery through safer writer management and lifecycle handling.
    • Minor logging enhancements for clearer reconnection lifecycle visibility.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 16, 2025

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 8.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "feat: cattach" is concise and directly references the new "cattach" feature introduced across the changeset (TaskMeta request/reply additions, cattach_pid wiring, scheduler query and reconnection handling), so it relates to the main change; it is terse and could be a bit more descriptive for changelog clarity.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dev/cattach

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@huerni huerni linked an issue Sep 16, 2025 that may be closed by this pull request
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/Craned/Supervisor/CforedClient.h (1)

91-108: Consider potential data races with m_wait_reconn_ shared state.

The addition of m_wait_reconn_ for reconnection control is good, but I have concerns about the concurrent access patterns:

  1. m_wait_reconn_ is checked in CleanOutputQueueAndWriteToStreamThread_ (Line 371) and set in AsyncSendRecvThread_ (Lines 507, 548)
  2. Both threads run concurrently and the atomicity alone might not prevent race conditions during state transitions

Consider:

  • Using a mutex or condition variable to coordinate the reconnection state more safely
  • Ensuring proper ordering between setting m_wait_reconn_ and joining threads
🧹 Nitpick comments (7)
src/CraneCtld/TaskScheduler.h (1)

514-514: Consider renaming to align with existing naming convention.

The method name QueryTaskUseId doesn't follow the naming pattern of other query methods in this class (e.g., QueryTasksInRam, QueryRnJobOnCtldForNodeConfig). Consider renaming to QueryTaskById or QueryTaskUsingId for consistency.

-  bool QueryTaskUseId(task_id_t task_id, crane::grpc::TaskToCtld* task);
+  bool QueryTaskById(task_id_t task_id, crane::grpc::TaskToCtld* task);
src/Utilities/PublicHeader/include/crane/PublicHeader.h (1)

71-71: Consider making the output queue size configurable.

The 1MB hardcoded limit might be too restrictive for some workloads with high output volume. Consider making this configurable through the configuration system.

Would you like me to help create a configuration parameter for this limit in the appropriate configuration files?

src/Craned/Supervisor/CforedClient.cpp (2)

33-40: Validate tight reconnection loop behavior.

The reconnection logic uses a tight 100ms sleep loop while attempting reconnection. This could:

  1. Cause excessive CPU usage if reconnection fails repeatedly
  2. Flood logs with connection attempt messages

Consider implementing exponential backoff or a maximum retry limit.

   m_reconnect_async_->on<uvw::async_event>(
       [this](const uvw::async_event&, uvw::async_handle&) {
+        int retry_count = 0;
+        int backoff_ms = 100;
         while (m_wait_reconn_ && !m_stopped_) {
           InitChannelAndStub(m_cfored_name_);
-          std::this_thread::sleep_for(std::chrono::milliseconds(100));
+          if (!m_wait_reconn_) break;  // Successful connection
+          
+          retry_count++;
+          if (retry_count > 10) backoff_ms = std::min(backoff_ms * 2, 5000);
+          std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
         }
       });

674-689: Message size validation and queue management look good.

The implementation correctly:

  1. Rejects individual messages > 1MB
  2. Prevents queue from exceeding 1MB total
  3. Updates the queue byte counter atomically

The warning messages provide good visibility into dropped messages.

Consider adding metrics/counters for dropped messages to help with monitoring and debugging in production:

 void CforedClient::TaskOutPutForward(const std::string& msg) {
   CRANE_TRACE("Receive TaskOutputForward len: {}.", msg.size());
+  static std::atomic<uint64_t> dropped_msg_count{0};
 
   if (msg.size() > kMaxOutputQueueBytes) {
     CRANE_WARN("Task output message size {} exceeds 1MB, discard!", msg.size());
+    dropped_msg_count++;
     return;
   }
src/CraneCtld/RpcService/CtldGrpcServer.h (1)

184-205: Consider adding documentation for StreamWriterProxy.

The StreamWriterProxy class serves as a crucial abstraction for managing writer lifecycles during reconnections. Consider adding documentation to clarify its role in the reconnection architecture.

Add documentation to explain the proxy's purpose:

+/**
+ * StreamWriterProxy provides thread-safe access to a CforedStreamWriter,
+ * enabling writer replacement during cfored reconnections without breaking
+ * existing callbacks. It acts as a stable reference point while the 
+ * underlying writer can be swapped out atomically.
+ */
 class StreamWriterProxy {
src/CraneCtld/RpcService/CtldGrpcServer.cpp (2)

214-226: Consider extracting proxy management logic.

The proxy management logic (finding/creating proxies) could be extracted into a helper method for better maintainability and potential reuse.

Consider extracting to a helper method:

std::weak_ptr<StreamWriterProxy> GetOrCreateProxy(const std::string& cfored_name, 
                                                   std::shared_ptr<CforedStreamWriter> writer) {
  m_ctld_server_->m_stream_proxy_mtx_.Lock();
  auto iter = m_ctld_server_->m_cfored_stream_proxy_map_.find(cfored_name);
  if (iter != m_ctld_server_->m_cfored_stream_proxy_map_.end()) {
    iter->second->SetWriter(writer);
    auto result = iter->second;
    m_ctld_server_->m_stream_proxy_mtx_.Unlock();
    return result;
  }
  
  auto proxy = std::make_shared<StreamWriterProxy>();
  proxy->SetWriter(writer);
  m_ctld_server_->m_cfored_stream_proxy_map_[cfored_name] = proxy;
  auto result = proxy;
  m_ctld_server_->m_stream_proxy_mtx_.Unlock();
  return result;
}

337-355: Permission check looks good, but consider rate limiting.

The TASK_META_REQUEST handler properly validates permissions by checking if the requester owns the task or is an admin. However, this endpoint could be susceptible to enumeration attacks where an attacker repeatedly queries different task IDs.

Consider adding rate limiting for failed authorization attempts to prevent task ID enumeration:

// Track failed attempts per UID
static absl::flat_hash_map<uint32_t, std::pair<int, absl::Time>> failed_attempts;
const int kMaxFailedAttempts = 10;
const absl::Duration kRateLimitWindow = absl::Seconds(60);

// In the permission check block:
if (payload.uid() != task.uid() && 
    !g_account_manager->CheckUidIsAdmin(payload.uid())) {
  auto now = absl::Now();
  auto& [count, last_time] = failed_attempts[payload.uid()];
  if (now - last_time > kRateLimitWindow) {
    count = 1;
    last_time = now;
  } else if (++count > kMaxFailedAttempts) {
    // Log potential enumeration attempt
    CRANE_WARN("Potential task enumeration from UID {}", payload.uid());
  }
  ok = false;
  failure_reason = "permission denied";
}
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a3916e6 and 59633c3.

📒 Files selected for processing (9)
  • protos/Crane.proto (5 hunks)
  • src/CraneCtld/CtldPublicDefs.h (1 hunks)
  • src/CraneCtld/RpcService/CtldGrpcServer.cpp (8 hunks)
  • src/CraneCtld/RpcService/CtldGrpcServer.h (3 hunks)
  • src/CraneCtld/TaskScheduler.cpp (1 hunks)
  • src/CraneCtld/TaskScheduler.h (1 hunks)
  • src/Craned/Supervisor/CforedClient.cpp (7 hunks)
  • src/Craned/Supervisor/CforedClient.h (2 hunks)
  • src/Utilities/PublicHeader/include/crane/PublicHeader.h (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-05-09T01:54:21.256Z
Learnt from: L-Xiafeng
PR: PKUHPC/CraneSched#496
File: src/CraneCtld/CranedKeeper.cpp:51-53
Timestamp: 2025-05-09T01:54:21.256Z
Learning: The ConfigureCraned function in src/CraneCtld/CranedKeeper.cpp is called from a thread pool, so there's no need to worry about it blocking the gRPC completion queue thread.

Applied to files:

  • src/CraneCtld/TaskScheduler.h
📚 Learning: 2025-06-30T08:43:44.470Z
Learnt from: huerni
PR: PKUHPC/CraneSched#537
File: src/Craned/TaskManager.cpp:145-147
Timestamp: 2025-06-30T08:43:44.470Z
Learning: In the CraneSched codebase, src/Craned/Craned.cpp guarantees that g_config.CranedRes.contains(g_config.CranedIdOfThisNode) through explicit validation during startup. The code checks if g_config.CranedRes.contains(g_config.Hostname) and exits the process if not found, then sets g_config.CranedIdOfThisNode = g_config.Hostname. TaskManager constructor is called after this validation, so g_config.CranedRes[g_config.CranedIdOfThisNode] is guaranteed to be valid.

Applied to files:

  • src/CraneCtld/TaskScheduler.cpp
📚 Learning: 2025-08-14T02:56:35.503Z
Learnt from: L-Xiafeng
PR: PKUHPC/CraneSched#587
File: src/Craned/Supervisor/CforedClient.cpp:449-454
Timestamp: 2025-08-14T02:56:35.503Z
Learning: In CforedClient::AsyncSendRecvThread_(), the guard `if (state <= State::Registering) { continue; }` in the TIMEOUT branch only prevents premature cleanup when stopping before registration completes, but it doesn't block normal gRPC event processing. The completion queue will still deliver Prepare/Write/Read events that advance the state machine normally.

Applied to files:

  • src/Craned/Supervisor/CforedClient.cpp
  • src/CraneCtld/RpcService/CtldGrpcServer.cpp
🧬 Code graph analysis (2)
src/CraneCtld/TaskScheduler.h (1)
src/CraneCtld/TaskScheduler.cpp (2)
  • QueryTaskUseId (2311-2320)
  • QueryTaskUseId (2311-2312)
src/Craned/Supervisor/CforedClient.cpp (1)
src/Craned/Supervisor/CranedClient.cpp (2)
  • InitChannelAndStub (38-43)
  • InitChannelAndStub (38-38)
🔇 Additional comments (17)
src/CraneCtld/CtldPublicDefs.h (1)

543-545: LGTM! Clean getter addition.

The new GetAllocatedCranedsRegex() getter provides appropriate const access to the allocated_craneds_regex member.

src/CraneCtld/TaskScheduler.cpp (1)

2311-2320: LGTM! Simple and correct implementation.

The implementation correctly:

  1. Locks the running task map before access
  2. Returns false when task not found
  3. Populates the TaskToCtld structure with nodelist from the allocated regex
src/Craned/Supervisor/CforedClient.cpp (3)

356-357: Thread management looks correct.

Good practice joining the existing forward thread before creating a new one. This ensures clean thread lifecycle management during reconnection.


380-381: Ensure atomic correctness for queue size tracking.

Good that you're tracking and subtracting from m_output_queue_bytes_. The use of fetch_sub with relaxed ordering is appropriate here since exact ordering with enqueue operations isn't critical.


506-510: Verify FD and gRPC stream cleanup during reconnection.

  • Reconnection path sets m_wait_reconn_ and joins the output thread before triggering reconnect (src/Craned/Supervisor/CforedClient.cpp:504–510). InitChannelAndStub joins previous m_fwd_thread_ and starts a new one (lines 336–357). Task fds are closed in poll_cb and WriteStringToFd_ (lines 165–170, 320–333).
  • No explicit Cancel/WritesDone/Finish calls are present on the failure path; confirm the gRPC stream/context and any CompletionQueue-related events/native sockets are deterministically cleaned up (or add explicit cancellation/Finish) to prevent resource/socket leaks on reconnect.
protos/Crane.proto (6)

599-599: LGTM!

The addition of TASK_META_REQUEST = 4 enum value is correctly positioned and follows the existing pattern.


624-628: LGTM!

The TaskMetaReq message structure is well-designed with appropriate field types for tracking task metadata requests.


635-635: LGTM!

The payload field addition follows the existing oneof pattern correctly.


647-647: LGTM!

The TASK_META_REPLY enum value is correctly positioned in the reply types.


682-687: LGTM!

The TaskMetaReply message structure properly includes all necessary fields for task metadata responses.


698-698: LGTM!

The payload field addition correctly extends the oneof union with the new reply type.

src/CraneCtld/RpcService/CtldGrpcServer.h (2)

153-167: LGTM!

The WriteTaskMetaReply method implementation is thread-safe and follows the established pattern of other write methods in the class.


434-436: LGTM!

The stream proxy map and its mutex are properly declared with appropriate thread-safety annotations.

src/CraneCtld/RpcService/CtldGrpcServer.cpp (4)

188-189: LGTM!

The addition of kWaitReConnect state enables proper reconnection handling for cfored clients.


388-393: LGTM!

The kWaitReConnect state handler properly invalidates the writer and logs the disconnection event before returning gracefully.


410-412: LGTM!

The cleanup properly removes the proxy from the map when a cfored client disconnects permanently.


257-268: Weak_ptr locking verified for all task callbacks. cb_task_res_allocated, cb_task_cancel and cb_task_completed each lock proxy_weak_ptr before using proxy; other proxy-> calls occur where a shared_ptr is held.

@github-actions github-actions bot added the test-passed Build and test success label Sep 16, 2025
@huerni huerni marked this pull request as draft September 17, 2025 01:27
@huerni huerni marked this pull request as ready for review September 23, 2025 07:26
@github-actions github-actions bot added test-passed Build and test success and removed test-passed Build and test success labels Sep 23, 2025
Copy link
Collaborator

@L-Xiafeng L-Xiafeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果一直连不上,这个任务就卡在重连了吧?

ranges::for_each(filtered_rng, append_fn);
}

bool TaskScheduler::QueryTaskUseId(task_id_t task_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryTaskRegex

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryTaskNodeRegex

[this](const uvw::async_event&, uvw::async_handle&) {
while (m_wait_reconn_ && !m_stopped_) {
InitChannelAndStub(m_cfored_name_);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里有个问题,对于多个task的任务如果第一个register上了,断开链接了,就会卡住第二个的register

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多个task的任务确实没考虑 我rebase后整体看看

void CforedClient::TaskOutPutForward(const std::string& msg) {
CRANE_TRACE("Receive TaskOutputForward len: {}.", msg.size());

if (msg.size() > kMaxOutputQueueBytes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里截断下?永远取最新的输出吧

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

test-passed Build and test success

Projects

None yet

Development

Successfully merging this pull request may close these issues.

crun需要能在转发链路断掉的时候独立运行

3 participants