diff --git a/bokeh_django/consumers.py b/bokeh_django/consumers.py index 4bbebaa..adf385d 100644 --- a/bokeh_django/consumers.py +++ b/bokeh_django/consumers.py @@ -214,8 +214,11 @@ def application_context(self) -> ApplicationContext: if self._application_context is None: self._application_context = self.scope["url_route"]["kwargs"]["app_context"] - if self._application_context.io_loop is None: - raise RuntimeError("io_loop should already been set") + # Explicitly set io_loop here (likely running in multi-worker environment) + if self._application_context._loop is None: + self._application_context._loop = IOLoop.current() + log.debug("io_loop has been re-set") + return self._application_context async def connect(self): @@ -223,21 +226,21 @@ async def connect(self): subprotocols = self.scope["subprotocols"] if len(subprotocols) != 2 or subprotocols[0] != 'bokeh': - self.close() + await self.close() raise RuntimeError("Subprotocol header is not 'bokeh'") token = subprotocols[1] if token is None: - self.close() + await self.close() raise RuntimeError("No token received in subprotocol header") - now = calendar.timegm(dt.datetime.utcnow().utctimetuple()) + now = calendar.timegm(dt.datetime.now(dt.UTC).utctimetuple()) payload = get_token_payload(token) if 'session_expiry' not in payload: - self.close() + await self.close() raise RuntimeError("Session expiry has not been provided") elif now >= payload['session_expiry']: - self.close() + await self.close() raise RuntimeError("Token is expired.") elif not check_token_signature(token, signed=False, @@ -252,7 +255,7 @@ def on_fully_opened(future): # this isn't really an error (unless we have a # bug), it just means a client disconnected # immediately, most likely. - log.debug("Failed to fully open connlocksection %r", e) + log.debug("Failed to fully open connection %r", e) future = self._async_open(token) @@ -263,7 +266,9 @@ def on_fully_opened(future): await self.accept("bokeh") async def disconnect(self, close_code): - self.connection.session.destroy() + if hasattr(self, "connection"): + self.connection.session.destroy() + await super().disconnect(close_code) async def receive(self, text_data) -> None: fragment = text_data @@ -277,8 +282,19 @@ async def receive(self, text_data) -> None: async def _async_open(self, token: str) -> None: try: session_id = get_session_id(token) - await self.application_context.create_session_if_needed(session_id, self.request, token) - session = self.application_context.get_session(session_id) + + # Ensure io_loop is set before creating session (likely running in multi-worker environment) + if self._application_context._loop is None: + self._application_context._loop = IOLoop.current() + log.debug("io_loop has been re-set") + + # Try to create or get session + try: + session = await self.application_context.create_session_if_needed(session_id, self.request, token) + + except Exception as e: + log.error("Error creating session: %s", e) + raise e protocol = Protocol() self.receiver = Receiver(protocol) @@ -292,7 +308,7 @@ async def _async_open(self, token: str) -> None: except Exception as e: log.error("Could not create new server session, reason: %s", e) - self.close() + await self.close() raise e msg = self.connection.protocol.create('ACK')