@@ -244,8 +244,7 @@ def main():
244244 # Process any entries on the worker queue.
245245 if r .llen (worker_queue ) > 0 :
246246 blobs = r .lrange (worker_queue , 0 , - 1 )
247- resources = [ResourcePath (f"s3://{ b .decode ()} " ) for b in blobs if
248- b"_guider" not in b ]
247+ resources = [ResourcePath (f"s3://{ b .decode ()} " ) for b in blobs if b"_guider" not in b ]
249248 guiders = [ResourcePath (f"s3://{ b .decode ()} " ) for b in blobs if b"_guider" in b ]
250249
251250 # Ingest if we have resources
@@ -297,11 +296,15 @@ def main():
297296 except Exception :
298297 logger .exception ("Error while ingesting %s" , guiders )
299298
300- # Atomically grab the next entry from the bucket queue, blocking until
301- # one exists.
302- r .blmove (redis_queue , worker_queue , 0 , "RIGHT" , "LEFT" )
303- # Be greedy and take as many entries as exist up to max
299+ # If we have any retries, don't wait for new images
304300 n = r .llen (worker_queue )
301+ if n == 0 :
302+ # Atomically grab the next entry from the bucket queue, blocking
303+ # until one exists.
304+ r .blmove (redis_queue , worker_queue , 0 , "RIGHT" , "LEFT" )
305+ n = 1
306+
307+ # Be greedy and take as many entries as exist up to max
305308 while n < max_ingests and r .lmove (redis_queue , worker_queue , "RIGHT" , "LEFT" ):
306309 n += 1
307310
0 commit comments