Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,13 @@ func (j *Job) Validate() error {
}
}

if j.Executor == "shell" && j.ExecutorConfig["mem_limit"] != "" {
err := validateMemoryLimit(j.ExecutorConfig["mem_limit"])
if err != nil {
return fmt.Errorf("Error parsing job memory limit value: %v", err)
}
}

return nil
}

Expand Down Expand Up @@ -556,3 +563,74 @@ func findParentJobInChildJobs(jobs []*Job, job *Job) bool {
}
return false
}

// validateMemoryLimit validates a memory limit string and returns an error if invalid.
// Accepts formats like "1024", "1024MB", "1GB", "512KB", etc.
func validateMemoryLimit(limit string) error {
if limit == "" {
return nil // Empty limit is valid (no limit)
}

// Try to parse as a plain number (bytes)
if value, err := strconv.ParseInt(limit, 10, 64); err == nil {
if value <= 0 {
return fmt.Errorf("memory limit must be greater than 0")
}
return nil
}

// Try to parse with units
limit = strings.ToUpper(strings.TrimSpace(limit))

// Extract the numeric part and unit
var numStr string
var unit string

// Find where the number ends and unit begins
i := 0
for i < len(limit) && (limit[i] >= '0' && limit[i] <= '9' || limit[i] == '.') {
i++
}

if i == 0 {
return fmt.Errorf("invalid memory limit format: %s", limit)
}

numStr = limit[:i]
unit = limit[i:]

// Parse the numeric part
value, err := strconv.ParseFloat(numStr, 64)
if err != nil {
return fmt.Errorf("invalid numeric value in memory limit: %s", numStr)
}

if value <= 0 {
return fmt.Errorf("memory limit must be greater than 0")
}

// Validate and convert unit to bytes
var multiplier int64
switch unit {
case "", "B", "BYTES":
multiplier = 1
case "KB", "K":
multiplier = 1024
case "MB", "M":
multiplier = 1024 * 1024
case "GB", "G":
multiplier = 1024 * 1024 * 1024
case "TB", "T":
multiplier = 1024 * 1024 * 1024 * 1024
default:
return fmt.Errorf("unsupported memory unit: %s (supported: B, KB, MB, GB, TB)", unit)
}

// Check for overflow
bytes := int64(value * float64(multiplier))
if bytes <= 0 {
return fmt.Errorf("memory limit too large or causes overflow")
}

return nil
}
124 changes: 124 additions & 0 deletions plugin/shell/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,51 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp
defer slowTimer.Stop()
}

// Parse memory limit if specified
memLimit, err := parseMemoryLimit(args.Config["mem_limit"])
if err != nil {
return nil, fmt.Errorf("shell: Error parsing job memory limit: %v", err)
}

var memLimitExceededMessage string
var memLimitExceeded bool

quit := make(chan int)

// Start memory monitoring if limit is set
if memLimit > 0 {
go func() {
ticker := time.NewTicker(1 * time.Second) // Check every second
defer ticker.Stop()

for {
select {
case <-quit:
return
case <-ticker.C:
// Get memory usage for the process and its children
_, totalMem, err := GetTotalCPUMemUsage(cmd.Process.Pid)
if err != nil {
// Process might have already finished
continue
}

if totalMem > float64(memLimit) {
// Memory limit exceeded, kill the process
err := processKill(cmd)
if err != nil {
memLimitExceededMessage = fmt.Sprintf("shell: Job '%s' memory usage (%.0f bytes) exceeding defined limit (%d bytes). SIGKILL returned error. Job may not have been killed", command, totalMem, memLimit)
} else {
memLimitExceededMessage = fmt.Sprintf("shell: Job '%s' memory usage (%.0f bytes) exceeding defined limit (%d bytes). Job was killed", command, totalMem, memLimit)
}
memLimitExceeded = true
return
}
}
}
}()
}

// FIXME: Debug metrics collection
// quit := make(chan int)

Expand All @@ -150,6 +195,13 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp
}
}

if memLimitExceeded {
_, err := output.Write([]byte(memLimitExceededMessage))
if err != nil {
log.Printf("Error writing output on memory limit exceeded event: %v", err)
}
}

// Warn if buffer is overwritten
if output.TotalWritten() > output.Size() {
log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size())
Expand Down Expand Up @@ -190,3 +242,75 @@ func buildCmd(command string, useShell bool, env []string, cwd string) (cmd *exe
cmd.Dir = cwd
return
}

// parseMemoryLimit converts a memory limit string to bytes.
// Accepts formats like "1024", "1024MB", "1GB", "512KB", etc.
// Returns 0 if no limit is specified (empty string).
func parseMemoryLimit(limit string) (int64, error) {
if limit == "" {
return 0, nil // No limit
}

// Try to parse as a plain number (bytes)
if value, err := strconv.ParseInt(limit, 10, 64); err == nil {
if value <= 0 {
return 0, fmt.Errorf("memory limit must be greater than 0")
}
return value, nil
}

// Try to parse with units
limit = strings.ToUpper(strings.TrimSpace(limit))

// Extract the numeric part and unit
var numStr string
var unit string

// Find where the number ends and unit begins
i := 0
for i < len(limit) && (limit[i] >= '0' && limit[i] <= '9' || limit[i] == '.') {
i++
}

if i == 0 {
return 0, fmt.Errorf("invalid memory limit format: %s", limit)
}

numStr = limit[:i]
unit = limit[i:]

// Parse the numeric part
value, err := strconv.ParseFloat(numStr, 64)
if err != nil {
return 0, fmt.Errorf("invalid numeric value in memory limit: %s", numStr)
}

if value <= 0 {
return 0, fmt.Errorf("memory limit must be greater than 0")
}

// Validate and convert unit to bytes
var multiplier int64
switch unit {
case "", "B", "BYTES":
multiplier = 1
case "KB", "K":
multiplier = 1024
case "MB", "M":
multiplier = 1024 * 1024
case "GB", "G":
multiplier = 1024 * 1024 * 1024
case "TB", "T":
multiplier = 1024 * 1024 * 1024 * 1024
default:
return 0, fmt.Errorf("unsupported memory unit: %s (supported: B, KB, MB, GB, TB)", unit)
}

// Check for overflow
bytes := int64(value * float64(multiplier))
if bytes <= 0 {
return 0, fmt.Errorf("memory limit too large or causes overflow")
}

return bytes, nil
}
89 changes: 89 additions & 0 deletions plugin/shell/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,92 @@ func Test_buildCmdWithCustomEnvironmentVariables(t *testing.T) {
assert.Equal(t, "Toto\nHo\n", string(out))

}

func Test_parseMemoryLimit(t *testing.T) {
tests := []struct {
name string
input string
expected int64
expectError bool
}{
{
name: "empty string returns no limit",
input: "",
expected: 0,
expectError: false,
},
{
name: "plain bytes",
input: "1024",
expected: 1024,
expectError: false,
},
{
name: "KB unit",
input: "1KB",
expected: 1024,
expectError: false,
},
{
name: "MB unit",
input: "1MB",
expected: 1024 * 1024,
expectError: false,
},
{
name: "GB unit",
input: "2GB",
expected: 2 * 1024 * 1024 * 1024,
expectError: false,
},
{
name: "lowercase units",
input: "512mb",
expected: 512 * 1024 * 1024,
expectError: false,
},
{
name: "decimal values",
input: "1.5GB",
expected: int64(1.5 * 1024 * 1024 * 1024),
expectError: false,
},
{
name: "invalid format",
input: "invalid",
expected: 0,
expectError: true,
},
{
name: "negative value",
input: "-100MB",
expected: 0,
expectError: true,
},
{
name: "zero value",
input: "0",
expected: 0,
expectError: true,
},
{
name: "unsupported unit",
input: "100PB",
expected: 0,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := parseMemoryLimit(tt.input)

if tt.expectError {
assert.Error(t, err, "Expected error for input: %s", tt.input)
} else {
assert.NoError(t, err, "Expected no error for input: %s", tt.input)
assert.Equal(t, tt.expected, result, "Expected %d bytes for input: %s", tt.expected, tt.input)
}
})
}
}
34 changes: 34 additions & 0 deletions website/docs/usage/executors/shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The shell executor runs commands on the target node's operating system. It can:
| `env` | No | Environment variables in the format "KEY1=value1,KEY2=value2" |
| `cwd` | No | The working directory to run the command from |
| `timeout` | No | Maximum execution time after which the job is forcefully terminated |
| `mem_limit` | No | Maximum memory usage after which the job is forcefully terminated. Supports units: B, KB, MB, GB, TB |

## Basic Usage Examples

Expand Down Expand Up @@ -84,6 +85,31 @@ The shell executor runs commands on the target node's operating system. It can:
}
```

### Setting a Memory Limit

```json
{
"executor": "shell",
"executor_config": {
"command": "/opt/scripts/memory_intensive_task.sh",
"mem_limit": "512MB"
}
}
```

### Combining Timeout and Memory Limit

```json
{
"executor": "shell",
"executor_config": {
"command": "/opt/scripts/resource_intensive_task.sh",
"timeout": "30m",
"mem_limit": "1GB"
}
}
```

## Advanced Examples

### Running a Multi-line Script
Expand Down Expand Up @@ -181,6 +207,14 @@ If jobs are timing out:
- Optimize the command to run more efficiently
- Consider breaking large jobs into smaller, chained jobs

### Memory Limits

If jobs are being killed due to memory limits:
- Adjust the `mem_limit` parameter to accommodate the job's memory requirements
- Optimize the command to use less memory (e.g., process data in chunks)
- Monitor memory usage patterns to set appropriate limits
- Consider if the job should run on a node with more available memory

### Environment Variables

If environment variables aren't working:
Expand Down
Loading