11"""Module that contains Theta Client class."""
2+ import datetime
23import struct
4+ import threading
35from decimal import Decimal
46from threading import Thread
57from time import sleep
2123from .terminal import check_download , launch_terminal
2224
2325_NOT_CONNECTED_MSG = "You must establish a connection first."
24- _VERSION = '0.7.7 '
26+ _VERSION = '0.7.8 '
2527
2628
2729def _format_strike (strike : float ) -> int :
@@ -193,6 +195,7 @@ def __init__(self):
193195 self .quote = Quote ()
194196 self .open_interest = OpenInterest ()
195197 self .contract = Contract ()
198+ self .date = None
196199
197200
198201class ThetaClient :
@@ -223,7 +226,11 @@ def __init__(self, port: int = 11000, timeout: Optional[float] = 60, launch: boo
223226 self ._stream_server : Optional [socket .socket ] = None # None while disconnected
224227 self .launch = launch
225228 self ._stream_impl = None
229+ self ._stream_responses = {}
230+ self ._counter_lock = threading .Lock ()
231+ self ._stream_req_id = 0
226232
233+ print ('If you require API support, feel free to join our discord server! https://discord.thetadata.us' )
227234 if launch :
228235 if username == "default" or passwd == "default" :
229236 print ('------------------------------------------------------------------------------------------------' )
@@ -290,56 +297,123 @@ def connect_stream(self, callback):
290297 def close_stream (self ):
291298 self ._stream_server .close ()
292299
293- def req_full_trade_stream_opt (self ):
300+ def req_full_trade_stream_opt (self , timeout : int = 5 ):
294301 """from_bytes
295302 """
296303 assert self ._stream_server is not None , _NOT_CONNECTED_MSG
297304
305+ with self ._counter_lock :
306+ req_id = self ._stream_req_id
307+ self ._stream_responses [req_id ] = None
308+ self ._stream_req_id += 1
309+
298310 # send request
299- hist_msg = f"MSG_CODE={ MessageType .STREAM_REQ .value } &sec={ SecType .OPTION .value } &req={ OptionReqType .TRADE .value } \n "
311+ hist_msg = f"MSG_CODE={ MessageType .STREAM_REQ .value } &sec={ SecType .OPTION .value } " \
312+ f"&req={ OptionReqType .TRADE .value } &id={ req_id } \n "
300313 self ._stream_server .sendall (hist_msg .encode ("utf-8" ))
301314
302- def req_full_open_interest_stream (self ):
315+ tries = 0
316+ lim = timeout * 100
317+ while self ._stream_responses [req_id ] is None : # This is kind of dumb.
318+ sleep (.01 )
319+ tries += 1
320+ if tries >= lim :
321+ return StreamResponseType .TIMED_OUT
322+
323+ if self ._stream_responses [req_id ] is not StreamResponseType .SUBSCRIBED :
324+ raise PermissionError ("Invalid permissions for stream request: " + self ._stream_responses [req_id ].name )
325+
326+ def req_full_open_interest_stream (self , timeout : int = 5 ):
303327 """from_bytes
304328 """
305329 assert self ._stream_server is not None , _NOT_CONNECTED_MSG
306330
331+ with self ._counter_lock :
332+ req_id = self ._stream_req_id
333+ self ._stream_responses [req_id ] = None
334+ self ._stream_req_id += 1
335+
307336 # send request
308337 hist_msg = f"MSG_CODE={ MessageType .STREAM_REQ .value } &sec={ SecType .OPTION .value } " \
309- f"&req={ OptionReqType .OPEN_INTEREST .value } \n "
338+ f"&req={ OptionReqType .OPEN_INTEREST .value } &id= { req_id } \n "
310339 self ._stream_server .sendall (hist_msg .encode ("utf-8" ))
311340
312- def req_trade_stream_opt (self , root : str , exp : date , strike : float , right : OptionRight ):
341+ tries = 0
342+ lim = timeout * 100
343+ while self ._stream_responses [req_id ] is None : # This is kind of dumb.
344+ sleep (.01 )
345+ tries += 1
346+ if tries >= lim :
347+ return StreamResponseType .TIMED_OUT
348+
349+ if self ._stream_responses [req_id ] is not StreamResponseType .SUBSCRIBED :
350+ raise PermissionError ("Invalid permissions for stream request: " + self ._stream_responses [req_id ].name )
351+
352+ def req_trade_stream_opt (self , root : str , exp : date = 0 , strike : float = 0 , right : OptionRight = 'C' , timeout : int = 5 ):
313353 """from_bytes
314354 """
315355 assert self ._stream_server is not None , _NOT_CONNECTED_MSG
316356 # format data
317357 strike = _format_strike (strike )
318358 exp_fmt = _format_date (exp )
319359
360+ with self ._counter_lock :
361+ req_id = self ._stream_req_id
362+ self ._stream_responses [req_id ] = None
363+ self ._stream_req_id += 1
364+
320365 # send request
321366 hist_msg = f"MSG_CODE={ MessageType .STREAM_REQ .value } &root={ root } &exp={ exp_fmt } &strike={ strike } " \
322- f"&right={ right .value } &sec={ SecType .OPTION .value } &req={ OptionReqType .TRADE .value } \n "
367+ f"&right={ right .value } &sec={ SecType .OPTION .value } &req={ OptionReqType .TRADE .value } &id= { req_id } \n "
323368 self ._stream_server .sendall (hist_msg .encode ("utf-8" ))
324369
325- def req_quote_stream_opt (self , root : str , exp : date , strike : float , right : OptionRight ):
370+ tries = 0
371+ lim = timeout * 100
372+ while self ._stream_responses [req_id ] is None : # This is kind of dumb.
373+ sleep (.01 )
374+ tries += 1
375+ if tries >= lim :
376+ return StreamResponseType .TIMED_OUT
377+
378+ if self ._stream_responses [req_id ] is not StreamResponseType .SUBSCRIBED :
379+ raise PermissionError ("Invalid permissions for stream request: " + self ._stream_responses [req_id ].name )
380+
381+ def req_quote_stream_opt (self , root : str , exp : date = 0 , strike : float = 0 , right : OptionRight = 'C' , timeout : int = 5 ):
326382 """from_bytes
327383 """
328384 assert self ._stream_server is not None , _NOT_CONNECTED_MSG
329385 # format data
330386 strike = _format_strike (strike )
331387 exp_fmt = _format_date (exp )
332388
389+ with self ._counter_lock :
390+ req_id = self ._stream_req_id
391+ self ._stream_responses [req_id ] = None
392+ self ._stream_req_id += 1
393+
333394 # send request
334395 hist_msg = f"MSG_CODE={ MessageType .STREAM_REQ .value } &root={ root } &exp={ exp_fmt } &strike={ strike } " \
335- f"&right={ right .value } &sec={ SecType .OPTION .value } &req={ OptionReqType .QUOTE .value } \n "
396+ f"&right={ right .value } &sec={ SecType .OPTION .value } &req={ OptionReqType .QUOTE .value } &id= { req_id } \n "
336397 self ._stream_server .sendall (hist_msg .encode ("utf-8" ))
337398
399+ tries = 0
400+ lim = timeout * 100
401+ while self ._stream_responses [req_id ] is None : # This is kind of dumb.
402+ sleep (.01 )
403+ tries += 1
404+ if tries >= lim :
405+ return StreamResponseType .TIMED_OUT
406+
407+ if self ._stream_responses [req_id ] is not StreamResponseType .SUBSCRIBED :
408+ raise PermissionError ("Invalid permissions for stream request: " + self ._stream_responses [req_id ].name )
409+
338410 def _recv_stream (self ):
339411 """from_bytes
340412 """
341413 msg = StreamMsg ()
414+
342415 parse_int = lambda d : int .from_bytes (d , "big" )
416+
343417 while True :
344418 msg .type = StreamMsgType .from_code (parse_int (self ._read_stream (1 )[:1 ]))
345419 msg .contract .from_bytes (self ._read_stream (parse_int (self ._read_stream (1 )[:1 ])))
@@ -351,11 +425,20 @@ def _recv_stream(self):
351425 msg .trade .from_bytes (data )
352426 elif msg .type == StreamMsgType .PING :
353427 self ._read_stream (n_bytes = 4 )
428+ continue
354429 elif msg .type == StreamMsgType .OPEN_INTEREST :
355430 data = self ._read_stream (n_bytes = 8 )
356431 msg .open_interest .from_bytes (data )
357- else :
432+ elif msg .type == StreamMsgType .REQ_RESPONSE :
433+ msg_id = parse_int (self ._read_stream (4 ))
434+ msg_rep = StreamResponseType .from_code (parse_int (self ._read_stream (4 )))
435+ self ._stream_responses [msg_id ] = msg_rep
358436 continue
437+ elif msg .type == StreamMsgType .STOP or msg .type == StreamMsgType .START :
438+ msg .date = datetime .strptime (str (parse_int (self ._read_stream (4 ))), "%Y%m%d" ).date ()
439+ else :
440+ raise ValueError ('undefined msg type' )
441+
359442 self ._stream_impl (msg )
360443
361444 def _read_stream (self , n_bytes : int ) -> bytearray :
0 commit comments