Skip to content

Conversation

@L-Xiafeng
Copy link
Collaborator

@L-Xiafeng L-Xiafeng commented Nov 27, 2025

Base on #368

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced X11 session management with improved forwarding and connection state tracking
    • Added task exit status reporting for better job completion visibility
    • Added SSH process migration to cgroup support
  • Improvements

    • Improved error handling during task cancellation and connection failures
    • Better resource management for I/O operations
  • API Changes

    • TaskStatus enum value renamed from Configured to Starting
    • New error message for step resource limit exceeded scenarios

✏️ Tip: You can customize this high-level summary in your review settings.

@L-Xiafeng L-Xiafeng added the enhancement New feature or request label Nov 27, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 27, 2025

Walkthrough

This PR refactors task IO handling and X11 forwarding by introducing centralized request-to-reply translation, migrating to Step-scoped logging, adding a new X11 session manager component, and extending protobuf definitions to support exit status reporting and X11 connection state propagation across supervisor and crun communication.

Changes

Cohort / File(s) Summary
Centralized request handling
internal/cfored/crun_server.go
Added HandleSupervisorRequest() function to consolidate translation of StreamTaskIORequest types to StreamCrunReply messages; refactored CrunStream state machine to use this handler instead of inline forwarding, including updates to X11 and IO cancellation flows.
Supervisor forwarding and logging
internal/cfored/server.go
Added forwardCrunRequestToSingleSupervisor() method to route crun requests to a specified supervisor; updated logging throughout to use Step-scoped identifiers instead of Job-scoped; extended X11 and exit-status handling with enhanced error paths and message content.
X11 session management
internal/crun/x11.go
New file implementing complete X11 session lifecycle management via X11SessionMgr, X11Session, and state machine (ConnectingLocal, Forwarding, Ended); includes data bridging between local socket and supervisor with EOF propagation and backpressure handling.
Task IO and context refactoring
internal/crun/crun.go
Replaced multiple cancellation/readiness contexts with unified stopStepCtx, stopReadCtx, stopWriteCtx; integrated X11SessionMgr for X11 traffic routing; reworked IO forwarding, TASK_EXIT_STATUS reporting, and WaitAck/Forwarding loops with channel draining on stop events.
Command-line flag parsing
internal/crun/cmd.go
Added RootCmd.Flags().SetInterspersed(false) to disable interspersed flag parsing.
Protocol extensions
protos/Crane.proto
Extended StreamCrunRequest and StreamCrunReply with new X11-related and exit-status message types; added TaskX11ConnReply, TaskX11ForwardReply, TaskX11EofReply, TaskExitStatusReply messages; added corresponding enum values (TASK_EXIT_STATUS, TASK_X11_CONN, TASK_X11_EOF) and payloads; deprecated execution_node field in QuerySshStepEnvVariablesForwardRequest.
Task status and error handling
protos/PublicDefs.proto, internal/util/err.go
Renamed TaskStatus enum value from Configured = 8 to Starting = 8; added error mapping for ERR_STEP_RES_BEYOND.
Supervisor RPC extension
protos/Supervisor.proto
Added MigrateSshProcToCgroup RPC with MigrateSshProcToCgroupRequest and MigrateSshProcToCgroupReply messages.

Sequence Diagram

sequenceDiagram
    participant Client as Task/Client
    participant Crun
    participant X11Mgr as X11SessionMgr
    participant LocalX11 as Local X11<br/>Socket
    participant Supervisor

    rect rgb(200, 220, 255)
    Note over Crun,X11Mgr: X11 Session Lifecycle
    
    Crun->>X11Mgr: NewSession(X11GlobalId)<br/>Status: ConnectingLocal
    X11Mgr->>X11Mgr: Create session object<br/>with channels
    
    rect rgb(220, 240, 220)
    Note over X11Mgr,LocalX11: StatusConnectingLocal Phase
    X11Mgr->>LocalX11: Connect (Unix socket or TCP)
    alt Connection Success
        LocalX11-->>X11Mgr: Connected
        X11Mgr->>X11Mgr: Transition to Forwarding
        X11Mgr->>Supervisor: TASK_X11_CONN reply
    else Connection Failure
        LocalX11-->>X11Mgr: Error
        X11Mgr->>Supervisor: TASK_X11_EOF (EOF signal)
        X11Mgr->>X11Mgr: Mark session Ended
    end
    end
    
    rect rgb(255, 240, 200)
    Note over X11Mgr,Supervisor: StatusForwarding Phase
    par Reader: Local→Supervisor
        X11Mgr->>LocalX11: Read data
        LocalX11-->>X11Mgr: X11 data bytes
        X11Mgr->>Supervisor: TASK_X11_FORWARD<br/>(StreamCrunRequest)
    and Writer: Supervisor→Local
        Supervisor-->>X11Mgr: X11 data reply<br/>(StreamCrunReply)
        X11Mgr->>LocalX11: Write to socket
    end
    
    alt EOF from Local or Supervisor
        X11Mgr->>Supervisor: SendEofToSupervisor<br/>(TASK_X11_EOF)
        X11Mgr->>X11Mgr: Transition to Ended
    end
    end
    
    rect rgb(240, 220, 220)
    Note over X11Mgr: Cleanup & Removal
    X11Mgr->>X11Mgr: Remove session from<br/>x11Sessions map
    end
    end

    rect rgb(200, 200, 220)
    Note over Crun,Supervisor: Task IO Request→Reply Handling
    
    Client->>Crun: TASK_OUTPUT or<br/>TASK_EXIT_STATUS
    Crun->>Crun: HandleSupervisorRequest()<br/>Translate to StreamCrunReply
    Crun->>Supervisor: Forward reply<br/>(with logging)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Areas requiring extra attention:

  • X11SessionMgr thread-safety: Verify that the session map access with mutex is correct and handles concurrent creates/deletes during session lifecycle transitions, especially during the Ended state cleanup.
  • Context and channel draining: Confirm that stopStepCtx, stopReadCtx, stopWriteCtx in crun.go are properly initialized, passed to X11SessionMgr, and that channel draining loops don't deadlock or leak goroutines.
  • EOF propagation semantics: Validate that SendEofToSupervisor guarantees a single EOF per session and that EOF signals in both directions (reader and writer goroutines) correctly terminate the session.
  • HandleSupervisorRequest centralization: Ensure all request types are covered, reply construction is correct, and error paths (fatal case) don't mask legitimate issues.
  • Protocol backwards compatibility: Review whether the deprecated execution_node field and new enum/message additions maintain compatibility with existing deployments; validate that new protobuf enums don't break existing code paths.

Possibly related PRs

Suggested reviewers

  • RileyWen
  • NamelessOIer

🐰 Behold, a session springs to life,
Through X11 channels, free of strife,
Steps now lead the way ahead,
Requests translate, replies spread!
EOF signals seal the dance—
Our IO forwarding takes its chance. 🚀

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: X11 multi connection and multi task for step' directly describes the main changes: introducing X11 multi-connection and multi-task support within step scope, which aligns with the substantial refactoring of X11 handling throughout the codebase.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dev/x11_task

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (8)
internal/crun/cmd.go (1)

84-84: Based on my verification, I can now provide the rewritten review comment:


Add explanatory comment for SetInterspersed(false) to match the pattern in ccon.

The SetInterspersed(false) configuration is correct for a wrapper command—it prevents crun flags from being parsed after the executable name, avoiding confusion between crun flags and the wrapped executable's arguments (e.g., crun executable -c 'command' where -c belongs to the wrapped command, not crun).

However, internal/ccon/cmd.go:188 includes a helpful explanatory comment for this same pattern that crun lacks. Consider adding a similar comment:

// Disable interspersed flags to avoid confusion with executable and its arguments
// e.g., crun myprogram -c 'echo hello'
// Here -c is part of myprogram's arguments, not a crun flag
RootCmd.Flags().SetInterspersed(false)

This improves code clarity for future maintainers.

internal/cfored/crun_server.go (2)

591-599: Unused error variable from HandleSupervisorRequest.

On line 592, the error from HandleSupervisorRequest is assigned to err but never checked before reusing err on line 594. During cancellation, errors are being ignored intentionally per the comment, but the variable shadowing could be confusing.

 				// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
-				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				_ = HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
 
-				if err = toCrunStream.Send(reply); err != nil {
+				if err := toCrunStream.Send(reply); err != nil {

661-669: Same pattern: unused error during waitingAck.

Similar to the cancellation loop, the error from HandleSupervisorRequest on line 662 is ignored but the variable is reused.

 				// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
-				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				_ = HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
 
-				if err = toCrunStream.Send(reply); err != nil {
+				if err := toCrunStream.Send(reply); err != nil {
internal/crun/crun.go (1)

650-652: Minor: Unnecessary braces around break statement.

The braces around break CrunStateMachineLoop are unnecessary and inconsistent with other case blocks.

 		case End:
-			{
-				break CrunStateMachineLoop
-			}
+			break CrunStateMachineLoop
internal/crun/x11.go (4)

56-67: Consider using context.Context directly instead of pointer.

Storing finishCtx *context.Context as a pointer is unusual. In Go, context.Context is typically passed by value. The pointer adds an extra dereference (line 296: *sm.finishCtx) and could introduce nil pointer issues.

 type X11SessionMgr struct {
 	sessionMutex sync.Mutex
 	x11Sessions  map[X11GlobalId]*X11Session
 	X11ReplyChan chan *protos.StreamCrunReply
 	X11RequestChan chan *protos.StreamCrunRequest
-	finishCtx      *context.Context
+	finishCtx      context.Context

 	port   uint32
 	target string
 }

And update the constructor and usage accordingly.


112-135: Connection attempt has no timeout.

net.Dial for both unix and tcp sockets has no timeout configured. If the X11 server is unresponsive, this could hang indefinitely.

Consider using net.DialTimeout or a context-based dialer:

-		session.conn, err = net.Dial("unix", session.sessionMgr.target)
+		session.conn, err = net.DialTimeout("unix", session.sessionMgr.target, 10*time.Second)

And similarly for TCP connections.


137-212: Goroutine may leak if reader goroutine exits before writer.

The writer goroutine (lines 183-208) waits on session.X11ToLocal channel. If the reader exits first and the channel is not closed, the writer may block forever. The current design relies on conn.Close() triggering a read error to exit the reader, which then allows wg.Wait() to complete. However, if the reader exits due to an error other than connection close, the writer may still be blocked.

Consider ensuring the channel is closed when the reader exits abnormally:

 		log.Tracef("[X11 %s:%d] X11 session reader ended.", session.X11Id.CranedId, session.X11Id.LocalId)
+		session.StopLocalReadWrite() // Signal writer to exit
 		wg.Done()

239-306: Potential race: goroutine started while holding lock.

At lines 255-258, a goroutine is started (go session.SessionRoutine()) while holding sessionMutex. While not strictly incorrect, starting goroutines while holding locks is generally discouraged as it can lead to subtle issues if the goroutine tries to acquire the same lock.

Consider starting the goroutine outside the critical section:

 			session := sm.NewSession(globalId)
 			sm.sessionMutex.Lock()
-			go session.SessionRoutine()
 			sm.x11Sessions[globalId] = session
 			sm.sessionMutex.Unlock()
+			go session.SessionRoutine()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5788342 and c9fd6bc.

📒 Files selected for processing (9)
  • Makefile (1 hunks)
  • internal/cfored/crun_server.go (6 hunks)
  • internal/cfored/server.go (7 hunks)
  • internal/crun/cmd.go (1 hunks)
  • internal/crun/crun.go (14 hunks)
  • internal/crun/x11.go (1 hunks)
  • protos/Crane.proto (10 hunks)
  • protos/PublicDefs.proto (2 hunks)
  • protos/Supervisor.proto (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/crun/crun.go (1)
internal/crun/x11.go (2)
  • X11SessionMgr (56-67)
  • NewX11SessionMgr (214-224)
🪛 checkmake (0.2.2)
Makefile

[warning] 106-106: Target body for "build" exceeds allowed length of 5 (12).

(maxbodylength)

🔇 Additional comments (26)
protos/PublicDefs.proto (1)

118-118: Enum rename preserves wire compatibility.

Renaming Configured to Starting while keeping the numeric value 8 maintains backward compatibility with existing serialized messages. The new name better reflects the task lifecycle semantics.

Makefile (1)

106-118: Build dependency ordering is correct.

The build: protos prerequisite ensures protobuf files are generated before compilation, which is essential given the new proto definitions in this PR.

Regarding the static analysis hint about target body length (12 lines vs recommended 5): this is a minor style concern. If desired, the build logic could be extracted to a separate shell script, but the current implementation is acceptable.

protos/Supervisor.proto (2)

122-128: New supervisor messages for SSH process cgroup migration.

The messages are well-structured for the cgroup migration functionality. Note that crane.grpc.MigrateSshProcToCgroupRequest (in Crane.proto) includes an additional task_id field, while this supervisor version only requires pid. This distinction is appropriate since the Supervisor context already has step information.


140-140: New RPC integrates cleanly with existing Supervisor service.

protos/Crane.proto (6)

35-35: Timestamp addition enables accurate event ordering.

Adding timestamp to StepStatusChangeRequest allows proper chronological tracking of step status transitions across distributed nodes.


474-474: Good deprecation practice.

Marking execution_node as deprecated rather than removing it maintains backward compatibility while signaling the intended migration path.


750-755: X11 forwarding request extended for multi-connection support.

The addition of local_id and craned_id enables routing X11 messages to specific sessions across multiple connections, which is essential for the multi-connection feature.


777-779: StreamCrunReply extensions for X11 and exit status.

The new enum values and message types properly extend the reply types to support:

  • Per-session X11 connection tracking (TaskX11ConnReply, TaskX11EofReply)
  • Explicit exit status reporting (TaskExitStatusReply)

Field numbering is sequential and consistent.

Also applies to: 808-828, 840-842


852-854: StreamTaskIORequest extensions align with reply counterparts.

The new supervisor request types (TASK_EXIT_STATUS, TASK_X11_CONN, TASK_X11_EOF) and their corresponding message structures mirror the reply types, providing consistent bidirectional communication for X11 sessions.

Also applies to: 869-900


927-928: TaskX11InputReq extended for session tracking.

Adding eof and local_id to TaskX11InputReq allows proper routing of X11 input to specific sessions and signaling connection termination.

internal/cfored/crun_server.go (5)

47-126: Centralized supervisor request handling improves maintainability.

The HandleSupervisorRequest function consolidates the translation of StreamTaskIORequest types to StreamCrunReply, reducing code duplication across the state machine.


506-511: X11 forward routing to single supervisor.

Using forwardCrunRequestToSingleSupervisor with craned_id correctly routes X11 messages to the specific craned handling that session, rather than broadcasting.


545-555: Unified IO handling via HandleSupervisorRequest.

Delegating to HandleSupervisorRequest simplifies the forwarding loop and ensures consistent behavior across all IO message types.


578-622: Cancellation loop prevents channel blocking.

Processing TaskIoRequestChannel during cancellation prevents the supervisor from blocking when the IO channel fills up. The select-based loop properly handles both IO forwarding and completion requests.


644-706: Refactored CrunWaitCtldAck handles IO during completion.

The new waitingAck loop processes IO messages while waiting for the completion acknowledgment, preventing deadlocks when the supervisor sends data before receiving the ack.

internal/cfored/server.go (6)

101-101: LGTM!

Log message updated to use "Step" terminology for consistency with step-level identifiers.


118-118: LGTM!

Consistent terminology update in the debug log.


133-134: LGTM!

Warning message now includes Step context for better debugging.


145-159: LGTM!

Log messages updated to reflect Step-based context consistently.


338-348: LGTM!

X11 message type handling added with appropriate fallthrough for TASK_X11_CONN, TASK_X11_OUTPUT, and TASK_X11_EOF. The messages are correctly forwarded to the crun via forwardRemoteIoToCrun.


394-420: LGTM!

Consolidated error handling for toSupervisorStream.Send is cleaner. The reply send logic is now unified at lines 416-420 with proper error handling and state transition.

internal/crun/crun.go (2)

101-108: LGTM!

The refactored context and channel structure is cleaner. Using separate stopReadCtx and stopWriteCtx derived from stopStepCtx provides better control over the read/write lifecycle.


717-720: Draining closed channel may block if channel is not closed.

The drain loop for range m.chanOutputFromRemote expects the channel to be closed. If the channel is never closed by the sender, this will block indefinitely. Ensure that chanOutputFromRemote is properly closed when the step ends.

Verify that chanOutputFromRemote is closed when the step finishes. Otherwise, consider using a non-blocking drain pattern or closing the channel in stopStepCb.

internal/crun/x11.go (3)

32-55: LGTM!

Well-structured type definitions for X11 session management. Using sync.Once for eofSent and stopReadWrite ensures one-time signaling is thread-safe.


69-87: LGTM!

Clean state machine implementation for X11 session lifecycle. Properly removes session from manager when ended.


296-304: Sessions may not be fully cleaned up on finish signal.

When the finish signal is received, StopLocalReadWrite() is called for each session, but this only signals the writer to stop. Sessions in X11ConnectingLocal state (blocked on net.Dial) won't be interrupted and may leak.

Consider whether connections in progress should be forcibly closed or use a dial timeout as suggested earlier.

},
},
}
log.Tracef("[Supervisor->Cfored->Crun][Step #%d.%d] fowarding msg size[%d]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo: "fowarding" should be "forwarding".

Multiple log messages contain the typo "fowarding" instead of "forwarding".

-		log.Tracef("[Supervisor->Cfored->Crun][Step #%d.%d] fowarding msg size[%d]",
+		log.Tracef("[Supervisor->Cfored->Crun][Step #%d.%d] forwarding msg size[%d]",

Similarly update lines 74, 89, 103, and 118.

Also applies to: 74-74, 89-89, 103-103, 118-118

🤖 Prompt for AI Agents
In internal/cfored/crun_server.go around lines 58, 74, 89, 103 and 118, several
log messages contain the typo "fowarding" — update each occurrence to the
correct spelling "forwarding" so all log statements read "forwarding" instead;
preserve the surrounding formatting, punctuation and log arguments exactly, only
change the misspelled word.

Comment on lines +120 to +124
default:
log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
jobId, stepId)
return errors.New("Unexpected Task IO Message Type")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

log.Fatalf will terminate the process before the error is returned.

log.Fatalf calls os.Exit(1) after logging, so the return errors.New(...) on line 123 is unreachable. If this is an unexpected message type that should crash the server, keep log.Fatalf. If the error should be propagated to the caller instead, use log.Errorf.

If propagation is intended:

 	default:
-		log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
+		log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
 			jobId, stepId)
 		return errors.New("Unexpected Task IO Message Type")
🤖 Prompt for AI Agents
In internal/cfored/crun_server.go around lines 120 to 124, the default branch
uses log.Fatalf which exits the process so the subsequent return is never
reached; if you intend to propagate the error instead of terminating, replace
log.Fatalf with log.Errorf (preserving the formatted message and jobId/stepId
args) and then return the errors.New("Unexpected Task IO Message Type") as
written so the caller receives the error; if termination is intended, remove the
return and keep log.Fatalf.

Comment on lines +165 to +189
func (keeper *SupervisorChannelKeeper) forwardCrunRequestToSingleSupervisor(taskId uint32, stepId uint32,
cranedId string, request *protos.StreamCrunRequest) {
stepIdentity := StepIdentifier{JobId: taskId, StepId: stepId}
keeper.toSupervisorChannelMtx.Lock()
defer keeper.toSupervisorChannelMtx.Unlock()
stepChannels, exist := keeper.toSupervisorChannels[stepIdentity]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist step.", taskId, stepId)
return
}
supervisorChannel, exist := stepChannels[cranedId]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
}

select {
case supervisorChannel.requestChannel <- request:
default:
if len(supervisorChannel.requestChannel) == cap(supervisorChannel.requestChannel) {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s is full", taskId, stepId, cranedId)
} else {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s write failed", taskId, stepId, cranedId)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Potential nil pointer dereference after failed lookup.

When stepChannels[cranedId] doesn't exist (line 176-178), the code logs an error but does not return. Execution continues to lines 180-188 where supervisorChannel is used, causing a nil pointer panic.

Apply this diff to fix the missing return:

 	supervisorChannel, exist := stepChannels[cranedId]
 	if !exist {
 		log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
+		return
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (keeper *SupervisorChannelKeeper) forwardCrunRequestToSingleSupervisor(taskId uint32, stepId uint32,
cranedId string, request *protos.StreamCrunRequest) {
stepIdentity := StepIdentifier{JobId: taskId, StepId: stepId}
keeper.toSupervisorChannelMtx.Lock()
defer keeper.toSupervisorChannelMtx.Unlock()
stepChannels, exist := keeper.toSupervisorChannels[stepIdentity]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist step.", taskId, stepId)
return
}
supervisorChannel, exist := stepChannels[cranedId]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
}
select {
case supervisorChannel.requestChannel <- request:
default:
if len(supervisorChannel.requestChannel) == cap(supervisorChannel.requestChannel) {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s is full", taskId, stepId, cranedId)
} else {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s write failed", taskId, stepId, cranedId)
}
}
}
func (keeper *SupervisorChannelKeeper) forwardCrunRequestToSingleSupervisor(taskId uint32, stepId uint32,
cranedId string, request *protos.StreamCrunRequest) {
stepIdentity := StepIdentifier{JobId: taskId, StepId: stepId}
keeper.toSupervisorChannelMtx.Lock()
defer keeper.toSupervisorChannelMtx.Unlock()
stepChannels, exist := keeper.toSupervisorChannels[stepIdentity]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist step.", taskId, stepId)
return
}
supervisorChannel, exist := stepChannels[cranedId]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
return
}
select {
case supervisorChannel.requestChannel <- request:
default:
if len(supervisorChannel.requestChannel) == cap(supervisorChannel.requestChannel) {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s is full", taskId, stepId, cranedId)
} else {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s write failed", taskId, stepId, cranedId)
}
}
}
🤖 Prompt for AI Agents
internal/cfored/server.go around lines 165 to 189: after the lookup of
supervisorChannel (stepChannels[cranedId]) the code logs an error when the
cranedId is missing but continues execution and dereferences supervisorChannel;
add an immediate return right after the log error (the branch where !exist) so
the function exits without using the nil supervisorChannel, preserving the
lock-unlock behavior via defer and avoiding a nil pointer panic.

Comment on lines +402 to +430
var x11ReqFromLocal chan *protos.StreamCrunRequest
if m.X11SessionMgr != nil {
x11ReqFromLocal = m.X11SessionMgr.X11RequestChan
} else {
x11ReqFromLocal = nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Potential nil pointer dereference when receiving from nil channel.

When m.X11SessionMgr is nil, x11ReqFromLocal is set to nil. In the select statement at line 430, receiving from a nil channel blocks forever, but this is safe in a select with other cases. However, the draining loop at line 443 (for range x11ReqFromLocal) will panic on a nil channel.

Apply this diff to guard the drain loop:

-			for range x11ReqFromLocal {
-				log.Tracef("Drained 1 msg from x11ReqFromLocal after stopReadCtx done")
+			if x11ReqFromLocal != nil {
+				for range x11ReqFromLocal {
+					log.Tracef("Drained 1 msg from x11ReqFromLocal after stopReadCtx done")
+				}
 			}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/crun/crun.go around lines 402-407 (and the drain loop at ~443),
x11ReqFromLocal is set to nil when m.X11SessionMgr is nil which causes a panic
when doing for range x11ReqFromLocal; guard the drain loop by checking the
channel is non-nil before iterating: if x11ReqFromLocal != nil { for range
x11ReqFromLocal { ... } }; leave the existing select usage unchanged.

Comment on lines +492 to +532
case protos.StreamCrunReply_TASK_X11_CONN:
fallthrough
case protos.StreamCrunReply_TASK_X11_FORWARD:
m.chanX11OutputFromRemote <- cforedReply.GetPayloadTaskX11ForwardReply().Msg
fallthrough
case protos.StreamCrunReply_TASK_X11_EOF:
m.X11SessionMgr.X11ReplyChan <- cforedReply

case protos.StreamCrunReply_TASK_EXIT_STATUS:
exitStatus := cforedReply.GetPayloadTaskExitStatusReply()
if exitStatus.ExitCode != 0 {
if exitStatus.Signaled {
fmt.Fprintf(os.Stderr, "error: task %d: Terminated\n", exitStatus.TaskId)
} else {
fmt.Fprintf(os.Stderr, "error: task %d: Exited with exit code %d\n",
exitStatus.TaskId, exitStatus.ExitCode)
}
m.err = int(exitStatus.ExitCode)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential nil pointer dereference when X11SessionMgr is nil.

When X11 forwarding is disabled, m.X11SessionMgr is nil. If a TASK_X11_CONN, TASK_X11_FORWARD, or TASK_X11_EOF message is received (lines 492-497), the code will panic when accessing m.X11SessionMgr.X11ReplyChan.

Add a nil check before forwarding X11 messages:

 			case protos.StreamCrunReply_TASK_X11_CONN:
 				fallthrough
 			case protos.StreamCrunReply_TASK_X11_FORWARD:
 				fallthrough
 			case protos.StreamCrunReply_TASK_X11_EOF:
-				m.X11SessionMgr.X11ReplyChan <- cforedReply
+				if m.X11SessionMgr != nil {
+					m.X11SessionMgr.X11ReplyChan <- cforedReply
+				} else {
+					log.Warnf("Received X11 message but X11SessionMgr is nil")
+				}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
internal/crun/crun.go around lines 492 to 509: the code sends X11-related
replies to m.X11SessionMgr.X11ReplyChan without checking if m.X11SessionMgr is
nil, which panics when X11 forwarding is disabled; add a nil check before
sending (e.g., if m.X11SessionMgr != nil { m.X11SessionMgr.X11ReplyChan <-
cforedReply } else { optionally ignore or log/debug that an X11 reply was
dropped }) so X11 messages are safely skipped when X11SessionMgr is nil and do
not cause a nil pointer dereference.

Comment on lines +895 to 929
m.chanInputFromLocal <- nil
m.stopStepCb()
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Channel not closed after file read error.

When os.Open fails, the code sends nil to chanInputFromLocal but doesn't close the channel. In StdinReaderRoutine, the channel is closed with defer close(m.chanInputFromLocal) (line 750), but FileReaderRoutine doesn't close the channel, leading to inconsistent behavior and potential blocking in consumers.

Consider adding close(m.chanInputFromLocal) after sending nil, or unifying cleanup logic:

 	if err != nil {
 		log.Errorf("Failed to open file %s: %s", parsedFilePath, err)
 		m.chanInputFromLocal <- nil
+		close(m.chanInputFromLocal)
 		m.stopStepCb()
 		return
 	}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/crun/crun.go around lines 895-897, the FileReaderRoutine sends nil
to m.chanInputFromLocal on os.Open failure but does not close the channel,
causing inconsistent lifecycle and potential blocked receivers; modify the
cleanup so the channel is closed after sending nil (or centralize channel
closing using a single owner or sync.Once to avoid double-close), then call
m.stopStepCb() and return — ensure only one goroutine closes
m.chanInputFromLocal to prevent race/close-panics.

Comment on lines +930 to +964
m.stopStepCtx, m.stopStepCb = context.WithCancel(context.Background())
m.stopReadCtx = context.WithoutCancel(m.stopStepCtx)
m.stopWriteCtx = context.WithoutCancel(m.stopStepCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

Go context.WithoutCancel behavior documentation semantics cancellation

💡 Result:

Short answer: context.WithoutCancel(parent) returns a new Context that copies parent's values but does not inherit cancellation or deadline information. The returned context's Done channel is nil (so it never signals cancel), Deadline/Err/Cause are not populated, and parent cancellation does not affect it — only the copied Values remain available. Use WithoutCancel when you want a context that carries values from a parent but is detached from that parent's cancellation/deadline. [1][2]

Sources:
[1] context package docs (WithoutCancel behavior).
[2] golang/go issue discussing the addition and semantics of WithoutCancel.


🏁 Script executed:

#!/bin/bash
# Search for usage of stopReadCtx, stopWriteCtx, and stopStepCb
cd /tmp/repo_clone 2>/dev/null || cd . 

echo "=== Searching for stopReadCtx usage ==="
rg -n "stopReadCtx" --type go

echo -e "\n=== Searching for stopWriteCtx usage ==="
rg -n "stopWriteCtx" --type go

echo -e "\n=== Searching for stopStepCb usage ==="
rg -n "stopStepCb" --type go

echo -e "\n=== Searching for stopStepCtx usage ==="
rg -n "stopStepCtx" --type go

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 1830


🏁 Script executed:

#!/bin/bash
# Look at the select statements that use these contexts
echo "=== Context around line 439 (stopReadCtx.Done()) ==="
sed -n '430,450p' internal/crun/crun.go

echo -e "\n=== Context around line 769 (stopWriteCtx.Done()) ==="
sed -n '760,780p' internal/crun/crun.go

echo -e "\n=== Context around line 911 (stopReadCtx.Done()) ==="
sed -n '902,920p' internal/crun/crun.go

echo -e "\n=== Context around line 453 (stopStepCtx.Done()) ==="
sed -n '444,465p' internal/crun/crun.go

echo -e "\n=== Function definition containing lines 930-932 ==="
sed -n '900,940p' internal/crun/crun.go

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 3557


🏁 Script executed:

#!/bin/bash
# Check line 717 context
echo "=== Context around line 717 ==="
sed -n '708,735p' internal/crun/crun.go

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 707


Use context.WithCancel(m.stopStepCtx) instead of context.WithoutCancel(m.stopStepCtx) for stopReadCtx and stopWriteCtx.

The code attempts to cancel stopReadCtx and stopWriteCtx by checking <-m.stopReadCtx.Done() and <-m.stopWriteCtx.Done() in select statements (lines 439, 717, 769, 911), but since these contexts are created with context.WithoutCancel, their Done() channels are nil. This means these select cases will never fire even when stopStepCb() is called, causing the I/O forwarding goroutines to hang indefinitely rather than gracefully shut down.

Change lines 931-932 to:

m.stopReadCtx, _ = context.WithCancel(m.stopStepCtx)
m.stopWriteCtx, _ = context.WithCancel(m.stopStepCtx)

This ensures cancellation propagates from stopStepCtx to both contexts as the code's select statements expect.

🤖 Prompt for AI Agents
In internal/crun/crun.go around lines 930 to 932, the stopReadCtx and
stopWriteCtx are created with context.WithoutCancel causing their Done()
channels to be nil so stop signals never propagate; replace those two contexts
so they are derived with context.WithCancel from m.stopStepCtx (so cancellation
of m.stopStepCtx cancels both stopReadCtx and stopWriteCtx) and discard the
returned cancel functions as appropriate.

Comment on lines +89 to +104
func (session *X11Session) SendEofToSupervisor(data []byte) {
session.eofSent.Do(func() {
log.Debugf("[X11 %s:%d] Sending EOF to supervisor.", session.X11Id.CranedId, session.X11Id.LocalId)
req := &protos.StreamCrunRequest{
Type: protos.StreamCrunRequest_TASK_X11_FORWARD,
Payload: &protos.StreamCrunRequest_PayloadTaskX11ForwardReq{
PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
Msg: data,
CranedId: session.X11Id.CranedId,
LocalId: session.X11Id.LocalId,
},
},
}
session.X11ToSupervisor <- req
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing EOF flag in the request.

SendEofToSupervisor creates a TASK_X11_FORWARD request but doesn't set the Eof field to indicate this is an EOF message. This may cause the receiver to treat it as regular data.

Based on the protobuf definition used elsewhere (e.g., server.go line 405-406), add the Eof field:

 			PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
 				Msg:      data,
 				CranedId: session.X11Id.CranedId,
 				LocalId:  session.X11Id.LocalId,
+				Eof:      true,
 			},
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (session *X11Session) SendEofToSupervisor(data []byte) {
session.eofSent.Do(func() {
log.Debugf("[X11 %s:%d] Sending EOF to supervisor.", session.X11Id.CranedId, session.X11Id.LocalId)
req := &protos.StreamCrunRequest{
Type: protos.StreamCrunRequest_TASK_X11_FORWARD,
Payload: &protos.StreamCrunRequest_PayloadTaskX11ForwardReq{
PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
Msg: data,
CranedId: session.X11Id.CranedId,
LocalId: session.X11Id.LocalId,
},
},
}
session.X11ToSupervisor <- req
})
}
func (session *X11Session) SendEofToSupervisor(data []byte) {
session.eofSent.Do(func() {
log.Debugf("[X11 %s:%d] Sending EOF to supervisor.", session.X11Id.CranedId, session.X11Id.LocalId)
req := &protos.StreamCrunRequest{
Type: protos.StreamCrunRequest_TASK_X11_FORWARD,
Payload: &protos.StreamCrunRequest_PayloadTaskX11ForwardReq{
PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
Msg: data,
CranedId: session.X11Id.CranedId,
LocalId: session.X11Id.LocalId,
Eof: true,
},
},
}
session.X11ToSupervisor <- req
})
}
🤖 Prompt for AI Agents
In internal/crun/x11.go around lines 89 to 104, the SendEofToSupervisor builds a
TASK_X11_FORWARD request but doesn't mark it as EOF; set the request's
PayloadTaskX11ForwardReq.Eof = true when constructing the payload so the
receiver treats this as an EOF message (i.e. add the Eof: true field to the
&protos.StreamCrunRequest_TaskX11ForwardReq literal alongside Msg, CranedId, and
LocalId).

Comment on lines +170 to +176
select {

case session.X11ToSupervisor <- req:
log.Tracef("[X11 %s:%d] Sent data to supervisor.", session.X11Id.CranedId, session.X11Id.LocalId)
default:
log.Errorf("[X11 %s:%d] X11 to supervisor channel full, dropping data.", session.X11Id.CranedId, session.X11Id.LocalId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Data dropped silently on channel full.

When X11ToSupervisor channel is full, data is dropped with only an error log. This could cause silent data loss and corruption of the X11 stream.

Consider either:

  1. Using a blocking send (remove default case) if backpressure is acceptable
  2. Closing the session on channel full to signal the error upstream
🤖 Prompt for AI Agents
In internal/crun/x11.go around lines 170 to 176, currently the select uses a
non-blocking send with a default that drops X11 data when
session.X11ToSupervisor is full; change this so data isn't silently lost: either
remove the default to perform a blocking send (so the goroutine backpressures
until the receiver can accept the packet) or, if backpressure is unacceptable,
detect the full channel and close the session to surface the error upstream —
implement one of these two behaviors, update the log to reflect the chosen
action, and ensure any necessary cleanup (closing channels, notifying
goroutines, or setting session state) is performed when you close the session.

@L-Xiafeng L-Xiafeng changed the title feat: X11 multi connection feat: X11 multi connection and multi task for step Dec 5, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (9)
internal/crun/crun.go (4)

401-447: Guard draining of X11 requests when X11SessionMgr is nil.

In the input‑forwarding goroutine:

  • x11ReqFromLocal is set to nil when m.X11SessionMgr is nil.
  • In the stopReadCtx.Done() case you for range x11ReqFromLocal, which blocks forever on a nil channel.

Wrap the drain loop in a nil‑check:

 		case <-m.stopReadCtx.Done():
 			for range m.chanInputFromLocal {
 				log.Tracef("Drained 1 msg from chanInputFromLocal after stopReadCtx done")
 			}
-			for range x11ReqFromLocal {
-				log.Tracef("Drained 1 msg from x11ReqFromLocal after stopReadCtx done")
-			}
+			if x11ReqFromLocal != nil {
+				for range x11ReqFromLocal {
+					log.Tracef("Drained 1 msg from x11ReqFromLocal after stopReadCtx done")
+				}
+			}
 			return

This matches the intended behavior and avoids a permanently blocked goroutine when X11 is disabled.


488-510: Protect X11SessionMgr usage against nil and simplify WaitAck handling.

Two related issues:

  1. Nil X11SessionMgr dereference

In both Forwarding and WaitAck states, X11 replies are sent to m.X11SessionMgr.X11ReplyChan without checking if X11 is enabled:

case protos.StreamCrunReply_TASK_X11_CONN, ...:
    m.X11SessionMgr.X11ReplyChan <- cforedReply

If a misconfigured server sends an X11 message when X11 was not requested, this will panic. Add a nil check and optionally log a warning:

-	case protos.StreamCrunReply_TASK_X11_CONN:
-		fallthrough
-	case protos.StreamCrunReply_TASK_X11_FORWARD:
-		fallthrough
-	case protos.StreamCrunReply_TASK_X11_EOF:
-		m.X11SessionMgr.X11ReplyChan <- cforedReply
+	case protos.StreamCrunReply_TASK_X11_CONN,
+		protos.StreamCrunReply_TASK_X11_FORWARD,
+		protos.StreamCrunReply_TASK_X11_EOF:
+		if m.X11SessionMgr != nil {
+			m.X11SessionMgr.X11ReplyChan <- cforedReply
+		} else {
+			log.Warn("Received X11 reply but X11SessionMgr is nil; dropping message")
+		}

Apply the same pattern in StateWaitAck.

  1. WaitAck double‑handling and fallthrough

StateWaitAck now handles several streaming reply types in the switch and then falls through to legacy code that assumes only TASK_COMPLETION_ACK_REPLY is valid. This is confusing and partially redundant. Consider:

  • Keeping streaming types (TASK_IO_FORWARD, X11*, TASK_EXIT_STATUS) in the switch with early returns (as you already do).
  • Handling TASK_COMPLETION_ACK_REPLY entirely inside the switch (set state, set error on failure, then return).
  • Adding a default case that logs an unexpected type and transitions to End.

That makes the post‑switch if cforedReply.Type != ... block unnecessary and clarifies the control flow.

Also applies to: 574-606


886-927: FileReaderRoutine should close the input channel and align with drain logic.

On file‑open error you send nil to chanInputFromLocal and call stopStepCb(), but do not close the channel; on EOF you send a final chunk and then nil, but also never close it. The draining code in the input‑forward goroutine (for range m.chanInputFromLocal) relies on the channel being closed to exit.

Since only one producer writes to chanInputFromLocal, you can safely close it here:

 func (m *StateMachineOfCrun) FileReaderRoutine(filePattern string) {
@@
 	file, err := os.Open(parsedFilePath)
 	if err != nil {
 		log.Errorf("Failed to open file %s: %s", parsedFilePath, err)
-		m.chanInputFromLocal <- nil
-		m.stopStepCb()
+		m.chanInputFromLocal <- nil
+		close(m.chanInputFromLocal)
+		m.stopStepCb()
 		return
 	}
@@
 			if err != nil {
 				if err == io.EOF {
-					m.chanInputFromLocal <- buffer[:n]
-					m.chanInputFromLocal <- nil
+					if n > 0 {
+						m.chanInputFromLocal <- buffer[:n]
+					}
+					m.chanInputFromLocal <- nil
+					close(m.chanInputFromLocal)
 					break reading

This keeps behavior consistent with StdinReaderRoutine (which already closes the channel) and lets the draining loop finish.


100-109: Fix stopRead/stopWrite cancellation: use context.WithCancel, not WithoutCancel.

stopReadCtx and stopWriteCtx are created with context.WithoutCancel(m.stopStepCtx), so their Done() channels are nil and the select <-m.stopReadCtx.Done() / <-m.stopWriteCtx.Done() branches never trigger. This breaks graceful shutdown of IO/X11 goroutines and the X11SessionMgr (which waits on finishCtx.Done()).

Derive them from stopStepCtx with WithCancel instead:

 func (m *StateMachineOfCrun) StartIOForward() {
-	m.stopStepCtx, m.stopStepCb = context.WithCancel(context.Background())
-	m.stopReadCtx = context.WithoutCancel(m.stopStepCtx)
-	m.stopWriteCtx = context.WithoutCancel(m.stopStepCtx)
+	m.stopStepCtx, m.stopStepCb = context.WithCancel(context.Background())
+	m.stopReadCtx, _ = context.WithCancel(m.stopStepCtx)
+	m.stopWriteCtx, _ = context.WithCancel(m.stopStepCtx)
@@
 	if iaMeta.X11 && iaMeta.GetX11Meta().EnableForwarding {
-		m.X11SessionMgr = NewX11SessionMgr(iaMeta.GetX11Meta(), &m.stopReadCtx)
+		m.X11SessionMgr = NewX11SessionMgr(iaMeta.GetX11Meta(), &m.stopReadCtx)
 		go m.X11SessionMgr.SessionMgrRoutine()
 	}
 }

This keeps stopStepCb as the single cancellation source and allows all dependent contexts to observe cancellation.

Also applies to: 929-960

internal/crun/x11.go (2)

137-177: Consider avoiding silent X11 data loss when supervisor channel is full.

In StatusForwarding’s reader, a full X11ToSupervisor channel causes data to be dropped with only a log message:

case session.X11ToSupervisor <- req:
    ...
default:
    log.Errorf("[X11 ...] X11 to supervisor channel full, dropping data.")

For X11 this is likely to corrupt the stream. Prefer either:

  • A blocking send (remove default) to apply backpressure, or
  • Treat channel‑full as a fatal error for the session: close the local conn, send EOF, and tear down the session so the user at least gets a clear failure instead of silent corruption.

This is more about robustness than correctness, so it can be deferred if you’re confident in the buffering.


89-104: Mark EOF explicitly in SendEofToSupervisor.

SendEofToSupervisor creates a TASK_X11_FORWARD request but doesn’t set the Eof flag, so downstream only sees an empty/last chunk and can’t reliably distinguish EOF from a short write.

Set Eof: true when sending EOF:

 func (session *X11Session) SendEofToSupervisor(data []byte) {
@@
 		req := &protos.StreamCrunRequest{
 			Type: protos.StreamCrunRequest_TASK_X11_FORWARD,
 			Payload: &protos.StreamCrunRequest_PayloadTaskX11ForwardReq{
 				PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
-					Msg:      data,
-					CranedId: session.X11Id.CranedId,
-					LocalId:  session.X11Id.LocalId,
+					Msg:      data,
+					CranedId: session.X11Id.CranedId,
+					LocalId:  session.X11Id.LocalId,
+					Eof:      true,
 				},
 			},
 		}

This matches the updated proto and the server‑side handling of EOF.

internal/cfored/crun_server.go (2)

47-126: Avoid log.Fatalf in HandleSupervisorRequest; propagate an error instead.

The default case calls log.Fatalf(...) and then returns an error, but log.Fatalf exits the process so the return is never reached. This can crash all interactive sessions on a single unexpected StreamTaskIORequest type, which is too strong for a boundary handler.

Recommend changing to log.Errorf (or Warnf) and returning an error, and having callers decide how to handle it (e.g., cancel the step or ignore the bad message):

-	default:
-		log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
-			jobId, stepId)
-		return errors.New("Unexpected Task IO Message Type")
+	default:
+		log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] Unexpected Task IO message type: %s",
+			jobId, stepId, taskMsg.Type.String())
+		return errors.New("unexpected Task IO message type")

Then, in the callers, check err and log/transition state appropriately instead of ignoring it.


58-60: Fix “fowarding” typos in trace logs.

Several trace logs spell “forwarding” as “fowarding”, which makes grepping and log analysis harder. Please fix the spelling in all affected messages.

Also applies to: 74-76, 89-91, 103-105, 118-119

internal/cfored/server.go (1)

165-189: Fix potential nil pointer panic in forwardCrunRequestToSingleSupervisor.

When cranedId is not present in stepChannels, you log an error but still proceed to send on supervisorChannel.requestChannel, which will be nil and panic.

Add an early return in the !exist branch (and optionally mirror the valid check used in forwardCrunRequestToSupervisor):

 	supervisorChannel, exist := stepChannels[cranedId]
 	if !exist {
 		log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
-	}
-
-	select {
+		return
+	}
+
+	if !supervisorChannel.valid.Load() {
+		log.Tracef("[Step #%d.%d] Ignoring crun request to invalid supervisor on Craned %s", taskId, stepId, cranedId)
+		return
+	}
+
+	select {
 	case supervisorChannel.requestChannel <- request:

This avoids a crash and keeps behavior consistent with the multi‑supervisor forwarding path.

🧹 Nitpick comments (5)
protos/Supervisor.proto (1)

122-141: MigrateSshProcToCgroup RPC shape looks consistent.

Request PID and ErrCode‑based reply align with the external CranedForPam.MigrateSshProcToCgroup API. Consider adding a brief comment on expected semantics (e.g., which cgroup hierarchy and failure modes) to keep server and client implementations aligned.

protos/Crane.proto (1)

454-462: SSH cgroup migration API pairing looks coherent.

The public MigrateSshProcToCgroupRequest/Reply in Crane.proto matches the Supervisor‑side proto and adds task_id for attribution. Ensure the CraneCtld and CranedForPam handlers agree on error‑reporting semantics (bool vs ErrCode) and log enough context (pid, task_id, craned) for debugging.

Makefile (1)

89-107: build: protos duplication is harmless but can be simplified.

You now have two build: protos rules (one with no body earlier, one with the recipe here). Make will merge them, so behavior is correct, but you can drop the earlier bare declaration to reduce confusion and satisfy tools like checkmake that inspect body length and target definitions.

internal/cfored/crun_server.go (1)

545-556: Minor: central handler errors currently ignored by callers.

In the TASK_IO forwarding and cancellation/ACK paths you call HandleSupervisorRequest but effectively ignore its returned error (it’s quickly shadowed by the subsequent err = toCrunStream.Send(...)). Once HandleSupervisorRequest is made non‑fatal, consider at least logging and/or transitioning to a safe state on error so unexpected message types don’t silently drop.

Also applies to: 579-621, 648-691, 697-705

internal/crun/crun.go (1)

688-721: StdoutWriterRoutine drain loop may never finish.

On m.stopWriteCtx.Done() you for range m.chanOutputFromRemote, but chanOutputFromRemote is never closed. Once the buffer is empty, this blocks forever.

Given crun is a short‑lived CLI, this is mostly a goroutine leak, but you can make the intent clearer and avoid the block by switching to a non‑blocking drain:

-	case <-m.stopWriteCtx.Done():
-		for range m.chanOutputFromRemote {
-			log.Tracef("Drained 1 msg from chanOutputFromRemote after stopWriteCtx done")
-		}
-		break writing
+	case <-m.stopWriteCtx.Done():
+		for {
+			select {
+			case _, ok := <-m.chanOutputFromRemote:
+				if !ok {
+					return
+				}
+				log.Tracef("Drained 1 msg from chanOutputFromRemote after stopWriteCtx done")
+			default:
+				break writing
+			}
+		}

(Or explicitly close chanOutputFromRemote from the state machine when the step ends.)

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c9fd6bc and bc6314b.

📒 Files selected for processing (9)
  • Makefile (1 hunks)
  • internal/cfored/crun_server.go (6 hunks)
  • internal/cfored/server.go (7 hunks)
  • internal/crun/cmd.go (1 hunks)
  • internal/crun/crun.go (14 hunks)
  • internal/crun/x11.go (1 hunks)
  • protos/Crane.proto (10 hunks)
  • protos/PublicDefs.proto (2 hunks)
  • protos/Supervisor.proto (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/crun/cmd.go
🧰 Additional context used
🪛 checkmake (0.2.2)
Makefile

[warning] 106-106: Target body for "build" exceeds allowed length of 5 (12).

(maxbodylength)

🔇 Additional comments (10)
protos/PublicDefs.proto (2)

109-122: TaskStatus rename keeps wire compatibility; verify downstream expectations.

Renaming the enum entry at value 8 to Starting preserves the numeric code, so protobuf wire compatibility is fine. Please double‑check any consumers that relied on the old name or assumed a strictly terminal/non‑terminal set of states.


817-827: Trailing newline and enum terminator are fine.

The added newline and closing brace on TxnAction do not change semantics and align with typical proto style.

protos/Crane.proto (4)

28-36: StepStatusChangeRequest timestamp is a good addition; ensure producers populate it.

Adding google.protobuf.Timestamp timestamp = 7; makes status changes auditable and orderable. Verify all Craned/Supervisor call sites set this field (or have a deliberate defaulting strategy) so consumers don’t see misleading zero timestamps.


472-481: Deprecating execution_node in QuerySshStepEnvVariablesForwardRequest is safe.

Marking execution_node as deprecated without removing it preserves wire compatibility. Confirm all callers have migrated off this field before eventually cleaning it up.


751-766: X11/exit‑status extensions to StreamCrunReply/Request look internally consistent.

The added X11 and exit‑status messages (TaskX11Conn/Forward/Eof*, TaskExitStatus*) and new CforedCrunReplyType values align with the StreamTaskIO* changes and the new X11SessionMgr in crun. Please verify:

  • Every new enum value is handled in both cfored and crun state machines.
  • local_id/craned_id are always set when required so multi‑connection routing can’t mis‑deliver data.
  • EOF is signaled consistently (bool vs dedicated *_EOF message) across all hops.

Also applies to: 769-845


847-903: StreamTaskIO X11 and exit‑status paths are well‑shaped; double‑check symmetry with Supervisor.*

New SupervisorRequestType values and payloads (TASK_X11_OUTPUT/CONN/EOF, TASK_EXIT_STATUS) plus the updated TaskX11InputReq (eof, local_id) give a complete duplex X11 and status channel. Ensure:

  • Supervisor implementations send TaskX11OutputEofReq or set EOF consistently so crun can terminate sessions cleanly.
  • For each request type, there’s a clear, documented path from Supervisor → cfored → crun and back.

Also applies to: 905-940

internal/cfored/server.go (2)

100-123: Supervisor channel and IO‑forward logging improvements look good.

The step‑scoped logging and centralized SupervisorCrashAndRemoveAllChannel/forwardRemoteIoToCrun wiring make diagnostics around IO forwarding and supervisor crashes clearer without changing behavior.

Also applies to: 139-163, 197-207


338-349: New X11 and exit‑status forwarding branches in TaskIOStream are consistent.

Routing TASK_X11_* and TASK_EXIT_STATUS supervisor messages through forwardRemoteIoToCrun mirrors the generic TASK_OUTPUT path and aligns with the new proto definitions. The bidirectional X11 forwarding from crun to supervisor (including LocalId) is also wired correctly.

Also applies to: 377-421

internal/crun/crun.go (1)

625-655: State machine End transition is clear and correct.

The Run loop now breaks the labeled state‑machine loop explicitly on End, which is straightforward and avoids any hidden fallthrough.

internal/crun/x11.go (1)

214-223: X11SessionMgr design and lifecycle wiring look solid.

The per‑session map keyed by (cranedId, localId), buffered reply/request channels, and the finishCtx‑driven shutdown path form a clear and maintainable X11 forwarding subsystem. The separation of concerns between session and manager, plus the use of sync.Once for EOF and stop, is a good pattern here.

Also applies to: 239-307

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (9)
internal/cfored/crun_server.go (1)

47-60: Good centralization of supervisor request handling.

The HandleSupervisorRequest function consolidates message translation logic. However, there are typos and an unreachable code issue.

The typo "fowarding" should be "forwarding" at lines 58, 74, 89, 103, and 118. Additionally, log.Fatalf at line 121 will terminate the process before the return errors.New(...) on line 123 is reached - use log.Errorf if error propagation is intended.

internal/crun/crun.go (5)

462-468: Potential panic when draining nil channel.

When m.X11SessionMgr is nil, x11ReqFromLocal is set to nil (line 429). The for range x11ReqFromLocal loop at line 466 will panic on a nil channel.

Apply this diff to guard the drain loop:

 			for range m.chanInputFromLocal {
 				log.Tracef("Drained 1 msg from chanInputFromLocal after stopReadCtx done")
 			}
-			for range x11ReqFromLocal {
-				log.Tracef("Drained 1 msg from x11ReqFromLocal after stopReadCtx done")
+			if x11ReqFromLocal != nil {
+				for range x11ReqFromLocal {
+					log.Tracef("Drained 1 msg from x11ReqFromLocal after stopReadCtx done")
+				}
 			}

515-520: Potential nil pointer dereference when X11SessionMgr is nil.

When X11 forwarding is disabled, m.X11SessionMgr is nil. If a TASK_X11_CONN, TASK_X11_FORWARD, or TASK_X11_EOF message is received, the code will panic when accessing m.X11SessionMgr.X11ReplyChan at line 520.

Add a nil check before forwarding X11 messages:

 			case protos.StreamCrunReply_TASK_X11_CONN:
 				fallthrough
 			case protos.StreamCrunReply_TASK_X11_FORWARD:
 				fallthrough
 			case protos.StreamCrunReply_TASK_X11_EOF:
-				m.X11SessionMgr.X11ReplyChan <- cforedReply
+				if m.X11SessionMgr != nil {
+					m.X11SessionMgr.X11ReplyChan <- cforedReply
+				} else {
+					log.Warnf("Received X11 message but X11SessionMgr is nil")
+				}

602-608: Same nil pointer issue for X11SessionMgr in WaitAck state.

The X11 message handling in StateWaitAck has the same nil pointer dereference issue at line 607 when m.X11SessionMgr is nil.

Apply the same nil check fix as suggested for StateForwarding.


962-964: Use context.WithCancel instead of context.WithoutCancel for proper cancellation propagation.

context.WithoutCancel returns a context that copies parent's values but does not inherit cancellation. This means stopReadCtx.Done() and stopWriteCtx.Done() return nil channels, so the select cases at lines 462, 740, 792, and 943 will never fire when stopStepCb() is called. The I/O goroutines will hang indefinitely.

Apply this diff to enable proper cancellation propagation:

 	m.stopStepCtx, m.stopStepCb = context.WithCancel(context.Background())
-	m.stopReadCtx = context.WithoutCancel(m.stopStepCtx)
-	m.stopWriteCtx = context.WithoutCancel(m.stopStepCtx)
+	m.stopReadCtx, _ = context.WithCancel(m.stopStepCtx)
+	m.stopWriteCtx, _ = context.WithCancel(m.stopStepCtx)

This ensures cancellation of stopStepCtx propagates to both child contexts.


631-645: Unreachable code after switch handles TASK_COMPLETION_ACK_REPLY.

The switch statement at lines 597-629 handles TASK_COMPLETION_ACK_REPLY by setting m.state = End but does not return. The code at lines 631-645 is still reachable and redundantly checks the same condition again. The default case is also missing, so unexpected message types fall through to line 631.

Consolidate the logic inside the switch case:

 	case protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY:
-		log.Debug("Task completed.")
-		m.state = End
-	}
-
-	if cforedReply.Type != protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY {
-		log.Errorf("Expect TASK_COMPLETION_ACK_REPLY. bug get %s\n", cforedReply.Type.String())
-		m.err = util.ErrorBackend
-		m.state = End
-		return
-	}
-
-	if cforedReply.GetPayloadTaskCompletionAckReply().Ok {
-		log.Debug("Task completed.")
-	} else {
-		log.Errorln("Failed to notify server of job completion")
-		m.err = util.ErrorBackend
+		if cforedReply.GetPayloadTaskCompletionAckReply().Ok {
+			log.Debug("Task completed.")
+		} else {
+			log.Errorln("Failed to notify server of task completion")
+			m.err = util.ErrorBackend
+		}
+		m.state = End
+		return
+
+	default:
+		log.Errorf("Expect TASK_COMPLETION_ACK_REPLY, but got %s\n", cforedReply.Type.String())
+		m.err = util.ErrorBackend
+		m.state = End
 	}
-
-	m.state = End
internal/cfored/server.go (1)

165-189: Missing return after failed lookup causes nil pointer dereference.

When stepChannels[cranedId] doesn't exist (line 176-178), the code logs an error but does not return. Execution continues to lines 180-188 where supervisorChannel is used, causing a nil pointer panic.

Apply this diff to fix the missing return:

 	supervisorChannel, exist := stepChannels[cranedId]
 	if !exist {
 		log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
+		return
 	}
internal/crun/x11.go (2)

89-104: Missing Eof: true flag in EOF request.

The SendEofToSupervisor function creates an EOF message but doesn't set the Eof field to true. Without this flag, the receiver cannot distinguish EOF from regular data messages.

 			PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
 				Msg:      data,
+				Eof:      true,
 				CranedId: session.X11Id.CranedId,
 				LocalId:  session.X11Id.LocalId,
 			},

170-176: Data dropped silently when channel is full.

When X11ToSupervisor channel is full, data is dropped with only an error log. This could cause X11 stream corruption. Consider blocking send or failing the session.

🧹 Nitpick comments (2)
internal/crun/x11.go (2)

56-67: Consider storing context by value.

Storing context.Context as a pointer (*context.Context) is non-idiomatic in Go. Contexts are typically passed and stored by value since the interface already contains a pointer to the underlying implementation.

 type X11SessionMgr struct {
 	sessionMutex sync.Mutex
 	x11Sessions  map[X11GlobalId]*X11Session
 	//All x11 reply from cfored
 	X11ReplyChan chan *protos.StreamCrunReply
 	//Any msg on chan close all x11 fwd session
 	X11RequestChan chan *protos.StreamCrunRequest
-	finishCtx      *context.Context
+	finishCtx      context.Context

 	port   uint32
 	target string
 }

Then update line 296 to sm.finishCtx.Done() and line 220 to finishCtx: stepFinishCtx.


112-135: Add connection timeout to prevent indefinite blocking.

The net.Dial calls have no timeout, which could cause the session routine to hang indefinitely if the X11 server is unresponsive or the network is slow.

 func (session *X11Session) StatusConnectingLocal() {
 	var err error
+	const dialTimeout = 10 * time.Second
 	if session.sessionMgr.port == 0 { // Unix Socket
-		session.conn, err = net.Dial("unix", session.sessionMgr.target)
+		session.conn, err = net.DialTimeout("unix", session.sessionMgr.target, dialTimeout)
 		if err != nil {
 			// ...
 		}
 	} else { // TCP socket
 		address := net.JoinHostPort(session.sessionMgr.target, fmt.Sprintf("%d", session.sessionMgr.port))
-		session.conn, err = net.Dial("tcp", address)
+		session.conn, err = net.DialTimeout("tcp", address, dialTimeout)
 		if err != nil {
 			// ...
 		}
 	}

Add "time" to the imports.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc6314b and cf7fdf3.

📒 Files selected for processing (9)
  • Makefile (1 hunks)
  • internal/cfored/crun_server.go (6 hunks)
  • internal/cfored/server.go (6 hunks)
  • internal/crun/cmd.go (1 hunks)
  • internal/crun/crun.go (13 hunks)
  • internal/crun/x11.go (1 hunks)
  • protos/Crane.proto (9 hunks)
  • protos/PublicDefs.proto (2 hunks)
  • protos/Supervisor.proto (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/crun/cmd.go
  • protos/PublicDefs.proto
🧰 Additional context used
🧬 Code graph analysis (2)
internal/crun/crun.go (3)
internal/crun/x11.go (2)
  • X11SessionMgr (56-67)
  • NewX11SessionMgr (214-224)
internal/util/err.go (1)
  • ExitCode (30-30)
internal/crun/cmd.go (1)
  • FlagInput (47-47)
internal/cfored/server.go (1)
internal/ccontrol/ccontrol.go (1)
  • StepIdentifier (50-53)
🪛 checkmake (0.2.2)
Makefile

[warning] 106-106: Target body for "build" exceeds allowed length of 5 (12).

(maxbodylength)

🔇 Additional comments (18)
protos/Supervisor.proto (2)

122-128: LGTM!

The new MigrateSshProcToCgroupRequest and MigrateSshProcToCgroupReply messages follow the existing patterns in the file. Using int32 for pid and ErrCode for error handling is consistent with other definitions.


140-140: LGTM!

The new RPC MigrateSshProcToCgroup is properly added to the Supervisor service and follows the existing request/reply pattern.

Makefile (1)

93-94: LGTM! Proper build dependency on protobuf generation.

Adding protos as a prerequisite for build and plugin targets ensures protobuf files are generated before compilation. The pattern of declaring prerequisites separately (line 93) and the recipe (line 106) is valid Make syntax - Make combines prerequisites from both declarations.

Also applies to: 106-106

internal/cfored/server.go (2)

338-348: LGTM!

The new case handlers for TASK_X11_CONN, TASK_X11_OUTPUT, TASK_X11_EOF, and TASK_EXIT_STATUS properly forward messages to crun using the existing forwardRemoteIoToCrun method. The fallthrough pattern for X11 cases is appropriate as they share the same forwarding logic.


394-420: LGTM!

The X11 forwarding now includes LocalId to support multi-connection X11, and the error handling has been consolidated. Moving the Send call and error handling outside the switch ensures consistent error handling for both IO and X11 forwarding paths.

internal/cfored/crun_server.go (3)

534-539: LGTM!

The X11 forwarding now correctly routes to a single supervisor based on CranedId, which is essential for multi-connection X11 support. Using forwardCrunRequestToSingleSupervisor instead of the broadcast forwardCrunRequestToSupervisor ensures X11 data reaches the correct node.


606-650: LGTM! Good addition of Task IO draining during cancellation.

Processing TaskIoRequestChannel messages during cancellation prevents the channel from blocking the supervisor, avoiding deadlocks. The comment at lines 609-610 clearly explains the rationale for this handling.


675-720: LGTM! Consistent Task IO draining in CrunWaitCtldAck state.

Similar to CrunWaitTaskCancel, this ensures Task IO messages are processed while waiting for acknowledgment, preventing supervisor blocking. The gotReply flag properly tracks whether a completion reply was received.

internal/crun/crun.go (3)

105-112: LGTM on the new field declarations.

The new fields properly support the refactored context-based cancellation and X11 session management. The separation of stopReadCtx and stopWriteCtx allows independent control of input/output lifecycles.


522-532: LGTM! Exit status handling.

The new exit status handling properly reports task termination and exit codes to stderr, and correctly updates m.err with the exit code for propagation.


986-995: LGTM! X11SessionMgr initialization.

The X11 session manager is properly initialized only when X11 forwarding is enabled, and the routine is started appropriately. Passing &m.stopReadCtx allows the session manager to monitor for stop signals.

protos/Crane.proto (4)

478-478: Deprecation handled correctly.

The execution_node field is marked deprecated while preserving the field number for backward compatibility. This is the correct approach for proto evolution.


790-795: X11 forward request properly extended for multi-connection.

The TaskX11ForwardReq message now includes eof, local_id, and craned_id fields to support session identification and EOF signaling across multiple X11 connections.


850-870: New X11 reply messages are well-structured.

The added TaskX11ConnReply, TaskX11ForwardReply, TaskX11EofReply, and TaskExitStatusReply messages properly encapsulate session identity and state information for the multi-connection design.


911-931: Request messages align with reply counterparts.

The new TaskX11FwdConnReq, TaskX11OutputReq, TaskX11OutputEofReq, and TaskExitStatus messages mirror the reply structures and provide consistent session identification.

internal/crun/x11.go (3)

32-55: Well-structured session identity and state management.

The X11GlobalId composite key and three-state lifecycle via X11Status provide clear session identification and state tracking. Using sync.Once for stopReadWrite and eofSent ensures idempotent cleanup operations.


214-237: Factory functions are well-implemented.

NewX11SessionMgr and NewSession properly initialize all fields with appropriate buffer sizes for the channels.


254-258: Race condition between goroutine start and map insertion.

The session goroutine is started (line 256) before being added to the map (line 257). If the session fails immediately in StatusConnectingLocal and transitions to X11Ended, it will try to delete itself from the map before being inserted. The subsequent insertion (line 257) would then add a defunct session that will never be cleaned up.

 			session := sm.NewSession(globalId)
 			sm.sessionMutex.Lock()
+			sm.x11Sessions[globalId] = session
 			go session.SessionRoutine()
-			sm.x11Sessions[globalId] = session
 			sm.sessionMutex.Unlock()

Likely an incorrect or invalid review comment.

Comment on lines +619 to +627
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)

if err = toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
state = CancelTaskOfDeadCrun
crunErr = true
break waitingCompleteReq
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ignored error from HandleSupervisorRequest during cancellation.

At line 620, the error returned from HandleSupervisorRequest is assigned but not checked before attempting to send the reply at line 622. If HandleSupervisorRequest returns an error (e.g., unexpected message type), reply may be nil or undefined.

Consider checking the error:

 				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				if err != nil {
+					continue // Skip sending reply for unexpected message types during cancellation
+				}
 
 				if err = toCrunStream.Send(reply); err != nil {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
if err = toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
state = CancelTaskOfDeadCrun
crunErr = true
break waitingCompleteReq
}
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
if err != nil {
continue // Skip sending reply for unexpected message types during cancellation
}
if err = toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
state = CancelTaskOfDeadCrun
break waitingCompleteReq
}
🤖 Prompt for AI Agents
In internal/cfored/crun_server.go around lines 619 to 627, the error returned
from HandleSupervisorRequest is assigned but never checked before using reply;
add a check right after calling HandleSupervisorRequest: if err != nil log the
error (with jobId/stepId and taskMsg.Type), skip sending (continue the loop or
jump to next iteration of waitingCompleteReq) to avoid dereferencing a nil
reply, and only call toCrunStream.Send(reply) when err is nil and reply is
non-nil; keep the existing behavior of treating send failures as broken
connections.

Comment on lines +153 to +156
}
log.Tracef("[X11 %s:%d] X11 fd has been closed and stop reading: %v", session.X11Id.CranedId, session.X11Id.LocalId, err)
break
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing EOF notification on read error.

When a non-EOF read error occurs, the reader exits without sending an EOF to the supervisor. This could leave the remote side waiting indefinitely.

 			if err == io.EOF {
 				log.Tracef("[X11 %s:%d] X11 fd reached EOF, stop reading.", session.X11Id.CranedId, session.X11Id.LocalId)
 				data := make([]byte, n)
 				copy(data, buffer[:n])
 				session.SendEofToSupervisor(data)
 				break
 			}
 			log.Tracef("[X11 %s:%d] X11 fd has been closed and stop reading: %v", session.X11Id.CranedId, session.X11Id.LocalId, err)
+			session.SendEofToSupervisor(make([]byte, 0))
 			break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
}
log.Tracef("[X11 %s:%d] X11 fd has been closed and stop reading: %v", session.X11Id.CranedId, session.X11Id.LocalId, err)
break
}
if err == io.EOF {
log.Tracef("[X11 %s:%d] X11 fd reached EOF, stop reading.", session.X11Id.CranedId, session.X11Id.LocalId)
data := make([]byte, n)
copy(data, buffer[:n])
session.SendEofToSupervisor(data)
break
}
log.Tracef("[X11 %s:%d] X11 fd has been closed and stop reading: %v", session.X11Id.CranedId, session.X11Id.LocalId, err)
session.SendEofToSupervisor(make([]byte, 0))
break
}
🤖 Prompt for AI Agents
In internal/crun/x11.go around lines 153 to 156, the reader currently logs
non-EOF read errors and breaks without notifying the supervisor; modify the
error branch to send an explicit EOF/stream-closed notification to the
supervisor before breaking. Concretely: when err != io.EOF, construct and send
the supervisor EOF/close message (including minimal context such as X11Id and
the error string), handle/log any send failures, and only then break out of the
loop so the remote side is unblocked.

Comment on lines +199 to +202
if err != nil {
log.Errorf("[X11 %s:%d] Failed to write to x11 fd: %v, stop writing.", session.X11Id.CranedId, session.X11Id.LocalId, err)
break loop
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Writer error doesn't close the connection.

When a write error occurs (line 199-201), the writer breaks the loop without closing the connection. This leaves the reader goroutine potentially stuck in a blocking Read() call until the remote closes.

 			_, err := session.conn.Write(msg)
 			if err != nil {
 				log.Errorf("[X11 %s:%d] Failed to write to x11 fd: %v, stop writing.", session.X11Id.CranedId, session.X11Id.LocalId, err)
+				session.conn.Close()
 				break loop
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err != nil {
log.Errorf("[X11 %s:%d] Failed to write to x11 fd: %v, stop writing.", session.X11Id.CranedId, session.X11Id.LocalId, err)
break loop
}
if err != nil {
log.Errorf("[X11 %s:%d] Failed to write to x11 fd: %v, stop writing.", session.X11Id.CranedId, session.X11Id.LocalId, err)
session.conn.Close()
break loop
}
🤖 Prompt for AI Agents
In internal/crun/x11.go around lines 199-202, the writer goroutine currently
breaks its loop on write errors but does not close the underlying connection,
leaving the reader possibly blocked on Read(); update the error path so that
after logging the write error you call Close() on the connection (or the
session's underlying net.Conn) and then break the loop to ensure the reader
unblocks and all resources are released.

Comment on lines +269 to +276
sm.sessionMutex.Lock()
session, exists := sm.x11Sessions[globalId]
if exists {
session.X11ToLocal <- data
} else {
log.Warnf("[X11 %s:%d] Received X11 forward for non-existing session ", cranedId, localId)
}
sm.sessionMutex.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential deadlock: blocking channel send while holding mutex.

The mutex is held during the blocking send to session.X11ToLocal (line 272). If the session's writer goroutine is slow or blocked, this will block all other session operations including new connections and EOF handling.

Consider releasing the mutex before the channel send:

 			sm.sessionMutex.Lock()
 			session, exists := sm.x11Sessions[globalId]
+			sm.sessionMutex.Unlock()
 			if exists {
 				session.X11ToLocal <- data
 			} else {
 				log.Warnf("[X11 %s:%d] Received X11 forward for non-existing session ", cranedId, localId)
 			}
-			sm.sessionMutex.Unlock()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
sm.sessionMutex.Lock()
session, exists := sm.x11Sessions[globalId]
if exists {
session.X11ToLocal <- data
} else {
log.Warnf("[X11 %s:%d] Received X11 forward for non-existing session ", cranedId, localId)
}
sm.sessionMutex.Unlock()
sm.sessionMutex.Lock()
session, exists := sm.x11Sessions[globalId]
sm.sessionMutex.Unlock()
if exists {
session.X11ToLocal <- data
} else {
log.Warnf("[X11 %s:%d] Received X11 forward for non-existing session ", cranedId, localId)
}
🤖 Prompt for AI Agents
In internal/crun/x11.go around lines 269-276, the code currently holds
sm.sessionMutex while performing a potentially blocking send to
session.X11ToLocal, which can deadlock other session operations; instead, grab
the session reference while holding the mutex (check existence), then release
the mutex before performing the channel send (or use a non-blocking/select with
default or a buffered channel and handle send failure), ensuring you do not
perform the blocking send while sm.sessionMutex is held.

Comment on lines +286 to +294
sm.sessionMutex.Lock()
session, exists := sm.x11Sessions[globalId]
if exists {
session.X11ToLocal <- nil
log.Tracef("[X11 %s:%d] Removed X11 session ", cranedId, localId)
} else {
log.Warnf("[X11 %s:%d] Received X11 EOF for non-existing session", cranedId, localId)
}
sm.sessionMutex.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same deadlock pattern with EOF handling.

Similar to the forward case, the mutex is held during the blocking send to session.X11ToLocal (line 289). Apply the same fix: release mutex before channel send.

🤖 Prompt for AI Agents
In internal/crun/x11.go around lines 286 to 294, avoid holding sm.sessionMutex
while performing the potentially blocking send to session.X11ToLocal. Under the
lock, look up sm.x11Sessions[globalId] and capture the session (or note it
doesn't exist), then release sm.sessionMutex before sending nil on
session.X11ToLocal; if the session did not exist just log the warning and do not
attempt the send. Ensure any logging that can be done without the channel send
is moved accordingly so the mutex is not held during the send.

L-Xiafeng and others added 2 commits December 20, 2025 18:16
Signed-off-by: lijunlin <[email protected]>

fix: X11 multi conn

Signed-off-by: lijunlin <[email protected]>

feat: X11 multi conn

Signed-off-by: lijunlin <[email protected]>

task exit status

Signed-off-by: lijunlin <[email protected]>

fix: Crun app args

Signed-off-by: lijunlin <[email protected]>

sync protos

Signed-off-by: lijunlin <[email protected]>

feat: Introcduce multi level cgrop for step/task
Signed-off-by: lijunlin <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (7)
internal/cfored/server.go (1)

165-189: Critical: Nil pointer dereference after failed lookup.

When stepChannels[cranedId] doesn't exist (lines 176-178), the code logs an error but does not return. Execution continues to lines 180-188 where supervisorChannel is used, causing a nil pointer panic.

🔎 Proposed fix
 	supervisorChannel, exist := stepChannels[cranedId]
 	if !exist {
 		log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
+		return
 	}
internal/cfored/crun_server.go (3)

47-126: New centralized request handler looks good, but contains typos and unreachable code.

The HandleSupervisorRequest function properly centralizes the translation of StreamTaskIORequest types to StreamCrunReply messages. However:

  1. Typo: "fowarding" should be "forwarding" in log messages at lines 58, 74, 89, 103, and 118.
  2. Unreachable code: log.Fatalf at line 121 will terminate the process before the return at line 123 is reached.
🔎 Proposed fixes

Fix the typos:

-		log.Tracef("[Supervisor->Cfored->Crun][Step #%d.%d] fowarding msg size[%d]",
+		log.Tracef("[Supervisor->Cfored->Crun][Step #%d.%d] forwarding msg size[%d]",

Fix the unreachable code (if error propagation is intended):

 	default:
-		log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
+		log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
 			jobId, stepId)
 		return errors.New("Unexpected Task IO Message Type")

619-627: Ignored error from HandleSupervisorRequest during cancellation.

At line 620, the error returned from HandleSupervisorRequest is assigned but not checked before attempting to send the reply at line 622. If HandleSupervisorRequest returns an error (e.g., unexpected message type), reply may be nil or undefined.

🔎 Proposed fix
 				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				if err != nil {
+					continue // Skip sending reply for unexpected message types during cancellation
+				}
 
 				if err = toCrunStream.Send(reply); err != nil {

689-697: Same ignored error issue from HandleSupervisorRequest.

At line 690, the error from HandleSupervisorRequest is not checked before sending the reply. This is the same pattern as in CrunWaitTaskCancel.

🔎 Proposed fix
 				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				if err != nil {
+					continue
+				}
 
 				if err = toCrunStream.Send(reply); err != nil {
internal/crun/x11.go (3)

89-104: Missing EOF flag in the X11 forward request.

The SendEofToSupervisor creates a TASK_X11_FORWARD request but doesn't set the Eof field to indicate this is an EOF message. This may cause the receiver to treat it as regular data.

🔎 Proposed fix
 			PayloadTaskX11ForwardReq: &protos.StreamCrunRequest_TaskX11ForwardReq{
 				Msg:      data,
 				CranedId: session.X11Id.CranedId,
 				LocalId:  session.X11Id.LocalId,
+				Eof:      true,
 			},

137-212: Multiple resource management issues in StatusForwarding.

Several issues from past reviews remain unaddressed:

  1. Lines 153-156: When a non-EOF read error occurs, the reader exits without sending an EOF to the supervisor, potentially leaving the remote side waiting indefinitely.

  2. Lines 170-176: Data is silently dropped when the channel is full. This could cause corruption of the X11 stream.

  3. Lines 199-202: When a write error occurs, the connection isn't closed, leaving the reader goroutine potentially stuck in a blocking Read() call.

🔎 Proposed fixes

For read error (lines 153-156):

 			log.Tracef("[X11 %s:%d] X11 fd has been closed and stop reading: %v", session.X11Id.CranedId, session.X11Id.LocalId, err)
+			session.SendEofToSupervisor(make([]byte, 0))
 			break

For write error (lines 199-202):

 			_, err := session.conn.Write(msg)
 			if err != nil {
 				log.Errorf("[X11 %s:%d] Failed to write to x11 fd: %v, stop writing.", session.X11Id.CranedId, session.X11Id.LocalId, err)
+				session.conn.Close()
 				break loop
 			}

269-294: Potential deadlock: blocking channel send while holding mutex.

At lines 272 and 289, the mutex is held during blocking sends to session.X11ToLocal. If the session's writer goroutine is slow or blocked, this will block all other session operations including new connections and EOF handling.

🔎 Proposed fix for both locations
 			sm.sessionMutex.Lock()
 			session, exists := sm.x11Sessions[globalId]
+			sm.sessionMutex.Unlock()
 			if exists {
 				session.X11ToLocal <- data
 			} else {
 				log.Warnf("[X11 %s:%d] Received X11 forward for non-existing session ", cranedId, localId)
 			}
-			sm.sessionMutex.Unlock()

Apply the same pattern for the EOF case at lines 286-294.

🧹 Nitpick comments (1)
protos/Crane.proto (1)

478-478: Good backward compatibility practice.

Marking the field as deprecated without removing it maintains wire-format compatibility with existing clients.

Consider adding a reserved 2; directive to QuerySshStepEnvVariablesForwardRequest when this field is eventually removed to prevent accidental reuse of field number 2.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cf7fdf3 and 2b83316.

📒 Files selected for processing (9)
  • internal/cfored/crun_server.go (6 hunks)
  • internal/cfored/server.go (6 hunks)
  • internal/crun/cmd.go (1 hunks)
  • internal/crun/crun.go (13 hunks)
  • internal/crun/x11.go (1 hunks)
  • internal/util/err.go (1 hunks)
  • protos/Crane.proto (9 hunks)
  • protos/PublicDefs.proto (2 hunks)
  • protos/Supervisor.proto (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/crun/cmd.go
  • protos/Supervisor.proto
  • internal/crun/crun.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/cfored/server.go (1)
internal/ccontrol/ccontrol.go (1)
  • StepIdentifier (50-53)
🪛 GitHub Actions: Go Code Quality Check
internal/util/err.go

[error] 1-1: gofmt formatting check failed. Run 'gofmt -w .' to fix code style issues.

🔇 Additional comments (11)
internal/cfored/server.go (2)

338-348: LGTM!

The fallthrough pattern for X11 message types and the new EXIT_STATUS handling are correctly implemented, with appropriate logging that includes the message type.


394-420: LGTM!

The X11 forwarding changes properly include the LocalId for session identification, and the consolidated error handling after the switch block improves code clarity.

internal/cfored/crun_server.go (2)

534-539: LGTM!

The X11 forwarding correctly routes to a single designated supervisor using the CranedId from the payload, replacing the previous broadcast approach.


573-583: LGTM!

The error from HandleSupervisorRequest is properly checked before sending the reply, preventing potential nil pointer issues.

internal/crun/x11.go (2)

69-87: LGTM!

The session state machine is well-structured with clear state transitions and proper cleanup when the session ends.


214-237: LGTM!

The session manager and session initialization properly set up buffered channels and shared state for coordinating X11 sessions.

protos/PublicDefs.proto (1)

118-118: Enum rename is binary-compatible but requires coordinated client updates.

The ConfiguredStarting rename preserves the numeric value (8), maintaining wire-format compatibility. However, any code directly referencing the old name must be updated to avoid compilation errors. Verify that all consuming services have been similarly updated, particularly text-based serialization formats (JSON/text) which will break with this change.

protos/Crane.proto (4)

790-795: LGTM! Multi-connection X11 forwarding support added correctly.

The new fields (eof, local_id, craned_id) enable proper tracking of multiple X11 connections with EOF signaling. Field numbering is unique and follows protobuf best practices.


818-820: LGTM! Exit status and X11 state messages properly defined.

The new reply types and enum values are correctly structured:

  • Enum values (7, 8, 9) are sequential and unique
  • New messages have proper field definitions with unique numbering
  • Payload fields (9, 10, 11) don't conflict with existing oneof members
  • These changes enable explicit exit status reporting and X11 connection lifecycle management

Also applies to: 850-870, 882-884


894-896: LGTM! Supervisor request protocol properly extended.

The StreamTaskIORequest extensions mirror the reply-side changes correctly:

  • Enum values (4, 5, 6) are unique and appropriate
  • New message types support the X11 multi-connection architecture with proper field numbering
  • Payload field numbers (6, 7, 8) continue sequentially from existing fields
  • The bidirectional protocol now supports exit status reporting and X11 connection management

Also applies to: 911-931, 940-942


967-971: LGTM! X11 input message properly extended.

The addition of eof and local_id fields completes the bidirectional X11 forwarding protocol with proper EOF signaling and connection tracking. Field numbering is correct.

protos.ErrCode_ERR_CRI_DISABLED: "CRI support is disabled in the cluster.",
protos.ErrCode_ERR_CRI_CONTAINER_NOT_READY: "Task is pending or container is not ready.",
protos.ErrCode_ERR_INVALID_MEM_FORMAT: "Invalid memory format",
protos.ErrCode_ERR_STEP_RES_BEYOND: "Step resource request exceeds job resource.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix formatting to pass gofmt check.

The pipeline is failing due to gofmt formatting issues. There appears to be inconsistent whitespace (extra tabs) before the error message string.

🔎 Proposed fix
-	protos.ErrCode_ERR_STEP_RES_BEYOND: 		"Step resource request exceeds job resource.",
+	protos.ErrCode_ERR_STEP_RES_BEYOND:         "Step resource request exceeds job resource.",

Run gofmt -w internal/util/err.go to automatically fix the formatting.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protos.ErrCode_ERR_STEP_RES_BEYOND: "Step resource request exceeds job resource.",
protos.ErrCode_ERR_STEP_RES_BEYOND: "Step resource request exceeds job resource.",
🤖 Prompt for AI Agents
In internal/util/err.go around line 203, there is inconsistent whitespace/tabs
before the error message string in the map entry for
protos.ErrCode_ERR_STEP_RES_BEYOND which causes gofmt to fail; fix by
normalizing the spacing (remove extra tabs) so the key/value align with
surrounding entries and then run `gofmt -w internal/util/err.go` (or apply
equivalent formatting) to ensure the file passes gofmt checks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants