From c8fc4ce8abd2348493fc7c9e4084d9647dee3641 Mon Sep 17 00:00:00 2001 From: byeongjo-kim Date: Tue, 27 Sep 2022 18:53:03 +0900 Subject: [PATCH 1/5] using torchserve api when set model.ready=True --- .../kserve/kserve_wrapper/TorchserveModel.py | 45 ++++++++++++++++++- kubernetes/kserve/kserve_wrapper/__main__.py | 6 +++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py index d31079bf79..968c94cc31 100644 --- a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py +++ b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py @@ -1,5 +1,8 @@ """ The torchserve side inference end-points request are handled to return a KServe side response """ +import os +import time +import requests import json import logging import pathlib @@ -19,6 +22,7 @@ REGISTER_URL_FORMAT = "{0}/models?initial_workers=1&url={1}" UNREGISTER_URL_FORMAT = "{0}/models/{1}" +READINESS_URL_FORMAT = "{0}/models/{1}" PREDICTOR_URL_FORMAT = "http://{0}/v1/models/{1}:predict" EXPLAINER_URL_FORMAT = "http://{0}/v1/models/{1}:explain" @@ -139,5 +143,42 @@ def load(self) -> bool: "More than one model file is detected, " f"Only one is allowed within model_dir: {existing_paths}" ) - self.ready = True - return self.ready + + num_try = 0 + model_load_max_try = int(os.environ.get('MODEL_LOAD_MAX_TRY', 10)) + model_load_delay = int(os.environ.get('MODEL_LOAD_DELAY', 30)) + model_load_timeout = int(os.environ.get('MODEL_LOAD_TIMEOUT', 5)) + + while num_try < model_load_max_try and not self.ready: + num_try = num_try + 1 + logging.info(f'Loading {self.name} .. {num_try} of {model_load_max_try} tries..') + + try: + response = requests.get( + READINESS_URL_FORMAT.format(self.management_address, self.name), + timeout=model_load_timeout + ).json() + + default_verison = response[0] + workers = default_verison['workers'] + workers_status = [worker['id'] for worker in workers if worker['status']=='READY'] + + if len(workers_status) == len(workers_status): + logging.info(f'The model {self.name} is ready') + self.ready = True + + except (requests.ConnectionError, + requests.Timeout, + requests.ConnectTimeout, + requests.ReadTimeout) as e: + logging.info(f'The model {self.name} is not ready') + + except Exception as e: + logging.info(e) + logging.info(f'Failed loading model {self.name}') + break + + logging.info(f'Sleep {model_load_delay} seconds for load {self.name}..') + time.sleep(model_load_delay) + + return self.ready \ No newline at end of file diff --git a/kubernetes/kserve/kserve_wrapper/__main__.py b/kubernetes/kserve/kserve_wrapper/__main__.py index 93d7622d7a..da2d2d1945 100644 --- a/kubernetes/kserve/kserve_wrapper/__main__.py +++ b/kubernetes/kserve/kserve_wrapper/__main__.py @@ -97,6 +97,12 @@ def parse_config(): # By default model.load() is called on first request. Enabling load all # model in TS config.properties, all models are loaded at start and the # below method sets status to true for the models. + # However, even if all preparations related to loading the model (e.g., + # download pretrained models using online storage) are not completed in + # torchserve handler, if model.ready=true is set, there may be problems. + # Therefore, the ready status is determined using the api provided by + # torchserve. + model.load() models.append(model) registeredModels = TSModelRepository( From 099b37cf5ed34b302fd2109fb73dec424374634a Mon Sep 17 00:00:00 2001 From: byeongjo-kim Date: Fri, 30 Sep 2022 17:34:25 +0900 Subject: [PATCH 2/5] add customized describing model api if want to check handler's status --- .../kserve/kserve_wrapper/TorchserveModel.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py index 968c94cc31..0528e5227b 100644 --- a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py +++ b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py @@ -22,7 +22,7 @@ REGISTER_URL_FORMAT = "{0}/models?initial_workers=1&url={1}" UNREGISTER_URL_FORMAT = "{0}/models/{1}" -READINESS_URL_FORMAT = "{0}/models/{1}" +READINESS_URL_FORMAT = "{0}/models/{1}?customized={2}" PREDICTOR_URL_FORMAT = "http://{0}/v1/models/{1}:predict" EXPLAINER_URL_FORMAT = "http://{0}/v1/models/{1}:explain" @@ -145,27 +145,31 @@ def load(self) -> bool: ) num_try = 0 + customized_value = os.environ.get('IS_CUSTOMIZED', 'false') model_load_max_try = int(os.environ.get('MODEL_LOAD_MAX_TRY', 10)) model_load_delay = int(os.environ.get('MODEL_LOAD_DELAY', 30)) model_load_timeout = int(os.environ.get('MODEL_LOAD_TIMEOUT', 5)) - while num_try < model_load_max_try and not self.ready: num_try = num_try + 1 logging.info(f'Loading {self.name} .. {num_try} of {model_load_max_try} tries..') try: response = requests.get( - READINESS_URL_FORMAT.format(self.management_address, self.name), + READINESS_URL_FORMAT.format(self.management_address, self.name, customized_value), timeout=model_load_timeout ).json() default_verison = response[0] + workers = default_verison['workers'] workers_status = [worker['id'] for worker in workers if worker['status']=='READY'] - if len(workers_status) == len(workers_status): - logging.info(f'The model {self.name} is ready') - self.ready = True + worker_ready = False + if len(workers_status) > 0: + worker_ready = True + + self.ready = worker_ready if customized_value == 'false' \ + else worker_ready and 'customizedMetadata' in default_verison except (requests.ConnectionError, requests.Timeout, @@ -181,4 +185,7 @@ def load(self) -> bool: logging.info(f'Sleep {model_load_delay} seconds for load {self.name}..') time.sleep(model_load_delay) - return self.ready \ No newline at end of file + if self.ready: + logging.info(f'The model {self.name} is ready') + + return self.ready From c6412d0566293d5e2633f6f98d38401c4d8d349b Mon Sep 17 00:00:00 2001 From: byeongjo-kim Date: Fri, 30 Sep 2022 17:57:24 +0900 Subject: [PATCH 3/5] modify env name --- kubernetes/kserve/kserve_wrapper/TorchserveModel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py index 0528e5227b..bbe4dd3fba 100644 --- a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py +++ b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py @@ -145,7 +145,7 @@ def load(self) -> bool: ) num_try = 0 - customized_value = os.environ.get('IS_CUSTOMIZED', 'false') + model_load_customized = os.environ.get('MODEL_LOAD_CUSTOMIZED', 'false') model_load_max_try = int(os.environ.get('MODEL_LOAD_MAX_TRY', 10)) model_load_delay = int(os.environ.get('MODEL_LOAD_DELAY', 30)) model_load_timeout = int(os.environ.get('MODEL_LOAD_TIMEOUT', 5)) @@ -155,7 +155,7 @@ def load(self) -> bool: try: response = requests.get( - READINESS_URL_FORMAT.format(self.management_address, self.name, customized_value), + READINESS_URL_FORMAT.format(self.management_address, self.name, model_load_customized), timeout=model_load_timeout ).json() @@ -168,7 +168,7 @@ def load(self) -> bool: if len(workers_status) > 0: worker_ready = True - self.ready = worker_ready if customized_value == 'false' \ + self.ready = worker_ready if model_load_customized == 'false' \ else worker_ready and 'customizedMetadata' in default_verison except (requests.ConnectionError, From 39d5140508b4419a7bc931043913e491a983afc9 Mon Sep 17 00:00:00 2001 From: Ankith Gunapal Date: Fri, 8 Dec 2023 11:11:56 -0800 Subject: [PATCH 4/5] Update TorchserveModel.py lint failure --- kubernetes/kserve/kserve_wrapper/TorchserveModel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py index 7e3810ea47..a5ae6ca86e 100644 --- a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py +++ b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py @@ -3,7 +3,6 @@ import os import time import requests -import json import logging import pathlib from enum import Enum From 0d7edcc4e50afd8340b063f140e96c1c69098557 Mon Sep 17 00:00:00 2001 From: ByeongjoKim Date: Wed, 24 Jan 2024 20:11:50 +0900 Subject: [PATCH 5/5] Reformatted TorchserveModel.py --- .../kserve/kserve_wrapper/TorchserveModel.py | 57 +++++++++++-------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py index a5ae6ca86e..acf93a851d 100644 --- a/kubernetes/kserve/kserve_wrapper/TorchserveModel.py +++ b/kubernetes/kserve/kserve_wrapper/TorchserveModel.py @@ -1,16 +1,16 @@ """ The torchserve side inference end-points request are handled to return a KServe side response """ -import os -import time -import requests import logging +import os import pathlib +import time from enum import Enum from typing import Dict, Union import grpc import inference_pb2_grpc import kserve +import requests from gprc_utils import from_ts_grpc, to_ts_grpc from inference_pb2 import PredictionResponse from kserve.errors import ModelMissingError @@ -156,47 +156,58 @@ def load(self) -> bool: raise ModelMissingError(model_path) num_try = 0 - model_load_customized = os.environ.get('MODEL_LOAD_CUSTOMIZED', 'false') - model_load_max_try = int(os.environ.get('MODEL_LOAD_MAX_TRY', 10)) - model_load_delay = int(os.environ.get('MODEL_LOAD_DELAY', 30)) - model_load_timeout = int(os.environ.get('MODEL_LOAD_TIMEOUT', 5)) + model_load_customized = os.environ.get("MODEL_LOAD_CUSTOMIZED", "false") + model_load_max_try = int(os.environ.get("MODEL_LOAD_MAX_TRY", 10)) + model_load_delay = int(os.environ.get("MODEL_LOAD_DELAY", 30)) + model_load_timeout = int(os.environ.get("MODEL_LOAD_TIMEOUT", 5)) while num_try < model_load_max_try and not self.ready: num_try = num_try + 1 - logging.info(f'Loading {self.name} .. {num_try} of {model_load_max_try} tries..') + logging.info( + f"Loading {self.name} .. {num_try} of {model_load_max_try} tries.." + ) try: response = requests.get( - READINESS_URL_FORMAT.format(self.management_address, self.name, model_load_customized), - timeout=model_load_timeout + READINESS_URL_FORMAT.format( + self.management_address, self.name, model_load_customized + ), + timeout=model_load_timeout, ).json() default_verison = response[0] - workers = default_verison['workers'] - workers_status = [worker['id'] for worker in workers if worker['status']=='READY'] + workers = default_verison["workers"] + workers_status = [ + worker["id"] for worker in workers if worker["status"] == "READY" + ] worker_ready = False if len(workers_status) > 0: worker_ready = True - self.ready = worker_ready if model_load_customized == 'false' \ - else worker_ready and 'customizedMetadata' in default_verison + self.ready = ( + worker_ready + if model_load_customized == "false" + else worker_ready and "customizedMetadata" in default_verison + ) - except (requests.ConnectionError, - requests.Timeout, - requests.ConnectTimeout, - requests.ReadTimeout) as e: - logging.info(f'The model {self.name} is not ready') + except ( + requests.ConnectionError, + requests.Timeout, + requests.ConnectTimeout, + requests.ReadTimeout, + ) as e: + logging.info(f"The model {self.name} is not ready") except Exception as e: logging.info(e) - logging.info(f'Failed loading model {self.name}') + logging.info(f"Failed loading model {self.name}") break - logging.info(f'Sleep {model_load_delay} seconds for load {self.name}..') + logging.info(f"Sleep {model_load_delay} seconds for load {self.name}..") time.sleep(model_load_delay) if self.ready: - logging.info(f'The model {self.name} is ready') - + logging.info(f"The model {self.name} is ready") + return self.ready