diff --git a/CHANGELOG.md b/CHANGELOG.md index 421cd2f9..8c4d5b1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ API documentation [Changelog](https://www.kucoin.com/docs-new/change-log) Current synchronized API documentation version [20250313](https://www.kucoin.com/docs-new/change-log#20250313) +# 2025-03-31(Python 1.2.1) +- Optimize WebSocket reconnection logic + ## 2025-03-21(1.2.0) - Update the latest APIs, documentation, etc - Remove range validation in Python diff --git a/README.md b/README.md index cecb1292..e9b4b734 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ The **KuCoin Universal SDK** is the official SDK provided by KuCoin, offering a ### Latest Version: `1.2.0`(Global API version) -### Python Installation +### Python Installation(1.2.1) ```bash pip install kucoin-universal-sdk diff --git a/sdk/python/CHANGELOG.md b/sdk/python/CHANGELOG.md index 421cd2f9..8c4d5b1f 100644 --- a/sdk/python/CHANGELOG.md +++ b/sdk/python/CHANGELOG.md @@ -4,6 +4,9 @@ API documentation [Changelog](https://www.kucoin.com/docs-new/change-log) Current synchronized API documentation version [20250313](https://www.kucoin.com/docs-new/change-log#20250313) +# 2025-03-31(Python 1.2.1) +- Optimize WebSocket reconnection logic + ## 2025-03-21(1.2.0) - Update the latest APIs, documentation, etc - Remove range validation in Python diff --git a/sdk/python/README.md b/sdk/python/README.md index bdae8a31..9588c926 100644 --- a/sdk/python/README.md +++ b/sdk/python/README.md @@ -8,7 +8,7 @@ For an overview of the project and SDKs in other languages, refer to the [Main R ## 📦 Installation -### Latest Version: `1.2.0` +### Latest Version: `1.2.1` Install the Python SDK using `pip`: ```bash diff --git a/sdk/python/kucoin_universal_sdk/generate/version.py b/sdk/python/kucoin_universal_sdk/generate/version.py index a7c94240..b867794d 100644 --- a/sdk/python/kucoin_universal_sdk/generate/version.py +++ b/sdk/python/kucoin_universal_sdk/generate/version.py @@ -1,2 +1,2 @@ -sdk_version = "v1.2.0" -sdk_generate_date = "2025-03-21" +sdk_version = "v1.2.1" +sdk_generate_date = "2025-03-31" diff --git a/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_client.py b/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_client.py index ba7d89d5..23a427b4 100644 --- a/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_client.py +++ b/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_client.py @@ -4,7 +4,7 @@ import time import urllib.parse from queue import Queue, Full, Empty -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Callable import websocket @@ -12,44 +12,40 @@ from kucoin_universal_sdk.model.constants import WsMessageType from kucoin_universal_sdk.model.websocket_option import WebSocketClientOption from kucoin_universal_sdk.model.websocket_option import WebSocketEvent -from ..interfaces.websocket import WsTokenProvider, WsToken - -class WriteMsg: - def __init__(self, msg: WsMessage, timeout: float): - self.msg = msg - self.ts = time.time() - self.timeout = timeout - self.exception = None - self.event = threading.Event() - - def set_exception(self, exception: Exception): - self.exception = exception - self.event.set() - -class WebSocketClient: - def __init__(self, token_provider: WsTokenProvider, options: WebSocketClientOption): +from ..interfaces.websocket import WsTokenProvider, WsToken, WebsocketTransport, WriteMsg + + +class WebSocketClient(WebsocketTransport): + def __init__(self, token_provider: WsTokenProvider, options: WebSocketClientOption, + on_reconnected: Optional[Callable[[], None]], + on_message: Optional[Callable[[WsMessage], None]], + on_event: Callable[[WebSocketEvent, str, str], None]): self.options = options self.conn = None self.conn_lock = threading.Lock() self.connected = threading.Event() self.shutdown = threading.Event() - self.disconnect_event = threading.Event() - self.reconnected_event = threading.Event() self.token_provider = token_provider self.token_info = None self.close_event = threading.Event() - self.reconnect_close_event = threading.Event() - - self.read_msg = Queue(maxsize=options.read_message_buffer) - self.write_msg = Queue(maxsize=options.write_message_buffer) - + self.reconnecting = threading.Event() + # callbacks + self.on_reconnected = on_reconnected + self.on_message = on_message + self.on_event = on_event + # data queues + self.read_msg_queue = Queue(maxsize=options.read_message_buffer) + self.write_msg_queue = Queue(maxsize=options.write_message_buffer) + # ack self.ack_event: Dict[str, WriteMsg] = {} self.ack_event_lock = threading.Lock() - self.metric = {'ping_success': 0, 'ping_err': 0} + # looping thread self.keep_alive_thread = None self.write_thread = None - self.ws_thread = None + self.read_thread = None self.reconnect_thread = None + + self.ws_thread = None self.welcome_received = threading.Event() def start(self): @@ -63,27 +59,124 @@ def start(self): logging.error(f"Failed to start WebSocket client: {err}") raise self.connected.set() - self.notify_event(WebSocketEvent.EVENT_CONNECTED, "") + self.on_event(WebSocketEvent.EVENT_CONNECTED, "", "") self.run() - self.reconnect() + logging.info("Websocket client started") + if not self.reconnect_thread or not self.reconnect_thread.is_alive(): + self.reconnect_thread = threading.Thread(target=self.reconnect_loop, daemon=True) + self.reconnect_thread.start() + + def stop(self): + with self.conn_lock: + self.on_event(WebSocketEvent.EVENT_CLIENT_SHUTDOWN, "", "") + self.shutdown.set() + self.close() + self.reconnect_thread.join() + self.token_provider.close() def run(self): if not self.keep_alive_thread or not self.keep_alive_thread.is_alive(): - self.keep_alive_thread = threading.Thread(target=self.keep_alive, daemon=True) + self.keep_alive_thread = threading.Thread(target=self.keep_alive_loop, daemon=True) self.keep_alive_thread.start() if not self.write_thread or not self.write_thread.is_alive(): - self.write_thread = threading.Thread(target=self.write_message, daemon=True) + self.write_thread = threading.Thread(target=self.write_loop, daemon=True) self.write_thread.start() - def stop(self): - with self.conn_lock: - self.notify_event(WebSocketEvent.EVENT_CLIENT_SHUTDOWN, "") - self.shutdown.set() - self.close() + if not self.read_thread or not self.read_thread.is_alive(): + self.read_thread = threading.Thread(target=self.read_loop, daemon=True) + self.read_thread.start() + + def write_loop(self): + while not self.close_event.is_set(): + data: WriteMsg = None + try: + data = self.write_msg_queue.get(timeout=1) + self.conn.send(data.msg.to_json()) + logging.debug(f"Message sent: {data.msg}") + data.ts = time.time() + except Empty: + continue + except Exception as e: + if data: + with self.ack_event_lock: + data: WriteMsg = self.ack_event.pop(data.msg.id) + data.set_exception(e) + logging.error(f"Error sending message: {e}") + logging.info("Exiting write loop...") + + def read_loop(self): + while not self.close_event.is_set(): + try: + data: WsMessage = self.read_msg_queue.get(timeout=1) + self.on_message(data) + except Empty: + continue + except Exception as e: + logging.error(f"Error process callback message error: {e}") + logging.info("Exiting read loop...") + + def reconnect_loop(self): + logging.info("Reconnecting loop started") + while not self.shutdown.is_set(): + if self.reconnecting.is_set(): + try: + logging.info("Broken WebSocket connection, starting reconnection") + self.on_event(WebSocketEvent.EVENT_TRY_RECONNECT, "", "") + attempt = 0 + + while True: + if not self.options.reconnect or ( + self.options.reconnect_attempts != -1 and attempt >= self.options.reconnect_attempts): + logging.error("Max reconnect attempts reached or reconnect disabled") + self.on_event(WebSocketEvent.EVENT_CLIENT_FAIL, "", "") + break + + logging.info( + f"Reconnecting in {self.options.reconnect_interval} seconds... (attempt {attempt})") + time.sleep(self.options.reconnect_interval) + try: + with self.conn_lock: + self.close() + self.dial() + self.connected.set() + self.on_event(WebSocketEvent.EVENT_CONNECTED, "", "") + self.run() + self.on_reconnected() + logging.info("Reconnect success") + break + except Exception as err: + logging.error(f"Reconnect attempt {attempt} failed: {err}") + attempt += 1 + + + finally: + self.reconnecting.clear() + time.sleep(1) + logging.info("Exiting read loop...") + + def keep_alive_loop(self): + interval = self.token_info.ping_interval / 1000.0 + timeout = self.token_info.ping_timeout / 1000.0 + last_ping_time = time.time() + + while not self.close_event.is_set(): + current_time = time.time() + if current_time - last_ping_time >= interval: + ping_msg = self.new_ping_message() + try: + self.write(ping_msg, timeout=timeout) + except TimeoutError: + logging.error("Heartbeat ping timeout") + except Exception as e: + logging.error(f"Exception in keep_alive: {e}") + last_ping_time = current_time + time.sleep(1) + logging.info("Exiting keep alive loop...") def dial(self): try: + self.welcome_received.clear() token_infos = self.token_provider.get_token() self.token_info = self.random_endpoint(token_infos) query_params = { @@ -93,17 +186,16 @@ def dial(self): url_str = f"{self.token_info.endpoint}?{urllib.parse.urlencode(query_params)}" self.conn = websocket.WebSocketApp( url_str, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close, - on_open=self.on_open, + on_message=self.on_message_cb, + on_error=self.on_error_cb, + on_close=self.on_close_cb, + on_open=self.on_open_cb, ) if not self.ws_thread or not self.ws_thread.is_alive(): self.ws_thread = threading.Thread(target=self.conn.run_forever, daemon=True) self.ws_thread.start() if not self.welcome_received.wait(timeout=5): self.close() - self.disconnect_event.set() raise Exception("Did not receive welcome message") self.close_event.clear() self.shutdown.clear() @@ -112,28 +204,38 @@ def dial(self): logging.error(f"Failed to connect or validate welcome message: {err}") raise - def on_open(self, ws): + def on_open_cb(self, ws): logging.info("WebSocket connection opened.") - def on_message(self, ws, message): + def on_error_cb(self, ws, error): + logging.error(f"WebSocket error: {error}") + + def on_close_cb(self, ws, close_status_code, close_msg): + logging.info(f"WebSocket closed with status code {close_status_code}, message: {close_msg}") + if self.shutdown.is_set(): + return + self.reconnecting.set() + + def on_message_cb(self, ws, message): if logging.root.level <= logging.DEBUG: - logging.debug(f"Received message: {message}") + logging.debug(f"Received message: {message}") + pass m = WsMessage.from_json(message) if m.type == WsMessageType.WELCOME.value: self.welcome_received.set() logging.info("Welcome message received.") elif m.type == WsMessageType.MESSAGE.value: - self.notify_event(WebSocketEvent.EVENT_MESSAGE_RECEIVED, "") + self.on_event(WebSocketEvent.EVENT_MESSAGE_RECEIVED, "", "") try: - logging.debug(f"queue size: {self.read_msg.qsize()}, max size: {self.read_msg.maxsize}") - self.read_msg.put(m, block=False) + logging.debug(f"queue size: {self.read_msg_queue.qsize()}, max size: {self.read_msg_queue.maxsize}") + self.read_msg_queue.put(m, block=False) except Full: - self.notify_event(WebSocketEvent.EVENT_READ_BUFFER_FULL, "") + self.on_event(WebSocketEvent.EVENT_READ_BUFFER_FULL, "", "") logging.warning("Read buffer full") elif m.type == WsMessageType.PONG.value: - self.notify_event(WebSocketEvent.EVENT_PONG_RECEIVED, "") + self.on_event(WebSocketEvent.EVENT_PONG_RECEIVED, "", "") logging.debug("PONG received") self._handle_ack_event(m) @@ -147,18 +249,15 @@ def _handle_ack_event(self, m: WsMessage): with self.ack_event_lock: data: WriteMsg = self.ack_event.pop(m.id, None) if not data: - logging.warning(f"Cannot find ack event, id: {m.id}") + logging.warning(f"Cannot find ack event, id: {m.id}, error message:{m}") return if m.type == WsMessageType.ERROR.value: error = m.raw_data - self.notify_event(WebSocketEvent.EVENT_ERROR_RECEIVED, error) + self.on_event(WebSocketEvent.EVENT_ERROR_RECEIVED, "", error) data.set_exception(error) else: data.event.set() - def read(self): - return self.read_msg - def write(self, ms: WsMessage, timeout: float) -> WriteMsg: logging.debug(f"Write message: {ms}") if not self.connected.is_set(): @@ -168,7 +267,7 @@ def write(self, ms: WsMessage, timeout: float) -> WriteMsg: with self.ack_event_lock: self.ack_event[ms.id] = msg try: - self.write_msg.put(msg) + self.write_msg_queue.put(msg) except Full: logging.warning(f"Write buffer is full for message ID {ms.id}.") self.ack_event.pop(ms.id, None) @@ -179,154 +278,31 @@ def write(self, ms: WsMessage, timeout: float) -> WriteMsg: finally: return msg - def write_message(self): - while not self.close_event.is_set(): - try: - data: WriteMsg = self.write_msg.get(timeout=1) - if self.conn: - self.conn.send(data.msg.to_json()) - logging.debug(f"Message sent: {data.msg}") - data.ts = time.time() - else: - logging.warning("No connection available to send message.") - except Empty: - continue - except Exception as e: - logging.error(f"Error sending message: {e}") - - def keep_alive(self): - interval = self.token_info.ping_interval / 1000.0 - timeout = self.token_info.ping_timeout / 1000.0 - last_ping_time = time.time() - - while not self.shutdown.is_set() and not self.close_event.is_set(): - current_time = time.time() - if current_time - last_ping_time >= interval: - ping_msg = self.new_ping_message() - try: - self.write(ping_msg, timeout=timeout) - self.metric['ping_success'] += 1 - except TimeoutError: - logging.error("Heartbeat ping timeout") - self.metric['ping_err'] += 1 - except Exception as e: - logging.error(f"Exception in keep_alive: {e}") - self.metric['ping_err'] += 1 - last_ping_time = current_time - - time.sleep(1) - - def on_error(self, ws, error): - logging.error(f"WebSocket error: {error}") - self.disconnect_event.set() - - def on_close(self, ws, close_status_code, close_msg): - logging.info(f"WebSocket closed with status code {close_status_code}, message: {close_msg}") - - def reconnect(self): - def reconnect_loop(): - while True: - if self.reconnect_close_event.wait(timeout=1): - return - - if self.disconnect_event.wait(timeout=1): - if self.shutdown.is_set(): - continue - - logging.info("Broken WebSocket connection, starting reconnection") - self.reconnect_close() - self.notify_event(WebSocketEvent.EVENT_TRY_RECONNECT, "") - self.disconnect_event.clear() - self.welcome_received.clear() - - attempt = 0 - if not self.options.reconnect or ( - self.options.reconnect_attempts != -1 and attempt >= self.options.reconnect_attempts): - logging.error("Max reconnect attempts reached or reconnect disabled") - break - - logging.info( - f"Reconnecting in {self.options.reconnect_interval} seconds... (attempt {attempt})") - time.sleep(self.options.reconnect_interval) - - try: - self.dial() - self.notify_event(WebSocketEvent.EVENT_CONNECTED, "") - self.connected.set() - self.run() - self.reconnected_event.set() - continue - except Exception as err: - logging.error(f"Reconnect attempt {attempt} failed: {err}") - attempt += 1 - - self.notify_event(WebSocketEvent.EVENT_CLIENT_FAIL, "") - logging.error("Failed to reconnect after all attempts.") - - self.reconnect_thread = threading.Thread(target=reconnect_loop) - self.reconnect_thread.start() - - def _clear_message_queues(self): - while not self.read_msg.empty(): - self.read_msg.get_nowait() - while not self.write_msg.empty(): - self.write_msg.get_nowait() - - def reconnect_close(self): - if self.connected.is_set(): - self.shutdown.set() - self.disconnect_event.set() - self.connected.clear() - with self.ack_event_lock: - for msg in self.ack_event.values(): - msg.event.set() - self.ack_event.clear() - self._clear_message_queues() - - if self.conn: - self.conn.close() - self.conn = None - self.close_event.set() - logging.info("WebSocket connection closed. Reconnecting...") - self.token_provider.close() - self.write_thread.join() - self.keep_alive_thread.join() - self.ws_thread.join() - self.notify_event(WebSocketEvent.EVENT_DISCONNECTED, "") - def close(self): if self.connected.is_set(): - self.shutdown.set() - self.disconnect_event.set() - self.reconnect_close_event.set() - self.connected.clear() + self.conn.close() + self.conn = None + logging.info("WebSocket connection closed.") + logging.info("Waiting all threads close...") + self.close_event.set() + self.write_thread.join() + self.read_thread.join() + self.keep_alive_thread.join() + self.ws_thread.join() with self.ack_event_lock: for msg in self.ack_event.values(): - msg.event.set() + msg.set_exception(RuntimeError("connection closed")) self.ack_event.clear() - self._clear_message_queues() - - if self.conn: - self.conn.close() - self.conn = None - self.close_event.set() - logging.info("WebSocket connection closed.") - logging.info("Waiting all threads close...") - self.token_provider.close() - self.write_thread.join() - self.keep_alive_thread.join() - self.ws_thread.join() - self.reconnect_thread.join() - self.notify_event(WebSocketEvent.EVENT_DISCONNECTED, "") - logging.info("WebSocket client closed.") - def notify_event(self, event: WebSocketEvent, msg: str, msg2=""): - try: - if self.options.event_callback is not None: - self.options.event_callback(event, msg, msg2) - except Exception as e: - logging.error(f"Exception in notify_event: {e}") + while not self.read_msg_queue.empty(): + self.read_msg_queue.get_nowait() + while not self.write_msg_queue.empty(): + self.write_msg_queue.get_nowait() + + self.on_event(WebSocketEvent.EVENT_DISCONNECTED, "", "") + self.connected.clear() + logging.info("WebSocket client closed.") def random_endpoint(self, tokens: List[WsToken]) -> Optional[WsToken]: if not tokens: diff --git a/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_service.py b/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_service.py index edfa26b7..b8ab47cd 100644 --- a/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_service.py +++ b/sdk/python/kucoin_universal_sdk/internal/infra/default_ws_service.py @@ -1,7 +1,6 @@ import logging -import threading +import time import uuid -import queue from typing import List from kucoin_universal_sdk.model.client_option import ClientOption @@ -13,48 +12,58 @@ from ..infra.default_ws_callback import TopicManager, CallbackManager from ..infra.default_ws_client import WebSocketClient, WriteMsg from ..infra.default_ws_token_provider import DefaultWsTokenProvider -from ..interfaces.websocket import WebSocketService, WebSocketMessageCallback +from ..interfaces.websocket import WebSocketService, WebSocketMessageCallback, WebsocketTransport from ..util.sub import SubInfo + class DefaultWsService(WebSocketService): - def __init__(self, client_option: ClientOption, domain: DomainType, private: bool, sdk_version:str): + def __init__(self, client_option: ClientOption, domain: DomainType, private: bool, sdk_version: str): self.token_transport = DefaultTransport(client_option, sdk_version) ws_option = client_option.websocket_client_option - self.client = WebSocketClient(DefaultWsTokenProvider(self.token_transport, domain, private), ws_option) + self.client: WebsocketTransport = WebSocketClient(DefaultWsTokenProvider(self.token_transport, domain, private), + ws_option, self.on_reconnected, self.on_message, + self.notify_event) self.topic_manager = TopicManager() self.option = ws_option - self.stop_event = threading.Event() self.private = private - def recovery(self): - def recovery_loop(): - while not self.stop_event.wait(timeout=1): - event_triggered = self.client.reconnected_event.is_set() - if self.stop_event.is_set(): - return - if event_triggered: - logging.info("WebSocket client reconnected, resubscribe...") - if self.client.welcome_received.is_set(): - old_topic_manager = self.topic_manager - self.topic_manager = TopicManager() - old_topic_manager.range(lambda _, value: self._resubscribe(value)) + def on_reconnected(self): + logging.info("WebSocket client reconnected, resubscribe...") + old_topic_manager = self.topic_manager + self.topic_manager = TopicManager() + + pending = list() + old_topic_manager.range(lambda _, value: pending.append(value)) - self.client.reconnected_event.clear() - logging.info("Recovery loop exiting...") + for attempt in range(0, self.option.auto_resubscribe_max_attempts): + if len(pending) == 0: + self.notify_event(WebSocketEvent.EVENT_RE_SUBSCRIBE_OK, "", "") + break - self.recovery_thread = threading.Thread(target=recovery_loop, daemon=True) - self.recovery_thread.start() + logging.info(f"[Attempt {attempt}] Resubscribing {len(pending)} items in 5 seconds...") + time.sleep(5) + failed = [] + for cm in pending: + success = self._resubscribe(cm) + if not success: + failed.append(cm) - def _resubscribe(self, callback_manager: CallbackManager): + pending = failed + if pending: + self.notify_event(WebSocketEvent.EVENT_RE_SUBSCRIBE_ERROR, "", "") + logging.info( + f"Resubscribe failed after {self.option.auto_resubscribe_max_attempts} attempts") + + def _resubscribe(self, callback_manager: CallbackManager) -> bool: sub_info_list = callback_manager.get_sub_info() for sub in sub_info_list: try: - sub_id = self.subscribe(sub.prefix, sub.args, sub.callback) - self.notify_event(WebSocketEvent.EVENT_RE_SUBSCRIBE_OK, sub_id) + self.subscribe(sub.prefix, sub.args, sub.callback) except Exception as err: - self.notify_event(WebSocketEvent.EVENT_RE_SUBSCRIBE_ERROR, f"id: {sub_id}, err: {err}") + return False + return True def start(self): try: @@ -62,58 +71,43 @@ def start(self): except Exception as err: logging.error(f"Failed to start client: {err}") raise - self.run() - self.recovery() - def notify_event(self, event, msg, msg2=""): + def notify_event(self, event: WebSocketEvent, msg: str, error: str): try: if self.option.event_callback: - self.option.event_callback(event, msg, msg2) + self.option.event_callback(event, msg, error) except Exception as e: logging.error(f"Exception in notify_event: {e}") - def run(self): - def message_loop(): - while not self.stop_event.is_set(): - try: - msg: WsMessage = self.client.read().get(timeout=1) - if msg is None: - break - if msg.type != WsMessageType.MESSAGE.value: - continue - - callback_manager = self.topic_manager.get_callback_manager(msg.topic) - if callback_manager is None: - logging.error(f"Cannot find callback manager, id: {msg.id}, topic: {msg.topic}") - continue - - cb = callback_manager.get(msg.topic) - if cb is None: - logging.error(f"Cannot find callback for id: {msg.id}, topic: {msg.topic}") - continue - - try: - cb.on_message(msg) - except Exception as e: - logging.error(f"Exception in callback: {e}") - self.notify_event(WebSocketEvent.EVENT_CALLBACK_ERROR, str(e)) - except queue.Empty: - continue - except Exception as e: - logging.error(f"Error in message loop: {e}") - break - - self.message_thread = threading.Thread(target=message_loop, daemon=True) - self.message_thread.start() + def on_message(self, msg: WsMessage): + if msg is None: + return + if msg.type != WsMessageType.MESSAGE.value: + return + + callback_manager = self.topic_manager.get_callback_manager(msg.topic) + if callback_manager is None: + logging.error(f"Cannot find callback manager, id: {msg.id}, topic: {msg.topic}") + return + + cb = callback_manager.get(msg.topic) + if cb is None: + logging.error(f"Cannot find callback for id: {msg.id}, topic: {msg.topic}") + return + + try: + cb.on_message(msg) + except Exception as e: + logging.error(f"Exception in callback: {e}") + self.notify_event(WebSocketEvent.EVENT_CALLBACK_ERROR, msg.id, str(e)) def stop(self): - self.stop_event.set() logging.info("Closing WebSocket client") self.client.stop() - self.recovery_thread.join() - self.message_thread.join() def subscribe(self, prefix: str, args: List[str], callback: WebSocketMessageCallback) -> str: + callback_manager = None + sub_id = None try: if args is None: args = [] @@ -130,26 +124,26 @@ def subscribe(self, prefix: str, args: List[str], callback: WebSocketMessageCall id=sub_id, type=WsMessageType.SUBSCRIBE.value, topic=sub_info.sub_topic(), - privateChannel=self.private, + private_channel=self.private, response=True ) - try: - data: WriteMsg = self.client.write(sub_event, self.option.write_timeout) - event_triggered = data.event.wait(timeout=data.timeout) - if event_triggered: - logging.info(f"ACK received for message ID {data.msg.id}") - data.event.clear() - else: - logging.warning(f"Timeout for message ID {data.msg.id}") - raise TimeoutError(f"Timeout for message ID {data.msg.id}") - if data.exception is not None: - logging.error(f"ERROR received for message ID {data.msg.id}: {data.exception}") - return sub_id - except Exception as err: - raise + data: WriteMsg = self.client.write(sub_event, self.option.write_timeout) + event_triggered = data.event.wait(timeout=data.timeout) + if event_triggered: + logging.info(f"ACK received for subscribe message, id: {data.msg.id}") + data.event.clear() + else: + logging.warning(f"Timeout for subscribe, id: {data.msg.id}") + raise TimeoutError(f"Timeout for subscribe, id: {data.msg.id}") + if data.exception is not None: + logging.error(f"ERROR received for subscribe, id: {data.msg.id}, exception: {data.exception}") + raise data.exception + return sub_id + except Exception as err: - callback_manager.remove(sub_id) + if callback_manager is not None and sub_id is not None: + callback_manager.remove(sub_id) logging.error(f"Subscribe error: {sub_id}, error: {err}") raise @@ -160,14 +154,24 @@ def unsubscribe(self, sub_id: str): sub_event = WsMessage( id=str(uuid.uuid4()), - msg_type=WsMessageType.UNSUBSCRIBE.value, + type=WsMessageType.UNSUBSCRIBE.value, topic=sub_info.sub_topic(), private_channel=self.private, response=True ) try: - self.client.write(sub_event, self.option.write_timeout) + data: WriteMsg = self.client.write(sub_event, self.option.write_timeout) + event_triggered = data.event.wait(timeout=data.timeout) + if event_triggered: + logging.info(f"ACK received for unsubscribe, id: {sub_id}") + data.event.clear() + else: + logging.warning(f"Timeout for unsubscribe, id: {sub_id}") + raise TimeoutError(f"Timeout for unsubscribe, id: {sub_id}") + if data.exception is not None: + logging.error(f"Error received for unsubscribe, id: {sub_id}, exception: {data.exception}") + raise data.exception callback_manager.remove(sub_id) logging.info(f"Unsubscribe success: {sub_id}") except Exception as err: diff --git a/sdk/python/kucoin_universal_sdk/internal/interfaces/websocket.py b/sdk/python/kucoin_universal_sdk/internal/interfaces/websocket.py index 7c9611d5..5520a64a 100644 --- a/sdk/python/kucoin_universal_sdk/internal/interfaces/websocket.py +++ b/sdk/python/kucoin_universal_sdk/internal/interfaces/websocket.py @@ -1,3 +1,5 @@ +import threading +import time from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional @@ -17,6 +19,33 @@ def on_message(self, message: WsMessage): pass +class WriteMsg: + def __init__(self, msg: WsMessage, timeout: float): + self.msg = msg + self.ts = time.time() + self.timeout = timeout + self.exception = None + self.event = threading.Event() + + def set_exception(self, exception: Exception): + self.exception = exception + self.event.set() + + +class WebsocketTransport(ABC): + @abstractmethod + def start(self): + pass + + @abstractmethod + def stop(self): + pass + + @abstractmethod + def write(self, ms: WsMessage, timeout: float) -> WriteMsg: + pass + + class WebSocketService(ABC): @abstractmethod def start(self): @@ -64,11 +93,13 @@ class WsToken(BaseModel): WsToken holds the token and API endpoint for a WebSocket connection. """ token: Optional[str] = Field(default=None, description="The token for authentication") - ping_interval: Optional[int] = Field(default=None, alias="pingInterval", description="Interval between ping messages (in milliseconds)") + ping_interval: Optional[int] = Field(default=None, alias="pingInterval", + description="Interval between ping messages (in milliseconds)") endpoint: Optional[str] = Field(default=None, description="The WebSocket API endpoint") protocol: Optional[str] = Field(default=None, description="Protocol used for WebSocket connection") encrypt: Optional[bool] = Field(default=None, description="Indicates if the connection is encrypted") - ping_timeout: Optional[int] = Field(default=None, alias="pingTimeout", description="Ping timeout duration (in milliseconds)") + ping_timeout: Optional[int] = Field(default=None, alias="pingTimeout", + description="Ping timeout duration (in milliseconds)") @classmethod def from_dict(cls, obj: Optional[Dict[str, Any]]): @@ -80,20 +111,21 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]): _obj = cls.model_validate({ "token": - obj.get("token"), + obj.get("token"), "pingInterval": - obj.get("pingInterval"), + obj.get("pingInterval"), "endpoint": - obj.get("endpoint"), + obj.get("endpoint"), "protocol": - obj.get("protocol"), + obj.get("protocol"), "encrypt": - obj.get("encrypt"), + obj.get("encrypt"), "pingTimeout": - obj.get("pingTimeout") + obj.get("pingTimeout") }) return _obj + class WsTokenProvider(ABC): @abstractmethod def get_token(self) -> List[WsToken]: @@ -103,7 +135,7 @@ def get_token(self) -> List[WsToken]: :return: list of WsToken objects. """ pass - + @abstractmethod def close(self): """ diff --git a/sdk/python/kucoin_universal_sdk/model/common.py b/sdk/python/kucoin_universal_sdk/model/common.py index 99fd4f59..3dc42285 100644 --- a/sdk/python/kucoin_universal_sdk/model/common.py +++ b/sdk/python/kucoin_universal_sdk/model/common.py @@ -127,50 +127,20 @@ class WsMessage(BaseModel): response: Optional[bool] = Field(default=None, description="Indicates if the message is a response.") raw_data: Any = Field(default=None, alias="data", description="Raw message data") - def to_json(self) -> str: - """ - Converts the WebSocket message to JSON string format. + model_config = { + "populate_by_name": True + } - :return: JSON string representation of the message - """ - return json.dumps({ - "id": self.id, - "type": self.type, - "sn": self.sn, - "topic": self.topic, - "subject": self.subject, - "privateChannel": self.private_channel, - "response": self.response, - "data": self.raw_data - }) + def to_json(self) -> str: + return self.model_dump_json(by_alias=True, exclude_none=True) @classmethod - def from_json(cls, json_str: str) -> Optional[WsMessage]: - """ - Converts a JSON string to a WsMessage instance. - - :param json_str: JSON string representation of the message - :return: WsMessage instance or None if the input is invalid - """ - return cls.from_dict(json.loads(json_str)) + def from_json(cls, json_str: str) -> Optional["WsMessage"]: + data = json.loads(json_str) + return cls.model_validate(data) @classmethod - def from_dict( - cls, obj: Optional[Dict[str, Any]]) -> Optional[WsMessage]: + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional["WsMessage"]: if obj is None: return None - - if not isinstance(obj, dict): - return cls.model_validate(obj) - - _obj = cls.model_validate({ - "id": obj.get("id"), - "type": obj.get("type"), - "sn": obj.get("sn"), - "topic": obj.get("topic"), - "subject": obj.get("subject"), - "privateChannel": obj.get("privateChannel"), - "response": obj.get("response"), - "data": obj.get("data"), - }) - return _obj + return cls.model_validate(obj) diff --git a/sdk/python/kucoin_universal_sdk/model/websocket_option.py b/sdk/python/kucoin_universal_sdk/model/websocket_option.py index 0c4e7e35..7c697fae 100644 --- a/sdk/python/kucoin_universal_sdk/model/websocket_option.py +++ b/sdk/python/kucoin_universal_sdk/model/websocket_option.py @@ -1,7 +1,7 @@ from __future__ import annotations from enum import Enum, auto -from typing import Callable, Any, Optional +from typing import Callable, Optional class WebSocketEvent(Enum): @@ -58,12 +58,12 @@ def __init__(self, reconnect: bool = True, reconnect_attempts: int = -1, reconnect_interval: float = 5.0, - token_renew_interval: float = 6 * 60 * 60, # 6 hours in seconds dial_timeout: float = 10.0, read_message_buffer: int = 1024, write_message_buffer: int = 256, write_timeout: float = 5.0, - event_callback: Optional[WebSocketCallback] = None): + event_callback: Optional[WebSocketCallback] = None, + auto_resubscribe_max_attempts: int = 3): """ Initializes WebSocket client options for managing connection behavior and settings. @@ -76,6 +76,7 @@ def __init__(self, write_message_buffer (int): Buffer size for writing messages in the queue. Defaults to 256. write_timeout (int): Timeout for sending messages in seconds. Defaults to 5.0. event_callback (Optional[WebSocketCallback]): A callback function to handle WebSocket events. Defaults to None. + auto_resubscribe_max_attempts(int): Maximum number of retry attempts for automatic resubscription upon failure. After exceeding this limit, failed items will not be retried further. """ self.reconnect = reconnect @@ -86,6 +87,7 @@ def __init__(self, self.write_message_buffer = write_message_buffer self.write_timeout = write_timeout self.event_callback = event_callback + self.auto_resubscribe_max_attempts = auto_resubscribe_max_attempts class WebSocketClientOptionBuilder: @@ -127,5 +129,9 @@ def with_event_callback(self, callback: WebSocketCallback) -> WebSocketClientOpt self.option.event_callback = callback return self + def with_auto_resubscribe_max_attempts(self, auto_resubscribe_max_attempts: int) -> WebSocketClientOptionBuilder: + self.option.auto_resubscribe_max_attempts = auto_resubscribe_max_attempts + return self + def build(self) -> WebSocketClientOption: return self.option diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 075ed990..83ec041e 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -2,7 +2,7 @@ setup( name="kucoin-universal-sdk", - version="1.2.0", + version="1.2.1", description="Official KuCoin Universal SDK", author="KuCoin", author_email="api@kucoin.com",