Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions rayapp/anyscale_cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package rayapp

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
)

type WorkspaceState int

const (
StateTerminated WorkspaceState = iota
StateStarting
StateRunning
)

var WorkspaceStateName = map[WorkspaceState]string{
StateTerminated: "TERMINATED",
StateStarting: "STARTING",
StateRunning: "RUNNING",
}

func (ws WorkspaceState) String() string {
return WorkspaceStateName[ws]
}

type AnyscaleCLI struct {
token string
}

var errAnyscaleNotInstalled = errors.New("anyscale is not installed")

func NewAnyscaleCLI(token string) *AnyscaleCLI {
return &AnyscaleCLI{token: token}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The token field in the AnyscaleCLI struct is initialized in NewAnyscaleCLI but is never used. The runAnyscaleCLI function executes anyscale commands without passing this token. Authentication seems to rely on environment variables (ANYSCALE_CLI_TOKEN, ANYSCALE_HOST), as suggested by the error message in the Authenticate method.

To avoid confusion and simplify the code, consider removing the token field from the AnyscaleCLI struct and the token parameter from NewAnyscaleCLI.

type AnyscaleCLI struct {}

var errAnyscaleNotInstalled = errors.New("anyscale is not installed")

func NewAnyscaleCLI() *AnyscaleCLI {
	return &AnyscaleCLI{}
}


func isAnyscaleInstalled() bool {
_, err := exec.LookPath("anyscale")
return err == nil
}

func (ac *AnyscaleCLI) Authenticate() error {
cmd := exec.Command("anyscale", "login")
err := cmd.Run()
if err != nil {
return fmt.Errorf("anyscale auth login failed, please set ANYSCALE_CLI_TOKEN & ANYSCALE_HOST env variables: %w", err)
}
return nil
}

// RunAnyscaleCLI runs the anyscale CLI with the given arguments.
// Returns the combined output and any error that occurred.
// Output is displayed to the terminal with colors preserved.
func (ac *AnyscaleCLI) runAnyscaleCLI(args []string) (string, error) {
if !isAnyscaleInstalled() {
return "", errAnyscaleNotInstalled
}

fmt.Println("anyscale cli args: ", args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code uses fmt.Println for logging throughout the file (e.g., here and on lines 102, 170, 179, etc.). While useful for debugging during development, this is not ideal for a library or CLI tool because it's inflexible. It forces all log messages to stdout and doesn't allow for log levels (e.g., DEBUG, INFO, ERROR).

Consider using the standard log package or a structured logging library like slog (available in Go 1.21+) or a third-party one (e.g., logrus, zap). This would allow you to control log levels, direct output to stderr (common for logs in CLI tools), and provide more structured information.

cmd := exec.Command("anyscale", args...)

// Capture output while also displaying to terminal with colors
var outputBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &outputBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &outputBuf)

if err := cmd.Run(); err != nil {
return outputBuf.String(), fmt.Errorf("anyscale error: %w", err)
}

return outputBuf.String(), nil
}

// parseComputeConfigName parses the AWS config path and converts it to a config name.
// e.g., "configs/basic-single-node/aws.yaml" -> "basic-single-node-aws"
func parseComputeConfigName(awsConfigPath string) string {
// Get the directory and filename
dir := filepath.Dir(awsConfigPath) // "configs/basic-single-node"
base := filepath.Base(awsConfigPath) // "aws.yaml"
ext := filepath.Ext(base) // ".yaml"
filename := strings.TrimSuffix(base, ext) // "aws"

// Get the last directory component (the config name)
configDir := filepath.Base(dir) // "basic-single-node"

// Combine: "basic-single-node-aws"
return configDir + "-" + filename
}

// CreateComputeConfig creates a new compute config from a YAML file if it doesn't already exist.
// name: the name for the compute config (without version tag)
// configFile: path to the YAML config file
// Returns the output from the CLI and any error.
func (ac *AnyscaleCLI) CreateComputeConfig(name, configFilePath string) (string, error) {
// Check if compute config already exists
if output, err := ac.GetComputeConfig(name); err == nil {
fmt.Printf("Compute config %q already exists, skipping creation\n", name)
return output, nil
}

// Create the compute config since it doesn't exist
args := []string{"compute-config", "create", "-n", name, "-f", configFilePath}
output, err := ac.runAnyscaleCLI(args)
if err != nil {
return output, fmt.Errorf("create compute config failed: %w", err)
}
return output, nil
}

// GetComputeConfig retrieves the details of a compute config by name.
// name: the name of the compute config (optionally with version tag, e.g., "name:1")
// Returns the output from the CLI and any error.
func (ac *AnyscaleCLI) GetComputeConfig(name string) (string, error) {
args := []string{"compute-config", "get", "-n", name}
output, err := ac.runAnyscaleCLI(args)
if err != nil {
return output, fmt.Errorf("get compute config failed: %w", err)
}
return output, nil
}

// ListComputeConfigs lists compute configs with optional filters.
// name: filter by name (optional, empty string for no filter)
// includeShared: include shared compute configs
// maxItems: maximum number of items to return (0 for no limit)
// Returns the output from the CLI and any error.
func (ac *AnyscaleCLI) ListComputeConfigs(name string, includeShared bool, maxItems int) (string, error) {
args := []string{"compute-config", "list"}
if name != "" {
args = append(args, "-n", name)
}
if includeShared {
args = append(args, "--include-shared")
}
if maxItems > 0 {
args = append(args, "--max-items", fmt.Sprintf("%d", maxItems))
}
output, err := ac.runAnyscaleCLI(args)
if err != nil {
return output, fmt.Errorf("list compute configs failed: %w", err)
}
return output, nil
}

func (ac *AnyscaleCLI) createEmptyWorkspace(config *WorkspaceTestConfig) error {
args := []string{"workspace_v2", "create"}
// get image URI and ray version from build ID
imageURI, rayVersion, err := convertBuildIdToImageURI(config.template.ClusterEnv.BuildID)
if err != nil {
return fmt.Errorf("convert build ID to image URI failed: %w", err)
}
args = append(args, "--name", config.workspaceName)
args = append(args, "--image-uri", imageURI)
args = append(args, "--ray-version", rayVersion)

// Use compute config name if set
if config.computeConfig != "" {
args = append(args, "--compute-config", config.computeConfig)
}

output, err := ac.runAnyscaleCLI(args)
if err != nil {
return fmt.Errorf("create empty workspace failed: %w", err)
}
fmt.Println("create empty workspace output:\n", output)
return nil
}

func (ac *AnyscaleCLI) terminateWorkspace(workspaceName string) error {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "terminate", "--name", workspaceName})
if err != nil {
return fmt.Errorf("delete workspace failed: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message "delete workspace failed" is inconsistent with the function name terminateWorkspace and the CLI command workspace_v2 terminate. For clarity and easier debugging, the error message should align with the action being performed.

		return fmt.Errorf("terminate workspace failed: %w", err)

}
fmt.Println("terminate workspace output:\n", output)
return nil
}

func (ac *AnyscaleCLI) copyTemplateToWorkspace(config *WorkspaceTestConfig) error {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "push", "--name", config.workspaceName, "--local-dir", config.template.Dir})
if err != nil {
return fmt.Errorf("copy template to workspace failed: %w", err)
}
fmt.Println("copy template to workspace output:\n", output)
return nil
}
Comment on lines +270 to +277
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function copyTemplateToWorkspace appears to be unused. The test runner in test.go uses pushTemplateToWorkspace instead. To keep the codebase clean and avoid confusion, it's best to remove dead code.


func (ac *AnyscaleCLI) pushTemplateToWorkspace(workspaceName, localFilePath string) error {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "push", "--name", workspaceName, "--local-dir", localFilePath})
if err != nil {
return fmt.Errorf("push file to workspace failed: %w", err)
}
fmt.Println("push file to workspace output:\n", output)
return nil
}

func (ac *AnyscaleCLI) runCmdInWorkspace(config *WorkspaceTestConfig, cmd string) error {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "run_command", "--name", config.workspaceName, cmd})
if err != nil {
return fmt.Errorf("run command in workspace failed: %w", err)
}
fmt.Println("run command in workspace output:\n", output)
return nil
}

func (ac *AnyscaleCLI) startWorkspace(config *WorkspaceTestConfig) error {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "start", "--name", config.workspaceName})
if err != nil {
return fmt.Errorf("start workspace failed: %w", err)
}
fmt.Println("start workspace output:\n", output)
return nil
}

func (ac *AnyscaleCLI) getWorkspaceStatus(workspaceName string) (string, error) {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "status", "--name", workspaceName})
if err != nil {
return "", fmt.Errorf("get workspace state failed: %w", err)
}
return output, nil
}

func (ac *AnyscaleCLI) waitForWorkspaceState(workspaceName string, state WorkspaceState) (string, error) {
output, err := ac.runAnyscaleCLI([]string{"workspace_v2", "wait", "--name", workspaceName, "--state", state.String()})
if err != nil {
return "", fmt.Errorf("wait for workspace state failed: %w", err)
}
return output, nil
}

func convertBuildIdToImageURI(buildId string) (string, string, error) {
// Convert build ID like "anyscaleray2441-py312-cu128" to "anyscale/ray:2.44.1-py312-cu128"
const prefix = "anyscaleray"
if !strings.HasPrefix(buildId, prefix) {
return "", "", fmt.Errorf("build ID must start with %q: %s", prefix, buildId)
}

// Remove the prefix to get "2441-py312-cu128"
remainder := strings.TrimPrefix(buildId, prefix)

// Find the first hyphen to separate version from suffix
hyphenIdx := strings.Index(remainder, "-")
var versionStr, suffix string
if hyphenIdx == -1 {
versionStr = remainder
suffix = ""
} else {
versionStr = remainder[:hyphenIdx]
suffix = remainder[hyphenIdx:] // includes the hyphen
}

// Parse version: "2441" -> "2.44.1"
// Format: first digit = major, next two = minor, rest = patch
if len(versionStr) < 4 {
return "", "", fmt.Errorf("version string too short: %s", versionStr)
}

major := versionStr[0:1]
minor := versionStr[1:3]
patch := versionStr[3:]

return fmt.Sprintf("anyscale/ray:%s.%s.%s%s", major, minor, patch, suffix), fmt.Sprintf("%s.%s.%s", major, minor, patch), nil
}
Loading