Skip to content

Multi-language workflows guide small changes #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 90 additions & 62 deletions guides/workflows/multi-language.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ icon: diagram-project
</Card>
</CardGroup>

## Tilebox languages and SDKs

Tilebox supports multiple languages and SDKs for running workflows.
All Tilebox SDKs and workflows are designed to be interoperable, which means it's possible to have a workflow where individual tasks are executed in different languages.
Check out [Languages & SDKs](/sdks/introduction) to learn more about currently available programming SDKs.

## Why multi-language workflows?

You might need to use multiple languages in a single workflow for many reasons, such as:
Expand All @@ -18,41 +24,42 @@ You might need to use multiple languages in a single workflow for many reasons,
- You want to use a library that is only available in a specific language (for example xarray in Python)
- You started prototyping in Python, but need to start migrating the compute-intensive parts of your workflow to a different language for performance reasons

## Example workflow
## Multi-language workflow example

This guide will tackle the first use case: you have a backend in Go and want to offload some of the processing to Python.
This guide will tackle the first use case: you have a tasking server in Go and want to offload some of the processing to Python.

## Defining tasks in Python and Go

```python Python
class TaskingWorkflow(Task):
class ScheduleImageCapture(Task):
# The input parameters must match the ones defined in the Go task
city: str
time: datetime
image_resolution: str
location: tuple[float, float] # lat_lon
resolution_m: int
spectral_bands: list[float] # spectral bands in nm

def execute(self, context: ExecutionContext) -> None:
# Here you can implement your task logic, submit subtasks, etc.
print(f"Tasking workflow executed for {self.city} at {self.time} with resolution {self.image_resolution}")
print(f"Image captured for {self.location} with {self.resolution_m}m resolution and bands {self.spectral_bands}")

@staticmethod
def identifier() -> tuple[str, str]:
# The identifier must match the one defined in the Go task
return "tilebox.com/tasking_workflow", "v1.0"
return "tilebox.com/schedule_image_capture", "v1.0"
```

```go Go
type TaskingWorkflow struct {
City string `json:"city"` // json tags must match the Python task definition
Time time.Time `json:"time"`
ImageResolution string `json:"image_resolution"`
type ScheduleImageCapture struct {
// json tags must match the Python task definition
Location [2]float64 `json:"location"` // lat_lon
ResolutionM int `json:"resolution_m"`
SpectralBands []float64 `json:"spectral_bands"` // spectral bands in nm
}

// No need to define the Execute method since we're only submitting the task

// Identifier must match with the task identifier in the Python runner
func (t *TaskingWorkflow) Identifier() workflows.TaskIdentifier {
return workflows.NewTaskIdentifier("tilebox.com/tasking_workflow", "v1.0")
func (t *ScheduleImageCapture) Identifier() workflows.TaskIdentifier {
return workflows.NewTaskIdentifier("tilebox.com/schedule_image_capture", "v1.0")
}
```

Expand Down Expand Up @@ -81,68 +88,89 @@ A couple important points to note:

## Creating a Go server that submits jobs

Write a simple HTTP server in Go with a `/submit` endpoint that accepts requests to submit a `TaskingWorkflow` job.
Write a simple HTTP tasking server in Go with a `/submit` endpoint that accepts requests to submit a `ScheduleImageCapture` job.

<Note>
Both Go and Python code are using `test-cluster-tZD9Ca2qsqt4V` as the cluster slug. You should replace it with your own cluster slug, which you can create in the [Tilebox Console](https://console.tilebox.com/workflows/clusters).
</Note>

```go Go
func main() {
ctx := context.Background()
client := workflows.NewClient()
ctx := context.Background()
client := workflows.NewClient()

cluster, err := client.Clusters.Get(ctx, "test-cluster-tZD9Ca2qsqt4V")
if err != nil {
log.Fatal(err)
}
cluster, err := client.Clusters.Get(ctx, "test-cluster-tZD9Ca2qsqt4V")
if err != nil {
log.Fatal(err)
}

http.HandleFunc("/submit", submitHandler(client, cluster))
http.HandleFunc("/submit", submitHandler(client, cluster))

log.Println("Server starting on http://localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
log.Println("Server starting on http://localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}

// Submit a job based on some query parameters
func submitHandler(client *workflows.Client, cluster *workflows.Cluster) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
city := r.URL.Query().Get("city")
timeArg := r.URL.Query().Get("time")
resolution := r.URL.Query().Get("resolution")

if city == "" {
http.Error(w, "city is required", http.StatusBadRequest)
return
}

taskingTime, err := time.Parse(time.RFC3339, timeArg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

job, err := client.Jobs.Submit(r.Context(), fmt.Sprintf("tasking/%s", city), cluster,
[]workflows.Task{
&TaskingWorkflow{
City: city,
Time: taskingTime,
ImageResolution: resolution,
},
},
)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

_, _ = io.WriteString(w, fmt.Sprintf("Job submitted: %s\n", job.ID))
}
return func(w http.ResponseWriter, r *http.Request) {
latArg := r.URL.Query().Get("lat")
lonArg := r.URL.Query().Get("lon")
resolutionArg := r.URL.Query().Get("resolution")
bandsArg := r.URL.Query().Get("bands[]")

latFloat, err := strconv.ParseFloat(latArg, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
lonFloat, err := strconv.ParseFloat(lonArg, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

resolutionM, err := strconv.Atoi(resolutionArg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var spectralBands []float64
for _, bandArg := range strings.Split(bandsArg, ",") {
band, err := strconv.ParseFloat(bandArg, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
spectralBands = append(spectralBands, band)
}

job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture", cluster,
[]workflows.Task{
&ScheduleImageCapture{
Location: [2]float64{latFloat, lonFloat},
ResolutionM: resolutionM,
SpectralBands: spectralBands,
},
},
)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

_, _ = io.WriteString(w, fmt.Sprintf("Job submitted: %s\n", job.ID))
}
}
```

<Note>
In the same way that you can submit jobs across languages you can also submit subtasks across languages.
</Note>

## Creating a Python runner

Write a Python script that starts a task runner and registers the `TaskingWorkflow` task.
Write a Python script that starts a task runner and registers the `ScheduleImageCapture` task.

```python Python
from tilebox.workflows import Client
Expand All @@ -152,7 +180,7 @@ def main():
runner = client.runner(
"test-cluster-tZD9Ca2qsqt4V",
tasks=[
TaskingWorkflow,
ScheduleImageCapture,
],
)
runner.run_forever()
Expand All @@ -161,7 +189,7 @@ if __name__ == "__main__":
main()
```

## Running the workflow
## Testing it

Start the Go server.

Expand All @@ -178,13 +206,13 @@ uv run runner.py
Submit a job to the Go server.

```bash Shell
curl http://localhost:8080/submit?city=Zurich&time=2025-05-29T08:06:42Z&resolution=30m
curl http://localhost:8080/submit?lat=40.75&lon=-73.98&resolution=30&bands[]=489.0,560.6,666.5
```

Check the Python runner output, it should print the following line:

```plaintext Output
Tasking workflow executed for Zurich at 2025-05-29T08:06:42Z with resolution 30m
Image captured for [40.75, -73.98] with 30m resolution and bands [489, 560.6, 666.5]
```

## Next Steps
Expand All @@ -196,4 +224,4 @@ Tasking workflow executed for Zurich at 2025-05-29T08:06:42Z with resolution 30m
</CardGroup>

As a learning exercise, you can try to change the [News API Workflow](/workflows/concepts/tasks#dependencies-example) to replace the `FetchNews` task with a Go task and keep all the other tasks in Python.
You will learn how to use a Go task in the middle of a workflow implemented in Python.
You'll learn how to submit a subtask in another language than what the current task is executed in.
8 changes: 4 additions & 4 deletions workflows/caches.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ The following code shows an example of how cache groups can be used.
from tilebox.workflows import Task, ExecutionContext
import random

class CacheGroupDemoWorkflow(Task):
class CacheGroupDemo(Task):
n: int

def execute(self, context: ExecutionContext):
Expand Down Expand Up @@ -254,18 +254,18 @@ class PrintSum(Task):
```
</CodeGroup>

Submitting a job of the `CacheGroupDemoWorkflow` and running it with a task runner can be done as follows:
Submitting a job of the `CacheGroupDemo` and running it with a task runner can be done as follows:

<CodeGroup>
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("cache-groups", CacheGroupDemoWorkflow(5), cluster="dev-cluster")
job_client.submit("cache-groups", CacheGroupDemo(5), cluster="dev-cluster")

# start a runner to execute it
runner = client.runner(
"dev-cluster",
tasks=[CacheGroupDemoWorkflow, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
runner.run_forever()
Expand Down
26 changes: 17 additions & 9 deletions workflows/concepts/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ icon: circle-nodes
---

<Accordion title="What is a Cluster?">
Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners). Using clusters, you can scope certain tasks to a specific group of task runners. Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster.
Clusters are a logical grouping for [task runners](/workflows/concepts/task-runners).
Using clusters, you can scope certain tasks to a specific group of task runners.
Tasks, which are always submitted to a specific cluster, are only executed on task runners assigned to the same cluster.
</Accordion>

## Use Cases
Expand Down Expand Up @@ -70,7 +72,8 @@ Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')

### Cluster Slug

Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier. Use this slug to reference the cluster for other operations, like submitting a job or subtasks.
Each cluster has a unique identifier, combining the cluster's name and an automatically generated identifier.
Use this slug to reference the cluster for other operations, like submitting a job or subtasks.

### Listing Clusters

Expand Down Expand Up @@ -148,13 +151,16 @@ To delete a cluster, use the `delete` method and pass the cluster's slug:

## Jobs Across Different Clusters

When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on. This allows you to direct the job to a specific set of task runners. By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed. See the example below for a job that spans across multiple clusters.
When [submitting a job](/workflows/concepts/jobs), you need to specify which cluster the job's root task should be executed on.
This allows you to direct the job to a specific set of task runners.
By default, all sub-tasks within a job are also submitted to the same cluster, but this can be overridden to submit sub-tasks to different clusters if needed.
See the example below for a job that spans across multiple clusters.

<CodeGroup title="Multi-Cluster Workflow Example">
```python Python
from tilebox.workflows import Task, ExecutionContext, Client

class MultiClusterWorkflow(Task):
class MultiCluster(Task):
def execute(self, context: ExecutionContext) -> None:
# this submits a task to the same cluster as the one currently executing this task
same_cluster = context.submit_subtask(DummyTask())
Expand All @@ -176,7 +182,7 @@ client = Client()
job_client = client.jobs()
job = job_client.submit(
"my-job",
MultiClusterWorkflow(),
MultiCluster(),
cluster="testing-CvufcSxcC9SKfe",
)
```
Expand All @@ -190,9 +196,9 @@ import (
"github.com/tilebox/tilebox-go/workflows/v1/subtask"
)

type MultiClusterWorkflow struct{}
type MultiCluster struct{}

func (t *MultiClusterWorkflow) Execute(ctx context.Context) error {
func (t *MultiCluster) Execute(ctx context.Context) error {
// this submits a task to the same cluster as the one currently executing this task
sameCluster, err := workflows.SubmitSubtask(ctx, &DummyTask{})
if err != nil {
Expand Down Expand Up @@ -227,11 +233,13 @@ func main() {
"my-job",
"testing-CvufcSxcC9SKfe",
[]workflows.Task{
&MultiClusterWorkflow{},
&MultiCluster{},
},
)
}
```
</CodeGroup>

This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered.
This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster.
If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available.
It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered.
Loading