Skip to content

Commit f7567e8

Browse files
authored
refactor: Improve EventStreamManager (#101)
1 parent 6f6d595 commit f7567e8

File tree

6 files changed

+304
-356
lines changed

6 files changed

+304
-356
lines changed

flagsmith/flagsmith.py

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import json
21
import logging
32
import typing
4-
from datetime import datetime, timezone
3+
from datetime import timezone
54

5+
import pydantic
66
import requests
77
from flag_engine import engine
88
from flag_engine.environments.models import EnvironmentModel
@@ -188,21 +188,6 @@ def _initialise_local_evaluation(self) -> None:
188188
self.environment_data_polling_manager_thread.start()
189189

190190
def handle_stream_event(self, event: StreamEvent) -> None:
191-
try:
192-
event_data = json.loads(event.data)
193-
except json.JSONDecodeError as e:
194-
raise FlagsmithAPIError("Unable to get valid json from event data.") from e
195-
196-
try:
197-
stream_updated_at = datetime.fromtimestamp(event_data.get("updated_at"))
198-
except TypeError as e:
199-
raise FlagsmithAPIError(
200-
"Unable to get valid timestamp from event data."
201-
) from e
202-
203-
if stream_updated_at.tzinfo is None:
204-
stream_updated_at = stream_updated_at.astimezone(timezone.utc)
205-
206191
if not self._environment:
207192
raise ValueError(
208193
"Unable to access environment. Environment should not be null"
@@ -211,7 +196,7 @@ def handle_stream_event(self, event: StreamEvent) -> None:
211196
if environment_updated_at.tzinfo is None:
212197
environment_updated_at = environment_updated_at.astimezone(timezone.utc)
213198

214-
if stream_updated_at > environment_updated_at:
199+
if event.updated_at > environment_updated_at:
215200
self.update_environment()
216201

217202
def get_environment_flags(self) -> Flags:
@@ -282,17 +267,13 @@ def get_identity_segments(
282267
def update_environment(self) -> None:
283268
try:
284269
self._environment = self._get_environment_from_api()
285-
except (FlagsmithAPIError, requests.RequestException):
270+
except (FlagsmithAPIError, pydantic.ValidationError):
286271
logger.exception("Error updating environment")
287-
self._update_overrides()
288-
289-
def _update_overrides(self) -> None:
290-
if not self._environment:
291-
return
292-
if overrides := self._environment.identity_overrides:
293-
self._identity_overrides_by_identifier = {
294-
identity.identifier: identity for identity in overrides
295-
}
272+
else:
273+
if overrides := self._environment.identity_overrides:
274+
self._identity_overrides_by_identifier = {
275+
identity.identifier: identity for identity in overrides
276+
}
296277

297278
def _get_environment_from_api(self) -> EnvironmentModel:
298279
environment_data = self._get_json_response(self.environment_url, method="GET")
@@ -383,13 +364,9 @@ def _get_json_response(
383364
response = request_method(
384365
url, json=body, timeout=self.request_timeout_seconds
385366
)
386-
if response.status_code != 200:
387-
raise FlagsmithAPIError(
388-
"Invalid request made to Flagsmith API. Response status code: %d",
389-
response.status_code,
390-
)
367+
response.raise_for_status()
391368
return response.json()
392-
except (requests.ConnectionError, json.JSONDecodeError) as e:
369+
except requests.RequestException as e:
393370
raise FlagsmithAPIError(
394371
"Unable to get valid response from Flagsmith API."
395372
) from e

flagsmith/streaming_manager.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
import logging
12
import threading
23
import typing
3-
from typing import Callable, Generator, Optional, Protocol, cast
4+
from typing import Callable, Optional
45

6+
import pydantic
57
import requests
68
import sseclient
79

10+
logger = logging.getLogger(__name__)
811

9-
class StreamEvent(Protocol):
10-
data: str
12+
13+
class StreamEvent(pydantic.BaseModel):
14+
updated_at: pydantic.AwareDatetime
1115

1216

1317
class EventStreamManager(threading.Thread):
@@ -34,14 +38,12 @@ def run(self) -> None:
3438
headers={"Accept": "application/json, text/event-stream"},
3539
timeout=self.request_timeout_seconds,
3640
) as response:
37-
sse_client = sseclient.SSEClient(
38-
cast(Generator[bytes, None, None], response)
39-
)
41+
sse_client = sseclient.SSEClient(chunk for chunk in response)
4042
for event in sse_client.events():
41-
self.on_event(event)
43+
self.on_event(StreamEvent.model_validate_json(event.data))
4244

43-
except requests.exceptions.ReadTimeout:
44-
pass
45+
except (requests.RequestException, pydantic.ValidationError):
46+
logger.exception("Error opening or reading from the event stream")
4547

4648
def stop(self) -> None:
4749
self._stop_event.set()

0 commit comments

Comments
 (0)