2323from .terminal import check_download , launch_terminal
2424
2525_NOT_CONNECTED_MSG = "You must establish a connection first."
26- _VERSION = '0.8.8 '
26+ _VERSION = '0.8.9 '
2727
2828
2929def _format_strike (strike : float ) -> int :
@@ -224,6 +224,7 @@ def to_string(self) -> str:
224224class StreamMsg :
225225 """Stream Msg"""
226226 def __init__ (self ):
227+ self .client = None
227228 self .type = StreamMsgType .ERROR
228229 self .req_response = None
229230 self .req_response_id = None
@@ -313,14 +314,15 @@ def connect(self):
313314 finally :
314315 self ._server .close ()
315316
316- def connect_stream (self , callback ):
317+ def connect_stream (self , callback ) -> Thread :
317318 """Initiate a connection with the Theta Terminal Stream server.
318319 Requests can only be made inside this generator aka the `with client.connect_stream()` block.
319320 Responses to the provided callback method are recycled, meaning that if you send data received
320321 in the callback method to another thread, you must create a copy of it first.
321322
322323 :raises ConnectionRefusedError: If the connection failed.
323324 :raises TimeoutError: If the timeout is set and has been reached.
325+ :return: The thread that is responsible for receiving messages.
324326 """
325327 for i in range (15 ):
326328 try :
@@ -335,7 +337,9 @@ def connect_stream(self, callback):
335337 sleep (1 )
336338 self ._stream_server .settimeout (self .timeout )
337339 self ._stream_impl = callback
338- Thread (target = self ._recv_stream ).start ()
340+ out = Thread (target = self ._recv_stream )
341+ out .start ()
342+ return out
339343
340344 def close_stream (self ):
341345 self ._stream_server .close ()
@@ -481,39 +485,43 @@ def _recv_stream(self):
481485 """from_bytes
482486 """
483487 msg = StreamMsg ()
484-
488+ msg . client = self
485489 parse_int = lambda d : int .from_bytes (d , "big" )
486490
487491 while True :
488- msg .type = StreamMsgType .from_code (parse_int (self ._read_stream (1 )[:1 ]))
489- msg .contract .from_bytes (self ._read_stream (parse_int (self ._read_stream (1 )[:1 ])))
490-
491- if msg .type == StreamMsgType .QUOTE :
492- msg .quote .from_bytes (self ._read_stream (44 ))
493- elif msg .type == StreamMsgType .TRADE :
494- data = self ._read_stream (n_bytes = 32 )
495- msg .trade .from_bytes (data )
496- elif msg .type == StreamMsgType .OHLCVC :
497- data = self ._read_stream (n_bytes = 36 )
498- msg .ohlcvc .from_bytes (data )
499- elif msg .type == StreamMsgType .PING :
500- self ._read_stream (n_bytes = 4 )
501- continue
502- elif msg .type == StreamMsgType .OPEN_INTEREST :
503- data = self ._read_stream (n_bytes = 8 )
504- msg .open_interest .from_bytes (data )
505- elif msg .type == StreamMsgType .REQ_RESPONSE :
506- msg .req_response_id = parse_int (self ._read_stream (4 ))
507- msg .req_response = StreamResponseType .from_code (parse_int (self ._read_stream (4 )))
508- self ._stream_responses [msg .req_response_id ] = msg .req_response
509- elif msg .type == StreamMsgType .STOP or msg .type == StreamMsgType .START :
510- msg .date = datetime .strptime (str (parse_int (self ._read_stream (4 ))), "%Y%m%d" ).date ()
511- elif msg .type == StreamMsgType .DISCONNECTED or msg .type == StreamMsgType .RECONNECTED :
512- self ._read_stream (4 ) # Future use.
513- else :
514- raise ValueError ('undefined msg type: ' + str (msg .type ))
515-
516- self ._stream_impl (msg )
492+ try :
493+ msg .type = StreamMsgType .from_code (parse_int (self ._read_stream (1 )[:1 ]))
494+ msg .contract .from_bytes (self ._read_stream (parse_int (self ._read_stream (1 )[:1 ])))
495+ if msg .type == StreamMsgType .QUOTE :
496+ msg .quote .from_bytes (self ._read_stream (44 ))
497+ elif msg .type == StreamMsgType .TRADE :
498+ data = self ._read_stream (n_bytes = 32 )
499+ msg .trade .from_bytes (data )
500+ elif msg .type == StreamMsgType .OHLCVC :
501+ data = self ._read_stream (n_bytes = 36 )
502+ msg .ohlcvc .from_bytes (data )
503+ elif msg .type == StreamMsgType .PING :
504+ self ._read_stream (n_bytes = 4 )
505+ continue
506+ elif msg .type == StreamMsgType .OPEN_INTEREST :
507+ data = self ._read_stream (n_bytes = 8 )
508+ msg .open_interest .from_bytes (data )
509+ elif msg .type == StreamMsgType .REQ_RESPONSE :
510+ msg .req_response_id = parse_int (self ._read_stream (4 ))
511+ msg .req_response = StreamResponseType .from_code (parse_int (self ._read_stream (4 )))
512+ self ._stream_responses [msg .req_response_id ] = msg .req_response
513+ elif msg .type == StreamMsgType .STOP or msg .type == StreamMsgType .START :
514+ msg .date = datetime .strptime (str (parse_int (self ._read_stream (4 ))), "%Y%m%d" ).date ()
515+ elif msg .type == StreamMsgType .DISCONNECTED or msg .type == StreamMsgType .RECONNECTED :
516+ self ._read_stream (4 ) # Future use.
517+ else :
518+ raise ValueError ('undefined msg type: ' + str (msg .type ))
519+
520+ self ._stream_impl (msg )
521+ except ConnectionResetError :
522+ msg .type = StreamMsgType .STREAM_DEAD
523+ self ._stream_impl (msg )
524+ return
517525
518526 def _read_stream (self , n_bytes : int ) -> bytearray :
519527 """from_bytes
0 commit comments