diff --git a/examples/disconnect_client.py b/examples/disconnect_client.py new file mode 100644 index 000000000..4e3c36143 --- /dev/null +++ b/examples/disconnect_client.py @@ -0,0 +1,25 @@ +"""This example shows how to clear resources owned by a client and how to disconnect a client from the scheduler.""" + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + + +def main(): + cluster = SchedulerClusterCombo(n_workers=10) + client = Client(address=cluster.get_address()) + # Client.clear() will clear all computation resources owned by the client. All unfinished tasks will be cancelled, + # and all object reference will be invalidated. The client can submit tasks as it wishes. + client.clear() + + # Once disconnect is called, this client is invalidated, and no tasks can be installed on this client. + # Should the user wish to initiate more tasks, they should instantiate another Client. The scheduler's running + # state will not be affected by this method. + client.disconnect() + + # The user may also choose to shutdown the scheduler while disconnecting client from the scheduler. Such a request + # is not guaranteed to succeed, as the scheduler can only be closed when it is not running under "protected" mode. + # client.shutdown() + + +if __name__ == "__main__": + main() diff --git a/examples/graphtask_client.py b/examples/graphtask_client.py new file mode 100644 index 000000000..0cba3ca8e --- /dev/null +++ b/examples/graphtask_client.py @@ -0,0 +1,51 @@ +"""This example shows how to utilize graph task functionality provided by scaler.""" + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + + +def inc(i): + return i + 1 + + +def add(a, b): + return a + b + + +def minus(a, b): + return a - b + + +# A graph task is defined as a dict with str as the key type and val_t as the value type, where val_t is defined as +# follows: +# Union[Any, Tuple[Union[Callable, str], ...] +# Each value can be one of the following: +# - a basic data type (int, List, etc.), +# - a callable, +# - a tuple of the form (Callable, key_t val1, key_t val2, ...) +# that represents a function call. +graph = { + "a": 2, + "b": 2, + "c": (inc, "a"), # c = a + 1 = 2 + 1 = 3 + "d": (add, "a", "b"), # d = a + b = 2 + 2 = 4 + "e": (minus, "d", "c"), # e = d - c = 4 - 3 = 1 + "f": add, +} + + +def main(): + # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py + cluster = SchedulerClusterCombo(n_workers=1) + client = Client(address=cluster.get_address()) + + # See graph's definition for more detail. + # The result is a dictionary that contains the requested keys. + # Each value provided in the graph will be evaluated and passed back. + result = client.get(graph, keys=["a", "b", "c", "d", "e", "f"]) + print(result.get("e")) + print(result) # {'a': 2, 'b': 2, 'c': 3, 'd': 4, 'e': 1, 'f': } + + +if __name__ == "__main__": + main() diff --git a/examples/graphtask_nested_client.py b/examples/graphtask_nested_client.py new file mode 100644 index 000000000..bd447485e --- /dev/null +++ b/examples/graphtask_nested_client.py @@ -0,0 +1,40 @@ +"""This example shows how to build graph dynamically in the remote side""" + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + + +def minus(a, b): + return a - b + + +def fibonacci(clnt: Client, n: int): + if n == 0: + return 0 + elif n == 1: + return 1 + else: + # Dynamically building graph in the worker side is okay. + # BE WARNED! You are not suppose to use it like that. This is to demonstrate the ability instead of intention + # of what graph can do. This should rarely be done. Redesign if you find yourself in this position. With the + # ability to dynamically build a graph, one can even concatenate the source graph to its child (as long as they + # evaluate to a value). + fib_graph = {"n": n, "one": 1, "two": 2, "n_minus_one": (minus, "n", "one"), "n_minus_two": (minus, "n", "two")} + res = clnt.get(fib_graph, keys=["n_minus_one", "n_minus_two"]) + n_minus_one = res.get("n_minus_one") + n_minus_two = res.get("n_minus_two") + a = clnt.submit(fibonacci, clnt, n_minus_one) + b = clnt.submit(fibonacci, clnt, n_minus_two) + return a.result() + b.result() + + +def main(): + # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py + cluster = SchedulerClusterCombo(n_workers=10) + client = Client(address=cluster.get_address()) + result = client.submit(fibonacci, client, 8).result() + print(result) # 21 + + +if __name__ == "__main__": + main() diff --git a/examples/map_client.py b/examples/map_client.py new file mode 100644 index 000000000..85ba3b5c4 --- /dev/null +++ b/examples/map_client.py @@ -0,0 +1,31 @@ +""" +This example shows how to use the Client.map() method. +Client.map() allows the user to invoke a callable many times with different values. +For more information on the map operation, refer to +https://en.wikipedia.org/wiki/Map_(higher-order_function) +""" + +import math + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + + +def main(): + # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py + cluster = SchedulerClusterCombo(n_workers=10) + client = Client(address=cluster.get_address()) + + # map each integer in [0, 100) through math.sqrt() + # the first parameter is the function to call, and the second is a list of argument tuples + # (x,) denotes a tuple of length one + results = client.map(math.sqrt, [(x,) for x in range(100)]) + + # Collect the results and sums them + result = sum(results) + + print(result) + + +if __name__ == "__main__": + main() diff --git a/examples/nested_client.py b/examples/nested_client.py new file mode 100644 index 000000000..11c8ed02c --- /dev/null +++ b/examples/nested_client.py @@ -0,0 +1,29 @@ +"""This example shows how to created nested tasks. Please see graphtask_nested_client.py for more information.""" + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + + +# Calculate fibonacci sequence with nested client. +# Each intermediate call in the recursive process is submitted to the client. +def fibonacci(client: Client, n: int): + if n == 0: + return 0 + elif n == 1: + return 1 + else: + a = client.submit(fibonacci, client, n - 1) + b = client.submit(fibonacci, client, n - 2) + return a.result() + b.result() + + +def main(): + # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py + cluster = SchedulerClusterCombo(n_workers=1) + client = Client(address=cluster.get_address()) + result = client.submit(fibonacci, client, 8).result() + print(result) # 21 + + +if __name__ == "__main__": + main() diff --git a/examples/readme.md b/examples/readme.md new file mode 100644 index 000000000..01197f73c --- /dev/null +++ b/examples/readme.md @@ -0,0 +1,26 @@ + +## Scaler Examples + +If you wish to run examples in current working directory, prefix `python run_xxx.py` with `PYTHONPATH=..` + +Ensure that the scheduler and cluster are set up before running clients. + +- `disconnect_client.py` + Shows how to disconnect a client from scheduler +- `graphtask_client.py` + Shows how to send a graph based task to scheduler +- `graphtask_nested_client.py` + Shows how to dynamically build graph in the remote end +- `map_client.py` + Shows how to use client.map +- `nested_client.py` + Shows how to send a nested task to scheduler +- `simple_client.py` + Shows how to send a basic task to scheduler +- `simple_scheduler.py` + Shows how to initialize a Scheduler +- `simple_cluster.py` + Shows how to initialize a Cluster + +```{include} disconnect_client.py +``` diff --git a/examples/send_object_client.py b/examples/send_object_client.py new file mode 100644 index 000000000..1cec6162d --- /dev/null +++ b/examples/send_object_client.py @@ -0,0 +1,39 @@ +""" +This example demonstrates how to use the Client.send_object() method. +This method is used to submit large objects to the cluster. +Users can then reuse this object without needing to retransmit it multiple times. +""" + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + +large_object = [1, 2, 3, 4, 5] + + +def query(object_reference, idx): + return object_reference[idx] + + +def main(): + # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py + cluster = SchedulerClusterCombo(n_workers=1) + client = Client(address=cluster.get_address()) + + # Send the "large" to the cluster for reuse. Providing a name for the object is optional. + # This method returns a reference to the object that we can use in place of the original object. + large_object_ref = client.send_object(large_object, name="large_object") + + # Reuse through object reference + # Note that this example is not very interesting, since query is essentially a cheap operation that should be done + # in local end. We chose this operation since it demonstrates that operation and operator defined on the original + # type (list) can be applied to the reference. + fut1 = client.submit(query, large_object_ref, 0) + fut2 = client.submit(query, large_object_ref, 1) + + # Get the result from the future. + print(fut1.result()) + print(fut2.result()) + + +if __name__ == "__main__": + main() diff --git a/examples/simple_client.py b/examples/simple_client.py new file mode 100644 index 000000000..faf7a96ef --- /dev/null +++ b/examples/simple_client.py @@ -0,0 +1,49 @@ +""" +This example demonstrates the most basic implementation to work with scaler +Scaler applications have three parts - scheduler, cluster, and client. +Scheduler is used to schedule works send from client to cluster. +Cluster, composed of 1 or more worker(s), are used to execute works. +Client is used to send tasks to scheduler. + +This example shows a client sends 100 tasks, where each task represents the +execution of math.sqrt function and get back the results. +""" + +import math + +from scaler import Client +from scaler.cluster.combo import SchedulerClusterCombo + + +def main(): + # Instantiate a SchedulerClusterCombo which contains a scheduler and a cluster that contains n_workers workers. In + # this case, there are 10 workers. There are more options to control the behavior of SchedulerClusterCombo, you can + # check them out in other examples. + cluster = SchedulerClusterCombo(n_workers=10) + + # Instantiate a Client that represents a client aforementioned. One may submit task using client. + # Since client is sending task(s) to the scheduler, we need to know the address that the scheduler has. In this + # case, we can get the address using cluster.get_address() + client = Client(address=cluster.get_address()) + # Submits 100 tasks + futures = [ + # In each iteration of the loop, we submit one task to the scheduler. Each task represents the execution of a + # function defined by you. + # Note: Users are responsible to correctly provide the argument(s) of a function that the user wish to call. + # Fail to do so results in exception. + # This is to demonstrate client.submit(). A better way to implement this particular case is to use + # client.map(). See `map_client.py` for more detail. + client.submit(math.sqrt, i) + for i in range(0, 100) + ] + + # Each call to Client.submit returns a future. Users are expected to keep the future until the task has been + # finished, or cancelled. The future returned by Client.submit is the only way to get results from corresponding + # tasks. In this case, future.result() will return a float, but this can be any type should the user wish. + result = sum(future.result() for future in futures) + + print(result) # 661.46 + + +if __name__ == "__main__": + main() diff --git a/examples/simple_cluster.py b/examples/simple_cluster.py new file mode 100644 index 000000000..0d3553690 --- /dev/null +++ b/examples/simple_cluster.py @@ -0,0 +1,53 @@ +""" +This example shows how to instantiate a Cluster using the Python API. +For an example on how to instantiate a Scheduler, see simple_scheduler.py +""" + +from scaler import Cluster +from scaler.io.config import ( + DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS, + DEFAULT_HARD_PROCESSOR_SUSPEND, + DEFAULT_HEARTBEAT_INTERVAL_SECONDS, + DEFAULT_TASK_TIMEOUT_SECONDS, + DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES, + DEFAULT_WORKER_DEATH_TIMEOUT, +) +from scaler.utility.network_util import get_available_tcp_port +from scaler.utility.zmq_config import ZMQConfig + + +def main(): + N_WORKERS = 8 + # Initialize a Cluster. + cluster = Cluster( + worker_io_threads=1, + address=ZMQConfig.from_string(f"tcp://127.0.0.1:{get_available_tcp_port()}"), + worker_names=[str(i) for i in range(N_WORKERS - 1)], + heartbeat_interval_seconds=DEFAULT_HEARTBEAT_INTERVAL_SECONDS, + task_timeout_seconds=DEFAULT_TASK_TIMEOUT_SECONDS, + death_timeout_seconds=DEFAULT_WORKER_DEATH_TIMEOUT, + garbage_collect_interval_seconds=DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS, + trim_memory_threshold_bytes=DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES, + hard_processor_suspend=DEFAULT_HARD_PROCESSOR_SUSPEND, + event_loop="builtin", # Or "uvloop" + logging_paths=("/dev/stdout",), + logging_level="DEBUG", # other choices are "INFO", "WARNING", "ERROR", "CRITICAL" + logging_config_file=None, + ) + + # Start the cluster. The cluster will begin accepting tasks from the scheduler. + cluster.start() + + # Shut down the cluster. Cluster subclasses Process and can be shutdown using `.terminate()`, or arbitrary signals + # can be sent using `.kill()` + cluster.terminate() + + # Wait for the cluster's process to terminate. + cluster.join() + + # Release resources. Must be called after `.join()` has returned. + cluster.close() + + +if __name__ == "__main__": + main() diff --git a/examples/simple_scheduler.py b/examples/simple_scheduler.py new file mode 100644 index 000000000..d5acd23c3 --- /dev/null +++ b/examples/simple_scheduler.py @@ -0,0 +1,47 @@ +"""This example demonstrates how to start a scheduler using the Python API.""" + +from scaler import Scheduler +from scaler.io.config import ( + DEFAULT_CLIENT_TIMEOUT_SECONDS, + DEFAULT_IO_THREADS, + DEFAULT_LOAD_BALANCE_SECONDS, + DEFAULT_LOAD_BALANCE_TRIGGER_TIMES, + DEFAULT_MAX_NUMBER_OF_TASKS_WAITING, + DEFAULT_OBJECT_RETENTION_SECONDS, + DEFAULT_PER_WORKER_QUEUE_SIZE, + DEFAULT_WORKER_TIMEOUT_SECONDS, +) +from scaler.scheduler.config import SchedulerConfig +from scaler.utility.network_util import get_available_tcp_port +from scaler.utility.zmq_config import ZMQConfig + + +def main(): + # First we need a SchedulerConfig as the parameter to Scheduler's ctor. + + # scaler provides a set of default to use. Kindly follow the comments there for detailed explanations. + # Note, these defaults aims to be a starting point. You should change the defaults according to your use case. + # Arguments that you would change are most likely "event_loop", "io_threads", "protected", and + # "per_worker_queue_size". + config = SchedulerConfig( + event_loop="builtin", # Either "builtin", or "uvloop" + address=ZMQConfig.from_string(f"tcp://127.0.0.1:{get_available_tcp_port()}"), + io_threads=DEFAULT_IO_THREADS, # Consider increasing this number if your workload is IO-heavy + max_number_of_tasks_waiting=DEFAULT_MAX_NUMBER_OF_TASKS_WAITING, + per_worker_queue_size=DEFAULT_PER_WORKER_QUEUE_SIZE, + client_timeout_seconds=DEFAULT_CLIENT_TIMEOUT_SECONDS, + worker_timeout_seconds=DEFAULT_WORKER_TIMEOUT_SECONDS, + object_retention_seconds=DEFAULT_OBJECT_RETENTION_SECONDS, + load_balance_seconds=DEFAULT_LOAD_BALANCE_SECONDS, + load_balance_trigger_times=DEFAULT_LOAD_BALANCE_TRIGGER_TIMES, + protected=False, # When false, clients can shutdown the scheduler. + ) + + # Then we put config into Scheduler. Unlike Cluster, scheduler should always be long running. Therefore, we don't + # provide API to close scheduler. The only way to shutdown a scheduler is through Client.shutdown, which shutdowns + # the scheduler if "protected" member variable is set to False + _ = Scheduler(config) + + +if __name__ == "__main__": + main() diff --git a/tests/test_examples.py b/tests/test_examples.py new file mode 100644 index 000000000..2699a971e --- /dev/null +++ b/tests/test_examples.py @@ -0,0 +1,21 @@ +import os +import unittest +from glob import glob + +from scaler.utility.logging.utility import setup_logger +from tests.utility import logging_test_name + + +class TestExamples(unittest.TestCase): + def setUp(self) -> None: + setup_logger() + logging_test_name(self) + + def tearDown(self) -> None: + pass + + def test_examples(self): + basic_examples = glob("examples/*.py") + prefix = "PYTHONPATH=. python " + for example in basic_examples: + assert os.system(prefix + example) == 0