55offering a robust framework for building real-time applications with Django
66Channels and Django REST Framework. The AsyncJsonWebsocketConsumer serves as the
77foundation for WebSocket connections with integrated authentication, permissions,
8- structured message handling, and group messaging capabilities.
8+ structured message handling, group messaging capabilities, and typed channel events .
99
1010Key features:
1111- DRF-style authentication and permission checking
1212- Structured message handling with Pydantic validation
1313- Automatic group management for pub/sub messaging
14+ - Typed channel event system with discriminated unions
1415- Comprehensive error handling and reporting
1516- Configurable logging and message completion signals
1617- Support for object-level permissions and retrieval
2526import sys
2627import uuid
2728from abc import ABC , abstractmethod
28- from collections .abc import Iterable , Sequence
29+ from collections .abc import Awaitable , Callable , Iterable , Sequence
2930from types import ModuleType
3031from typing import (
3132 Annotated ,
3233 Any ,
3334 Generic ,
3435 Literal ,
36+ TypedDict ,
3537 cast ,
3638 get_args ,
3739 get_origin ,
4042from channels .generic .websocket import (
4143 AsyncJsonWebsocketConsumer as BaseAsyncJsonWebsocketConsumer ,
4244)
45+ from channels .layers import get_channel_layer
4346from django .contrib .auth .models import AnonymousUser , User
4447from django .db .models import Model
4548from rest_framework .authentication import BaseAuthentication
5053)
5154
5255import structlog
56+ from asgiref .sync import async_to_sync
5357from asgiref .typing import WebSocketConnectEvent , WebSocketDisconnectEvent
5458from pydantic import Field , TypeAdapter , ValidationError
5559from typing_extensions import TypeVar , get_original_bases
5660
5761from chanx .constants import MISSING_PYHUMPS_ERROR
5862from chanx .generic .authenticator import ChanxWebsocketAuthenticator , QuerysetLike
5963from chanx .messages .base import (
64+ BaseChannelEvent ,
6065 BaseGroupMessage ,
6166 BaseMessage ,
6267)
8388 "OG" , bound = BaseGroupMessage | None , default = None
8489) # Outgoing group messages
8590M = TypeVar ("M" , bound = Model | None , default = None ) # Object model
91+ Event = TypeVar ("Event" , bound = BaseChannelEvent | None , default = None ) # Channel Events
92+
93+
94+ class EventPayload (TypedDict ):
95+ event_data : dict [str , Any ]
8696
8797
8898class AsyncJsonWebsocketConsumer (
89- Generic [IC , OG , M ], BaseAsyncJsonWebsocketConsumer , ABC
99+ Generic [IC , Event , OG , M ], BaseAsyncJsonWebsocketConsumer , ABC
90100):
91101 """
92102 Base class for asynchronous JSON WebSocket consumers with authentication and permissions.
@@ -99,6 +109,9 @@ class AsyncJsonWebsocketConsumer(
99109 `OUTGOING_GROUP_MESSAGE_SCHEMA` to enable proper validation and handling
100110 of group message broadcasts.
101111
112+ For typed channel events, subclasses can define a union type of channel events
113+ and use the generic type parameter Event to enable type-safe channel event handling.
114+
102115 Attributes:
103116 authentication_classes: DRF authentication classes for connection verification
104117 permission_classes: DRF permission classes for connection authorization
@@ -135,6 +148,7 @@ class AsyncJsonWebsocketConsumer(
135148
136149 # Message schemas
137150 _INCOMING_MESSAGE_SCHEMA : IC
151+ _EVENT_SCHEMA : Event
138152 _OUTGOING_GROUP_MESSAGE_SCHEMA : OG
139153
140154 # Object instance
@@ -164,14 +178,21 @@ def __init_subclass__(cls, *args: Any, **kwargs: Any):
164178 if i < len (type_var_vals ) and var is not None :
165179 type_var_vals [i ] = var
166180
167- incoming_message_schema , outgoing_group_message_schema , _model = (
168- type_var_vals
169- )
181+ (
182+ incoming_message_schema ,
183+ event_schema ,
184+ outgoing_group_message_schema ,
185+ _model ,
186+ ) = type_var_vals
170187 else :
171- incoming_message_schema , outgoing_group_message_schema , _model = (
172- get_args (base )
173- )
188+ (
189+ incoming_message_schema ,
190+ event_schema ,
191+ outgoing_group_message_schema ,
192+ _model ,
193+ ) = get_args (base )
174194 cls ._INCOMING_MESSAGE_SCHEMA = incoming_message_schema
195+ cls ._EVENT_SCHEMA = event_schema
175196 cls ._OUTGOING_GROUP_MESSAGE_SCHEMA = outgoing_group_message_schema
176197 break
177198
@@ -237,6 +258,13 @@ def _setup_message_adapters(self) -> None:
237258 ]
238259 )
239260
261+ self .event_adapter : TypeAdapter [Event ] = TypeAdapter (
262+ Annotated [
263+ self ._EVENT_SCHEMA ,
264+ Field (discriminator = "handler" ),
265+ ]
266+ )
267+
240268 self .outgoing_group_message_adapter : TypeAdapter [OG ] = TypeAdapter (
241269 Annotated [
242270 self ._OUTGOING_GROUP_MESSAGE_SCHEMA ,
@@ -582,6 +610,123 @@ async def send_group_member(self, event: GroupMemberEvent) -> None:
582610 if self .send_completion :
583611 await self .send_message (GroupCompleteMessage ())
584612
613+ # Channel event system methods
614+ @classmethod
615+ async def asend_channel_event (
616+ cls ,
617+ group_name : str ,
618+ event : Event ,
619+ ) -> None :
620+ """
621+ Send a typed channel event to one or more channel groups.
622+
623+ This is a class method that provides a type-safe way to send events through
624+ the channel layer to consumers. It can be called from tasks, views, or other
625+ places where you don't have a consumer instance.
626+
627+ Args:
628+ event: The typed event to send (constrained by the consumer's Event type)
629+ group_name: Group name to send to (required)
630+
631+ Example:
632+ ```python
633+ # From a Django task or view:
634+ await ChatDetailConsumer.send_channel_event(
635+ MemberAddedEvent(
636+ type="member_added",
637+ payload={"member_id": 123, "email": "user@example.com"}
638+ ),
639+ groups=["chat_room_1"],
640+ from_user_pk=request.user.pk
641+ )
642+ ```
643+ """
644+ channel_layer = get_channel_layer ()
645+ assert channel_layer is not None
646+
647+ assert event is not None
648+ await channel_layer .group_send (
649+ group_name ,
650+ {
651+ "type" : "handle_channel_event" ,
652+ "event_data" : event .model_dump (),
653+ },
654+ )
655+
656+ @classmethod
657+ def send_channel_event (
658+ cls ,
659+ group_name : str ,
660+ event : Event ,
661+ ) -> None :
662+ """
663+ Synchronous version of send_channel_event for use in Django tasks/views.
664+
665+ This method provides the same functionality as send_channel_event but
666+ can be called from synchronous code like Django tasks, views, or signals.
667+
668+ Args:
669+ group_name: Group name to send to (required)
670+ event: The typed event to send (constrained by the consumer's Event type)
671+
672+ Example:
673+ ```python
674+ # From a Django task:
675+ ChatDetailConsumer.send_channel_event_sync(
676+ "chat_room_1",
677+ MemberAddedEvent(
678+ type="member_added",
679+ payload={"member_id": 123, "email": "user@example.com"}
680+ ),
681+ )
682+ ```
683+ """
684+ async_to_sync (cls .asend_channel_event )(group_name , event )
685+
686+ async def handle_channel_event (self , event_payload : EventPayload ) -> None :
687+ """
688+ Internal dispatcher for channel events with completion signal.
689+
690+ This method is called by the channel layer when an event is sent to a group
691+ this consumer belongs to. It extracts the event data, checks exclusion rules,
692+ finds the appropriate handler method, and calls it with proper error handling.
693+
694+ Args:
695+ event_payload: The message from the channel layer containing event data
696+ and metadata about the sender
697+ """
698+ event_data_dict : dict [str , Any ] = event_payload .get ("event_data" , {})
699+ event_data = self .event_adapter .validate_python (event_data_dict )
700+
701+ assert event_data is not None
702+
703+ try :
704+ handler_name = event_data .handler
705+ if not handler_name :
706+ await logger .awarning ("Received channel event without handler field" )
707+ return
708+
709+ # Find and call the handler method
710+ handler_method : Callable [[Event ], Awaitable [None ]] | None = getattr (
711+ self , handler_name , None
712+ )
713+ if not callable (handler_method ):
714+ await logger .ainfo (
715+ f"Handler method '{ handler_name } ' is not available for sending event"
716+ )
717+ return
718+
719+ # Handler is async, await it
720+ await handler_method (event_data )
721+
722+ except Exception as e :
723+ await logger .aexception (f"Failed to process channel event: { str (e )} " )
724+ # Don't re-raise to avoid breaking the channel layer
725+ finally :
726+ # Send completion signal if configured
727+ if self .send_completion :
728+ await self .send_message (CompleteMessage ())
729+
585730 # Helper methods
586731
587732 async def _handle_receive_json_and_signal_complete (
0 commit comments