diff --git a/pycue/opencue/cuebot.py b/pycue/opencue/cuebot.py index 8d5c530d1..d14fb5160 100644 --- a/pycue/opencue/cuebot.py +++ b/pycue/opencue/cuebot.py @@ -18,7 +18,6 @@ from __future__ import division from __future__ import absolute_import -from builtins import object from random import shuffle import abc import time @@ -68,82 +67,107 @@ logger = logging.getLogger("opencue") -DEFAULT_MAX_MESSAGE_BYTES = 1024 ** 2 * 10 +DEFAULT_MAX_MESSAGE_BYTES = 1024**2 * 10 DEFAULT_GRPC_PORT = 8443 +_DEFAULT_TIMEOUT_MS = 10000 # 10 seconds -if platform.system() != 'Darwin': +if platform.system() != "Darwin": # Avoid spamming users with epoll fork warning messages os.environ["GRPC_POLL_STRATEGY"] = "epoll1" -class Cuebot(object): - """Used to manage the connection to the Cuebot. Normally the connection - to the Cuebot is made automatically as needed so you don't have to explicitly - call Cuebot.connect(). - - If you need to change the host(s) in which the library is connecting to, - you have a couple options. You can set it programmatically with - Cuebot.setHosts or set the CUEBOT_HOSTS environment variable - to a comma delimited list of host names.""" - RpcChannel = None - Hosts = [] - Stubs = {} - Config = opencue.config.load_config_from_file() - Timeout = Config.get('cuebot.timeout', 10000) + +class CuebotConnectionManager: + """Singleton manager for Cuebot connections. + + Manages the connection to the Cuebot server. Normally the connection + to the Cuebot is made automatically as needed so you don't have to explicitly + call init(). + + If you need to change the host(s) in which the library is connecting to, + you have a couple options. You can set it programmatically with + set_hosts or set the CUEBOT_HOSTS environment variable + to a comma delimited list of host names. + """ + + # Singleton instance + _instance = None + _initialized = False PROTO_MAP = { - 'action': filter_pb2, - 'allocation': facility_pb2, - 'comment': comment_pb2, - 'criterion': criterion_pb2, - 'cue': cue_pb2, - 'department': department_pb2, - 'depend': depend_pb2, - 'facility': facility_pb2, - 'filter': filter_pb2, - 'frame': job_pb2, - 'group': job_pb2, - 'host': host_pb2, - 'job': job_pb2, - 'layer': job_pb2, - 'limit': limit_pb2, - 'matcher': filter_pb2, - 'owner': host_pb2, - 'proc': host_pb2, - 'renderPartition': renderPartition_pb2, - 'service': service_pb2, - 'show': show_pb2, - 'subscription': subscription_pb2, - 'task': task_pb2 + "action": filter_pb2, + "allocation": facility_pb2, + "comment": comment_pb2, + "criterion": criterion_pb2, + "cue": cue_pb2, + "department": department_pb2, + "depend": depend_pb2, + "facility": facility_pb2, + "filter": filter_pb2, + "frame": job_pb2, + "group": job_pb2, + "host": host_pb2, + "job": job_pb2, + "layer": job_pb2, + "limit": limit_pb2, + "matcher": filter_pb2, + "owner": host_pb2, + "proc": host_pb2, + "renderPartition": renderPartition_pb2, + "service": service_pb2, + "show": show_pb2, + "subscription": subscription_pb2, + "task": task_pb2, } SERVICE_MAP = { - 'action': filter_pb2_grpc.ActionInterfaceStub, - 'allocation': facility_pb2_grpc.AllocationInterfaceStub, - 'comment': comment_pb2_grpc.CommentInterfaceStub, - 'cue': cue_pb2_grpc.CueInterfaceStub, - 'depend': depend_pb2_grpc.DependInterfaceStub, - 'department': department_pb2_grpc.DepartmentInterfaceStub, - 'facility': facility_pb2_grpc.FacilityInterfaceStub, - 'filter': filter_pb2_grpc.FilterInterfaceStub, - 'frame': job_pb2_grpc.FrameInterfaceStub, - 'group': job_pb2_grpc.GroupInterfaceStub, - 'host': host_pb2_grpc.HostInterfaceStub, - 'job': job_pb2_grpc.JobInterfaceStub, - 'layer': job_pb2_grpc.LayerInterfaceStub, - 'limit': limit_pb2_grpc.LimitInterfaceStub, - 'matcher': filter_pb2_grpc.MatcherInterfaceStub, - 'owner': host_pb2_grpc.OwnerInterfaceStub, - 'proc': host_pb2_grpc.ProcInterfaceStub, - 'renderPartition': renderPartition_pb2_grpc.RenderPartitionInterfaceStub, - 'service': service_pb2_grpc.ServiceInterfaceStub, - 'serviceOverride': service_pb2_grpc.ServiceOverrideInterfaceStub, - 'show': show_pb2_grpc.ShowInterfaceStub, - 'subscription': subscription_pb2_grpc.SubscriptionInterfaceStub, - 'task': task_pb2_grpc.TaskInterfaceStub + "action": filter_pb2_grpc.ActionInterfaceStub, + "allocation": facility_pb2_grpc.AllocationInterfaceStub, + "comment": comment_pb2_grpc.CommentInterfaceStub, + "cue": cue_pb2_grpc.CueInterfaceStub, + "depend": depend_pb2_grpc.DependInterfaceStub, + "department": department_pb2_grpc.DepartmentInterfaceStub, + "facility": facility_pb2_grpc.FacilityInterfaceStub, + "filter": filter_pb2_grpc.FilterInterfaceStub, + "frame": job_pb2_grpc.FrameInterfaceStub, + "group": job_pb2_grpc.GroupInterfaceStub, + "host": host_pb2_grpc.HostInterfaceStub, + "job": job_pb2_grpc.JobInterfaceStub, + "layer": job_pb2_grpc.LayerInterfaceStub, + "limit": limit_pb2_grpc.LimitInterfaceStub, + "matcher": filter_pb2_grpc.MatcherInterfaceStub, + "owner": host_pb2_grpc.OwnerInterfaceStub, + "proc": host_pb2_grpc.ProcInterfaceStub, + "renderPartition": renderPartition_pb2_grpc.RenderPartitionInterfaceStub, + "service": service_pb2_grpc.ServiceInterfaceStub, + "serviceOverride": service_pb2_grpc.ServiceOverrideInterfaceStub, + "show": show_pb2_grpc.ShowInterfaceStub, + "subscription": subscription_pb2_grpc.SubscriptionInterfaceStub, + "task": task_pb2_grpc.TaskInterfaceStub, } - @staticmethod - def init(config=None): + def __new__(cls): + """Singleton pattern implementation.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, config=None): + """Initialize the CuebotConnectionManager instance.""" + if self._initialized: + return + + self.rpc_channel = None + self.hosts = [] + self.stubs = {} + self.config = opencue.config.load_config_from_file() + self.timeout = self.config.get("cuebot.timeout", _DEFAULT_TIMEOUT_MS) + + self.initialize(config) + + # Mark as initialized to prevent re-initialization + self.__class__._initialized = True + + def initialize(self, config=None): """Main init method for setting up the Cuebot object. Sets the communication channel and hosts. @@ -153,97 +177,109 @@ def init(config=None): hosts_env = os.getenv("CUEBOT_HOSTS") if config: - Cuebot.Config = config - Cuebot.Timeout = config.get('cuebot.timeout', Cuebot.Timeout) + self.config = config + self.timeout = config.get("cuebot.timeout", self.timeout) if hosts_env: - Cuebot.setHosts(hosts_env.split(",")) + self.set_hosts(hosts_env.split(",")) else: - facility = os.getenv("CUEBOT_FACILITY", Cuebot.Config.get("cuebot.facility_default")) - Cuebot.setHostWithFacility(facility) - if Cuebot.Hosts is None: - raise CueException('Cuebot host not set. Please ensure CUEBOT_HOSTS is set ' + - 'or a facility_default host is set in the yaml pycue config.') - - @staticmethod - def setChannel(): + facility = os.getenv( + "CUEBOT_FACILITY", self.config.get("cuebot.facility_default") + ) + self.set_host_with_facility(facility) + if not self.hosts: + raise CueException( + "Cuebot host not set. Please ensure CUEBOT_HOSTS is set " + + "or a facility_default host is set in the yaml pycue config." + ) + + def set_channel(self): """Sets the gRPC channel connection""" # gRPC must specify a single host. Randomize host list to balance load across cuebots. - hosts = list(Cuebot.Hosts) + hosts = list(self.hosts) shuffle(hosts) - maxMessageBytes = Cuebot.Config.get('cuebot.max_message_bytes', DEFAULT_MAX_MESSAGE_BYTES) + max_message_bytes = self.config.get( + "cuebot.max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES + ) # create interceptors interceptors = ( RetryOnRpcErrorClientInterceptor( max_attempts=4, - sleeping_policy=ExponentialBackoff(init_backoff_ms=100, - max_backoff_ms=1600, - multiplier=2), + sleeping_policy=ExponentialBackoff( + init_backoff_ms=100, max_backoff_ms=1600, multiplier=2 + ), status_for_retry=(grpc.StatusCode.UNAVAILABLE,), ), ) - connectStr = "Not Defined" + connect_str = "Not Defined" for host in hosts: - if ':' in host: - connectStr = host + if ":" in host: + connect_str = host else: - connectStr = '%s:%s' % ( - host, Cuebot.Config.get('cuebot.grpc_port', DEFAULT_GRPC_PORT)) - # pylint: disable=logging-not-lazy - logger.debug('connecting to gRPC at %s' % connectStr) - # pylint: enable=logging-not-lazy + connect_str = "%s:%s" % ( + host, + self.config.get("cuebot.grpc_port", DEFAULT_GRPC_PORT), + ) + logger.debug("connecting to gRPC at %s" % connect_str) # TODO(bcipriano) Configure gRPC TLS. (Issue #150) try: - Cuebot.RpcChannel = grpc.intercept_channel( - grpc.insecure_channel(connectStr, options=[ - ('grpc.max_send_message_length', maxMessageBytes), - ('grpc.max_receive_message_length', maxMessageBytes)]), - *interceptors) + self.rpc_channel = grpc.intercept_channel( + grpc.insecure_channel( + connect_str, + options=[ + ("grpc.max_send_message_length", max_message_bytes), + ("grpc.max_receive_message_length", max_message_bytes), + ], + ), + *interceptors, + ) # Test the connection - Cuebot.getStub('cue').GetSystemStats( - cue_pb2.CueGetSystemStatsRequest(), timeout=Cuebot.Timeout) + self.get_stub("cue").GetSystemStats( + cue_pb2.CueGetSystemStatsRequest(), timeout=self.timeout + ) # pylint: disable=broad-except except Exception: - logger.warning('Could not establish grpc channel with %s', connectStr) + logger.warning("Could not establish grpc channel with %s", connect_str) continue - atexit.register(Cuebot.closeChannel) + atexit.register(self.close_channel) return None - raise ConnectionException('No grpc connection could be established. ' + - 'Please check configured cuebot hosts: ' + connectStr) + raise ConnectionException( + "No grpc connection could be established. " + + "Please check configured cuebot hosts: " + + connect_str + ) - @staticmethod - def closeChannel(): + def close_channel(self): """Close the gRPC channel, delete it and reset it to None.""" - if Cuebot and Cuebot.RpcChannel is not None: - Cuebot.RpcChannel.close() - del Cuebot.RpcChannel - Cuebot.RpcChannel = None + if self.rpc_channel is not None: + self.rpc_channel.close() + del self.rpc_channel + self.rpc_channel = None - @staticmethod - def resetChannel(): + def reset_channel(self): """Close and reopen the gRPC channel.""" - Cuebot.closeChannel() - Cuebot.setChannel() + self.close_channel() + self.set_channel() - @staticmethod - def setHostWithFacility(facility): + def set_host_with_facility(self, facility): """Sets hosts to connect to based on the provided facility. If an unknown facility is provided, it will fall back to the one listed in cuebot.facility_default :type facility: str :param facility: a facility named in the config file""" - if facility not in list(Cuebot.Config.get("cuebot.facility").keys()): - default = Cuebot.Config.get("cuebot.facility_default") - logger.warning("The facility '%s' does not exist, defaulting to %s", facility, default) + if facility not in list(self.config.get("cuebot.facility").keys()): + default = self.config.get("cuebot.facility_default") + logger.warning( + "The facility '%s' does not exist, defaulting to %s", facility, default + ) facility = default logger.debug("setting facility to: %s", facility) - hosts = Cuebot.Config.get("cuebot.facility")[facility] - Cuebot.setHosts(hosts) + hosts = self.config.get("cuebot.facility")[facility] + self.set_hosts(hosts) - @staticmethod - def setHosts(hosts): + def set_hosts(self, hosts): """Sets the cuebot host names to connect to. :param hosts: a list of hosts or a host @@ -251,21 +287,20 @@ def setHosts(hosts): if isinstance(hosts, str): hosts = [hosts] logger.debug("setting new server hosts to: %s", hosts) - Cuebot.Hosts = hosts - Cuebot.resetChannel() + self.hosts = hosts + self.reset_channel() - @staticmethod - def setTimeout(timeout): + def set_timeout(self, timeout): """Sets the default network timeout. :param timeout: The network connection timeout in millis. :type timeout: int """ logger.debug("setting new server timeout to: %d", timeout) - Cuebot.Timeout = timeout + self.timeout = timeout @classmethod - def getProto(cls, name): + def get_proto(cls, name): """Returns a proto class for the given name.""" proto = cls.PROTO_MAP.get(name) if proto is None: @@ -273,37 +308,154 @@ def getProto(cls, name): return proto @classmethod - def getService(cls, name): + def get_service(cls, name): """Returns the service for the given name.""" service = cls.SERVICE_MAP.get(name) if service is None: raise ValueError("Could not find stub interface for {}.".format(name)) return service - @classmethod - def getStub(cls, name): + def get_stub(self, name): """Get the matching stub from the SERVICE_MAP. Reuse an existing one if possible. :param name: name of stub key for SERVICE_MAP :type name: str""" - if Cuebot.RpcChannel is None: - cls.init() + if self.rpc_channel is None: + self.initialize() + + service = self.get_service(name) + return service(self.rpc_channel) + - service = cls.getService(name) - return service(Cuebot.RpcChannel) +# Lazy proxy class for backward compatibility +class _CuebotLazyProxy: + """Lazy proxy for backward compatibility with the old Cuebot interface. + + This class acts as a proxy that only instantiates the actual CuebotConnectionManager + when methods or properties are first accessed, avoiding premature initialization + during module import. + """ + # Class attributes for backward compatibility + PROTO_MAP = CuebotConnectionManager.PROTO_MAP + SERVICE_MAP = CuebotConnectionManager.SERVICE_MAP + + # Backward compatibility static methods with original signatures + @staticmethod + def init(config=None): + """Legacy static init method for backward compatibility.""" + return CuebotConnectionManager().initialize(config) + + @staticmethod + def setChannel(): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().set_channel() + + @staticmethod + def setTimeout(timeout): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().set_timeout(timeout) + + @staticmethod + def setHostWithFacility(facility): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().set_host_with_facility(facility) + + @staticmethod + def setHosts(hosts): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().set_hosts(hosts) + + @staticmethod + def getStub(name): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().get_stub(name) + + @staticmethod + def closeChannel(): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().close_channel() + + @staticmethod + def resetChannel(): + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().reset_channel() @staticmethod def getConfig(): - """Gets the Cuebot config object, originally read in from the config file on disk.""" - return Cuebot.Config + """Legacy static method for backward compatibility.""" + return CuebotConnectionManager().config + @classmethod + def getService(cls, name): + """Legacy class method for backward compatibility.""" + return CuebotConnectionManager.get_service(name) -# Python 2/3 compatible implementation of ABC -ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + @classmethod + def getProto(cls, name): + """Legacy class method for backward compatibility.""" + return CuebotConnectionManager.get_proto(name) + + # Backward compatibility properties for old attribute access + @property + def RpcChannel(self): + """Backward compatibility property for RpcChannel.""" + return CuebotConnectionManager().rpc_channel + + @RpcChannel.setter + def RpcChannel(self, value): + """Backward compatibility setter for RpcChannel.""" + CuebotConnectionManager().rpc_channel = value + + @property + def Hosts(self): + """Backward compatibility property for Hosts.""" + return CuebotConnectionManager().hosts + + @Hosts.setter + def Hosts(self, value): + """Backward compatibility setter for Hosts.""" + CuebotConnectionManager().hosts = value + + @property + def Stubs(self): + """Backward compatibility property for Stubs.""" + return CuebotConnectionManager().stubs + + @Stubs.setter + def Stubs(self, value): + """Backward compatibility setter for Stubs.""" + CuebotConnectionManager().stubs = value + + @property + def Config(self): + """Backward compatibility property for Config.""" + return CuebotConnectionManager().config + + @Config.setter + def Config(self, value): + """Backward compatibility setter for Config.""" + CuebotConnectionManager().config = value + + @property + def Timeout(self): + """Backward compatibility property for Timeout.""" + return CuebotConnectionManager().timeout + + @Timeout.setter + def Timeout(self, value): + """Backward compatibility setter for Timeout.""" + CuebotConnectionManager().timeout = value + + def __call__(self, *args, **kwargs): + """Noop to support old `Cuebot()` calls. + + Returns itself. + """ + return self -class SleepingPolicy(ABC): +class SleepingPolicy(abc.ABC): """ Implement policy for sleeping between API retries """ @@ -389,3 +541,7 @@ def intercept_stream_unary( ): return self._intercept_call(continuation, client_call_details, request_iterator) + + +# Backward compatibility namespace as a lazy proxy +Cuebot = _CuebotLazyProxy()