Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion clamor/gateway/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .connector import *
from .exceptions import *
from .opcodes import *
from .emitter import *
51 changes: 31 additions & 20 deletions clamor/gateway/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import anyio

from .encoding import ENCODERS
from .emitter import Emitter
from clamor.utils import Emitter
from .opcodes import opcodes
from .exceptions import *

Expand Down Expand Up @@ -65,9 +65,14 @@ class DiscordWebsocketClient:
def __init__(self, url: str, **kwargs):
self.url = url
self.encoder = ENCODERS[kwargs.get('encoding', 'json')]
self.zlib_compressed = kwargs.get('zlib_compressed', False)
self.zlib_compressed = kwargs.get('zlib_compressed', True)
self.emitter = Emitter()

# Compression
if self.zlib_compressed:
self.buffer = bytearray()
self.inflator = zlib.decompressobj()

# Websocket connection
self._con = None
self._running = False
Expand All @@ -86,8 +91,9 @@ def __init__(self, url: str, **kwargs):
self._session_id = 0
self._token = ""

self.emitter.add_listener('HELLO', self._on_hello)
self.emitter.add_listener('HEARTBEAT_ACK', self._on_heartbeat_ack)
self.emitter.add_listener(opcodes['HELLO'], self._on_hello)
self.emitter.add_listener(opcodes['HEARTBEAT_ACK'], self._on_heartbeat_ack)
self.emitter.add_listener("READY", self._on_ready)

self.format_url()

Expand All @@ -99,22 +105,23 @@ def format_url(self):
async def _receive(self):
message = await self._con.get_message()
logger.debug("Received message '{}'".format(message))

if self.zlib_compressed:
# handle zlib compression here
pass
else:
# As there are special cases where zlib-compressed payloads also occur, even
# if zlib-stream wasn't specified in the Gateway url, also try to detect them.
is_json = message[0] == '{'
is_etf = message[0] == 131
if not is_json and not is_etf:
message = zlib.decompress(message, 15, self.TEN_MEGABYTES).decode('utf-8')
self.buffer.extend(message)
if self.buffer.endswith(self.ZLIB_SUFFIX):
message = self.inflator.decompress(self.buffer).decode()
self.buffer.clear()
elif message[0] != '{' and message[0] != 131:
message = zlib.decompress(message, 15, self.TEN_MEGABYTES).decode('utf-8')

try:
message = self.encoder.decode(message)
except Exception as e:
raise EncodingError(str(e))

if message.get('s'):
self._last_sequence = message['s']

logger.debug("Decoded message to '{}'".format(message))
return message

Expand All @@ -130,16 +137,15 @@ async def _send(self, opcode: Union[int, str], data):
await self._con.send(json.dumps(payload))

async def _on_hello(self, data):
self._interval = int(data["heartbeat_interval"])
self._interval = data["heartbeat_interval"]
logger.debug("Found heartbeat interval: {}".format(self._interval))
await self._tg.spawn(self._heartbeat_task)

async def _on_heartbeat_ack(self, data):
self._has_ack = True

async def _on_dispatch(self, data, event):
if event == "ready":
self._session_id = int(data["session_id"])
async def _on_ready(self, data):
self._session_id = data["session_id"]

async def _heartbeat(self):
"""|coro|
Expand All @@ -164,7 +170,10 @@ async def _receive_task(self):
"""
while self._running:
message = await self._receive()
await self.emitter.emit(message['op'], message['d'], message.get('t'))
if message['op'] == 0:
await self.emitter.emit(message['t'], message['d'])
else:
await self.emitter.emit(message['op'], message['d'])

async def _heartbeat_task(self):
while self._running:
Expand Down Expand Up @@ -219,7 +228,10 @@ async def connect(self):
await self.on_open()

async def resume(self):
await self.close()
if self._running:
await self.close()

logger.info("Resuming")
async with anysocks.open_connection(self.url) as con:
self._con = con
self._running = True
Expand All @@ -229,7 +241,6 @@ async def resume(self):
'seq': self._last_sequence
}
await self._send('RESUME', payload)
logger.info("Resuming")
await self.on_open()

async def start(self, token: str):
Expand Down
28 changes: 0 additions & 28 deletions clamor/gateway/emitter.py

This file was deleted.

1 change: 1 addition & 0 deletions clamor/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .emitter import *
199 changes: 199 additions & 0 deletions clamor/utils/emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# -*- coding: utf-8 -*-

from collections import defaultdict
from enum import Enum
from inspect import iscoroutinefunction
from functools import wraps
from typing import Callable, Coroutine, Union, Any, Dict

from clamor.gateway.exceptions import InvalidListener

from anyio import create_task_group


def check_coroutine(func):
@wraps(func)
def wrapper(self, listener: Callable[..., Coroutine[Any, Any, None]]):
if not iscoroutinefunction(listener):
raise InvalidListener("Listener must be a coroutine")
return func(self, listener)

return wrapper


class Priority(Enum):
BEFORE = 0
NORMAL = 1
AFTER = 2


class ListenerPod:
"""Event listener module

Listeners that all follow a certain event will exist in the same pod. Pods will separate
listeners into self-explanatory categories, before, normal, and after. The listeners will
trigger in order from before to after, with each listener triggering synchronously with
listeners from the same category.

Attributes
----------
before : set
Listeners that will trigger 1st
normal : set
Listeners that will trigger 2nd
after : set
Listeners that will trigger 3rd
"""

def __init__(self):
self.before = set()
self.normal = set()
self.after = set()

def __bool__(self):
return bool(self.before) or bool(self.normal) or bool(self.after)

@check_coroutine
def add_before(self, listener: Callable[..., Coroutine[Any, Any, None]]):
"""Add listener (Before)

Add a coroutine to the before category

Parameters
----------
listener : Coroutine
A coroutine to be triggered on it's respective event
"""
self.before.add(listener)

@check_coroutine
def add_normal(self, listener: Callable[..., Coroutine[Any, Any, None]]):
"""Add listener (Normal)

Add a coroutine to the normal category

Parameters
----------
listener : Coroutine
A coroutine to be triggered on it's respective event
"""
self.normal.add(listener)

@check_coroutine
def add_after(self, listener: Callable[..., Coroutine[Any, Any, None]]):
"""Add listener (After)

Add a coroutine to the after category

Parameters
----------
listener : Coroutine
A coroutine to be triggered on it's respective event
"""
self.after.add(listener)

async def emit(self, data: Dict[str, Any]):
"""Trigger listeners in the pod

All listeners in the before category will be spawned with the appropriate payload, and
once all those have finished, the normal category is triggered, and then the after category.

Parameters
----------
data : dict
The payload provided by discord to be distributed
"""
async with create_task_group() as tg:
for listener in self.before:
await tg.spawn(listener, data)
async with create_task_group() as tg:
for listener in self.normal:
await tg.spawn(listener, data)
async with create_task_group() as tg:
for listener in self.after:
await tg.spawn(listener, data)


class Emitter:
"""Main event emitter

This is what orchestrates all the event pods, adds listeners, removes them and triggers events.
Events can be either an op code, or a name (for opcode 0).

Attributes
----------
listeners : defaultdict(:class `clamor.gateway.emitter.ListenerPod`:)
A default dict that holders event namess to listener pods.
"""

def __init__(self):
self.listeners = defaultdict(ListenerPod)

def add_listener(self, event: Union[int, str],
listener: Callable[..., Coroutine[Any, Any, None]],
order: Priority = Priority.NORMAL):
"""Add a listener

Add a listener to the correct pod and category, which by default is the normal priority.

Parameters
----------
event : int or str
The op code or event to listen too
listener : Coroutine
A coroutine to be triggered on it's respective event
order : :class `clamor.gateway.emitter.Priority`
The order this listener should be triggered in
"""

# Create a pod if one does not exist for the event, then add the listener
# using the respective method, based on the priority.
getattr(self.listeners[event], "add_" + order.name.lower())(listener)

async def emit(self, event: Union[int, str], data):
"""Emit an event

Trigger the corresponding ListenerPod if one exists.

Parameters
----------
event: int or str
The op code or event to listen too
data : dict
The payload provided by discord to be distributed
"""
if self.listeners[event]:
await self.listeners[event].emit(data)

def clear_event(self, event: Union[str, int]):
"""Clear all listeners

Removes all listeners, to matter the category, from the provided event.

Parameters
----------
event : str or int
The op code or event to remove
"""
self.listeners.pop(event)

def remove_listener(self, event: Union[str, int],
listener: Callable[..., Coroutine[Any, Any, None]]):
"""Remove a specific listener from an event

Removes a the provided listener from an event, no matter the category.

Parameters
----------
event : int or str
The op code or event to search
listener : Coroutine
Listener to remove
"""
if self.listeners[event]:
if listener in self.listeners[event].before:
self.listeners[event].before.remove(listener)
if listener in self.listeners[event].normal:
self.listeners[event].normal.remove(listener)
if listener in self.listeners[event].after:
self.listeners[event].after.remove(listener)
Loading