22
33import decimal
44import logging
5- from typing import Any , Callable , Dict , Mapping , NamedTuple , Optional , Set , Type
5+ from collections .abc import Callable , Mapping
6+ from typing import Any , NamedTuple
67
78import workflows
89from workflows .transport import middleware
@@ -20,9 +21,9 @@ class CommonTransport:
2021 subscriptions and transactions."""
2122
2223 __callback_interceptor = None
23- __subscriptions : Dict [int , Dict [str , Any ]] = {}
24+ __subscriptions : dict [int , dict [str , Any ]] = {}
2425 __subscription_id : int = 0
25- __transactions : Set [int ] = set ()
26+ __transactions : set [int ] = set ()
2627 __transaction_id : int = 0
2728
2829 log = logging .getLogger ("workflows.transport" )
@@ -32,14 +33,14 @@ class CommonTransport:
3233 #
3334
3435 def __init__ (
35- self , middleware : list [Type [middleware .BaseTransportMiddleware ]] = None
36+ self , middleware : list [type [middleware .BaseTransportMiddleware ]] = None
3637 ):
3738 if middleware is None :
3839 self .middleware = []
3940 else :
4041 self .middleware = middleware
4142
42- def add_middleware (self , middleware : Type [middleware .BaseTransportMiddleware ]):
43+ def add_middleware (self , middleware : type [middleware .BaseTransportMiddleware ]):
4344 self .middleware .insert (0 , middleware )
4445
4546 @classmethod
@@ -99,7 +100,7 @@ def mangled_callback(header, message):
99100
100101 @middleware .wrap
101102 def subscribe_temporary (
102- self , channel_hint : Optional [ str ] , callback : MessageCallback , ** kwargs
103+ self , channel_hint : str | None , callback : MessageCallback , ** kwargs
103104 ) -> TemporarySubscription :
104105 """Listen to a new queue that is specifically created for this connection,
105106 and has a limited lifetime. Notify for messages via callback function.
@@ -320,7 +321,7 @@ def broadcast_status(self, status: dict) -> None:
320321 raise NotImplementedError
321322
322323 @middleware .wrap
323- def ack (self , message , subscription_id : Optional [ int ] = None , ** kwargs ):
324+ def ack (self , message , subscription_id : int | None = None , ** kwargs ):
324325 """Acknowledge receipt of a message. This only makes sense when the
325326 'acknowledgement' flag was set for the relevant subscription.
326327 :param message: ID of the message to be acknowledged, OR a dictionary
@@ -351,7 +352,7 @@ def ack(self, message, subscription_id: Optional[int] = None, **kwargs):
351352 self ._ack (message_id , subscription_id = subscription_id , ** kwargs )
352353
353354 @middleware .wrap
354- def nack (self , message , subscription_id : Optional [ int ] = None , ** kwargs ):
355+ def nack (self , message , subscription_id : int | None = None , ** kwargs ):
355356 """Reject receipt of a message. This only makes sense when the
356357 'acknowledgement' flag was set for the relevant subscription.
357358 :param message: ID of the message to be rejected, OR a dictionary
@@ -380,7 +381,7 @@ def nack(self, message, subscription_id: Optional[int] = None, **kwargs):
380381 self ._nack (message_id , subscription_id = subscription_id , ** kwargs )
381382
382383 @middleware .wrap
383- def transaction_begin (self , subscription_id : Optional [ int ] = None , ** kwargs ) -> int :
384+ def transaction_begin (self , subscription_id : int | None = None , ** kwargs ) -> int :
384385 """Start a new transaction.
385386 :param **kwargs: Further parameters for the transport layer.
386387 :return: A transaction ID that can be passed to other functions.
@@ -462,7 +463,7 @@ def _subscribe_broadcast(self, sub_id: int, channel, callback, **kwargs):
462463 def _subscribe_temporary (
463464 self ,
464465 sub_id : int ,
465- channel_hint : Optional [ str ] ,
466+ channel_hint : str | None ,
466467 callback : MessageCallback ,
467468 ** kwargs ,
468469 ) -> str :
@@ -530,7 +531,7 @@ def _nack(self, message_id, subscription_id, **kwargs):
530531 raise NotImplementedError ("Transport interface not implemented" )
531532
532533 def _transaction_begin (
533- self , transaction_id : int , * , subscription_id : Optional [ int ] = None , ** kwargs
534+ self , transaction_id : int , * , subscription_id : int | None = None , ** kwargs
534535 ) -> None :
535536 """Start a new transaction.
536537 :param transaction_id: ID for this transaction in the transport layer.
0 commit comments