Skip to content

feat: Implement resource competition and queue scheduling mechanisms #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Resource Competition and Queue Scheduling Example

This example demonstrates how to implement resource competition and queue scheduling mechanisms using Temporal, providing a flexible resource pool management system that supports dynamic adjustment of resource pool size, cancellation of resource requests, and monitoring functions.

## Overview

This example implements a resource pool and queue system based on Temporal, featuring the following characteristics:

1. **Parallel Execution**: Supports the parallel execution of multiple workflow instances, making full use of system resources.
2. **Shared Resource Pool**: Provides a limited number of shared resources for workflows to use.
3. **Preemptive Resource Acquisition**: Workflows acquire resources through a preemptive mechanism to ensure efficient utilization.
4. **Waiting Queue**: When resources are unavailable, workflows enter a waiting queue and continue execution after automatically acquiring resources.
5. **Dynamic Adjustment of Resource Pool**: Supports dynamic adjustment of the resource pool size (expansion or reduction) at runtime.
6. **Cancellation of Resource Requests**: Supports cancellation of resource requests in the waiting queue, terminating workflows that are no longer needed.
7. **Real-time Monitoring**: Provides real-time monitoring capabilities to track the status and changes of the resource pool.

## Component Description

- **ResourcePool**: The structure of the resource pool, providing resource acquisition, release, and management functions.
- **ResourcePoolWorkflow**: The workflow that manages resource allocation and queues.
- **SampleWorkflowWithResourcePool**: An example workflow that uses the resource pool, demonstrating resource acquisition and release.
- **UpdateResourcePool**: The functionality to dynamically adjust the size of the resource pool.
- **CancelResourceRequest**: Cancels resource requests in the waiting queue.
- **ResourcePoolInitializer**: An interface for customizing resource pool initialization and scaling behavior.

## Key Features

- **Resource Allocation**: The resource pool workflow manages the allocation of limited resources to ensure efficient utilization.
- **Queuing Mechanism**: Workflows that have not acquired resources enter a waiting queue and continue execution after automatically acquiring resources.
- **Dynamic Scaling**: Supports adjusting the size of the resource pool at runtime to meet varying load demands.
- **Request Cancellation**: Supports terminating resource requests in the waiting queue to avoid unnecessary resource occupation.
- **Signal Communication**: Uses Temporal's signal mechanism for communication between workflows.
- **Persistence**: Even if the system crashes, the state of waiting workflows and resources can be restored.
- **Delayed Scaling Down**: Intelligently waits for resources to be released before completing scaling down, without affecting resources currently in use.

# Usage Example

### Start Workflow

You can start the workflow with the following command:

```bash
go run queue/starter/main.go -test=basic
```

### Query Resource Pool Status

To query the status of the resource pool, you can use the following command:

```bash
go run queue/starter/main.go -test=pool -poolid=resource-pool:{namespace}:{resourceID}
```

### Running Tests

The project includes unit tests, and you can run the tests using the following command:

```bash
go test ./...
```

## Dependencies

- [Temporal Go SDK](https://github.com/temporalio/sdk-go)
- [Testify](https://github.com/stretchr/testify)

## Contribution

Contributions of any kind are welcome! Please submit issues or pull requests.

## License

This project is licensed under the MIT License. For more details, please see the LICENSE file.
92 changes: 92 additions & 0 deletions queue/consume/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"context"
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"github.com/temporalio/samples-go/queue"
)

// helper workflow to cancel resource request
func cancelResourceRequest(ctx workflow.Context, resourcePoolWorkflowID string, targetWorkflowID string) error {
logger := workflow.GetLogger(ctx)
logger.Info("start cancel resource request workflow",
"resourcePoolWorkflowID", resourcePoolWorkflowID,
"targetWorkflowID", targetWorkflowID)

// create resource pool workflow execution reference
execution := workflow.Execution{
ID: resourcePoolWorkflowID,
}

// send cancel command
err := queue.CancelResourceRequest(ctx, execution, targetWorkflowID)
if err != nil {
logger.Error("failed to cancel resource request", "Error", err)
return err
}

logger.Info("successfully canceled resource request")
return nil
}

// helper workflow to update resource pool size
func updateResourcePoolSize(ctx workflow.Context, resourcePoolWorkflowID string, newSize int) error {
logger := workflow.GetLogger(ctx)
logger.Info("start update resource pool size workflow",
"resourcePoolWorkflowID", resourcePoolWorkflowID,
"newSize", newSize)

// create resource pool workflow execution reference
execution := workflow.Execution{
ID: resourcePoolWorkflowID,
}

// send update command
err := queue.UpdateResourcePool(ctx, execution, newSize)
if err != nil {
logger.Error("failed to update resource pool size", "Error", err)
return err
}

logger.Info("successfully updated resource pool size")
return nil
}

func main() {
// create temporal client
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("can't create client", err)
}
defer c.Close()
// create sample workflow worker - also need to set context
sampleWorker := worker.New(c, "queue-sample", worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), queue.ClientContextKey, c),
})

// register sample workflow
sampleWorker.RegisterWorkflow(queue.SampleWorkflowWithResourcePool)
// register resource pool management workflow
sampleWorker.RegisterWorkflow(cancelResourceRequest)
sampleWorker.RegisterWorkflow(updateResourcePoolSize)

// start all workers
workerErr := make(chan error, 1)

go func() {
workerErr <- sampleWorker.Run(worker.InterruptCh())
}()

// wait for any worker to fail
err = <-workerErr
if err != nil {
log.Fatalln("worker run failed", err)
}
}
Loading