-
-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add websocket unittests and put in users app #2921
base: master
Are you sure you want to change the base?
Changes from all commits
bbd6b62
7a9bbd6
1fecdd1
8c2ba0e
ad80fc2
0f86c64
3b01a65
917aae3
1d08db9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import asyncio | ||
import functools | ||
import threading | ||
import time | ||
from contextlib import contextmanager | ||
|
||
from uvicorn.config import Config | ||
from uvicorn.main import ServerState | ||
from uvicorn.protocols.http.h11_impl import H11Protocol | ||
from uvicorn.protocols.websockets.websockets_impl import WebSocketProtocol | ||
|
||
|
||
def run_loop(loop): | ||
loop.run_forever() | ||
loop.close() | ||
|
||
|
||
@contextmanager | ||
def run_server(app, path="/"): | ||
asyncio.set_event_loop(None) | ||
loop = asyncio.new_event_loop() | ||
config = Config(app=app, ws=WebSocketProtocol) | ||
server_state = ServerState() | ||
protocol = functools.partial(H11Protocol, config=config, server_state=server_state) | ||
create_server_task = loop.create_server(protocol, host="127.0.0.1") | ||
server = loop.run_until_complete(create_server_task) | ||
port = server.sockets[0].getsockname()[1] # type: ignore | ||
url = "ws://127.0.0.1:{port}{path}".format(port=port, path=path) | ||
try: | ||
# Run the event loop in a new thread. | ||
thread = threading.Thread(target=run_loop, args=[loop]) | ||
thread.start() | ||
# Return the contextmanager state. | ||
yield url | ||
finally: | ||
# Close the loop from our main thread. | ||
while server_state.tasks: | ||
time.sleep(0.01) | ||
loop.call_soon_threadsafe(loop.stop) | ||
thread.join() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from asyncio import new_event_loop | ||
|
||
import pytest | ||
from websockets import connect | ||
|
||
from {{cookiecutter.project_slug}}.users.tests.async_server import run_server | ||
from {{cookiecutter.project_slug}}.users.websocket import websocket_application as app | ||
|
||
|
||
def test_accept_connection(): | ||
async def open_connection(url): | ||
async with connect(url) as websocket: | ||
return websocket.open | ||
|
||
with run_server(app) as _url: | ||
loop = new_event_loop() | ||
is_open = loop.run_until_complete(open_connection(_url)) | ||
assert is_open | ||
loop.close() | ||
|
||
|
||
@pytest.mark.timeout(10) | ||
def test_ping(): | ||
async def open_connection(url): | ||
async with connect(url) as websocket: | ||
await websocket.send("ping") | ||
return await websocket.recv() | ||
|
||
with run_server(app) as _url: | ||
loop = new_event_loop() | ||
received_message = loop.run_until_complete(open_connection(_url)) | ||
assert received_message == "pong" | ||
loop.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,18 @@ | ||
async def websocket_application(scope, receive, send): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand why we need to move this file. Is it solely for having a place to put the tests? I think I'm missing how it would grow in a full application... In a WSGI app, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's moved so that you can access models, like the User model. In my latest project, from ASGI, I created a URLRouter that would allow multiple "websocket_application" with some extra security features (CSRF + Session). The ASGI file points towards the websocket application if the scheme is |
||
event = await receive() | ||
if event["type"] == "websocket.connect": | ||
# TODO Add authentication by reading scope | ||
# and getting sessionid from cookie | ||
await send({"type": "websocket.accept"}) | ||
else: | ||
await send({"type": "websocket.close"}) | ||
return | ||
|
||
while True: | ||
event = await receive() | ||
|
||
if event["type"] == "websocket.connect": | ||
await send({"type": "websocket.accept"}) | ||
|
||
if event["type"] == "websocket.disconnect": | ||
break | ||
|
||
if event["type"] == "websocket.receive": | ||
if event["text"] == "ping": | ||
await send({"type": "websocket.send", "text": "pong!"}) | ||
await send({"type": "websocket.send", "text": "pong"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a lot of boilerplate 😞 Could we extract the main logic into smaller async functions and use
pytest-asyncio
to test them?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that was my initial thought too while developing this. But in my own project code, I do use pytest-asyncio but for a different purpose.
When a
text
is transmitted to the server, I categorize it based on the JSON passed through. You can see this every time GitHub sends a notification to you, the client. It's JSON encoded. For example, GitHub may send:What we'd do is do a switch/case with that key/value "subscribe" (e.g. this is how GitHub auto updates this GitHub issue whenever something new happens). The problem then becomes how many different categories do you want to manage. That's when I split it into a coroutine; when you split it into a coroutine, that's when you can properly "unit" test.