File tree Expand file tree Collapse file tree 2 files changed +6
-22
lines changed
Expand file tree Collapse file tree 2 files changed +6
-22
lines changed Original file line number Diff line number Diff line change @@ -2199,31 +2199,17 @@ def _get_node_pubsub(self, node):
21992199 return pubsub
22002200
22012201 def _sharded_message_generator (self ):
2202- """
2203- Iterate through pubsubs until a complete cycle is done.
2204- """
2205- while True :
2202+ for _ in range (len (self .node_pubsub_mapping )):
22062203 pubsub = next (self ._pubsubs_generator )
2207-
2208- # None marks end of cycle
2209- if pubsub is None :
2210- break
2211-
22122204 message = pubsub .get_message ()
22132205 if message is not None :
22142206 return message
2215-
22162207 return None
22172208
22182209 def _pubsubs_generator (self ):
2219- """
2220- Generator that yields pubsubs in round-robin fashion.
2221- Yields None to mark cycle boundaries.
2222- """
22232210 while True :
22242211 current_nodes = list (self .node_pubsub_mapping .values ())
22252212 yield from current_nodes
2226- yield None # Cycle marker
22272213
22282214 def get_sharded_message (
22292215 self , ignore_subscribe_messages = False , timeout = 0.0 , target_node = None
Original file line number Diff line number Diff line change @@ -959,20 +959,18 @@ def test_generator_handles_concurrent_mapping_changes(self, r):
959959 assert msg is not None
960960 assert msg ["type" ] == "ssubscribe"
961961
962- # Get initial mapping size (if available)
963- initial_size = 0
964- if hasattr (pubsub , "node_pubsub_mapping" ):
965- initial_size = len (pubsub .node_pubsub_mapping )
962+ # Get initial mapping size (cluster pubsub only)
963+ assert hasattr (pubsub , "node_pubsub_mapping" ), "Test requires ClusterPubSub"
964+ initial_size = len (pubsub .node_pubsub_mapping )
966965
967966 # Subscribe to second channel (modifies mapping during potential iteration)
968967 pubsub .ssubscribe (channel2 )
969968 msg = wait_for_message (pubsub , timeout = 1.0 , func = pubsub .get_sharded_message )
970969 assert msg is not None
971970 assert msg ["type" ] == "ssubscribe"
972971
973- # Verify mapping was updated (if available)
974- if hasattr (pubsub , "node_pubsub_mapping" ):
975- assert len (pubsub .node_pubsub_mapping ) >= initial_size
972+ # Verify mapping was updated
973+ assert len (pubsub .node_pubsub_mapping ) >= initial_size
976974
977975 # Publish and read messages - should not raise RuntimeError
978976 r .spublish (channel1 , "msg1" )
You can’t perform that action at this time.
0 commit comments