Skip to content

Commit 0c71787

Browse files
authored
sse(stream): fix stream payload and frequency (#52)
* fix: invalid json for sse payload * fix: only stream a message if environment changes
1 parent 17c4b98 commit 0c71787

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

src/sse.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import json
23
from datetime import datetime
34
from functools import lru_cache
45
from typing import Optional
@@ -65,9 +66,15 @@ async def stream_environment_changes(
6566
request: Request, environment_key: str, settings: Settings = Depends(get_settings)
6667
):
6768
started_at = datetime.now()
69+
last_updated_at = None
6870

6971
async def get_updated_at() -> Optional[int]:
72+
nonlocal last_updated_at
7073
updated_at = redis_connection.get(environment_key)
74+
if last_updated_at == updated_at:
75+
return None
76+
77+
last_updated_at = updated_at
7178
return updated_at
7279

7380
async def event_generator():
@@ -84,7 +91,7 @@ async def event_generator():
8491
if updated_at := await get_updated_at():
8592
yield {
8693
"event": "environment_updated",
87-
"data": {"updated_at": float(updated_at)},
94+
"data": json.dumps({"updated_at": float(updated_at)}),
8895
"retry": settings.retry_timeout,
8996
}
9097
await asyncio.sleep(settings.stream_delay)

tests/test_sse.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
def get_settings_override():
1515
return Settings(
16-
max_stream_age=3, stream_delay=1, sse_authentication_token=auth_token
16+
max_stream_age=3, stream_delay=0.2, sse_authentication_token=auth_token
1717
)
1818

1919

@@ -88,6 +88,7 @@ async def test_stream_changes(client):
8888
await asyncio.sleep(1)
8989

9090
second_last_updated_at = datetime.now()
91+
9192
# Next, let's update the environment once again
9293
await ac.post(
9394
f"/sse/environments/{environment_key}/queue-change",
@@ -97,10 +98,10 @@ async def test_stream_changes(client):
9798
# Finally, let's wait for the stream to finish
9899
response = await stream_response_task
99100

100-
# Then
101+
# Then - we only got two messages
101102
expected_response = (
102-
"event: environment_updated\r\ndata: {'updated_at': %0.6f}\r\nretry: 15000\r\n\r\nevent:"
103-
" environment_updated\r\ndata: {'updated_at': %0.6f}\r\nretry: 15000\r\n\r\n"
103+
'event: environment_updated\r\ndata: {"updated_at": %0.6f}\r\nretry: 15000\r\n\r\nevent:'
104+
' environment_updated\r\ndata: {"updated_at": %0.6f}\r\nretry: 15000\r\n\r\n'
104105
% (first_last_updated_at.timestamp(), second_last_updated_at.timestamp())
105106
)
106107
assert response.status_code == 200

0 commit comments

Comments
 (0)