From bab30aa39fa8e2097202b7b7aea59f2eb6e11ebd Mon Sep 17 00:00:00 2001 From: Corentin Musard Date: Mon, 2 Jun 2025 11:12:51 +0200 Subject: [PATCH] Multi-language workflows guide small changes --- guides/workflows/multi-language.mdx | 152 ++++++++++++++++------------ workflows/caches.mdx | 8 +- workflows/concepts/clusters.mdx | 26 +++-- workflows/concepts/task-runners.mdx | 18 ++-- 4 files changed, 120 insertions(+), 84 deletions(-) diff --git a/guides/workflows/multi-language.mdx b/guides/workflows/multi-language.mdx index c3e598e..13f7088 100644 --- a/guides/workflows/multi-language.mdx +++ b/guides/workflows/multi-language.mdx @@ -10,6 +10,12 @@ icon: diagram-project +## Tilebox languages and SDKs + +Tilebox supports multiple languages and SDKs for running workflows. +All Tilebox SDKs and workflows are designed to be interoperable, which means it's possible to have a workflow where individual tasks are executed in different languages. +Check out [Languages & SDKs](/sdks/introduction) to learn more about currently available programming SDKs. + ## Why multi-language workflows? You might need to use multiple languages in a single workflow for many reasons, such as: @@ -18,41 +24,42 @@ You might need to use multiple languages in a single workflow for many reasons, - You want to use a library that is only available in a specific language (for example xarray in Python) - You started prototyping in Python, but need to start migrating the compute-intensive parts of your workflow to a different language for performance reasons -## Example workflow +## Multi-language workflow example -This guide will tackle the first use case: you have a backend in Go and want to offload some of the processing to Python. +This guide will tackle the first use case: you have a tasking server in Go and want to offload some of the processing to Python. ## Defining tasks in Python and Go ```python Python -class TaskingWorkflow(Task): +class ScheduleImageCapture(Task): # The input parameters must match the ones defined in the Go task - city: str - time: datetime - image_resolution: str + location: tuple[float, float] # lat_lon + resolution_m: int + spectral_bands: list[float] # spectral bands in nm def execute(self, context: ExecutionContext) -> None: # Here you can implement your task logic, submit subtasks, etc. - print(f"Tasking workflow executed for {self.city} at {self.time} with resolution {self.image_resolution}") + print(f"Image captured for {self.location} with {self.resolution_m}m resolution and bands {self.spectral_bands}") @staticmethod def identifier() -> tuple[str, str]: # The identifier must match the one defined in the Go task - return "tilebox.com/tasking_workflow", "v1.0" + return "tilebox.com/schedule_image_capture", "v1.0" ``` ```go Go -type TaskingWorkflow struct { - City string `json:"city"` // json tags must match the Python task definition - Time time.Time `json:"time"` - ImageResolution string `json:"image_resolution"` +type ScheduleImageCapture struct { + // json tags must match the Python task definition + Location [2]float64 `json:"location"` // lat_lon + ResolutionM int `json:"resolution_m"` + SpectralBands []float64 `json:"spectral_bands"` // spectral bands in nm } // No need to define the Execute method since we're only submitting the task // Identifier must match with the task identifier in the Python runner -func (t *TaskingWorkflow) Identifier() workflows.TaskIdentifier { - return workflows.NewTaskIdentifier("tilebox.com/tasking_workflow", "v1.0") +func (t *ScheduleImageCapture) Identifier() workflows.TaskIdentifier { + return workflows.NewTaskIdentifier("tilebox.com/schedule_image_capture", "v1.0") } ``` @@ -81,7 +88,7 @@ A couple important points to note: ## Creating a Go server that submits jobs -Write a simple HTTP server in Go with a `/submit` endpoint that accepts requests to submit a `TaskingWorkflow` job. +Write a simple HTTP tasking server in Go with a `/submit` endpoint that accepts requests to submit a `ScheduleImageCapture` job. Both Go and Python code are using `test-cluster-tZD9Ca2qsqt4V` as the cluster slug. You should replace it with your own cluster slug, which you can create in the [Tilebox Console](https://console.tilebox.com/workflows/clusters). @@ -89,60 +96,81 @@ Write a simple HTTP server in Go with a `/submit` endpoint that accepts requests ```go Go func main() { - ctx := context.Background() - client := workflows.NewClient() + ctx := context.Background() + client := workflows.NewClient() - cluster, err := client.Clusters.Get(ctx, "test-cluster-tZD9Ca2qsqt4V") - if err != nil { - log.Fatal(err) - } + cluster, err := client.Clusters.Get(ctx, "test-cluster-tZD9Ca2qsqt4V") + if err != nil { + log.Fatal(err) + } - http.HandleFunc("/submit", submitHandler(client, cluster)) + http.HandleFunc("/submit", submitHandler(client, cluster)) - log.Println("Server starting on http://localhost:8080") - log.Fatal(http.ListenAndServe(":8080", nil)) + log.Println("Server starting on http://localhost:8080") + log.Fatal(http.ListenAndServe(":8080", nil)) } // Submit a job based on some query parameters func submitHandler(client *workflows.Client, cluster *workflows.Cluster) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - city := r.URL.Query().Get("city") - timeArg := r.URL.Query().Get("time") - resolution := r.URL.Query().Get("resolution") - - if city == "" { - http.Error(w, "city is required", http.StatusBadRequest) - return - } - - taskingTime, err := time.Parse(time.RFC3339, timeArg) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - job, err := client.Jobs.Submit(r.Context(), fmt.Sprintf("tasking/%s", city), cluster, - []workflows.Task{ - &TaskingWorkflow{ - City: city, - Time: taskingTime, - ImageResolution: resolution, - }, - }, - ) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - _, _ = io.WriteString(w, fmt.Sprintf("Job submitted: %s\n", job.ID)) - } + return func(w http.ResponseWriter, r *http.Request) { + latArg := r.URL.Query().Get("lat") + lonArg := r.URL.Query().Get("lon") + resolutionArg := r.URL.Query().Get("resolution") + bandsArg := r.URL.Query().Get("bands[]") + + latFloat, err := strconv.ParseFloat(latArg, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + lonFloat, err := strconv.ParseFloat(lonArg, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + resolutionM, err := strconv.Atoi(resolutionArg) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var spectralBands []float64 + for _, bandArg := range strings.Split(bandsArg, ",") { + band, err := strconv.ParseFloat(bandArg, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + spectralBands = append(spectralBands, band) + } + + job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture", cluster, + []workflows.Task{ + &ScheduleImageCapture{ + Location: [2]float64{latFloat, lonFloat}, + ResolutionM: resolutionM, + SpectralBands: spectralBands, + }, + }, + ) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + _, _ = io.WriteString(w, fmt.Sprintf("Job submitted: %s\n", job.ID)) + } } ``` + + In the same way that you can submit jobs across languages you can also submit subtasks across languages. + + ## Creating a Python runner -Write a Python script that starts a task runner and registers the `TaskingWorkflow` task. +Write a Python script that starts a task runner and registers the `ScheduleImageCapture` task. ```python Python from tilebox.workflows import Client @@ -152,7 +180,7 @@ def main(): runner = client.runner( "test-cluster-tZD9Ca2qsqt4V", tasks=[ - TaskingWorkflow, + ScheduleImageCapture, ], ) runner.run_forever() @@ -161,7 +189,7 @@ if __name__ == "__main__": main() ``` -## Running the workflow +## Testing it Start the Go server. @@ -178,13 +206,13 @@ uv run runner.py Submit a job to the Go server. ```bash Shell -curl http://localhost:8080/submit?city=Zurich&time=2025-05-29T08:06:42Z&resolution=30m +curl http://localhost:8080/submit?lat=40.75&lon=-73.98&resolution=30&bands[]=489.0,560.6,666.5 ``` Check the Python runner output, it should print the following line: ```plaintext Output -Tasking workflow executed for Zurich at 2025-05-29T08:06:42Z with resolution 30m +Image captured for [40.75, -73.98] with 30m resolution and bands [489, 560.6, 666.5] ``` ## Next Steps @@ -196,4 +224,4 @@ Tasking workflow executed for Zurich at 2025-05-29T08:06:42Z with resolution 30m As a learning exercise, you can try to change the [News API Workflow](/workflows/concepts/tasks#dependencies-example) to replace the `FetchNews` task with a Go task and keep all the other tasks in Python. -You will learn how to use a Go task in the middle of a workflow implemented in Python. +You'll learn how to submit a subtask in another language than what the current task is executed in. diff --git a/workflows/caches.mdx b/workflows/caches.mdx index 87357e4..005b7da 100644 --- a/workflows/caches.mdx +++ b/workflows/caches.mdx @@ -206,7 +206,7 @@ The following code shows an example of how cache groups can be used. from tilebox.workflows import Task, ExecutionContext import random -class CacheGroupDemoWorkflow(Task): +class CacheGroupDemo(Task): n: int def execute(self, context: ExecutionContext): @@ -254,18 +254,18 @@ class PrintSum(Task): ``` -Submitting a job of the `CacheGroupDemoWorkflow` and running it with a task runner can be done as follows: +Submitting a job of the `CacheGroupDemo` and running it with a task runner can be done as follows: ```python Python # submit a job to test our workflow job_client = client.jobs() - job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster") + job_client.submit("cache-groups", CacheGroupDemo(5), cluster="dev-cluster") # start a runner to execute it runner = client.runner( "dev-cluster", - tasks=[CacheGroupDemoWorkflow, ProduceRandomNumbers, ProduceRandomNumber, PrintSum], + tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum], cache=LocalFileSystemCache("/path/to/cache/directory"), ) runner.run_forever() diff --git a/workflows/concepts/clusters.mdx b/workflows/concepts/clusters.mdx index dd9d831..6180b05 100644 --- a/workflows/concepts/clusters.mdx +++ b/workflows/concepts/clusters.mdx @@ -4,7 +4,9 @@ icon: circle-nodes --- - Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners). Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster. + Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners). + Using clusters, you can scope certain tasks to a specific group of task runners. + Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster. ## Use Cases @@ -70,7 +72,8 @@ Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing') ### Cluster Slug -Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier. Use this slug to reference the cluster for other operations, like submitting a job or subtasks. +Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier. +Use this slug to reference the cluster for other operations, like submitting a job or subtasks. ### Listing Clusters @@ -148,13 +151,16 @@ To delete a cluster, use the `delete` method and pass the cluster's slug: ## Jobs Across Different Clusters -When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on. This allows you to direct the job to a specific set of task runners. By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed. See the example below for a job that spans across multiple clusters. +When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on. +This allows you to direct the job to a specific set of task runners. +By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed. +See the example below for a job that spans across multiple clusters. ```python Python from tilebox.workflows import Task, ExecutionContext, Client -class MultiClusterWorkflow(Task): +class MultiCluster(Task): def execute(self, context: ExecutionContext) -> None: # this submits a task to the same cluster as the one currently executing this task same_cluster = context.submit_subtask(DummyTask()) @@ -176,7 +182,7 @@ client = Client() job_client = client.jobs() job = job_client.submit( "my-job", - MultiClusterWorkflow(), + MultiCluster(), cluster="testing-CvufcSxcC9SKfe", ) ``` @@ -190,9 +196,9 @@ import ( "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) -type MultiClusterWorkflow struct{} +type MultiCluster struct{} -func (t *MultiClusterWorkflow) Execute(ctx context.Context) error { +func (t *MultiCluster) Execute(ctx context.Context) error { // this submits a task to the same cluster as the one currently executing this task sameCluster, err := workflows.SubmitSubtask(ctx, &DummyTask{}) if err != nil { @@ -227,11 +233,13 @@ func main() { "my-job", "testing-CvufcSxcC9SKfe", []workflows.Task{ - &MultiClusterWorkflow{}, + &MultiCluster{}, }, ) } ``` -This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered. +This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. +If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. +It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered. diff --git a/workflows/concepts/task-runners.mdx b/workflows/concepts/task-runners.mdx index 37de22c..af42fe1 100644 --- a/workflows/concepts/task-runners.mdx +++ b/workflows/concepts/task-runners.mdx @@ -163,7 +163,7 @@ Here's an example of a distributed workflow: ```python Python from tilebox.workflows import Task, ExecutionContext - class DistributedWorkflow(Task): + class Distributed(Task): def execute(self, context: ExecutionContext) -> None: download_task = context.submit_subtask(DownloadData()) process_task = context.submit_subtask( @@ -198,9 +198,9 @@ import ( "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) -type DistributedWorkflow struct{} +type Distributed struct{} -func (t *DistributedWorkflow) Execute(ctx context.Context) error { +func (t *Distributed) Execute(ctx context.Context) error { downloadTask, err := workflows.SubmitSubtask(ctx, &DownloadData{}) if err != nil { return fmt.Errorf("failed to submit download subtask: %w", err) @@ -232,7 +232,7 @@ func (t *ProcessData) Execute(ctx context.Context) error { ``` -To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one. +To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `Distributed` task does not require specific hardware, so it can be registered with both runners and executed by either one. @@ -243,7 +243,7 @@ from tilebox.workflows import Client client = Client() high_network_speed_runner = client.runner( "dev-cluster", - tasks=[DownloadData, DistributedWorkflow] + tasks=[DownloadData, Distributed] ) high_network_speed_runner.run_forever() ``` @@ -268,7 +268,7 @@ func main() { err = highNetworkSpeedRunner.RegisterTasks( &DownloadData{}, - &DistributedWorkflow{}, + &Distributed{}, ) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) @@ -289,7 +289,7 @@ from tilebox.workflows import Client client = Client() gpu_runner = client.runner( "dev-cluster", - tasks=[ProcessData, DistributedWorkflow] + tasks=[ProcessData, Distributed] ) gpu_runner.run_forever() ``` @@ -314,7 +314,7 @@ func main() { err = gpuRunner.RegisterTasks( &ProcessData{}, - &DistributedWorkflow{}, + &Distributed{}, ) if err != nil { slog.Error("failed to register tasks", slog.Any("error", err)) @@ -328,7 +328,7 @@ func main() { -Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner. +Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `Distributed` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner. In this case, since `ProcessData` depends on `DownloadData`, the GPU task runner remains idle until the download completion, then picks up the processing task.