Skip to content

Common core.py and session.py file added to handle and process protoc… #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/nasdaq_protocols/itch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from nasdaq_protocols import soup

from .core import (
ItchMessageId,
Message
)
from .session import (
Expand All @@ -15,7 +14,6 @@


__all__ = [
'ItchMessageId',
'Message',
'OnItchMessageCoro',
'OnItchCloseCoro',
Expand Down
32 changes: 4 additions & 28 deletions src/nasdaq_protocols/itch/core.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,17 @@
import attrs
from nasdaq_protocols.common import Serializable, Byte, CommonMessage, logable
from nasdaq_protocols.soup_app import SoupAppMessage
from nasdaq_protocols.common import logable


__all__ = [
'ItchMessageId',
'Message'
]
APP_NAME = 'ITCH'


@attrs.define(auto_attribs=True, hash=True)
class ItchMessageId(Serializable):
indicator: int

@classmethod
def from_bytes(cls, bytes_: bytes) -> tuple[int, 'ItchMessageId']:
return 1, ItchMessageId(Byte.from_bytes(bytes_)[1])

def to_bytes(self) -> tuple[int, bytes]:
return Byte.to_bytes(self.indicator)

def __str__(self):
return f'indicator={self.indicator}'


@attrs.define
@logable
class Message(CommonMessage, msg_id_cls=ItchMessageId, app_name=APP_NAME):
def __init_subclass__(cls, *args, **kwargs):
cls.log.debug('itch.core.Message subclassing %s, params = %s', cls.__name__, str(kwargs))

if 'app_name' not in kwargs:
kwargs['app_name'] = APP_NAME

kwargs['msg_id_cls'] = ItchMessageId

if 'indicator' in kwargs:
kwargs['msg_id'] = ItchMessageId(kwargs['indicator'])
class Message(SoupAppMessage, app_name=APP_NAME):

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
73 changes: 13 additions & 60 deletions src/nasdaq_protocols/itch/session.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,40 @@
import asyncio
from typing import Callable, Type, Awaitable
from typing import Callable, Awaitable, Type

import attrs
from nasdaq_protocols.common import DispatchableMessageQueue, logable
from nasdaq_protocols import soup
from nasdaq_protocols.soup_app.session import BaseClientSession, SessionId
from .core import Message


__all__ = [
'OnItchMessageCoro',
'OnItchCloseCoro',
'ItchSessionId',
'ClientSession'
]

OnItchMessageCoro = Callable[[Type[Message]], Awaitable[None]]
OnItchCloseCoro = Callable[[], Awaitable[None]]


@attrs.define(auto_attribs=True)
class ItchSessionId:
class ItchSessionId(SessionId):
soup_session_id: soup.SoupSessionId = None

def __str__(self):
if self.soup_session_id:
return f'itch-{self.soup_session_id}'
return 'itch-nosoup'
protocol_name: str = "itch"


@attrs.define(auto_attribs=True)
@logable
class ClientSession:
soup_session: soup.SoupClientSession
on_msg_coro: OnItchMessageCoro = None
on_close_coro: OnItchCloseCoro = None
closed: bool = False
_session_id: ItchSessionId = None
_close_event: asyncio.Event = None
_message_queue: DispatchableMessageQueue = None

def __attrs_post_init__(self):
self._session_id = ItchSessionId(self.soup_session.session_id)
self._message_queue = DispatchableMessageQueue(self._session_id, self.on_msg_coro)
self.soup_session.set_handlers(on_msg_coro=self._on_soup_message, on_close_coro=self._on_soup_close)
self.soup_session.start_dispatching()

async def receive_message(self):
"""
Asynchronously receive a message from the itch session.

This method blocks until a message is received by the session.
"""
return await self._message_queue.get()

async def close(self):
"""
Asynchronously close the itch session.
"""
if self._close_event or self.closed:
self.log.debug('%s> closing in progress..', self._session_id)
return
self._close_event = asyncio.Event()
self.soup_session.initiate_close()
await self._close_event.wait()
self.log.debug('%s> closed.', self._session_id)
class ClientSession(BaseClientSession):
"""ITCH protocol client session implementation."""

async def _on_soup_message(self, message: soup.SoupMessage):
if isinstance(message, soup.SequencedData):
self.log.debug('%s> incoming sequenced bytes_', self._session_id)
await self._message_queue.put(
self.decode(message.data)[1]
)
def _create_session_id(self):
return ItchSessionId(self.soup_session.session_id)

async def _on_soup_close(self):
await self._message_queue.stop()
if self.on_close_coro is not None:
await self.on_close_coro()
if self._close_event:
self._close_event.set()
self.closed = True
def send_message(self, msg: Message):
raise NotImplementedError("ITCH protocol does not support sending messages")

@classmethod
def decode(cls, bytes_: bytes):
def decode(cls, bytes_: bytes): # pylint: disable=W0221
"""
Decode the given bytes into an itch message.
Decode the given bytes into an ITCH message.
"""
return Message.from_bytes(bytes_)
2 changes: 0 additions & 2 deletions src/nasdaq_protocols/ouch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from nasdaq_protocols import soup

from .core import (
OuchMessageId,
Message
)
from .session import (
Expand All @@ -15,7 +14,6 @@


__all__ = [
'OuchMessageId',
'Message',
'OnOuchMessageCoro',
'OnOuchCloseCoro',
Expand Down
40 changes: 4 additions & 36 deletions src/nasdaq_protocols/ouch/core.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,17 @@
from abc import ABC

import attrs
from nasdaq_protocols.common import Serializable, Byte, CommonMessage, logable
from nasdaq_protocols.soup_app import SoupAppMessage
from nasdaq_protocols.common import logable


__all__ = [
'OuchMessageId',
'Message'
]
APP_NAME = 'OUCH'


@attrs.define(auto_attribs=True, hash=True)
class OuchMessageId(Serializable, ABC):
indicator: int
direction: str = 'outgoing'

def to_bytes(self) -> tuple[int, bytes]:
return Byte.to_bytes(self.indicator)

@classmethod
def from_bytes(cls, bytes_: bytes) -> tuple[int, 'OuchMessageId']:
return 1, OuchMessageId(Byte.from_bytes(bytes_)[1])

def __str__(self):
return f'indicator={self.indicator}, direction={self.direction}'


@attrs.define
@logable
class Message(CommonMessage, msg_id_cls=OuchMessageId, app_name=APP_NAME):

IncomingMsgClasses = []
OutgoingMsgsClasses = []

def __init_subclass__(cls, *args, **kwargs):
cls.log.debug('ouch.core.Message subclassing %s, params = %s', cls.__name__, str(kwargs))

if 'app_name' not in kwargs:
kwargs['app_name'] = APP_NAME

kwargs['msg_id_cls'] = OuchMessageId
class Message(SoupAppMessage, app_name=APP_NAME):

if all(k in kwargs for k in ['direction', 'indicator']):
kwargs['msg_id'] = OuchMessageId(kwargs['indicator'], kwargs['direction'])
container = cls.IncomingMsgClasses if kwargs['direction'] == 'incoming' else cls.OutgoingMsgsClasses
container.append(cls)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
73 changes: 9 additions & 64 deletions src/nasdaq_protocols/ouch/session.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,35 @@
import asyncio
from typing import Callable, Type, Awaitable
from typing import Callable, Awaitable, Type

import attrs
from nasdaq_protocols.common import DispatchableMessageQueue, logable
from nasdaq_protocols import soup
from nasdaq_protocols.soup_app.session import BaseClientSession, SessionId
from .core import Message


__all__ = [
'OnOuchMessageCoro',
'OnOuchCloseCoro',
'OuchSessionId',
'ClientSession'
]

OnOuchMessageCoro = Callable[[Type[Message]], Awaitable[None]]
OnOuchCloseCoro = Callable[[], Awaitable[None]]


@attrs.define(auto_attribs=True)
class OuchSessionId:
class OuchSessionId(SessionId):
soup_session_id: soup.SoupSessionId = None

def __str__(self):
if self.soup_session_id:
return f'ouch-{self.soup_session_id}'
return 'ouch-nosoup'
protocol_name: str = "ouch"


@attrs.define(auto_attribs=True)
@logable
class ClientSession:
soup_session: soup.SoupClientSession
on_msg_coro: OnOuchMessageCoro = None
on_close_coro: OnOuchCloseCoro = None
closed: bool = False
_session_id: OuchSessionId = None
_close_event: asyncio.Event = None
_message_queue: DispatchableMessageQueue = None

def __attrs_post_init__(self):
self._session_id = OuchSessionId(self.soup_session.session_id)
self._message_queue = DispatchableMessageQueue(self._session_id, self.on_msg_coro)
self.soup_session.set_handlers(on_msg_coro=self._on_soup_message, on_close_coro=self._on_soup_close)
self.soup_session.start_dispatching()

async def receive_message(self):
"""
Asynchronously receive a message from the ouch session.

This method blocks until a message is received by the session.
"""
return await self._message_queue.get()

def send_message(self, msg: Message):
"""
Send a message to the Ouch Server.
"""
self.soup_session.send_unseq_data(msg.to_bytes()[1])

async def close(self):
"""
Asynchronously close the ouch session.
"""
if self._close_event or self.closed:
self.log.debug('%s> closing in progress..', self._session_id)
return
self._close_event = asyncio.Event()
self.soup_session.initiate_close()
await self._close_event.wait()
self.log.debug('%s> closed.', self._session_id)

async def _on_soup_message(self, message: soup.SoupMessage):
if isinstance(message, soup.SequencedData):
self.log.debug('%s> incoming sequenced bytes_', self._session_id)
await self._message_queue.put(self.decode(message.data)[1])
class ClientSession(BaseClientSession):

async def _on_soup_close(self):
await self._message_queue.stop()
if self.on_close_coro is not None:
await self.on_close_coro()
if self._close_event:
self._close_event.set()
self.closed = True
def _create_session_id(self):
return OuchSessionId(self.soup_session.session_id)

def decode(self, bytes_: bytes):
"""
Decode the given bytes into an itch message.
Decode the given bytes into an OUCH message.
"""
return Message.from_bytes(bytes_)
1 change: 0 additions & 1 deletion src/nasdaq_protocols/soup/_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ def deserialize(self) -> Any:

_, msg = SoupMessage.from_bytes(self._buffer[:siz + 2])
self._buffer = self._buffer[siz + 2:]
buff_len -= (siz+2)

return msg, msg.is_logout(), msg.is_heartbeat()
9 changes: 9 additions & 0 deletions src/nasdaq_protocols/soup_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .core import (
SoupAppMessageId,
SoupAppMessage
)

__all__ = [
'SoupAppMessageId',
'SoupAppMessage'
]
43 changes: 43 additions & 0 deletions src/nasdaq_protocols/soup_app/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import attrs
from nasdaq_protocols.common import Serializable, Byte, CommonMessage, logable


__all__ = [
'SoupAppMessageId',
'SoupAppMessage'
]


@attrs.define(auto_attribs=True, hash=True)
class SoupAppMessageId(Serializable):
indicator: int
direction: str = 'outgoing'

@classmethod
def from_bytes(cls, bytes_: bytes) -> tuple[int, 'SoupAppMessageId']:
return 1, cls(Byte.from_bytes(bytes_)[1])

def to_bytes(self) -> tuple[int, bytes]:
return Byte.to_bytes(self.indicator)

def __str__(self):
return f'indicator={self.indicator}, direction={self.direction}'


@attrs.define
@logable
class SoupAppMessage(CommonMessage):
IncomingMsgClasses = []
OutgoingMsgsClasses = []

def __init_subclass__(cls, *args, **kwargs):
cls.log.debug('%s subclassing %s, params = %s', cls.__mro__[1].__name__, cls.__name__, str(kwargs))

app_name = kwargs.get('app_name')
kwargs['app_name'] = app_name
kwargs['msg_id_cls'] = SoupAppMessageId

if 'indicator' in kwargs and 'direction' in kwargs:
kwargs['msg_id'] = SoupAppMessageId(kwargs['indicator'], kwargs['direction'])

super().__init_subclass__(**kwargs)
Loading