@@ -84,6 +84,11 @@ def __init__(
8484 self .llm_provider = llm_provider
8585 self .config = config
8686 self .vision_prompt_text = None
87+ self .vlm_batch_size = self .config .vlm_batch_size or 5
88+ self .max_concurrent_vlm_tasks = (
89+ self .config .max_concurrent_vlm_tasks or 5
90+ )
91+ self .semaphore = None
8792
8893 async def process_page (self , image , page_num : int ) -> dict [str , str ]:
8994 """Process a single PDF page using the vision model."""
@@ -213,6 +218,15 @@ async def process_page(self, image, page_num: int) -> dict[str, str]:
213218 "content" : f"Error processing page: { str (e )} " ,
214219 }
215220
221+ async def process_and_yield (self , image , page_num : int ):
222+ """Process a page and yield the result."""
223+ async with self .semaphore :
224+ result = await self .process_page (image , page_num )
225+ return {
226+ "content" : result .get ("content" , "" ) or "" ,
227+ "page_number" : page_num ,
228+ }
229+
216230 async def ingest (
217231 self , data : str | bytes , ** kwargs
218232 ) -> AsyncGenerator [dict [str , str | int ], None ]:
@@ -228,9 +242,9 @@ async def ingest(
228242 )
229243 logger .info ("Retrieved vision prompt text from database." )
230244
231- try :
232- batch_size = self .config .vlm_batch_size or 5
245+ self .semaphore = asyncio .Semaphore (self .max_concurrent_vlm_tasks )
233246
247+ try :
234248 if isinstance (data , str ):
235249 pdf_info = pdf2image .pdfinfo_from_path (data )
236250 else :
@@ -240,52 +254,95 @@ async def ingest(
240254 max_pages = pdf_info ["Pages" ]
241255 logger .info (f"PDF has { max_pages } pages to process" )
242256
243- # Convert and process each batch of rasterized pages
244- for batch_start in range (0 , max_pages , batch_size ):
245- batch_end = min (batch_start + batch_size , max_pages )
246- logger .info (
247- f"Processing batch: pages { batch_start + 1 } -{ batch_end } /{ max_pages } "
257+ # Create a task queue to process pages in order
258+ pending_tasks = []
259+ completed_tasks = []
260+ next_page_to_yield = 1
261+
262+ # Process pages with a sliding window, in batches
263+ for batch_start in range (1 , max_pages + 1 , self .vlm_batch_size ):
264+ batch_end = min (
265+ batch_start + self .vlm_batch_size - 1 , max_pages
266+ )
267+ logger .debug (
268+ f"Preparing batch of pages { batch_start } -{ batch_end } /{ max_pages } "
248269 )
249270
271+ # Convert the batch of pages to images
250272 if isinstance (data , str ):
251- batch_images = pdf2image .convert_from_path (
273+ images = pdf2image .convert_from_path (
252274 data ,
253275 dpi = 150 ,
254- first_page = batch_start + 1 ,
276+ first_page = batch_start ,
255277 last_page = batch_end ,
256278 )
257279 else :
258280 pdf_bytes = BytesIO (data )
259- batch_images = pdf2image .convert_from_bytes (
281+ images = pdf2image .convert_from_bytes (
260282 pdf_bytes .getvalue (),
261283 dpi = 150 ,
262- first_page = batch_start + 1 ,
284+ first_page = batch_start ,
263285 last_page = batch_end ,
264286 )
265287
266- batch_tasks = []
267- for i , image in enumerate (batch_images ):
268- page_num = batch_start + i + 1
269- batch_tasks . append ( self . process_page ( image , page_num ))
270-
271- # Process the batch concurrently
272- batch_results = await asyncio . gather ( * batch_tasks )
273-
274- for i , result in enumerate ( batch_results ):
275- page_num = batch_start + i + 1
276- yield {
277- "content" : result . get ( "content" , "" ) or "" ,
278- "page_number" : page_num ,
279- }
280-
281- # Force garbage collection after each batch
282- import gc
288+ # Create tasks for each page in the batch
289+ for i , image in enumerate (images ):
290+ page_num = batch_start + i
291+ task = asyncio . create_task (
292+ self . process_and_yield ( image , page_num )
293+ )
294+ task . page_num = page_num # Store page number for sorting
295+ pending_tasks . append ( task )
296+
297+ # Check if any tasks have completed and yield them in order
298+ while pending_tasks :
299+ # Get the first done task without waiting
300+ done_tasks , pending_tasks_set = await asyncio . wait (
301+ pending_tasks ,
302+ timeout = 0.01 ,
303+ return_when = asyncio . FIRST_COMPLETED ,
304+ )
283305
284- gc .collect ()
306+ if not done_tasks :
307+ break
308+
309+ # Add completed tasks to our completed list
310+ pending_tasks = list (pending_tasks_set )
311+ completed_tasks .extend (iter (done_tasks ))
312+
313+ # Sort completed tasks by page number
314+ completed_tasks .sort (key = lambda t : t .page_num )
315+
316+ # Yield results in order
317+ while (
318+ completed_tasks
319+ and completed_tasks [0 ].page_num == next_page_to_yield
320+ ):
321+ task = completed_tasks .pop (0 )
322+ yield await task
323+ next_page_to_yield += 1
324+
325+ # Wait for and yield any remaining tasks in order
326+ while pending_tasks :
327+ done_tasks , _ = await asyncio .wait (pending_tasks )
328+ completed_tasks .extend (done_tasks )
329+ pending_tasks = []
330+
331+ # Sort and yield remaining completed tasks
332+ completed_tasks .sort (key = lambda t : t .page_num )
333+
334+ # Yield results in order
335+ while (
336+ completed_tasks
337+ and completed_tasks [0 ].page_num == next_page_to_yield
338+ ):
339+ task = completed_tasks .pop (0 )
340+ yield await task
341+ next_page_to_yield += 1
285342
286343 total_elapsed = time .perf_counter () - ingest_start
287344 logger .info (
288- f"Completed PDF ingestion in { total_elapsed :.2f} seconds"
345+ f"Completed PDF conversion in { total_elapsed :.2f} seconds"
289346 )
290347
291348 except Exception as e :
0 commit comments