@@ -262,12 +262,26 @@ def run(self) -> None:
262262 LOG .exception ("[Schema Topic] Failed to create %r, retrying" , self .config .topic_name )
263263 self ._stop_schema_reader .wait (timeout = SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS )
264264
265- # Subscribe after topic creation; confluent-kafka 2.13+ misbehaves otherwise.
266- # The first consume() in the main loop drives partition assignment.
265+ # Subscribe to topic AFTER it has been created
266+ # This is critical with confluent-kafka 2.13+ where subscribing to a non-existent topic
267+ # causes the consumer to not pick up messages even after the topic is created
267268 if schema_topic_exists :
268269 LOG .debug ("[Consumer] Subscribing to topic %r after creation" , self .config .topic_name )
269270 self .consumer .subscribe ([self .config .topic_name ])
270271
272+ # Wait for partition assignment to complete by doing an initial poll
273+ # The subscribe() call is asynchronous and partitions are assigned during poll/consume
274+ LOG .debug ("[Consumer] Waiting for partition assignment" )
275+ for _ in range (10 ): # Try up to 10 times with 100ms timeout each = 1 second total
276+ assignment = self .consumer .assignment ()
277+ if assignment :
278+ LOG .debug ("[Consumer] Partitions assigned: %s" , assignment )
279+ break
280+ # Trigger partition assignment by polling
281+ self .consumer .poll (0.1 )
282+ else :
283+ LOG .warning ("[Consumer] No partitions assigned after waiting, will retry during consumption" )
284+
271285 while not self ._stop_schema_reader .is_set ():
272286 if self .offset == OFFSET_UNINITIALIZED :
273287 # Handles also a unusual case of purged schemas topic where starting offset can be > 0
0 commit comments