diff --git a/.gitignore b/.gitignore index 894a44c..1630138 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,6 @@ venv.bak/ # mypy .mypy_cache/ + +*_pb2.py +*_pb2_grpc.py diff --git a/requirements.txt b/requirements.txt index 915391b..03aa603 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -grpcio>=1.14.2 +grpcio>=1.14.2 --no-binary grpcio grpcio-tools>=1.14.1 \ No newline at end of file diff --git a/run_example_service.py b/run_example_service.py index 77a18e2..6e6e736 100644 --- a/run_example_service.py +++ b/run_example_service.py @@ -11,28 +11,45 @@ from service import registry -logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s") +logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - " + "%(name)s - %(message)s") log = logging.getLogger("run_example_service") def main(): parser = argparse.ArgumentParser(description="Run services") - parser.add_argument("--no-daemon", action="store_false", dest="run_daemon", help="do not start the daemon") + parser.add_argument("--no-daemon", + action="store_false", + dest="run_daemon", + help="do not start the daemon") parser.add_argument("--daemon-config", dest="daemon_config", help="Path of daemon configuration file, without config it won't be started", - required=False - ) - parser.add_argument("--ssl", action="store_true", dest="run_ssl", help="start the daemon with SSL") + required=False) + parser.add_argument("--ssl", + action="store_true", + dest="run_ssl", + help="start the daemon with SSL") + parser.add_argument("--mp", + help="number of concurrent processes", + metavar="NUMBER_OF_PROCESSES", + default=1, + type=int, + required=False) args = parser.parse_args() root_path = pathlib.Path(__file__).absolute().parent - + # All services modules go here service_modules = ["service.example_service"] - + # Call for all the services listed in service_modules - all_p = start_all_services(root_path, service_modules, args.run_daemon, args.daemon_config, args.run_ssl) - + all_p = start_all_services(root_path, + service_modules, + args.run_daemon, + args.daemon_config, + args.run_ssl, + args.mp) + # Continuous checking all subprocess try: while True: @@ -46,26 +63,29 @@ def main(): raise -def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl): +def start_all_services(cwd, service_modules, + run_daemon, daemon_config, run_ssl, mp): """ Loop through all service_modules and start them. For each one, an instance of Daemon "snetd" is created. snetd will start with configs from "snetd.config.json" """ all_p = [] - for i, service_module in enumerate(service_modules): + for _, service_module in enumerate(service_modules): service_name = service_module.split(".")[-1] - log.info("Launching {} on port {}".format(str(registry[service_name]), service_module)) - all_p += start_service(cwd, service_module, run_daemon, daemon_config, run_ssl) + log.info("Launching {} on port {}".format(service_module, + str(registry[service_name]))) + all_p += start_service(cwd, service_module, + run_daemon, daemon_config, run_ssl, mp) return all_p -def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl): +def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl, mp): """ Starts SNET Daemon ("snetd") and the python module of the service at the passed gRPC port. """ - + def add_ssl_configs(conf): """Add SSL keys to snetd.config.json""" with open(conf, "r") as f: @@ -74,19 +94,23 @@ def add_ssl_configs(conf): snetd_configs["ssl_key"] = "/opt/singnet/.certs/privkey.pem" with open(conf, "w") as f: json.dump(snetd_configs, f, sort_keys=True, indent=4) - + all_p = [] if run_daemon: if daemon_config: all_p.append(start_snetd(str(cwd), daemon_config)) else: - for idx, config_file in enumerate(glob.glob("./snetd_configs/*.json")): + for _, config_file in enumerate(glob.glob("./snetd_configs/*.json")): if run_ssl: add_ssl_configs(config_file) all_p.append(start_snetd(str(cwd), config_file)) service_name = service_module.split(".")[-1] grpc_port = registry[service_name]["grpc"] - p = subprocess.Popen([sys.executable, "-m", service_module, "--grpc-port", str(grpc_port)], cwd=str(cwd)) + p = subprocess.Popen([sys.executable, + "-m", service_module, + "--grpc-port", str(grpc_port), + "--mp", str(mp)], + cwd=str(cwd)) all_p.append(p) return all_p diff --git a/service/common.py b/service/common.py deleted file mode 100644 index 21aa4e5..0000000 --- a/service/common.py +++ /dev/null @@ -1,29 +0,0 @@ -import argparse -import os.path -import time - -from service import registry - - -def common_parser(script_name): - parser = argparse.ArgumentParser(prog=script_name) - service_name = os.path.splitext(os.path.basename(script_name))[0] - parser.add_argument("--grpc-port", - help="port to bind gRPC service to", - default=registry[service_name]['grpc'], - type=int, - required=False) - return parser - - -# From gRPC docs: -# Because start() does not block you may need to sleep-loop if there is nothing -# else for your code to do while serving. -def main_loop(grpc_handler, args): - server = grpc_handler(port=args.grpc_port) - server.start() - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - server.stop(0) diff --git a/service/example_service.py b/service/example_service.py index 97dd523..4285f5f 100644 --- a/service/example_service.py +++ b/service/example_service.py @@ -1,18 +1,29 @@ -import sys +from concurrent import futures +import contextlib +import datetime import logging +import multiprocessing +import time +import socket +import sys +import argparse +import os import grpc -import concurrent.futures as futures -import service.common +from service import registry # Importing the generated codes from buildproto.sh import service.service_spec.example_service_pb2_grpc as grpc_bt_grpc from service.service_spec.example_service_pb2 import Result -logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s") -log = logging.getLogger("example_service") +logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s]" + " - %(name)s - %(message)s") +_LOGGER = logging.getLogger("example_service") +_ONE_DAY = datetime.timedelta(days=1) +_PROCESS_COUNT = multiprocessing.cpu_count() +_THREAD_CONCURRENCY = _PROCESS_COUNT """ Simple arithmetic service to test the Snet Daemon (gRPC), dApp and/or Snet-CLI. @@ -41,16 +52,18 @@ value: 924.0 """ - # Create a class to be added to the gRPC server # derived from the protobuf codes. + + class CalculatorServicer(grpc_bt_grpc.CalculatorServicer): def __init__(self): + self.pid = os.getpid() self.a = 0 self.b = 0 self.result = 0 # Just for debugging purpose. - log.debug("CalculatorServicer created") + _LOGGER.debug("[{}] CalculatorServicer created".format(self.pid)) # The method that will be exposed to the snet-cli call command. # request: incoming data @@ -64,7 +77,10 @@ def add(self, request, context): self.result = Result() self.result.value = self.a + self.b - log.debug("add({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] add({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result def sub(self, request, context): @@ -73,7 +89,10 @@ def sub(self, request, context): self.result = Result() self.result.value = self.a - self.b - log.debug("sub({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] sub({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result def mul(self, request, context): @@ -82,7 +101,10 @@ def mul(self, request, context): self.result = Result() self.result.value = self.a * self.b - log.debug("mul({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] mul({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result def div(self, request, context): @@ -91,10 +113,21 @@ def div(self, request, context): self.result = Result() self.result.value = self.a / self.b - log.debug("div({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] div({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result +def wait_forever(server): + try: + while True: + time.sleep(_ONE_DAY.total_seconds()) + except KeyboardInterrupt: + server.stop(None) + + # The gRPC serve function. # # Params: @@ -103,17 +136,62 @@ def div(self, request, context): # # Add all your classes to the server here. # (from generated .py files by protobuf compiler) -def serve(max_workers=10, port=7777): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) +def run_server(grpc_port=7777): + options = (('grpc.so_reuseport', 1),) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,), + options=options) grpc_bt_grpc.add_CalculatorServicer_to_server(CalculatorServicer(), server) - server.add_insecure_port("[::]:{}".format(port)) - return server + server.add_insecure_port("[::]:{}".format(grpc_port)) + server.start() + wait_forever(server) + + +@contextlib.contextmanager +def reserve_port(grpc_port=7777): + """Find and reserve a port for all subprocesses to use.""" + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(('', grpc_port)) + try: + yield sock.getsockname()[1] + finally: + sock.close() + + +def main(): + """ Runs the gRPC server to communicate with the SNET Daemon. """ + parser = argparse.ArgumentParser(prog=__file__) + service_name = os.path.splitext(os.path.basename(__file__))[0] + parser.add_argument("--grpc-port", + help="port to bind gRPC service to", + default=registry[service_name]['grpc'], + type=int, + required=False) + parser.add_argument("--mp", + help="number of concurrent processes", + metavar="NUMBER_OF_PROCESSES", + default=1, + type=int, + required=False) + args = parser.parse_args() + + num_processes = _PROCESS_COUNT if args.mp > _PROCESS_COUNT else args.mp + with reserve_port(args.grpc_port) as port: + sys.stdout.flush() + workers = [] + for _ in range(num_processes): + # NOTE: It is imperative that the worker subprocesses be forked before + # any gRPC servers start up. See + # https://github.com/grpc/grpc/issues/16001 for more details. + worker = multiprocessing.Process(target=run_server, args=(port,)) + worker.start() + workers.append(worker) + for worker in workers: + worker.join() if __name__ == "__main__": - """ - Runs the gRPC server to communicate with the Snet Daemon. - """ - parser = service.common.common_parser(__file__) - args = parser.parse_args(sys.argv[1:]) - service.common.main_loop(serve, args) + main() diff --git a/test_example_service.py b/test_example_service.py index bdda5ae..2f40608 100644 --- a/test_example_service.py +++ b/test_example_service.py @@ -16,11 +16,14 @@ test_flag = True # Example Service - Arithmetic - endpoint = input("Endpoint (localhost:{}): ".format(registry["example_service"]["grpc"])) if not test_flag else "" + endpoint = input("Endpoint (localhost:{}): ".format( + registry["example_service"]["grpc"])) if not test_flag else "" if endpoint == "": - endpoint = "localhost:{}".format(registry["example_service"]["grpc"]) + endpoint = "localhost:{}".format( + registry["example_service"]["grpc"]) - grpc_method = input("Method (add|sub|mul|div): ") if not test_flag else "mul" + grpc_method = input( + "Method (add|sub|mul|div): ") if not test_flag else "mul" a = float(input("Number 1: ") if not test_flag else "12") b = float(input("Number 2: ") if not test_flag else "7")