Skip to content

Conversation

@L-Xiafeng
Copy link
Collaborator

@L-Xiafeng L-Xiafeng commented Dec 11, 2025

Summary by CodeRabbit

  • New Features

    • X11 forwarding/session support with connection, data forward, EOF and exit-status signaling.
    • CLI IO flags added: --input (-i), --output (-o), --err (-e); per-task targeting and file-pattern redirection.
    • Environment-variable driven job/step configuration and expanded per-task IO/metadata in payloads.
  • Bug Fixes

    • Stricter GRES/GPU parsing with clear errors.
    • Improved validation and error messages for IO file patterns, forwarding, cancellation, and shutdown flows.
  • Refactor

    • IO metadata and script payloads reorganized into dedicated IO/meta fields.

✏️ Tip: You can customize this high-level summary in your review settings.

@L-Xiafeng L-Xiafeng added the enhancement New feature or request label Dec 11, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 11, 2025

📝 Walkthrough

Walkthrough

Adds IoMeta and sh_script to Task/Step payloads, moves I/O metadata out of BatchMeta, changes ParseGres to return errors and updates callers, adds --input/--output/--err flags, implements X11 forwarding subsystem, and expands protobuf/forwarding messages and supervisor routing.

Changes

Cohort / File(s) Summary
Dependencies
go.mod
Promoted github.com/gogo/protobuf v1.3.2 to a direct require.
Protobufs
protos/PublicDefs.proto, protos/Crane.proto, protos/Supervisor.proto
Added IoMeta message; added io_meta and sh_script to TaskToCtld/StepToCtld/StepToD; expanded StreamTask/StreamCrun/StreamCtld messages and enums for per-task identifiers, X11 conn/EOF/exit-status/err-output signaling; added MigrateSshProcToCgroup RPC.
I/O metadata & CLI
internal/cbatch/...
internal/cbatch/cbatch.go, internal/cbatch/cmd.go, internal/cbatch/line.go
Migrated input/output/error patterns and open-mode to IoMeta; added --input/-i flag; Slurm line parser recognizes -i/--input; validations updated to use IoMeta.
GRES parsing & callers
internal/util/string.go, internal/calloc/calloc.go, internal/ccon/run.go, internal/crun/crun.go
ParseGres signature changed to return (*protos.DeviceMap, error); callers now validate/propagate parse errors and return CraneErr on invalid input.
Crun I/O, file patterns & env
internal/crun/crun.go, internal/crun/env.go, internal/crun/cmd.go, internal/util/string.go
ParseFilePattern/pattern parsing updated to indicate local vs remote and return errors; IO routines reworked for per-task/file-pattern routing; added SetFieldsFromEnv to populate Task/Step from env vars; added --output/--err flags and disabled interspersed flag parsing.
X11 subsystem
internal/crun/x11.go, protos/Crane.proto
New X11Session and X11SessionMgr: per-session state machine, local connect/read/write, supervisor forwarding (TASK_X11_FORWARD/CONN/EOF), session lifecycle and manager loop; protobuf messages extended for X11 flow.
Supervisor / forwarding
internal/cfored/crun_server.go, internal/cfored/server.go
Added HandleSupervisorRequest helper and SupervisorChannelKeeper.forwardCrunRequestToSingleSupervisor; centralized translation of StreamTaskIORequest → StreamCrunReply; added per-task/craned routing for X11, err-output and exit-status; refined cancellation/ack flows.
Command wiring & minor
internal/crun/cmd.go, internal/cbatch/cmd.go, internal/util/err.go
Added --output/--err and --input flags; set interspersed=false for crun flags; added mapping for ERR_STEP_RES_BEYOND.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Crun as Crun (task)
  participant X11Mgr as X11SessionMgr
  participant LocalX11 as Local X11 (unix/tcp)
  participant Supervisor as Supervisor

  Note over Crun,X11Mgr: Request X11 forwarding for a task
  Crun->>X11Mgr: NewSession(local_id, craned_id)
  X11Mgr->>LocalX11: connect (unix socket / tcp)
  alt connect success
    LocalX11-->>X11Mgr: local X11 data
    X11Mgr->>Supervisor: TASK_X11_FORWARD (data, local_id, craned_id)
    Supervisor-->>X11Mgr: TASK_X11_CONN / replies
    X11Mgr->>LocalX11: forward supervisor->local data
  else connect failure
    X11Mgr->>Supervisor: TASK_X11_EOF (local_id, craned_id, eof)
  end
  Note over Supervisor,Crun: Exit-status and ERR_OUTPUT messages forwarded per-task via StreamTaskIO⇄StreamCrun
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Suggested reviewers

  • Nativu5
  • NamelessOIer
  • RileyWen

Poem

🐰 I tunneled through code with a twitch and a hop,
IoMeta now cradles stdin, out, and prop,
GRES raises errors that no longer drop,
X11 finds tunnels — forward, EOF, and stop,
The rabbit hops off: "jobs routed — carrot crop!"

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.75% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: Batch input/output/error option with file pattern' accurately reflects the primary change: adding batch file pattern support for input/output/error redirection across multiple modified files (cbatch, crun, and proto definitions).
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dev/io

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@L-Xiafeng L-Xiafeng marked this pull request as ready for review December 16, 2025 06:53
L-Xiafeng and others added 2 commits December 20, 2025 18:16
Signed-off-by: lijunlin <[email protected]>

fix: X11 multi conn

Signed-off-by: lijunlin <[email protected]>

feat: X11 multi conn

Signed-off-by: lijunlin <[email protected]>

task exit status

Signed-off-by: lijunlin <[email protected]>

fix: Crun app args

Signed-off-by: lijunlin <[email protected]>

sync protos

Signed-off-by: lijunlin <[email protected]>

feat: Introcduce multi level cgrop for step/task
Signed-off-by: lijunlin <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (4)
internal/util/string.go (1)

1052-1096: LGTM with minor formatting inconsistencies.

The refactoring of ParseGres to return explicit errors instead of logging is a good improvement that enables proper error handling at call sites.

Consider standardizing error message formatting for consistency:

🔎 Suggested consistency improvements
 		gresNameCount, err := strconv.ParseUint(parts[1], 10, 64)
 		if err != nil {
-			return nil, fmt.Errorf("error parsing gres count: %s", g)
+			return nil, fmt.Errorf("invalid gres count in '%s'", g)
 		}
 		count, err := strconv.ParseUint(parts[2], 10, 64)
 		if err != nil {
-			return nil, fmt.Errorf("Error parsing count for %s: %v\n", name, err)
+			return nil, fmt.Errorf("invalid count for gres '%s': %v", name, err)
 		}
 	} else {
-		return nil, fmt.Errorf("Error parsing gres: %s\n", g)
+		return nil, fmt.Errorf("invalid gres format: %s", g)
 	}

Note: Remove trailing \n from error messages (lines 1078, 1091) as they're typically not needed in Go error strings, and standardize capitalization (line 1078 starts with capital "Error").

internal/crun/env.go (1)

131-133: Inconsistent error type: Use util.NewCraneErr for consistency.

All other error cases in this function return util.NewCraneErr(util.ErrorInvalidFormat, ...), but this case uses fmt.Errorf. This inconsistency could affect error handling by callers.

🔎 Proposed fix
 			default:
-				return fmt.Errorf("invalid CRANE_OPEN_MODE from env: must be either '%s' or '%s'", util.OpenModeAppend, util.OpenModeTruncate)
+				return util.NewCraneErr(util.ErrorInvalidFormat, fmt.Sprintf("invalid CRANE_OPEN_MODE from env: must be either '%s' or '%s'", util.OpenModeAppend, util.OpenModeTruncate))
internal/crun/crun.go (2)

826-829: Clarify backslash escape handling behavior.

The current logic removes all backslashes from the pattern and returns early. This means \%j becomes %j (literal), but \\ becomes empty and foo\bar becomes foobar. Consider whether the intent is to handle only \% sequences or all backslash escapes.


1033-1039: Note: FIXME for numeric input handling.

The FIXME comment indicates that numeric FlagInput values should forward input to a specific task, but the current implementation treats them as file patterns. Consider tracking this as a follow-up issue.

Would you like me to open an issue to track implementing task-specific input forwarding?

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b8baadd and 15b1454.

📒 Files selected for processing (11)
  • go.mod
  • internal/calloc/calloc.go
  • internal/cbatch/cbatch.go
  • internal/cbatch/cmd.go
  • internal/cbatch/line.go
  • internal/ccon/run.go
  • internal/crun/crun.go
  • internal/crun/env.go
  • internal/util/string.go
  • protos/Crane.proto
  • protos/PublicDefs.proto
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-04-24T16:05:47.156Z
Learnt from: NamelessOIer
Repo: PKUHPC/CraneSched-FrontEnd PR: 256
File: internal/cqueue/cqueue.go:350-356
Timestamp: 2025-04-24T16:05:47.156Z
Learning: In cqueue.go, the function ProcessAllocMemPerNode should use task.AllocatedResView.AllocatableRes.MemoryLimitBytes (allocated resources) rather than task.ReqResView.AllocatableRes.MemoryLimitBytes (requested resources) to correctly display allocated memory per node.

Applied to files:

  • internal/cbatch/cbatch.go
🧬 Code graph analysis (6)
internal/calloc/calloc.go (5)
internal/util/string.go (1)
  • ParseGres (1052-1096)
internal/cbatch/cmd.go (1)
  • FlagGres (46-46)
internal/crun/cmd.go (1)
  • FlagGres (45-45)
internal/calloc/cmd.go (1)
  • FlagGres (38-38)
internal/util/err.go (2)
  • NewCraneErr (60-65)
  • ErrorCmdArg (36-36)
internal/cbatch/cmd.go (1)
internal/ccon/flags.go (1)
  • Flags (109-121)
internal/ccon/run.go (1)
internal/util/string.go (1)
  • ParseGres (1052-1096)
internal/crun/env.go (3)
internal/util/err.go (2)
  • NewCraneErr (60-65)
  • ErrorInvalidFormat (39-39)
internal/util/util.go (3)
  • DefaultConfigPath (78-78)
  • OpenModeAppend (106-106)
  • OpenModeTruncate (107-107)
internal/util/string.go (3)
  • ParseGres (1052-1096)
  • ParseMemStringAsByte (82-104)
  • ParseDurationStrToSeconds (140-173)
internal/crun/crun.go (2)
internal/util/string.go (1)
  • ParseGres (1052-1096)
internal/util/err.go (2)
  • NewCraneErr (60-65)
  • ErrorCmdArg (36-36)
internal/cbatch/cbatch.go (3)
internal/util/string.go (2)
  • ParseGres (1052-1096)
  • CheckFileLength (577-592)
internal/util/util.go (1)
  • OpenModeAppend (106-106)
internal/cbatch/cmd.go (4)
  • FlagGres (46-46)
  • FlagStdinPath (49-49)
  • FlagStdoutPath (50-50)
  • FlagStderrPath (51-51)
🔇 Additional comments (24)
internal/calloc/calloc.go (1)

398-404: LGTM!

The error handling for ParseGres is correctly implemented and follows the existing error handling pattern in the codebase.

protos/PublicDefs.proto (3)

162-163: LGTM!

The addition of io_meta and sh_script fields to TaskToCtld consolidates I/O and script handling metadata into dedicated fields, improving organization.


223-230: Clarify the AI summary regarding optional/required change.

The AI summary mentions "changed node_num and ntasks_per_node from optional uint32 to required uint32," but Proto3 does not have optional or required keywords for scalar fields. All scalar fields in Proto3 have default values and are implicitly optional.

The actual change appears to be just the addition of the comment on line 223. If there's a semantic requirement that these fields must be set, that should be enforced at the application level with validation logic, not at the protobuf schema level.


361-367: LGTM!

The new IoMeta message effectively consolidates I/O-related metadata (input/output/error file patterns and open mode) that was previously scattered across other messages. This improves maintainability and reusability.

protos/Crane.proto (1)

553-554: LGTM!

These are cosmetic comment formatting changes only; no functional modifications to the protobuf schema.

internal/cbatch/line.go (1)

64-64: LGTM!

The addition of -i and --input to the supported Slurm options map correctly enables recognition of the new stdin redirection flag introduced in this PR.

internal/ccon/run.go (1)

216-220: LGTM!

The error handling for ParseGres is correctly implemented and provides clear error messages that include both the invalid specification and the underlying parsing error.

internal/cbatch/cmd.go (1)

49-49: FlagStdinPath is properly wired to IoMeta. The variable is correctly declared, registered as the --input/-i flag with matching parameters to --output and --error, and properly assigned to IoMeta.InputFilePattern at cbatch.go:284 with the same conditional pattern as FlagStdoutPath. Validation is also in place at cbatch.go:349.

internal/crun/env.go (2)

49-52: Verify the condition logic for CRANE_CONF.

The condition FlagConfigFilePath != util.DefaultConfigPath only triggers when a non-default path is already set. Typically, environment variables should be used when no CLI override is provided (i.e., when the flag is still at default). Is the intent to allow env var override only when CLI has already changed it, or should the condition be inverted?


58-62: Potential nil pointer dereference on nested step fields.

Accessing step.ReqResourcesPerTask.AllocatableRes.CpuCoreLimit assumes both ReqResourcesPerTask and AllocatableRes are initialized. This pattern repeats at lines 86, 100, 111-112, and 122. If step is passed without initialized resource fields, this will panic. Ensure callers initialize these structures or add nil guards.

internal/cbatch/cbatch.go (8)

64-69: LGTM!

The restructuring properly initializes IoMeta as a separate entity and moves ShScript to the top-level field. This ensures GetIoMeta() won't return nil when accessed later in the function.


104-109: LGTM!

The GRES parsing now properly checks for and propagates errors with a descriptive message, consistent with other argument parsing in this switch statement.


168-173: LGTM!

The addition of --input flag handling and migration of all I/O file patterns to IoMeta is properly implemented and consistent with the existing argument handling patterns.


184-192: LGTM!

The OpenModeAppend field is correctly migrated from BatchMeta to IoMeta.


224-230: LGTM!

CLI GRES parsing now properly validates and propagates errors with an appropriate error message for the CLI context.


283-291: LGTM!

The CLI flag handling for stdin, stdout, and stderr paths is correctly implemented using IoMeta, maintaining consistency with the script-based handling.


311-320: LGTM!

CLI --open-mode flag handling is correctly migrated to use IoMeta.


348-357: LGTM!

Validation for all I/O file patterns is now properly implemented using IoMeta, with clear error messages for each path type.

internal/crun/crun.go (6)

831-859: LGTM!

The separation of remote tokens (%N, %n, %t) from local replacements is a clean design. Remote tokens that can only be resolved on the target node correctly trigger isLocalFile = false.


863-901: LGTM!

The token handling correctly differentiates between remote and local replacements. Remote tokens mark the file as non-local (to be resolved remotely) while local tokens are substituted immediately.


1119-1148: LGTM!

The addition of ShScript alongside CmdLine for both job and step payloads properly propagates the command to the new proto field. Setting both to the same value for interactive tasks is appropriate.


1161-1172: LGTM!

The transition from pointer to direct value assignments for step configuration fields is cleaner and aligns with the proto structure changes.


1205-1214: LGTM!

GRES parsing now properly handles and propagates errors with a descriptive message, consistent with the changes in cbatch.go.


818-825: The review concern is unfounded. The state machine ensures ParseFilePattern is only called after node allocation completes. The execution order is sequential: StateWaitRes populates m.cranedId (line 324) and transitions to WaitForward, then StateWaitForward transitions to Forwarding, and finally StateForwarding calls StartIOForward which spawns FileReaderRoutine and invokes ParseFilePattern. The architecture guarantees m.cranedId is populated before ParseFilePattern accesses it at line 822.

Likely an incorrect or invalid review comment.

github.com/alecthomas/participle/v2 v2.1.4
github.com/antonfisher/nested-logrus-formatter v1.3.1
github.com/containerd/cgroups v1.1.0
github.com/gogo/protobuf v1.3.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check which protobuf library is being used in imports
echo "=== Checking gogo/protobuf usage ==="
rg -n --type=go 'github\.com/gogo/protobuf' -C2

echo -e "\n=== Checking google.golang.org/protobuf usage ==="
rg -n --type=go 'google\.golang\.org/protobuf' -C2

echo -e "\n=== Checking proto file generation ==="
fd -e proto --exec cat {}

echo -e "\n=== Checking go.mod for both dependencies ==="
grep -n "gogo/protobuf\|google\.golang\.org/protobuf" go.mod

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 50382


🏁 Script executed:

cat -n internal/crun/env.go | head -50

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 1964


🏁 Script executed:

rg -n "proto\." internal/crun/env.go -A 2 -B 2

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 287


🏁 Script executed:

grep -n "proto.Bool" internal/crun/env.go

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 206


🏁 Script executed:

cat -n internal/crun/env.go | sed -n '130,140p'

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 525


Remove gogo/protobuf dependency and migrate to google.golang.org/protobuf.

The PR makes github.com/gogo/protobuf a direct dependency, but this library has been archived since 2021 and is no longer maintained. The codebase already uses google.golang.org/protobuf v1.36.5 throughout and in generated code.

The gogo/protobuf import in internal/crun/env.go:29 is only used for proto.Bool() helper functions (lines 135, 137), which can be replaced with simple pointer operations (&value). Remove the gogo/protobuf dependency and replace those calls with direct pointer syntax.

Comment on lines 909 to 950
if isLocalFile {
log.Debugf("Input file is not a local file: %s", filePattern)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading log message: Condition and message are inverted.

When isLocalFile is true, the log says "Input file is not a local file", which contradicts the variable value. The skip logic appears intentional (local files aren't read here because they'll be handled differently), but the message should reflect the actual condition.

🔎 Proposed fix
 	if isLocalFile {
-		log.Debugf("Input file is not a local file: %s", filePattern)
+		log.Debugf("Input file is a local file, skipping remote read: %s", filePattern)
 		return
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if isLocalFile {
log.Debugf("Input file is not a local file: %s", filePattern)
return
}
if isLocalFile {
log.Debugf("Input file is a local file, skipping remote read: %s", filePattern)
return
}
🤖 Prompt for AI Agents
In internal/crun/crun.go around lines 909 to 912, the debug log message is
inverted: when isLocalFile is true it currently logs "Input file is not a local
file". Update the message to accurately reflect the condition (e.g., "Input file
is a local file; skipping: %s") while keeping the same log level, format and the
filePattern argument, then return as before.

Comment on lines +34 to +37
for _, e := range env {
pair := strings.SplitN(e, "=", 2)
key := pair[0]
value := pair[1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Potential panic: Index out of bounds when accessing pair[1].

If an environment variable lacks an = sign (e.g., exported variables with no value in some edge cases), strings.SplitN(e, "=", 2) returns a slice with only one element. Accessing pair[1] will cause a panic.

🔎 Proposed fix
 	for _, e := range env {
 		pair := strings.SplitN(e, "=", 2)
+		if len(pair) != 2 {
+			continue
+		}
 		key := pair[0]
 		value := pair[1]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, e := range env {
pair := strings.SplitN(e, "=", 2)
key := pair[0]
value := pair[1]
for _, e := range env {
pair := strings.SplitN(e, "=", 2)
if len(pair) != 2 {
continue
}
key := pair[0]
value := pair[1]
🤖 Prompt for AI Agents
In internal/crun/env.go around lines 34 to 37, the loop blindly accesses pair[1]
after strings.SplitN(e, "=", 2) which can panic if the input has no '='; update
the loop to check the length of pair before indexing and treat a missing
right-hand side as an empty string (e.g., if len(pair) == 1 then key = pair[0],
value = "" else key = pair[0], value = pair[1]); ensure you still handle empty
keys appropriately and continue or skip invalid entries as the function's
semantics require.

Comment on lines +72 to +77
case "CRANE_EXPORT_ENV":
if job != nil {
job.Env["CRANE_EXPORT_ENV"] = FlagExport
} else {
step.Env["CRANE_EXPORT_ENV"] = FlagExport
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bug: CRANE_EXPORT_ENV handler uses FlagExport instead of value.

The matched environment variable value is stored in value, but the code assigns FlagExport (presumably a CLI flag) instead. This ignores the actual environment variable content.

🔎 Proposed fix
 		case "CRANE_EXPORT_ENV":
 			if job != nil {
-				job.Env["CRANE_EXPORT_ENV"] = FlagExport
+				job.Env["CRANE_EXPORT_ENV"] = value
 			} else {
-				step.Env["CRANE_EXPORT_ENV"] = FlagExport
+				step.Env["CRANE_EXPORT_ENV"] = value
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case "CRANE_EXPORT_ENV":
if job != nil {
job.Env["CRANE_EXPORT_ENV"] = FlagExport
} else {
step.Env["CRANE_EXPORT_ENV"] = FlagExport
}
case "CRANE_EXPORT_ENV":
if job != nil {
job.Env["CRANE_EXPORT_ENV"] = value
} else {
step.Env["CRANE_EXPORT_ENV"] = value
}
🤖 Prompt for AI Agents
In internal/crun/env.go around lines 72 to 77, the CRANE_EXPORT_ENV branch
incorrectly assigns the CLI flag variable FlagExport instead of the matched
environment variable value; change the assignment so job.Env["CRANE_EXPORT_ENV"]
or step.Env["CRANE_EXPORT_ENV"] is set to the local variable value (the captured
env value) while preserving the existing nil check for job vs step.

Comment on lines +147 to +156
case "CRANE_TIME_LIMIT":
seconds, err := util.ParseDurationStrToSeconds(FlagTime)
if err != nil {
return util.NewCraneErr(util.ErrorInvalidFormat, "invalid CRANE_TIME_LIMIT from env")
}
if job != nil {
job.TimeLimit.Seconds = seconds
} else {
step.TimeLimit.Seconds = seconds
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bug: CRANE_TIME_LIMIT handler uses FlagTime instead of value.

Similar to the CRANE_EXPORT_ENV issue, this handler parses FlagTime (a CLI flag) instead of the environment variable's actual value.

🔎 Proposed fix
 		case "CRANE_TIME_LIMIT":
-			seconds, err := util.ParseDurationStrToSeconds(FlagTime)
+			seconds, err := util.ParseDurationStrToSeconds(value)
 			if err != nil {
 				return util.NewCraneErr(util.ErrorInvalidFormat, "invalid CRANE_TIME_LIMIT from env")
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case "CRANE_TIME_LIMIT":
seconds, err := util.ParseDurationStrToSeconds(FlagTime)
if err != nil {
return util.NewCraneErr(util.ErrorInvalidFormat, "invalid CRANE_TIME_LIMIT from env")
}
if job != nil {
job.TimeLimit.Seconds = seconds
} else {
step.TimeLimit.Seconds = seconds
}
case "CRANE_TIME_LIMIT":
seconds, err := util.ParseDurationStrToSeconds(value)
if err != nil {
return util.NewCraneErr(util.ErrorInvalidFormat, "invalid CRANE_TIME_LIMIT from env")
}
if job != nil {
job.TimeLimit.Seconds = seconds
} else {
step.TimeLimit.Seconds = seconds
}
🤖 Prompt for AI Agents
In internal/crun/env.go around lines 147 to 156, the CRANE_TIME_LIMIT branch
incorrectly calls util.ParseDurationStrToSeconds(FlagTime) (a CLI flag) instead
of parsing the environment variable value; change the call to
util.ParseDurationStrToSeconds(value), keep the same error handling (return
util.NewCraneErr(...)) and assignment to job.TimeLimit.Seconds or
step.TimeLimit.Seconds as currently written so the time limit comes from the env
var.

Signed-off-by: lijunlin <[email protected]>

fix: crun input

Signed-off-by: lijunlin <[email protected]>

fix: Format

Signed-off-by: lijunlin <[email protected]>

chore: Log format

Signed-off-by: lijunlin <[email protected]>

fix: crun output

Signed-off-by: lijunlin <[email protected]>

style: format

Signed-off-by: lijunlin <[email protected]>

refactor: crun step res field, inherit job when not specified

Signed-off-by: lijunlin <[email protected]>

refactor: Update terminology from task to job and pass jobId to StartTerminal

refactor: Remove pending reason for step

Signed-off-by: lijunlin <[email protected]>

fix: cacct elapsed time

Signed-off-by: lijunlin <[email protected]>

fix: Auth uid

Signed-off-by: lijunlin <[email protected]>

refactor step query

Signed-off-by: lijunlin <[email protected]>

docs: Add error string

Signed-off-by: lijunlin <[email protected]>

chore: Sync protos

Signed-off-by: lijunlin <[email protected]>

fix

Signed-off-by: lijunlin <[email protected]>

refactor

feat: Crun step submit

feat: Enhance job and step processing with unified data structure and parsing functions

Signed-off-by: lijunlin <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/crun/crun.go (1)

599-638: Unreachable code after switch statement in StateWaitAck.

Lines 633-647 are unreachable. The switch statement on lines 599-631 handles all cases including TASK_COMPLETION_ACK_REPLY which sets state to End and then the function would continue past the switch. However, the check on line 633 if cforedReply.Type != protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY is unreachable because:

  1. If type was TASK_COMPLETION_ACK_REPLY, line 630 already sets m.state = End
  2. All other handled cases return early (lines 602, 610, 623) or call log.Fatalf (line 626)

This appears to be dead code from before the switch was added.

🔎 Proposed fix - remove dead code
 	case protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY:
 		log.Debug("Task completed.")
 		m.state = End
 	}
-
-	if cforedReply.Type != protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY {
-		log.Errorf("Expect TASK_COMPLETION_ACK_REPLY. bug get %s\n", cforedReply.Type.String())
-		m.err = util.ErrorBackend
-		m.state = End
-		return
-	}
-
-	if cforedReply.GetPayloadTaskCompletionAckReply().Ok {
-		log.Debug("Task completed.")
-	} else {
-		log.Errorln("Failed to notify server of job completion")
-		m.err = util.ErrorBackend
-	}
-
-	m.state = End
 }
♻️ Duplicate comments (5)
internal/crun/env.go (4)

34-37: Potential panic: Index out of bounds when accessing pair[1].

This issue was previously flagged. If an environment variable lacks an = sign, strings.SplitN(e, "=", 2) returns a slice with only one element. Accessing pair[1] will cause a panic.


72-77: Bug: CRANE_EXPORT_ENV handler uses FlagExport instead of value.

This issue was previously flagged. The matched environment variable value is stored in value, but the code assigns FlagExport (a CLI flag) instead, ignoring the actual environment variable content.


134-138: Initialize IoMeta before calling GetIoMeta().

This issue was previously flagged. The code calls job.GetIoMeta().OpenModeAppend without ensuring IoMeta has been initialized. Add explicit initialization like job.IoMeta = &protos.IoMeta{} before these assignments.


147-156: Bug: CRANE_TIME_LIMIT handler uses FlagTime instead of value.

This issue was previously flagged. The handler parses FlagTime (a CLI flag) instead of the environment variable's actual value.

internal/crun/crun.go (1)

947-950: Misleading log message: Condition and message are inverted.

This issue was previously flagged. When isLocalFile is true, the log says "Input file is not a local file", which contradicts the variable value.

🧹 Nitpick comments (2)
internal/cfored/crun_server.go (1)

58-59: Typo: "fowarding" should be "forwarding" in log messages.

Multiple trace log messages contain the typo "fowarding" instead of "forwarding".

Also applies to: 74-75, 89-90, 103-104, 118-119

internal/crun/x11.go (1)

254-258: Potential issue: Goroutine started while holding mutex.

Starting a goroutine (go session.SessionRoutine()) while holding sessionMutex is generally safe here since the goroutine doesn't immediately try to acquire the same lock. However, consider moving the goroutine start after releasing the lock for clearer code.

🔎 Suggested improvement
 				session := sm.NewSession(globalId)
 				sm.sessionMutex.Lock()
-				go session.SessionRoutine()
 				sm.x11Sessions[globalId] = session
 				sm.sessionMutex.Unlock()
+				go session.SessionRoutine()
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 15b1454 and b095f5e.

📒 Files selected for processing (17)
  • go.mod
  • internal/calloc/calloc.go
  • internal/cbatch/cbatch.go
  • internal/cbatch/cmd.go
  • internal/cbatch/line.go
  • internal/ccon/run.go
  • internal/cfored/crun_server.go
  • internal/cfored/server.go
  • internal/crun/cmd.go
  • internal/crun/crun.go
  • internal/crun/env.go
  • internal/crun/x11.go
  • internal/util/err.go
  • internal/util/string.go
  • protos/Crane.proto
  • protos/PublicDefs.proto
  • protos/Supervisor.proto
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/cbatch/line.go
  • go.mod
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-04-24T16:05:47.156Z
Learnt from: NamelessOIer
Repo: PKUHPC/CraneSched-FrontEnd PR: 256
File: internal/cqueue/cqueue.go:350-356
Timestamp: 2025-04-24T16:05:47.156Z
Learning: In cqueue.go, the function ProcessAllocMemPerNode should use task.AllocatedResView.AllocatableRes.MemoryLimitBytes (allocated resources) rather than task.ReqResView.AllocatableRes.MemoryLimitBytes (requested resources) to correctly display allocated memory per node.

Applied to files:

  • internal/cbatch/cbatch.go
🧬 Code graph analysis (6)
internal/ccon/run.go (1)
internal/util/string.go (1)
  • ParseGres (1052-1096)
internal/crun/crun.go (3)
internal/crun/x11.go (2)
  • X11SessionMgr (56-67)
  • NewX11SessionMgr (214-224)
internal/util/err.go (3)
  • ExitCode (30-30)
  • NewCraneErr (60-65)
  • ErrorCmdArg (36-36)
internal/util/string.go (2)
  • InvalidDuration (872-877)
  • ParseGres (1052-1096)
internal/calloc/calloc.go (5)
internal/util/string.go (1)
  • ParseGres (1052-1096)
internal/cbatch/cmd.go (1)
  • FlagGres (46-46)
internal/crun/cmd.go (1)
  • FlagGres (45-45)
internal/calloc/cmd.go (1)
  • FlagGres (38-38)
internal/util/err.go (2)
  • NewCraneErr (60-65)
  • ErrorCmdArg (36-36)
internal/cfored/server.go (1)
internal/ccontrol/ccontrol.go (1)
  • StepIdentifier (50-53)
internal/cbatch/cbatch.go (2)
internal/util/string.go (2)
  • ParseGres (1052-1096)
  • CheckFileLength (577-592)
internal/cbatch/cmd.go (4)
  • FlagGres (46-46)
  • FlagStdinPath (49-49)
  • FlagStdoutPath (50-50)
  • FlagStderrPath (51-51)
internal/cbatch/cmd.go (1)
internal/ccon/flags.go (1)
  • Flags (109-121)
🔇 Additional comments (29)
protos/Supervisor.proto (1)

122-140: LGTM!

The new RPC method and message definitions for migrating SSH processes to cgroups are well-structured and follow protobuf best practices.

protos/Crane.proto (2)

790-943: LGTM! X11 forwarding support well-integrated.

The X11 forwarding message additions are comprehensive and well-structured:

  • Connection management with local_id and craned_id for routing
  • EOF signaling for stream closure
  • Exit status reporting
  • Consistent message patterns across Crun and TaskIO streams

478-478: The deprecated execution_node field is correctly marked in QuerySshStepEnvVariablesForwardRequest.

Code usages of GetExecutionNode() found in the codebase (plugin/powerControl/power_control.go and internal/ccontrol/output.go) reference non-deprecated execution_node fields from other proto message definitions (specifically in PublicDefs.proto), not from the deprecated field at line 478. No code changes are required.

internal/crun/cmd.go (1)

87-87: Verify the flag parsing behavior change.

Setting SetInterspersed(false) changes how flags are parsed: flags must now appear before positional arguments, and anything after positional arguments is treated as an argument rather than a flag.

For crun, this likely allows passing flags to the executable being run (e.g., crun --partition=gpu myapp --app-flag=value), which is a good improvement. However, this is a breaking change for any existing usage where flags appeared after the executable name.

Please verify:

  1. This change is intentional and documented
  2. Existing scripts or documentation are updated to reflect the new flag ordering requirement
  3. The change aligns with the PR's objective (based on the AI summary mentioning batch input/output options)

Example of the behavioral difference:

# Before: Both work the same
crun --partition=gpu executable
crun executable --partition=gpu

# After SetInterspersed(false):
crun --partition=gpu executable          # ✓ Works
crun executable --partition=gpu          # ✗ --partition is passed to executable, not crun
internal/ccon/run.go (1)

216-221: LGTM! Proper error handling for ParseGres.

The code correctly handles the updated ParseGres signature by checking and propagating errors appropriately.

internal/util/err.go (1)

203-203: LGTM! New error message added.

The new error code mapping provides a clear message for step resource validation failures.

internal/calloc/calloc.go (1)

399-403: LGTM! Proper error handling for ParseGres.

The code correctly handles the updated ParseGres signature by checking and propagating errors with appropriate error codes.

internal/util/string.go (1)

1052-1096: LGTM! Improved error handling pattern.

The refactoring from logging errors to returning them is a good improvement that allows callers to handle errors appropriately. All 6 call sites across the codebase have been correctly updated to handle the new error return: internal/crun/env.go, internal/crun/crun.go, internal/ccon/run.go, internal/calloc/calloc.go, and two in internal/cbatch/cbatch.go. Each properly unpacks and checks the error, with contextual error handling.

internal/cbatch/cmd.go (1)

49-145: No action needed. The --input flag is properly wired to task submission. The flag value is consumed in BuildCbatchJob() (internal/cbatch/cbatch.go, lines 283-284) where it is assigned to task.GetIoMeta().InputFilePattern.

Likely an incorrect or invalid review comment.

internal/crun/env.go (1)

78-87: LGTM: GRES parsing now handles errors properly.

The updated util.ParseGres return signature with error handling is correctly integrated here. Invalid GRES values from environment variables now yield explicit errors.

internal/cfored/crun_server.go (1)

47-126: Consolidating supervisor request handling - good refactoring.

The HandleSupervisorRequest helper centralizes the translation of StreamTaskIORequest types into StreamCrunReply messages. This reduces duplication across the state machine.

internal/cbatch/cbatch.go (4)

65-69: LGTM: IoMeta and ShScript initialization.

IoMeta is properly initialized before any fields are accessed via GetIoMeta(). The ShScript field is correctly assigned at the task level.


104-109: LGTM: GRES parsing with proper error handling.

The updated util.ParseGres signature returning (map, error) is correctly integrated with appropriate error wrapping.


168-173: LGTM: Input file pattern now using IoMeta.

The new --input flag handling correctly stores the pattern in IoMeta.InputFilePattern.


349-357: LGTM: Input file path length validation added.

Validation for the input file pattern length is now included alongside output and error file validation.

internal/cfored/server.go (2)

338-348: LGTM: X11 forwarding cases added to supervisor IO forwarding.

X11 connection, output, and EOF message types are now properly forwarded to crun using the same path as regular task output.


416-420: LGTM: Error handling added after sending reply to supervisor.

Connection breakage during reply send now properly triggers transition to SupervisorUnReg state.

internal/crun/crun.go (3)

838-940: LGTM: ParseFilePattern enhanced for local/remote file detection.

The function now properly distinguishes between local and remote file patterns using hostname lookup and remote token detection (%N, %n, %t). The boolean return value allows callers to handle local vs remote files differently.


1014-1023: LGTM: X11 session manager integration.

X11 forwarding is properly initialized when enabled, using the new X11SessionMgr to manage sessions.


1089-1091: LGTM: ShScript field added to job/step payloads.

Both TaskToCtld and StepToCtld now include the ShScript field alongside CmdLine, aligning with the proto changes.

Also applies to: 1116-1118

internal/crun/x11.go (4)

45-55: LGTM: X11Session structure with proper synchronization primitives.

The session uses sync.Once for eofSent and stopReadWrite to ensure these actions happen exactly once, preventing duplicate EOF messages or connection closures.


170-176: Data silently dropped when X11 channel is full.

When X11ToSupervisor channel is full, data is dropped with only an error log. Consider whether this should trigger session termination or backpressure.

Is dropping X11 data acceptable behavior, or should this trigger session termination to prevent a corrupted X11 stream?


214-224: LGTM: X11SessionMgr constructor.

Properly initializes all fields including buffered channels for X11 reply and request handling.


296-304: LGTM: Graceful shutdown of X11 sessions on finish signal.

When the finish context is done, all active sessions are properly stopped using StopLocalReadWrite().

protos/PublicDefs.proto (5)

361-367: LGTM: New IoMeta message consolidates I/O metadata.

The IoMeta message properly encapsulates open mode and file pattern fields. Using optional bool for open_mode_append allows distinguishing between unset and false values.


162-163: LGTM: IoMeta and sh_script fields added consistently.

The new fields are added at consistent field numbers across TaskToCtld, StepToCtld, and StepToD messages, with sh_script at the task/step level rather than within BatchTaskAdditionalMeta.

Also applies to: 243-244, 325-326


228-230: Breaking change: node_num and ntasks_per_node promoted from optional to required.

In proto3, removing the optional keyword changes the wire format. Ensure all existing clients that send StepToCtld messages are updated.


369-371: BatchTaskAdditionalMeta simplified.

The message now only contains interpreter, with I/O fields moved to IoMeta. This is a clean separation of concerns.


118-118: All consumers have been updated to use the new enum value name. A comprehensive search found no remaining references to the old Configured enum value name anywhere in the codebase. The TaskStatus enum and its usages across proto files (Supervisor.proto, PublicDefs.proto) are consistent with the rename.

Comment on lines +120 to +124
default:
log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
jobId, stepId)
return errors.New("Unexpected Task IO Message Type")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unreachable code: log.Fatalf terminates before error return.

log.Fatalf calls os.Exit(1) after logging, so the return errors.New(...) on line 123 is never reached. Either use log.Errorf to allow the error to be returned, or remove the return statement.

🔎 Proposed fix
 	default:
-		log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
+		log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
 			jobId, stepId)
 		return errors.New("Unexpected Task IO Message Type")
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
default:
log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
jobId, stepId)
return errors.New("Unexpected Task IO Message Type")
}
default:
log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
jobId, stepId)
return errors.New("Unexpected Task IO Message Type")
}
🤖 Prompt for AI Agents
In internal/cfored/crun_server.go around lines 120 to 124, the default branch
uses log.Fatalf which calls os.Exit and makes the subsequent return
errors.New(...) unreachable; change the fatal call to a non-terminating log
(e.g., log.Errorf or log.Printf) and keep the return of errors.New("Unexpected
Task IO Message Type") so the caller can handle the error, or alternatively
remove the return and retain log.Fatalf if process termination is intended—pick
the first option (use log.Errorf and return the error).

Comment on lines +619 to +627
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)

if err = toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
state = CancelTaskOfDeadCrun
crunErr = true
break waitingCompleteReq
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ignored error from HandleSupervisorRequest followed by overwrite.

On line 620, the error from HandleSupervisorRequest is assigned to err, but line 622 immediately overwrites err with the result of toCrunStream.Send(reply). If HandleSupervisorRequest fails, the error is silently discarded.

🔎 Proposed fix
 				// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
 				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				if err != nil {
+					log.Debugf("[Cfored->Crun][Step #%d.%d] HandleSupervisorRequest failed: %s", jobId, stepId, err.Error())
+					continue
+				}
 
-				if err = toCrunStream.Send(reply); err != nil {
+				if err := toCrunStream.Send(reply); err != nil {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
if err = toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
state = CancelTaskOfDeadCrun
crunErr = true
break waitingCompleteReq
}
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
if err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] HandleSupervisorRequest failed: %s", jobId, stepId, err.Error())
continue
}
if err := toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
state = CancelTaskOfDeadCrun
break waitingCompleteReq
}
🤖 Prompt for AI Agents
internal/cfored/crun_server.go around lines 619 to 627: the error returned by
HandleSupervisorRequest is currently assigned to err and then immediately
overwritten by the result of toCrunStream.Send(reply), silently discarding any
HandleSupervisorRequest failure; change the flow to capture
HandleSupervisorRequest error into a separate variable (e.g., handleErr), check
it right away and handle/log/act on it (return, set state, or break the waiting
loop as appropriate), then call toCrunStream.Send and handle its error
separately so neither error is lost or overwritten.

Comment on lines +689 to +697
// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)

if err = toCrunStream.Send(reply); err != nil {
log.Debugf("[Cfored->Crun][Step #%d.%d] Failed to send %s to crun: %s. "+
"The connection to crun was broken.", jobId, stepId, taskMsg.Type.String(), err.Error())
gotReply = false
break waitingAck
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same error handling issue during CrunWaitCtldAck state.

The error from HandleSupervisorRequest on line 690 is ignored and overwritten on line 692.

🤖 Prompt for AI Agents
In internal/cfored/crun_server.go around lines 689 to 697, the error returned by
HandleSupervisorRequest is being ignored/overwritten by the subsequent send
error handling; capture and check the HandleSupervisorRequest error immediately
(e.g., errHandle := HandleSupervisorRequest(...)); if errHandle != nil, log it
and set gotReply = false and break waitingAck; then perform the
toCrunStream.Send(reply) into a separate variable (e.g., sendErr) and handle/log
sendErr without reusing/overwriting the earlier error variable.

Comment on lines +165 to +189
func (keeper *SupervisorChannelKeeper) forwardCrunRequestToSingleSupervisor(taskId uint32, stepId uint32,
cranedId string, request *protos.StreamCrunRequest) {
stepIdentity := StepIdentifier{JobId: taskId, StepId: stepId}
keeper.toSupervisorChannelMtx.Lock()
defer keeper.toSupervisorChannelMtx.Unlock()
stepChannels, exist := keeper.toSupervisorChannels[stepIdentity]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist step.", taskId, stepId)
return
}
supervisorChannel, exist := stepChannels[cranedId]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
}

select {
case supervisorChannel.requestChannel <- request:
default:
if len(supervisorChannel.requestChannel) == cap(supervisorChannel.requestChannel) {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s is full", taskId, stepId, cranedId)
} else {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s write failed", taskId, stepId, cranedId)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Missing return after supervisorChannel not found causes nil pointer dereference.

When supervisorChannel doesn't exist (line 176), the code logs an error but continues to line 181 where it tries to send to supervisorChannel.requestChannel, causing a nil pointer dereference.

🔎 Proposed fix
 	supervisorChannel, exist := stepChannels[cranedId]
 	if !exist {
 		log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
+		return
 	}
 
 	select {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (keeper *SupervisorChannelKeeper) forwardCrunRequestToSingleSupervisor(taskId uint32, stepId uint32,
cranedId string, request *protos.StreamCrunRequest) {
stepIdentity := StepIdentifier{JobId: taskId, StepId: stepId}
keeper.toSupervisorChannelMtx.Lock()
defer keeper.toSupervisorChannelMtx.Unlock()
stepChannels, exist := keeper.toSupervisorChannels[stepIdentity]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist step.", taskId, stepId)
return
}
supervisorChannel, exist := stepChannels[cranedId]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
}
select {
case supervisorChannel.requestChannel <- request:
default:
if len(supervisorChannel.requestChannel) == cap(supervisorChannel.requestChannel) {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s is full", taskId, stepId, cranedId)
} else {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s write failed", taskId, stepId, cranedId)
}
}
}
func (keeper *SupervisorChannelKeeper) forwardCrunRequestToSingleSupervisor(taskId uint32, stepId uint32,
cranedId string, request *protos.StreamCrunRequest) {
stepIdentity := StepIdentifier{JobId: taskId, StepId: stepId}
keeper.toSupervisorChannelMtx.Lock()
defer keeper.toSupervisorChannelMtx.Unlock()
stepChannels, exist := keeper.toSupervisorChannels[stepIdentity]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist step.", taskId, stepId)
return
}
supervisorChannel, exist := stepChannels[cranedId]
if !exist {
log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
return
}
select {
case supervisorChannel.requestChannel <- request:
default:
if len(supervisorChannel.requestChannel) == cap(supervisorChannel.requestChannel) {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s is full", taskId, stepId, cranedId)
} else {
log.Errorf("[Step #%d.%d] toSupervisorChannel to supervisor on%s write failed", taskId, stepId, cranedId)
}
}
}
🤖 Prompt for AI Agents
In internal/cfored/server.go around lines 165 to 189, the code logs an error
when supervisorChannel is not found but doesn’t return, causing a nil pointer
dereference when accessing supervisorChannel.requestChannel; update the function
to immediately return after logging the missing supervisorChannel error (i.e.,
after the log.Errorf on line ~176) so that the send/select never attempts to use
a nil supervisorChannel, preserving the existing lock/unlock behavior and
leaving the rest of the method unchanged.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
internal/cfored/crun_server.go (2)

47-137: Good centralization of supervisor request translation.

The HandleSupervisorRequest function cleanly maps various StreamTaskIORequest types to corresponding StreamCrunReply messages. This reduces duplication across the state machine.

However, note that log.Fatalf on line 132 terminates the process, making the return errors.New(...) on line 134 unreachable.

🔎 Proposed fix
 	default:
-		log.Fatalf("[Supervisor->Cfored->Crun][Step #%d.%d] Expect Type TASK_OUTPUT or TASK_X11_OUTPUT or TASK_EXIT_STATUS.",
+		log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] Unexpected message type: %s",
-			jobId, stepId)
+			jobId, stepId, taskMsg.Type.String())
 		return errors.New("Unexpected Task IO Message Type")
 	}

649-657: Error from HandleSupervisorRequest is silently discarded.

On line 650, the error from HandleSupervisorRequest is assigned to err, but line 652 immediately overwrites err with the result of toCrunStream.Send(reply). If HandleSupervisorRequest fails, the error is silently discarded.

🔎 Proposed fix
 				// During cancellation, just forward the IO messages from supervisor to crun, ignore errors
-				err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+				if err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply); err != nil {
+					log.Debugf("[Cfored->Crun][Step #%d.%d] HandleSupervisorRequest failed: %s", jobId, stepId, err.Error())
+					continue
+				}
 
-				if err = toCrunStream.Send(reply); err != nil {
+				if err := toCrunStream.Send(reply); err != nil {
internal/cfored/server.go (1)

165-189: Critical: Missing return after supervisorChannel not found causes nil pointer dereference.

When supervisorChannel doesn't exist (line 176), the code logs an error but continues to line 181 where it tries to send to supervisorChannel.requestChannel, causing a nil pointer dereference.

🔎 Proposed fix
 	supervisorChannel, exist := stepChannels[cranedId]
 	if !exist {
 		log.Errorf("[Step #%d.%d] Trying to forward crun request to non-exist craned %s.", taskId, stepId, cranedId)
+		return
 	}
 
 	select {
internal/crun/crun.go (1)

1099-1103: Misleading log and potentially incorrect behavior for remote file patterns.

When isLocalFile is false, the log says "Input file is not a local file" but then calls m.stopStepCb() which cancels the step. If the file pattern contains remote tokens (%N, %n, %t), shouldn't the input handling be deferred to the remote side rather than canceling the step?

🔎 Proposed fix - if remote files should skip local reading without canceling
 	if !isLocalFile {
-		log.Debugf("Input file is not a local file: %s", filePattern)
-		m.chanInputFromLocal <- nil
-		m.stopStepCb()
+		log.Debugf("Input file pattern contains remote tokens, skipping local read: %s", filePattern)
+		// Don't send nil or cancel - let remote side handle input
 		return
 	}
🧹 Nitpick comments (1)
internal/crun/crun.go (1)

632-673: StateWaitAck has duplicate handling after switch statement.

Lines 675-689 check TASK_COMPLETION_ACK_REPLY again after the switch statement already handles it at line 670. The early return statements in the switch cases (lines 638, 644, 652, 665) mean flow won't reach line 675 for those cases, but the TASK_COMPLETION_ACK_REPLY case at line 670 sets m.state = End then falls through to line 675, causing redundant checks.

🔎 Proposed fix - add return after TASK_COMPLETION_ACK_REPLY
 	case protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY:
 		log.Debug("Task completed.")
 		m.state = End
+		return
 	}
-
-	if cforedReply.Type != protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY {
-		log.Errorf("Expect TASK_COMPLETION_ACK_REPLY. bug get %s\n", cforedReply.Type.String())
-		m.err = util.ErrorBackend
-		m.state = End
-		return
-	}
-
-	if cforedReply.GetPayloadTaskCompletionAckReply().Ok {
-		log.Debug("Task completed.")
-	} else {
-		log.Errorln("Failed to notify server of job completion")
-		m.err = util.ErrorBackend
-	}
-
-	m.state = End
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b095f5e and 1674042.

📒 Files selected for processing (7)
  • internal/cfored/crun_server.go
  • internal/cfored/server.go
  • internal/crun/cmd.go
  • internal/crun/crun.go
  • internal/util/err.go
  • protos/Crane.proto
  • protos/PublicDefs.proto
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/util/err.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/cfored/server.go (1)
internal/ccontrol/ccontrol.go (1)
  • StepIdentifier (50-53)
🔇 Additional comments (20)
protos/PublicDefs.proto (4)

361-371: LGTM on the new IoMeta message.

The IoMeta message cleanly consolidates I/O-related metadata (append mode, file patterns, and crun-specific task IDs) into a reusable structure. The use of optional for open_mode_append, input_task_id, and output_task_id is appropriate since they may not always be set.


373-375: LGTM on BatchTaskAdditionalMeta simplification.

The refactoring to keep only interpreter in BatchTaskAdditionalMeta (with I/O metadata moved to IoMeta) improves separation of concerns.


118-118: Incorrect enum name in review comment.

The review comment references Configured = 8 being renamed to Starting = 8, but the actual enum value is Configuring = 7. The TaskStatus enum currently contains both Configuring = 7 and Starting = 8 with no usages of either in the codebase. There is no enum value named Configured in the proto definition.

Likely an incorrect or invalid review comment.


229-230: No action needed—code does not rely on presence tracking for these fields.

The codebase uses proto3 and does not call HasNodeNum() or HasNtasksPerNode() methods anywhere. Instead, code explicitly validates that both fields must be > 0 (internal/util/string.go) and treats 0 as invalid. All initialization paths set these fields to proper values (default to 1), and step mode logic depends on value validation, not on detecting unset fields.

internal/crun/cmd.go (2)

88-89: LGTM on SetInterspersed(false).

This correctly stops flag parsing at the first positional argument, ensuring that flags meant for the user's executable aren't consumed by crun.


48-49: LGTM on new --output and --err flags.

The new flags follow the same pattern as --input with sensible defaults ("all"). The short flags -o and -e are intuitive and align with common conventions.

Also applies to: 113-114

internal/cfored/server.go (2)

337-351: LGTM on expanded supervisor request handling.

The new cases for TASK_ERR_OUTPUT, TASK_X11_CONN, TASK_X11_OUTPUT, TASK_X11_EOF, and TASK_EXIT_STATUS are properly forwarded using forwardRemoteIoToCrun. The fallthrough pattern for X11-related types is appropriate since they share the same forwarding logic.


420-424: LGTM on consolidated reply sending with error handling.

Moving the Send call after the switch and handling connection errors by transitioning to SupervisorUnReg is a clean improvement that ensures proper state management when the supervisor connection breaks.

internal/cfored/crun_server.go (3)

148-149: LGTM on per-task routing maps.

The craned_tasks_map and task_craned_map enable efficient per-task routing for directed input forwarding, which is essential for the new --input task ID feature.


549-562: LGTM on per-task I/O forwarding logic.

The conditional routing based on TaskId presence correctly sends input to either a specific craned (via forwardCrunRequestToSingleSupervisor) or broadcasts to all (via forwardCrunRequestToSupervisor).


705-750: LGTM on the waitingAck loop with gotReply flag.

The refactored loop properly handles IO messages during the ack-waiting phase, preventing channel fullness from blocking supervisors. The gotReply flag ensures CompletionAck is only sent when actually received.

internal/crun/crun.go (4)

980-1083: LGTM on ParseFilePattern enhancements.

The function now correctly distinguishes between local and remote file patterns by detecting remote-only tokens (%N, %n, %t). The backslash escape handling and hostname-based node resolution are well-implemented.


1142-1221: LGTM on StartIOForward refactoring.

The new implementation cleanly handles all combinations of input/output/error flag values:

  • "all" → stdin/stdout/stderr
  • Task ID → filtered terminal I/O
  • File pattern → file-based I/O

The context-based cancellation with stopStepCtx, stopReadCtx, and stopWriteCtx provides proper lifecycle management.


1584-1605: LGTM on PTY mode flag overrides.

When --pty is used, forcing inputFlag="0", outputFlag="0", errorFlag="none" is correct since PTY multiplexes all I/O through a single channel for the primary task.


816-861: stopStepCb() is thread-safe and idempotent.

In FileWriterRoutine, stopStepCb() is a context.CancelFunc and is safe to call multiple times. While the control flow within a single FileWriterRoutine invocation prevents consecutive calls (due to return and break statements), multiple concurrent FileWriterRoutine goroutines may invoke it simultaneously. This is safe because context.CancelFunc is explicitly designed to be idempotent and thread-safe.

protos/Crane.proto (5)

477-479: LGTM on deprecating execution_node.

Properly marking the field as deprecated with [deprecated = true] maintains backward compatibility while signaling to consumers that this field should no longer be used.


729-734: LGTM on TaskResAllocatedReply enhancements.

Adding craned_task_map and ntasks_total enables the crun client to properly route per-task I/O. The nested Tasks message is a clean approach for the repeated field in the map value.


823-827: LGTM on new StreamCrunReply types and messages.

The new enum values (TASK_EXIT_STATUS, TASK_X11_CONN, TASK_X11_EOF, TASK_ERR_OUTPUT_FORWARD) and corresponding payload messages are well-structured and support the enhanced X11 forwarding and per-task exit status reporting features.

Also applies to: 861-900


904-965: LGTM on StreamTaskIORequest expansions.

The new request types and messages (TaskErrOutputReq, TaskX11FwdConnReq, TaskX11OutputEofReq, TaskExitStatus) properly support the supervisor-to-cfored communication for the enhanced I/O forwarding features.


980-994: LGTM on StreamTaskIOReply additions.

Adding optional uint32 task_id to TaskInputReq and eof/local_id to TaskX11InputReq enables the supervisor to properly route inputs to specific tasks or X11 sessions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/crun/crun.go (1)

1039-1083: Mixed local/remote tokens are discarded rather than properly handled or rejected.

When a pattern contains both local tokens (e.g., %j) and remote tokens (e.g., %t), the function applies local token replacements but then discards the result when isLocalFile is set to false. Either validate that patterns cannot mix local and remote tokens, or clarify how remote handling should receive the partially-expanded pattern. Currently, mixed tokens silently produce an empty string return without error or warning.

♻️ Duplicate comments (3)
internal/cfored/crun_server.go (3)

131-135: Unreachable code: log.Fatalf terminates before error return.

This issue was previously flagged. log.Fatalf calls os.Exit(1) after logging, so the return errors.New(...) on line 134 is never reached. Either use log.Errorf to allow the error to be returned, or remove the return statement.


650-658: Ignored error from HandleSupervisorRequest followed by overwrite.

This issue was previously flagged. On line 651, the error from HandleSupervisorRequest is assigned to err, but line 653 immediately overwrites err with the result of toCrunStream.Send(reply). If HandleSupervisorRequest fails, the error is silently discarded.

🔎 Proposed fix
 // During cancellation, just forward the IO messages from supervisor to crun, ignore errors
 err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+if err != nil {
+	log.Debugf("[Cfored->Crun][Step #%d.%d] HandleSupervisorRequest failed: %s", jobId, stepId, err.Error())
+	continue
+}

-if err = toCrunStream.Send(reply); err != nil {
+if err := toCrunStream.Send(reply); err != nil {

720-728: Same error handling issue during CrunWaitCtldAck state.

This issue was previously flagged. The error from HandleSupervisorRequest on line 721 is ignored and overwritten on line 723.

🔎 Proposed fix
 // During cancellation, just forward the IO messages from supervisor to crun, ignore errors
 err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
+if err != nil {
+	log.Debugf("[Cfored->Crun][Step #%d.%d] HandleSupervisorRequest failed: %s", jobId, stepId, err.Error())
+	gotReply = false
+	break waitingAck
+}

-if err = toCrunStream.Send(reply); err != nil {
+if err := toCrunStream.Send(reply); err != nil {
🧹 Nitpick comments (3)
internal/cfored/crun_server.go (1)

148-149: Consider using camelCase for Go field names.

The fields craned_tasks_map and task_craned_map use snake_case, which is inconsistent with Go naming conventions. Consider renaming to cranedTasksMap and taskCranedMap.

internal/crun/crun.go (2)

449-458: Task ID parsing allows numeric file paths to be ambiguous.

The logic parses m.inputFlag as a task ID if it's numeric and within range. However, if a user legitimately wants to read from a file named "0" or "1", this will be interpreted as a task ID instead. Consider requiring explicit syntax (e.g., "task:0" vs "file:0") to disambiguate.


1100-1104: Previous issue appears resolved.

The past review comment flagged that the log message was inverted. The current code (lines 1100-1104) correctly logs "Input file is not a local file" when isLocalFile is false, which is logically correct. However, the routine still calls m.stopStepCb() when encountering a non-local file pattern, which seems intentional but may deserve a clearer error message explaining why non-local input files are not supported in this context.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1674042 and 00baa42.

📒 Files selected for processing (3)
  • internal/cfored/crun_server.go
  • internal/crun/crun.go
  • internal/util/err.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/util/err.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/crun/crun.go (3)
internal/crun/x11.go (1)
  • X11SessionMgr (56-67)
internal/util/string.go (1)
  • ParseGres (1052-1096)
internal/crun/cmd.go (2)
  • FlagOutput (48-48)
  • FlagErr (49-49)
🔇 Additional comments (11)
internal/cfored/crun_server.go (3)

420-427: LGTM!

The bidirectional mapping initialization correctly populates both craned_tasks_map and task_craned_map from the allocation payload.


756-765: LGTM!

The gotReply mechanism correctly prevents attempting to send CompletionAck to a dead crun connection. This improves error handling during the acknowledgment phase.


604-614: Verify reply is valid before sending after error.

If HandleSupervisorRequest returns an error (line 605), the code breaks from the forwarding loop but still attempts to send reply on line 609. Since HandleSupervisorRequest may not have initialized reply properly when an error occurs, sending an uninitialized or partially initialized reply could cause issues.

🔎 Proposed fix
 err := HandleSupervisorRequest(jobId, stepId, taskMsg, &reply)
 if err != nil {
+	log.Errorf("[Supervisor->Cfored->Crun][Step #%d.%d] HandleSupervisorRequest failed: %s", jobId, stepId, err.Error())
+	state = CancelTaskOfDeadCrun
 	break forwarding
 }

 if err = toCrunStream.Send(reply); err != nil {

Likely an incorrect or invalid review comment.

internal/crun/crun.go (8)

66-67: LGTM!

The renamed constants FlagIOForwardALL and FlagIOForwardNONE better reflect their purpose for IO forwarding control.


92-99: LGTM!

The new fields properly support per-task IO routing, X11 forwarding, and context-based cancellation. Field naming follows Go conventions.

Also applies to: 114-122


332-336: LGTM!

The cranedTaskMap initialization correctly extracts task-to-craned mappings from the allocation payload.


540-570: LGTM!

The forwarding logic correctly handles IO messages, X11 forwarding, and task exit statuses with proper cancellation support via stopWriteCtx.


783-892: LGTM!

The new writer routines (StderrWriterRoutine, DiscardRoutine, FileWriterRoutine, StdoutFileWriterRoutine, StderrFileWriterRoutine) properly handle cancellation, create necessary directories, and manage file resources correctly.


1143-1228: LGTM!

The StartIOForward function correctly initializes IO channels and spawns appropriate reader/writer routines based on the input/output/error flags. The logic properly handles "all", "none", task ID, and file path options, with X11 forwarding conditionally enabled.


1382-1385: LGTM!

The error handling for ParseGres is correct and provides clear error messages.


1590-1611: LGTM!

The PTY compatibility check correctly prevents using --input/--output/--err flags with --pty, and the IoMeta fields are properly populated with the flag values.

Comment on lines +550 to +563
req := crunRequest.GetPayloadTaskIoForwardReq()

if req.TaskId != nil {
node := task_craned_map[*req.TaskId]
log.Debugf("[Crun->Cfored->Supervisor][Step #%d.%d] Receive TASK_IO_FORWARD Request to task #%d"+
"on node %s msg size[%d], EOF [%v]",
jobId, stepId, *req.TaskId, node, len(req.GetMsg()), req.Eof)
gSupervisorChanKeeper.forwardCrunRequestToSingleSupervisor(jobId, stepId,
node, crunRequest)
} else {
log.Debugf("[Crun->Cfored->Supervisor][Step #%d.%d] Receive TASK_IO_FORWARD Request to tasks msg size[%d], EOF [%v]",
jobId, stepId, len(req.GetMsg()), req.Eof)
gSupervisorChanKeeper.forwardCrunRequestToSupervisor(jobId, stepId, crunRequest)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the forwardCrunRequestToSingleSupervisor function implementation
rg -A 15 "func.*forwardCrunRequestToSingleSupervisor" internal/cfored/

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 1170


🏁 Script executed:

#!/bin/bash
# Also check the forwardCrunRequestToSupervisor for comparison
rg -A 15 "func.*forwardCrunRequestToSupervisor\(" internal/cfored/

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 1245


🏁 Script executed:

#!/bin/bash
# Find where task_craned_map is populated/initialized
rg "task_craned_map\[" internal/cfored/crun_server.go -B 3 -A 1

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 505


🏁 Script executed:

#!/bin/bash
# Check the full context around lines 550-563 including variable initialization
sed -n '540,570p' internal/cfored/crun_server.go

Repository: PKUHPC/CraneSched-FrontEnd

Length of output: 1555


Add defensive check before task-to-node lookup.

When req.TaskId is non-nil, the code performs node := task_craned_map[*req.TaskId] without validating that the task ID exists in the map. In Go, accessing a missing key returns the zero value (empty string). This empty string is then passed to forwardCrunRequestToSingleSupervisor, which logs an error when looking up stepChannels[""] but does not return. The function continues and attempts to send on an uninitialized or nil channel field, causing a panic.

Add a check to verify the task ID exists in the map before forwarding, or handle the case where the node is empty.

🤖 Prompt for AI Agents
In internal/cfored/crun_server.go around lines 550 to 563, when req.TaskId is
non-nil the code looks up node := task_craned_map[*req.TaskId] without verifying
the key exists which can yield an empty string and later cause a panic when
forwarding; update this block to perform a safe map lookup (node, ok :=
task_craned_map[*req.TaskId]) and handle the missing-case: if ok is false or
node == "" then log a clear error including the task id and drop/ack the request
(do not call forwardCrunRequestToSingleSupervisor), otherwise proceed to call
forwardCrunRequestToSingleSupervisor with the validated node.

Comment on lines +633 to +674
switch cforedReply.Type {
case protos.StreamCrunReply_TASK_IO_FORWARD:
select {
case m.chanOutputFromRemote <- cforedReply.GetPayloadTaskIoForwardReply().Msg:
case <-m.stopWriteCtx.Done():
}
return // Still in WaitAck state
case protos.StreamCrunReply_TASK_ERR_OUTPUT_FORWARD:
select {
case m.chanErrOutputFromRemote <- cforedReply.GetPayloadTaskIoErrOutputForwardReply().Msg:
case <-m.stopWriteCtx.Done():
}
return

case protos.StreamCrunReply_TASK_X11_CONN:
fallthrough
case protos.StreamCrunReply_TASK_X11_FORWARD:
fallthrough
case protos.StreamCrunReply_TASK_X11_EOF:
m.X11SessionMgr.X11ReplyChan <- cforedReply
return // Still in WaitAck state

case protos.StreamCrunReply_TASK_EXIT_STATUS:
exitStatus := cforedReply.GetPayloadTaskExitStatusReply()
if exitStatus.ExitCode != 0 {
if exitStatus.Signaled {
fmt.Fprintf(os.Stderr, "error: task %d: Terminated\n", exitStatus.TaskId)
} else {
fmt.Fprintf(os.Stderr, "error: task %d: Exited with exit code %d\n",
exitStatus.TaskId, exitStatus.ExitCode)
}
m.err = int(exitStatus.ExitCode)
}
return // Still in WaitAck state

case protos.StreamCrunReply_TASK_CANCEL_REQUEST:
log.Fatalf("Received TASK_CANCEL_REQUEST in WaitAck state.")

case protos.StreamCrunReply_TASK_COMPLETION_ACK_REPLY:
log.Debug("Task completed.")
m.state = End
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

StateWaitAck inefficiently processes messages with early returns.

The function handles multiple message types by returning early to stay in the WaitAck state (lines 639, 645, 653, 666). This causes the Run() loop to repeatedly call StateWaitAck, which is inefficient. Consider wrapping the logic in a loop within StateWaitAck to process multiple messages until receiving TASK_COMPLETION_ACK_REPLY.

Additionally, line 669 uses log.Fatalf for TASK_CANCEL_REQUEST, which terminates the entire process. Consider using log.Errorf and transitioning to TaskKilling state instead.

🤖 Prompt for AI Agents
internal/crun/crun.go around lines 633 to 674: StateWaitAck currently returns
from the function for most message cases to remain in WaitAck, causing repeated
Run() invocations; change StateWaitAck to loop reading/processing cforedReply
messages (e.g., for { read reply; switch ... } ) and only exit the loop when
TASK_COMPLETION_ACK_REPLY is received (set m.state = End) or when context
cancellation occurs, replacing the early returns with continues to keep
processing within the same function; also replace the log.Fatalf on
TASK_CANCEL_REQUEST with log.Errorf (or similar non-fatal logging) and
transition the state to TaskKilling (e.g., set m.state = TaskKilling and
break/return appropriately) so the process is not terminated abruptly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants