Skip to content

Update WorkPool's Job Configuration #189

@Blarc

Description

@Blarc

When we create a work pool CRD the operator creates a work pool deployment. The worker that is created by the deployment creates a work pool on Prefect's server on startup. The work pool's job configuration uses default values which is problematic. The CRD should allow setting the job configuration.

Image

If we update the job configuration via Prefect's server UI, the following request body is sent:

{
    "base_job_template": {
        "job_configuration": {
            "command": "{{ command }}",
            "env": "{{ env }}",
            "labels": "{{ labels }}",
            "name": "{{ name }}",
            "namespace": "{{ namespace }}",
            "job_manifest": {
                "apiVersion": "batch/v1",
                "kind": "Job",
                "metadata": {
                    "generateName": "{{ name }}-",
                    "labels": "{{ labels }}",
                    "namespace": "{{ namespace }}"
                },
                "spec": {
                    "backoffLimit": 0,
                    "template": {
                        "spec": {
                            "completions": 1,
                            "containers": [
                                {
                                    "args": "{{ command }}",
                                    "env": "{{ env }}",
                                    "image": "{{ image }}",
                                    "imagePullPolicy": "{{ image_pull_policy }}",
                                    "name": "prefect-job"
                                }
                            ],
                            "parallelism": 1,
                            "restartPolicy": "Never",
                            "serviceAccountName": "{{ service_account_name }}"
                        }
                    },
                    "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
                }
            },
            "cluster_config": "{{ cluster_config }}",
            "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
            "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}",
            "stream_output": "{{ stream_output }}"
        },
        "variables": {
            "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template.",
            "properties": {
                "name": {
                    "anyOf": [
                        {
                            "type": "string"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "Name given to infrastructure created by a worker.",
                    "title": "Name"
                },
                "env": {
                    "additionalProperties": {
                        "anyOf": [
                            {
                                "type": "string"
                            },
                            {
                                "type": "null"
                            }
                        ]
                    },
                    "description": "Environment variables to set when starting a flow run.",
                    "title": "Environment Variables",
                    "type": "object"
                },
                "labels": {
                    "additionalProperties": {
                        "type": "string"
                    },
                    "description": "Labels applied to infrastructure created by a worker.",
                    "title": "Labels",
                    "type": "object"
                },
                "command": {
                    "anyOf": [
                        {
                            "type": "string"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.",
                    "title": "Command"
                },
                "namespace": {
                    "default": "not-default",
                    "description": "The Kubernetes namespace to create jobs within.",
                    "title": "Namespace",
                    "type": "string"
                },
                "image": {
                    "anyOf": [
                        {
                            "type": "string"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.",
                    "examples": [
                        "docker.io/prefecthq/prefect:3-latest"
                    ],
                    "title": "Image"
                },
                "service_account_name": {
                    "anyOf": [
                        {
                            "type": "string"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "The Kubernetes service account to use for job creation.",
                    "title": "Service Account Name"
                },
                "image_pull_policy": {
                    "default": "IfNotPresent",
                    "description": "The Kubernetes image pull policy to use for job containers.",
                    "enum": [
                        "IfNotPresent",
                        "Always",
                        "Never"
                    ],
                    "title": "Image Pull Policy",
                    "type": "string"
                },
                "finished_job_ttl": {
                    "anyOf": [
                        {
                            "type": "integer"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.",
                    "title": "Finished Job TTL"
                },
                "job_watch_timeout_seconds": {
                    "anyOf": [
                        {
                            "type": "integer"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.",
                    "title": "Job Watch Timeout Seconds"
                },
                "pod_watch_timeout_seconds": {
                    "default": 60,
                    "description": "Number of seconds to watch for pod creation before timing out.",
                    "title": "Pod Watch Timeout Seconds",
                    "type": "integer"
                },
                "stream_output": {
                    "default": true,
                    "description": "If set, output will be streamed from the job to local standard output.",
                    "title": "Stream Output",
                    "type": "boolean"
                },
                "cluster_config": {
                    "anyOf": [
                        {
                            "$ref": "#/definitions/KubernetesClusterConfig"
                        },
                        {
                            "type": "null"
                        }
                    ],
                    "description": "The Kubernetes cluster config to use for job creation."
                }
            },
            "type": "object",
            "definitions": {
                "KubernetesClusterConfig": {
                    "block_schema_references": {},
                    "block_type_slug": "kubernetes-cluster-config",
                    "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
                    "properties": {
                        "config": {
                            "description": "The entire contents of a kubectl config file.",
                            "title": "Config",
                            "type": "object"
                        },
                        "context_name": {
                            "description": "The name of the kubectl context to use.",
                            "title": "Context Name",
                            "type": "string"
                        }
                    },
                    "required": [
                        "config",
                        "context_name"
                    ],
                    "secret_fields": [],
                    "title": "KubernetesClusterConfig",
                    "type": "object"
                }
            }
        }
    }
}

I was about to propose to add new fields to CRD that would allow configuring the job configuration, however, this API is quite annoying to deal with. I do think this is quite important though, because, among other things (service account name etc.), this is also how we can set the namespace where the jobs will be created.

How should we tackle this? Do we need to create a struct for the whole base_job_template ? Is there a better way?

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions