Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions examples/disconnect_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""This example shows how to clear resources owned by a client and how to disconnect a client from the scheduler."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
cluster = SchedulerClusterCombo(n_workers=10)
client = Client(address=cluster.get_address())
# Client.clear() will clear all computation resources owned by the client. All unfinished tasks will be cancelled,
# and all object reference will be invalidated. The client can submit tasks as it wishes.
client.clear()

# Once disconnect is called, this client is invalidated, and no tasks can be installed on this client.
# Should the user wish to initiate more tasks, they should instantiate another Client. The scheduler's running
# state will not be affected by this method.
client.disconnect()

# The user may also choose to shutdown the scheduler while disconnecting client from the scheduler. Such a request
# is not guaranteed to succeed, as the scheduler can only be closed when it is not running under "protected" mode.
# client.shutdown()


if __name__ == "__main__":
main()
51 changes: 51 additions & 0 deletions examples/graphtask_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""This example shows how to utilize graph task functionality provided by scaler."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def inc(i):
return i + 1


def add(a, b):
return a + b


def minus(a, b):
return a - b


# A graph task is defined as a dict with str as the key type and val_t as the value type, where val_t is defined as
# follows:
# Union[Any, Tuple[Union[Callable, str], ...]
# Each value can be one of the following:
# - a basic data type (int, List, etc.),
# - a callable,
# - a tuple of the form (Callable, key_t val1, key_t val2, ...)
# that represents a function call.
graph = {
"a": 2,
"b": 2,
"c": (inc, "a"), # c = a + 1 = 2 + 1 = 3
"d": (add, "a", "b"), # d = a + b = 2 + 2 = 4
"e": (minus, "d", "c"), # e = d - c = 4 - 3 = 1
"f": add,
}


def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=1)
client = Client(address=cluster.get_address())

# See graph's definition for more detail.
# The result is a dictionary that contains the requested keys.
# Each value provided in the graph will be evaluated and passed back.
result = client.get(graph, keys=["a", "b", "c", "d", "e", "f"])
print(result.get("e"))
print(result) # {'a': 2, 'b': 2, 'c': 3, 'd': 4, 'e': 1, 'f': <function add at 0x70af1e29b4c0>}


if __name__ == "__main__":
main()
40 changes: 40 additions & 0 deletions examples/graphtask_nested_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""This example shows how to build graph dynamically in the remote side"""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def minus(a, b):
return a - b


def fibonacci(clnt: Client, n: int):
if n == 0:
return 0
elif n == 1:
return 1
else:
# Dynamically building graph in the worker side is okay.
# BE WARNED! You are not suppose to use it like that. This is to demonstrate the ability instead of intention
# of what graph can do. This should rarely be done. Redesign if you find yourself in this position. With the
# ability to dynamically build a graph, one can even concatenate the source graph to its child (as long as they
# evaluate to a value).
fib_graph = {"n": n, "one": 1, "two": 2, "n_minus_one": (minus, "n", "one"), "n_minus_two": (minus, "n", "two")}
res = clnt.get(fib_graph, keys=["n_minus_one", "n_minus_two"])
n_minus_one = res.get("n_minus_one")
n_minus_two = res.get("n_minus_two")
a = clnt.submit(fibonacci, clnt, n_minus_one)
b = clnt.submit(fibonacci, clnt, n_minus_two)
return a.result() + b.result()


def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=10)
client = Client(address=cluster.get_address())
result = client.submit(fibonacci, client, 8).result()
print(result) # 21


if __name__ == "__main__":
main()
31 changes: 31 additions & 0 deletions examples/map_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
This example shows how to use the Client.map() method.
Client.map() allows the user to invoke a callable many times with different values.
For more information on the map operation, refer to
https://en.wikipedia.org/wiki/Map_(higher-order_function)
"""

import math

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=10)
client = Client(address=cluster.get_address())

# map each integer in [0, 100) through math.sqrt()
# the first parameter is the function to call, and the second is a list of argument tuples
# (x,) denotes a tuple of length one
results = client.map(math.sqrt, [(x,) for x in range(100)])

# Collect the results and sums them
result = sum(results)

print(result)


if __name__ == "__main__":
main()
29 changes: 29 additions & 0 deletions examples/nested_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""This example shows how to created nested tasks. Please see graphtask_nested_client.py for more information."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


# Calculate fibonacci sequence with nested client.
# Each intermediate call in the recursive process is submitted to the client.
def fibonacci(client: Client, n: int):
if n == 0:
return 0
elif n == 1:
return 1
else:
a = client.submit(fibonacci, client, n - 1)
b = client.submit(fibonacci, client, n - 2)
return a.result() + b.result()


def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=1)
client = Client(address=cluster.get_address())
result = client.submit(fibonacci, client, 8).result()
print(result) # 21


if __name__ == "__main__":
main()
26 changes: 26 additions & 0 deletions examples/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

## Scaler Examples

If you wish to run examples in current working directory, prefix `python run_xxx.py` with `PYTHONPATH=..`

Ensure that the scheduler and cluster are set up before running clients.

- `disconnect_client.py`
Shows how to disconnect a client from scheduler
- `graphtask_client.py`
Shows how to send a graph based task to scheduler
- `graphtask_nested_client.py`
Shows how to dynamically build graph in the remote end
- `map_client.py`
Shows how to use client.map
- `nested_client.py`
Shows how to send a nested task to scheduler
- `simple_client.py`
Shows how to send a basic task to scheduler
- `simple_scheduler.py`
Shows how to initialize a Scheduler
- `simple_cluster.py`
Shows how to initialize a Cluster

```{include} disconnect_client.py
```
39 changes: 39 additions & 0 deletions examples/send_object_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
This example demonstrates how to use the Client.send_object() method.
This method is used to submit large objects to the cluster.
Users can then reuse this object without needing to retransmit it multiple times.
"""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo

large_object = [1, 2, 3, 4, 5]


def query(object_reference, idx):
return object_reference[idx]


def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=1)
client = Client(address=cluster.get_address())

# Send the "large" to the cluster for reuse. Providing a name for the object is optional.
# This method returns a reference to the object that we can use in place of the original object.
large_object_ref = client.send_object(large_object, name="large_object")

# Reuse through object reference
# Note that this example is not very interesting, since query is essentially a cheap operation that should be done
# in local end. We chose this operation since it demonstrates that operation and operator defined on the original
# type (list) can be applied to the reference.
fut1 = client.submit(query, large_object_ref, 0)
fut2 = client.submit(query, large_object_ref, 1)

# Get the result from the future.
print(fut1.result())
print(fut2.result())


if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions examples/simple_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
This example demonstrates the most basic implementation to work with scaler
Scaler applications have three parts - scheduler, cluster, and client.
Scheduler is used to schedule works send from client to cluster.
Cluster, composed of 1 or more worker(s), are used to execute works.
Client is used to send tasks to scheduler.

This example shows a client sends 100 tasks, where each task represents the
execution of math.sqrt function and get back the results.
"""

import math

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
# Instantiate a SchedulerClusterCombo which contains a scheduler and a cluster that contains n_workers workers. In
# this case, there are 10 workers. There are more options to control the behavior of SchedulerClusterCombo, you can
# check them out in other examples.
cluster = SchedulerClusterCombo(n_workers=10)

# Instantiate a Client that represents a client aforementioned. One may submit task using client.
# Since client is sending task(s) to the scheduler, we need to know the address that the scheduler has. In this
# case, we can get the address using cluster.get_address()
client = Client(address=cluster.get_address())
# Submits 100 tasks
futures = [
# In each iteration of the loop, we submit one task to the scheduler. Each task represents the execution of a
# function defined by you.
# Note: Users are responsible to correctly provide the argument(s) of a function that the user wish to call.
# Fail to do so results in exception.
# This is to demonstrate client.submit(). A better way to implement this particular case is to use
# client.map(). See `map_client.py` for more detail.
client.submit(math.sqrt, i)
for i in range(0, 100)
]

# Each call to Client.submit returns a future. Users are expected to keep the future until the task has been
# finished, or cancelled. The future returned by Client.submit is the only way to get results from corresponding
# tasks. In this case, future.result() will return a float, but this can be any type should the user wish.
result = sum(future.result() for future in futures)

print(result) # 661.46


if __name__ == "__main__":
main()
53 changes: 53 additions & 0 deletions examples/simple_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
This example shows how to instantiate a Cluster using the Python API.
For an example on how to instantiate a Scheduler, see simple_scheduler.py
"""

from scaler import Cluster
from scaler.io.config import (
DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS,
DEFAULT_HARD_PROCESSOR_SUSPEND,
DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
DEFAULT_TASK_TIMEOUT_SECONDS,
DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES,
DEFAULT_WORKER_DEATH_TIMEOUT,
)
from scaler.utility.network_util import get_available_tcp_port
from scaler.utility.zmq_config import ZMQConfig


def main():
N_WORKERS = 8
# Initialize a Cluster.
cluster = Cluster(
worker_io_threads=1,
address=ZMQConfig.from_string(f"tcp://127.0.0.1:{get_available_tcp_port()}"),
worker_names=[str(i) for i in range(N_WORKERS - 1)],
heartbeat_interval_seconds=DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
task_timeout_seconds=DEFAULT_TASK_TIMEOUT_SECONDS,
death_timeout_seconds=DEFAULT_WORKER_DEATH_TIMEOUT,
garbage_collect_interval_seconds=DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS,
trim_memory_threshold_bytes=DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES,
hard_processor_suspend=DEFAULT_HARD_PROCESSOR_SUSPEND,
event_loop="builtin", # Or "uvloop"
logging_paths=("/dev/stdout",),
logging_level="DEBUG", # other choices are "INFO", "WARNING", "ERROR", "CRITICAL"
logging_config_file=None,
)

# Start the cluster. The cluster will begin accepting tasks from the scheduler.
cluster.start()

# Shut down the cluster. Cluster subclasses Process and can be shutdown using `.terminate()`, or arbitrary signals
# can be sent using `.kill()`
cluster.terminate()

# Wait for the cluster's process to terminate.
cluster.join()

# Release resources. Must be called after `.join()` has returned.
cluster.close()


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions examples/simple_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""This example demonstrates how to start a scheduler using the Python API."""

from scaler import Scheduler
from scaler.io.config import (
DEFAULT_CLIENT_TIMEOUT_SECONDS,
DEFAULT_IO_THREADS,
DEFAULT_LOAD_BALANCE_SECONDS,
DEFAULT_LOAD_BALANCE_TRIGGER_TIMES,
DEFAULT_MAX_NUMBER_OF_TASKS_WAITING,
DEFAULT_OBJECT_RETENTION_SECONDS,
DEFAULT_PER_WORKER_QUEUE_SIZE,
DEFAULT_WORKER_TIMEOUT_SECONDS,
)
from scaler.scheduler.config import SchedulerConfig
from scaler.utility.network_util import get_available_tcp_port
from scaler.utility.zmq_config import ZMQConfig


def main():
# First we need a SchedulerConfig as the parameter to Scheduler's ctor.

# scaler provides a set of default to use. Kindly follow the comments there for detailed explanations.
# Note, these defaults aims to be a starting point. You should change the defaults according to your use case.
# Arguments that you would change are most likely "event_loop", "io_threads", "protected", and
# "per_worker_queue_size".
config = SchedulerConfig(
event_loop="builtin", # Either "builtin", or "uvloop"
address=ZMQConfig.from_string(f"tcp://127.0.0.1:{get_available_tcp_port()}"),
io_threads=DEFAULT_IO_THREADS, # Consider increasing this number if your workload is IO-heavy
max_number_of_tasks_waiting=DEFAULT_MAX_NUMBER_OF_TASKS_WAITING,
per_worker_queue_size=DEFAULT_PER_WORKER_QUEUE_SIZE,
client_timeout_seconds=DEFAULT_CLIENT_TIMEOUT_SECONDS,
worker_timeout_seconds=DEFAULT_WORKER_TIMEOUT_SECONDS,
object_retention_seconds=DEFAULT_OBJECT_RETENTION_SECONDS,
load_balance_seconds=DEFAULT_LOAD_BALANCE_SECONDS,
load_balance_trigger_times=DEFAULT_LOAD_BALANCE_TRIGGER_TIMES,
protected=False, # When false, clients can shutdown the scheduler.
)

# Then we put config into Scheduler. Unlike Cluster, scheduler should always be long running. Therefore, we don't
# provide API to close scheduler. The only way to shutdown a scheduler is through Client.shutdown, which shutdowns
# the scheduler if "protected" member variable is set to False
_ = Scheduler(config)


if __name__ == "__main__":
main()
Loading
Loading