@@ -85,48 +85,56 @@ class Consumer(object):
8585 """Base consumer class that defines the contract between rejected and
8686 consumer applications.
8787
88- In any of the consumer base classes, if the ``MESSAGE_TYPE `` attribute is
89- set, the ``type `` property of incoming messages will be validated against
90- when a message is received, checking for string equality against the
91- ``MESSAGE_TYPE`` attribute . If they are not matched , the consumer will not
88+ In any of the consumer base classes, if the ``message_type `` is specified
89+ in the configuration (or set with the ``MESSAGE_TYPE `` attribute), the
90+ ``type`` property of incoming messages will be validated against when a
91+ message is received . If there is no match , the consumer will not
9292 process the message and will drop the message without an exception if the
93- ``DROP_INVALID_MESSAGES`` attribute is set to ``True``. If it is ``False``,
94- a :py:class:`MessageException` is raised.
95-
96- If a consumer raises a :py:class:`ProcessingException`, the message that
97- was being processed will be republished to the exchange specified by the
98- ``ERROR_EXCHANGE`` attribute of the consumer's class using the routing key
99- that was last used for the message. The original message body and
100- properties will be used and an additional header
93+ ``drop_invalid_messages`` setting is set to ``True`` in the configuration
94+ (or if the ``DROP_INVALID_MESSAGES`` attribute is set to ``True``).
95+ If it is ``False``, a :exc:`~rejected.consumer.MessageException` is raised.
96+
97+ If a consumer raises a `~rejected.consumer.ProcessingException`, the
98+ message that was being processed will be republished to the exchange
99+ specified by the ``error`` exchange configuration value or the
100+ ``ERROR_EXCHANGE`` attribute of the consumer's class. The message will be
101+ published using the routing key that was last used for the message. The
102+ original message body and properties will be used and an additional header
101103 ``X-Processing-Exceptions`` will be added that will contain the number of
102- times the message has had a `` ProcessingException`` raised for it. In
103- combination with a queue that has ``x-message-ttl`` set and
104- ``x-dead-letter-exchange`` that points to the original exchange for the
104+ times the message has had a :exc:`~rejected.consumer. ProcessingException`
105+ raised for it. In combination with a queue that has ``x-message-ttl`` set
106+ and ``x-dead-letter-exchange`` that points to the original exchange for the
105107 queue the consumer is consuming off of, you can implement a delayed retry
106108 cycle for messages that are failing to process due to external resource or
107109 service issues.
108110
109- If ``ERROR_MAX_RETRY`` is set on the class, the headers for each method
111+ If ``error_max_retry`` is specified in the configuration or
112+ ``ERROR_MAX_RETRY`` is set on the class, the headers for each method
110113 will be inspected and if the value of ``X-Processing-Exceptions`` is
111- greater than or equal to the ``ERROR_MAX_RETRY`` value, the message will
114+ greater than or equal to the specified value, the message will
112115 be dropped.
113116
117+ :param dict settings: The configuration from rejected
118+ :param rejected.process.Process process: The controlling process
119+ :param bool drop_invalid_messages: Drop a message if its type property
120+ doesn't match the specified message type.
121+ :param str|list message_type: Used to validate the message type of a
122+ message before processing. This attribute can be set to a string
123+ that is matched against the AMQP message type or a list of
124+ acceptable message types.
125+ :param error_exchange: The exchange to publish a message raising a
126+ :exc:`~rejected.consumer.ProcessingException` to
127+ :type error_exchange: str
128+ :param int error_max_retry: The number of
129+ :exc:`~rejected.consumer.ProcessingException`s raised on a message
130+ before a message is dropped. If not specified, messages will never be
131+ dropped.
132+
114133 """
115134 DROP_INVALID_MESSAGES = False
116- """Drop a message if its type property doesn't match ``MESSAGE_TYPE``"""
117-
118135 MESSAGE_TYPE = None
119- """Used to validate the message type of a message before processing.
120-
121- This attribute can be set to a string that is matched against the
122- AMQP message type or a list of acceptable message types.
123- """
124-
125136 ERROR_EXCHANGE = 'errors'
126- """The exchange to publish messages that raise `ProcessingException` to"""
127-
128137 ERROR_MAX_RETRY = None
129- """The number of `ProcessingException`s before a message is dropped"""
130138
131139 def __init__ (self , settings , process ,
132140 drop_invalid_messages = DROP_INVALID_MESSAGES ,
@@ -136,20 +144,6 @@ def __init__(self, settings, process,
136144 """Creates a new instance of a Consumer class. To perform
137145 initialization tasks, extend Consumer.initialize
138146
139- :param dict settings: The configuration from rejected
140- :param rejected.process.Process process: The controlling process
141- :param bool drop_invalid_messages: Drop a message if its type property
142- doesn't match the specified message type.
143- :param str|list message_type: Used to validate the message type of a
144- message before processing. This attribute can be set to a string
145- that is matched against the AMQP message type or a list of
146- acceptable message types.
147- :param error_exchange: The exchange to publish `ProcessingException` to
148- :type error_exchange: str
149- :param int error_max_retry: The number of `ProcessingException`s
150- raised on a message before a message is dropped. If not specified,
151- messages will never be dropped.
152-
153147 """
154148 self ._channel = None
155149 self ._drop_invalid = drop_invalid_messages
@@ -788,6 +782,20 @@ class PublishingConsumer(Consumer):
788782 ``DROP_INVALID_MESSAGES`` attribute is set to ``True``. If it is ``False``,
789783 a :py:class:`ConsumerException` is raised.
790784
785+ :param dict settings: The configuration from rejected
786+ :param rejected.process.Process process: The controlling process
787+ :param bool drop_invalid_messages: Drop a message if its type property
788+ doesn't match the specified message type.
789+ :param str|list message_type: Used to validate the message type of a
790+ message before processing. This attribute can be set to a string
791+ that is matched against the AMQP message type or a list of
792+ acceptable message types.
793+ :param error_exchange: The exchange to publish `ProcessingException` to
794+ :type error_exchange: str
795+ :param int error_max_retry: The number of `ProcessingException`s
796+ raised on a message before a message is dropped. If not specified,
797+ messages will never be dropped.
798+
791799 """
792800
793801 def initialize (self ):
@@ -913,6 +921,20 @@ class SmartConsumer(Consumer):
913921 ``DROP_INVALID_MESSAGES`` attribute is set to ``True``. If it is ``False``,
914922 a :py:class:`ConsumerException` is raised.
915923
924+ :param dict settings: The configuration from rejected
925+ :param rejected.process.Process process: The controlling process
926+ :param bool drop_invalid_messages: Drop a message if its type property
927+ doesn't match the specified message type.
928+ :param str|list message_type: Used to validate the message type of a
929+ message before processing. This attribute can be set to a string
930+ that is matched against the AMQP message type or a list of
931+ acceptable message types.
932+ :param error_exchange: The exchange to publish `ProcessingException` to
933+ :type error_exchange: str
934+ :param int error_max_retry: The number of `ProcessingException`s
935+ raised on a message before a message is dropped. If not specified,
936+ messages will never be dropped.
937+
916938 """
917939
918940 @property
@@ -1085,8 +1107,23 @@ def _load_yaml_value(value):
10851107
10861108
10871109class SmartPublishingConsumer (SmartConsumer , PublishingConsumer ):
1088- """PublishingConsumer with serialization built in"""
1110+ """PublishingConsumer with serialization built in
1111+
1112+ :param dict settings: The configuration from rejected
1113+ :param rejected.process.Process process: The controlling process
1114+ :param bool drop_invalid_messages: Drop a message if its type property
1115+ doesn't match the specified message type.
1116+ :param str|list message_type: Used to validate the message type of a
1117+ message before processing. This attribute can be set to a string
1118+ that is matched against the AMQP message type or a list of
1119+ acceptable message types.
1120+ :param error_exchange: The exchange to publish `ProcessingException` to
1121+ :type error_exchange: str
1122+ :param int error_max_retry: The number of `ProcessingException`s
1123+ raised on a message before a message is dropped. If not specified,
1124+ messages will never be dropped.
10891125
1126+ """
10901127 def publish_message (self , exchange , routing_key , properties , body ,
10911128 no_serialization = False ,
10921129 no_encoding = False ):
0 commit comments