diff --git a/api/lmes/v1alpha1/lmevaljob_types.go b/api/lmes/v1alpha1/lmevaljob_types.go index ce3332ac9..47a3fb5d4 100644 --- a/api/lmes/v1alpha1/lmevaljob_types.go +++ b/api/lmes/v1alpha1/lmevaljob_types.go @@ -393,6 +393,33 @@ type PersistentVolumeClaimManaged struct { Size string `json:"size,omitempty"` } +// MLFlowExportType defines what to export to MLFlow +// +kubebuilder:validation:Enum=metrics;artifacts +type MLFlowExportType string + +const ( + // Export evaluation metrics to MLFlow + MLFlowMetricsExport MLFlowExportType = "metrics" + // Export evaluation artifacts to MLFlow + MLFlowArtifactsExport MLFlowExportType = "artifacts" +) + +// MLFlowOutput defines the configuration for MLFlow output +type MLFlowOutput struct { + // TrackingUri is the MLFlow tracking server URI + // +kubebuilder:validation:Pattern=`^https?://[a-zA-Z0-9.-]+(:[0-9]+)?(/.*)?$` + TrackingUri string `json:"trackingUri"` + // ExperimentName is the name of the MLFlow experiment + // +optional + ExperimentName *string `json:"experimentName,omitempty"` + // RunId is the specific MLFlow run ID to use (optional) + // +optional + RunId *string `json:"runId,omitempty"` + // Export defines what to export to MLFlow (metrics, artifacts, or both) + // +optional + Export []MLFlowExportType `json:"export,omitempty"` +} + type Outputs struct { // Use an existing PVC to store the outputs // +optional @@ -400,6 +427,9 @@ type Outputs struct { // Create an operator managed PVC // +optional PersistentVolumeClaimManaged *PersistentVolumeClaimManaged `json:"pvcManaged,omitempty"` + // Export results to MLFlow tracking server + // +optional + MLFlow *MLFlowOutput `json:"mlflow,omitempty"` } func (c *LMEvalContainer) GetSecurityContext() *corev1.SecurityContext { @@ -604,6 +634,37 @@ func (o *Outputs) HasExistingPVC() bool { return o.PersistentVolumeClaimName != nil } +// HasMLFlow returns whether the outputs define MLFlow configuration +func (o *Outputs) HasMLFlow() bool { + return o.MLFlow != nil +} + +// HasMLFlowMetrics returns whether MLFlow export includes metrics +func (m *MLFlowOutput) HasMLFlowMetrics() bool { + if m == nil || len(m.Export) == 0 { + return false + } + for _, export := range m.Export { + if export == MLFlowMetricsExport { + return true + } + } + return false +} + +// HasMLFlowArtifacts returns whether MLFlow export includes artifacts +func (m *MLFlowOutput) HasMLFlowArtifacts() bool { + if m == nil || len(m.Export) == 0 { + return false + } + for _, export := range m.Export { + if export == MLFlowArtifactsExport { + return true + } + } + return false +} + // LMEvalJobStatus defines the observed state of LMEvalJob type LMEvalJobStatus struct { // Important: Run "make" to regenerate code after modifying this file diff --git a/api/lmes/v1alpha1/zz_generated.deepcopy.go b/api/lmes/v1alpha1/zz_generated.deepcopy.go index 77e4c07b7..86a3444c0 100644 --- a/api/lmes/v1alpha1/zz_generated.deepcopy.go +++ b/api/lmes/v1alpha1/zz_generated.deepcopy.go @@ -418,6 +418,36 @@ func (in *LMEvalPodSpec) DeepCopy() *LMEvalPodSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MLFlowOutput) DeepCopyInto(out *MLFlowOutput) { + *out = *in + if in.ExperimentName != nil { + in, out := &in.ExperimentName, &out.ExperimentName + *out = new(string) + **out = **in + } + if in.RunId != nil { + in, out := &in.RunId, &out.RunId + *out = new(string) + **out = **in + } + if in.Export != nil { + in, out := &in.Export, &out.Export + *out = make([]MLFlowExportType, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MLFlowOutput. +func (in *MLFlowOutput) DeepCopy() *MLFlowOutput { + if in == nil { + return nil + } + out := new(MLFlowOutput) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Metric) DeepCopyInto(out *Metric) { *out = *in @@ -517,6 +547,11 @@ func (in *Outputs) DeepCopyInto(out *Outputs) { *out = new(PersistentVolumeClaimManaged) **out = **in } + if in.MLFlow != nil { + in, out := &in.MLFlow, &out.MLFlow + *out = new(MLFlowOutput) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Outputs. diff --git a/cmd/lmes_driver/main.go b/cmd/lmes_driver/main.go index 950c1d665..f27aa3e15 100644 --- a/cmd/lmes_driver/main.go +++ b/cmd/lmes_driver/main.go @@ -50,28 +50,36 @@ func (t *strArrayArg) String() string { } var ( - taskRecipes strArrayArg - customArtifactArgs strArrayArg - taskNames strArrayArg - copy = flag.String("copy", "", "copy this binary to specified destination path") - getStatus = flag.Bool("get-status", false, "Get current status") - shutdown = flag.Bool("shutdown", false, "Shutdown the driver") - outputPath = flag.String("output-path", OutputPath, "output path") - detectDevice = flag.Bool("detect-device", false, "detect available device(s), CUDA or CPU") - commPort = flag.Int("listen-port", driver.DefaultPort, "driver serves APIs on the port") - downloadAssetsS3 = flag.Bool("download-assets-s3", false, "Download assets from S3") - customTaskGitURL = flag.String("custom-task-git-url", "", "Git repository URL for custom tasks") - customTaskGitBranch = flag.String("custom-task-git-branch", "", "Git repository branch for custom tasks") - customTaskGitCommit = flag.String("custom-task-git-commit", "", "Git commit for custom tasks") - customTaskGitPath = flag.String("custom-task-git-path", "", "Custom task path") - allowOnline = flag.Bool("allow-online", false, "Allow LMEval online access") - driverLog = ctrl.Log.WithName("driver") + taskRecipes strArrayArg + customArtifactArgs strArrayArg + taskNames strArrayArg + mlflowExportTypes strArrayArg + copy = flag.String("copy", "", "copy this binary to specified destination path") + getStatus = flag.Bool("get-status", false, "Get current status") + shutdown = flag.Bool("shutdown", false, "Shutdown the driver") + outputPath = flag.String("output-path", OutputPath, "output path") + detectDevice = flag.Bool("detect-device", false, "detect available device(s), CUDA or CPU") + commPort = flag.Int("listen-port", driver.DefaultPort, "driver serves APIs on the port") + downloadAssetsS3 = flag.Bool("download-assets-s3", false, "Download assets from S3") + customTaskGitURL = flag.String("custom-task-git-url", "", "Git repository URL for custom tasks") + customTaskGitBranch = flag.String("custom-task-git-branch", "", "Git repository branch for custom tasks") + customTaskGitCommit = flag.String("custom-task-git-commit", "", "Git commit for custom tasks") + customTaskGitPath = flag.String("custom-task-git-path", "", "Custom task path") + allowOnline = flag.Bool("allow-online", false, "Allow LMEval online access") + mlflowTrackingUri = flag.String("mlflow-tracking-uri", "", "MLFlow tracking server URI") + mlflowExperimentName = flag.String("mlflow-experiment-name", "", "MLFlow experiment name") + mlflowRunId = flag.String("mlflow-run-id", "", "MLFlow run ID") + mlflowSourceName = flag.String("mlflow-source-name", "", "Value for mlflow.source.name tag (e.g. CR name)") + mlflowSourceType = flag.String("mlflow-source-type", "", "Value for mlflow.source.type tag") + mlflowParamsJSON = flag.String("mlflow-params-json", "", "JSON-encoded parameters to send to MLFlow") + driverLog = ctrl.Log.WithName("driver") ) func init() { flag.Var(&taskRecipes, "task-recipe", "task recipe") flag.Var(&customArtifactArgs, "custom-artifact", "A string contains an artifact's type, name and value. Use | as separator") flag.Var(&taskNames, "task-name", "A task name for custom tasks") + flag.Var(&mlflowExportTypes, "mlflow-export-type", "MLFlow export type (metrics, artifacts)") } func main() { @@ -120,21 +128,28 @@ func main() { } driverOpt := driver.DriverOption{ - Context: ctx, - OutputPath: *outputPath, - DetectDevice: *detectDevice, - Logger: driverLog, - TaskRecipes: taskRecipes, - CustomArtifacts: customArtifacts, - Args: args, - CommPort: *commPort, - DownloadAssetsS3: *downloadAssetsS3, - CustomTaskGitURL: *customTaskGitURL, - CustomTaskGitBranch: *customTaskGitBranch, - CustomTaskGitCommit: *customTaskGitCommit, - CustomTaskGitPath: *customTaskGitPath, - TaskNames: taskNames, - AllowOnline: *allowOnline, + Context: ctx, + OutputPath: *outputPath, + DetectDevice: *detectDevice, + Logger: driverLog, + TaskRecipes: taskRecipes, + CustomArtifacts: customArtifacts, + Args: args, + CommPort: *commPort, + DownloadAssetsS3: *downloadAssetsS3, + CustomTaskGitURL: *customTaskGitURL, + CustomTaskGitBranch: *customTaskGitBranch, + CustomTaskGitCommit: *customTaskGitCommit, + CustomTaskGitPath: *customTaskGitPath, + TaskNames: taskNames, + AllowOnline: *allowOnline, + MLFlowTrackingUri: *mlflowTrackingUri, + MLFlowExperimentName: *mlflowExperimentName, + MLFlowRunId: *mlflowRunId, + MLFlowExportTypes: mlflowExportTypes, + MLFlowSourceName: *mlflowSourceName, + MLFlowSourceType: *mlflowSourceType, + MLFlowParamsJSON: *mlflowParamsJSON, } driver, err := driver.NewDriver(&driverOpt) diff --git a/config/crd/bases/trustyai.opendatahub.io_lmevaljobs.yaml b/config/crd/bases/trustyai.opendatahub.io_lmevaljobs.yaml index 033df5e83..7c35adc1d 100644 --- a/config/crd/bases/trustyai.opendatahub.io_lmevaljobs.yaml +++ b/config/crd/bases/trustyai.opendatahub.io_lmevaljobs.yaml @@ -276,6 +276,33 @@ spec: outputs: description: Outputs specifies storage for evaluation results properties: + mlflow: + description: Export results to MLFlow tracking server + properties: + experimentName: + description: ExperimentName is the name of the MLFlow experiment + type: string + export: + description: Export defines what to export to MLFlow (metrics, + artifacts, or both) + items: + description: MLFlowExportType defines what to export to + MLFlow + enum: + - metrics + - artifacts + type: string + type: array + runId: + description: RunId is the specific MLFlow run ID to use (optional) + type: string + trackingUri: + description: TrackingUri is the MLFlow tracking server URI + pattern: ^https?://[a-zA-Z0-9.-]+(:[0-9]+)?(/.*)?$ + type: string + required: + - trackingUri + type: object pvcManaged: description: Create an operator managed PVC properties: diff --git a/controllers/lmes/driver/driver.go b/controllers/lmes/driver/driver.go index a3f7743b1..e1e15f9ab 100644 --- a/controllers/lmes/driver/driver.go +++ b/controllers/lmes/driver/driver.go @@ -73,6 +73,14 @@ type DriverOption struct { CustomTaskGitPath string TaskNames []string AllowOnline bool + // MLFlow configuration + MLFlowTrackingUri string + MLFlowExperimentName string + MLFlowRunId string + MLFlowExportTypes []string + MLFlowSourceName string + MLFlowSourceType string + MLFlowParamsJSON string } type ArtifactType string @@ -443,6 +451,15 @@ func (d *driverImpl) updateCompleteStatus(err error) { var results string results, err = d.getResults() d.status.Results = results + + // Export to MLFlow if configured and no errors + if err == nil { + mlflowErr := d.exportMLFlow() + if mlflowErr != nil { + d.Option.Logger.Error(mlflowErr, "failed to export results to MLFlow") + // Don't fail the job for MLFlow export errors, just log them + } + } } if err != nil { @@ -710,3 +727,94 @@ func (d *driverImpl) fetchGitCustomTasks() error { return nil } + +// exportMLFlow exports evaluation results to MLFlow tracking server if configured +func (d *driverImpl) exportMLFlow() error { + // Check if MLFlow is configured + if d.Option.MLFlowTrackingUri == "" || len(d.Option.MLFlowExportTypes) == 0 { + return nil + } + + d.Option.Logger.Info("Exporting results to MLFlow", + "trackingUri", d.Option.MLFlowTrackingUri, + "experimentName", d.Option.MLFlowExperimentName, + "runId", d.Option.MLFlowRunId, + "exportTypes", d.Option.MLFlowExportTypes, + "sourceName", d.Option.MLFlowSourceName, + "sourceType", d.Option.MLFlowSourceType) + + // Build command arguments + args := []string{ + "/opt/app-root/src/scripts/mlflow_export.py", + "--output-dir", d.Option.OutputPath, + "--tracking-uri", d.Option.MLFlowTrackingUri, + } + + if d.Option.MLFlowExperimentName != "" { + args = append(args, "--experiment-name", d.Option.MLFlowExperimentName) + } + + if d.Option.MLFlowRunId != "" { + args = append(args, "--run-id", d.Option.MLFlowRunId) + } + + if len(d.Option.MLFlowExportTypes) > 0 { + args = append(args, "--export-types") + args = append(args, d.Option.MLFlowExportTypes...) + } + + if d.Option.MLFlowSourceName != "" { + args = append(args, "--source-name", d.Option.MLFlowSourceName) + } + + if d.Option.MLFlowSourceType != "" { + args = append(args, "--source-type", d.Option.MLFlowSourceType) + } + + if d.Option.MLFlowParamsJSON != "" { + args = append(args, "--params-json", d.Option.MLFlowParamsJSON) + } + + // Execute MLFlow export script + cmd := exec.Command("python", args...) + + // Set environment variables for the script + env := append(os.Environ(), + "MLFLOW_TRACKING_URI="+d.Option.MLFlowTrackingUri) + + if d.Option.MLFlowExperimentName != "" { + env = append(env, "MLFLOW_EXPERIMENT_NAME="+d.Option.MLFlowExperimentName) + } + + if d.Option.MLFlowRunId != "" { + env = append(env, "MLFLOW_RUN_ID="+d.Option.MLFlowRunId) + } + + if len(d.Option.MLFlowExportTypes) > 0 { + env = append(env, "MLFLOW_EXPORT_TYPES="+strings.Join(d.Option.MLFlowExportTypes, ",")) + } + + if d.Option.MLFlowSourceName != "" { + env = append(env, "MLFLOW_SOURCE_NAME="+d.Option.MLFlowSourceName) + } + + if d.Option.MLFlowSourceType != "" { + env = append(env, "MLFLOW_SOURCE_TYPE="+d.Option.MLFlowSourceType) + } + + if d.Option.MLFlowParamsJSON != "" { + env = append(env, "MLFLOW_PARAMS_JSON="+d.Option.MLFlowParamsJSON) + } + + cmd.Env = env + + // Capture output and errors + output, err := cmd.CombinedOutput() + if err != nil { + d.Option.Logger.Error(err, "MLFlow export script failed", "output", string(output)) + return fmt.Errorf("MLFlow export failed: %v, output: %s", err, string(output)) + } + + d.Option.Logger.Info("MLFlow export completed successfully", "output", string(output)) + return nil +} diff --git a/controllers/lmes/lmevaljob_controller.go b/controllers/lmes/lmevaljob_controller.go index 0131011d5..159246639 100644 --- a/controllers/lmes/lmevaljob_controller.go +++ b/controllers/lmes/lmevaljob_controller.go @@ -909,7 +909,8 @@ func CreatePod(svcOpts *serviceOptions, job *lmesv1alpha1.LMEvalJob, permConfig }, } - if job.Spec.HasCustomOutput() { + // Only add PVC volume mount if PVC outputs are configured + if job.Spec.HasCustomOutput() && (job.Spec.Outputs.HasManagedPVC() || job.Spec.Outputs.HasExistingPVC()) { outputPVCMount := corev1.VolumeMount{ Name: "outputs", MountPath: OutputPath, @@ -926,7 +927,8 @@ func CreatePod(svcOpts *serviceOptions, job *lmesv1alpha1.LMEvalJob, permConfig }, } - if job.Spec.HasCustomOutput() { + // Only add PVC volume if PVC outputs are configured + if job.Spec.HasCustomOutput() && (job.Spec.Outputs.HasManagedPVC() || job.Spec.Outputs.HasExistingPVC()) { var claimName string if job.Spec.Outputs.HasManagedPVC() { @@ -1423,6 +1425,83 @@ func concatTasks(tasks lmesv1alpha1.TaskList) []string { return append(tasks.TaskNames, recipesName...) } +func buildMLFlowParams(job *lmesv1alpha1.LMEvalJob) string { + if job == nil { + return "" + } + + params := map[string]any{ + "crName": job.Name, + "namespace": job.Namespace, + "model": job.Spec.Model, + "tasks": concatTasks(job.Spec.TaskList), + } + + if len(job.Spec.TaskList.TaskNames) > 0 { + params["taskNames"] = job.Spec.TaskList.TaskNames + } + + if len(job.Spec.TaskList.TaskRecipes) > 0 { + params["taskRecipes"] = job.Spec.TaskList.TaskRecipes + } + + if job.Spec.TaskList.CustomArtifacts != nil { + params["customArtifacts"] = job.Spec.TaskList.CustomArtifacts + } + + if job.Spec.TaskList.HasCustomTasksWithGit() { + params["customTasksGit"] = job.Spec.TaskList.CustomTasks.Source.GitSource + } + + if len(job.Spec.ModelArgs) > 0 { + params["modelArgs"] = job.Spec.ModelArgs + } + + if len(job.Spec.GenArgs) > 0 { + params["genArgs"] = job.Spec.GenArgs + } + + if job.Spec.NumFewShot != nil { + params["numFewShot"] = job.Spec.NumFewShot + } + + if job.Spec.Limit != "" { + params["limit"] = job.Spec.Limit + } + + if job.Spec.LogSamples != nil { + params["logSamples"] = job.Spec.LogSamples + } + + if job.Spec.BatchSize != nil { + params["batchSize"] = job.Spec.BatchSize + } + + if job.Spec.AllowOnline != nil { + params["allowOnline"] = job.Spec.AllowOnline + } + + if job.Spec.AllowCodeExecution != nil { + params["allowCodeExecution"] = job.Spec.AllowCodeExecution + } + + if job.Spec.SystemInstruction != "" { + params["systemInstruction"] = job.Spec.SystemInstruction + } + + if job.Spec.ChatTemplate != nil { + params["chatTemplate"] = job.Spec.ChatTemplate + } + + paramsJSON, err := json.Marshal(params) + if err != nil { + ctrl.Log.WithName("mlflow").Error(err, "failed to marshal MLFlow parameters for LMEvalJob", "jobName", job.Name) + return "" + } + + return string(paramsJSON) +} + func generateCmd(svcOpts *serviceOptions, job *lmesv1alpha1.LMEvalJob, permConfig *PermissionConfig) []string { if job == nil { return nil @@ -1498,6 +1577,31 @@ func generateCmd(svcOpts *serviceOptions, job *lmesv1alpha1.LMEvalJob, permConfi appendArtifactCmd("task", job.Spec.TaskList.CustomArtifacts.GetTasks()) } + // MLFlow configuration + if job.Spec.Outputs != nil && job.Spec.Outputs.HasMLFlow() { + mlflow := job.Spec.Outputs.MLFlow + cmds = append(cmds, "--mlflow-tracking-uri", mlflow.TrackingUri) + + if mlflow.ExperimentName != nil { + cmds = append(cmds, "--mlflow-experiment-name", *mlflow.ExperimentName) + } + + if mlflow.RunId != nil { + cmds = append(cmds, "--mlflow-run-id", *mlflow.RunId) + } + + for _, exportType := range mlflow.Export { + cmds = append(cmds, "--mlflow-export-type", string(exportType)) + } + + cmds = append(cmds, "--mlflow-source-name", fmt.Sprintf("%s/%s", job.Namespace, job.Name)) + cmds = append(cmds, "--mlflow-source-type", lmesv1alpha1.KindName) + + if paramsJSON := buildMLFlowParams(job); paramsJSON != "" { + cmds = append(cmds, "--mlflow-params-json", paramsJSON) + } + } + cmds = append(cmds, "--") return cmds }