Skip to content

Fix nest-asyncio conflict with uvloop #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ python:
dependencies:
deepdiff: '>=6.0'
httpx: '>=0.27.0'
nest-asyncio: '>=1.6.0'
pypdf: '>=4.0'
requests-toolbelt: '>=1.0.0'
extraDependencies:
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"jsonpath-python>=1.0.6",
"marshmallow>=3.19.0",
"mypy-extensions>=1.0.0",
"nest-asyncio>=1.6.0",
"packaging>=23.1",
"pypdf>=4.0",
"python-dateutil>=2.8.2",
Expand Down
59 changes: 48 additions & 11 deletions src/unstructured_client/_hooks/custom/split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from typing import Any, Coroutine, Optional, Tuple, Union

import httpx
import nest_asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import requests
from pypdf import PdfReader
from requests_toolbelt.multipart.decoder import MultipartDecoder
Expand Down Expand Up @@ -58,6 +59,44 @@ def get_optimal_split_size(num_pages: int, concurrency_level: int) -> int:
return max(split_size, MIN_PAGES_PER_SPLIT)


def run_async_in_thread(coro):
"""
Execute an asyncio coroutine in a separate thread with a new event loop.

This function creates a new thread and runs the provided coroutine in that thread
using a new event loop. This is useful for running asyncio code in environments
where you can't or don't want to modify the main event loop, or when you need to
run asyncio code from synchronous contexts.

Args:
coro (Coroutine): The asyncio coroutine to be executed.

Returns:
Any: The result of the coroutine execution.

Raises:
Any exception that the coroutine might raise will be propagated.

Note:
- This function creates a new thread for each call.
- The new event loop is properly shut down after the coroutine completes.
- This approach isolates the asyncio operations from the main thread's event loop.
"""

def wrapper():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wrapper)
return future.result()


class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorHook):
"""
A hook class that splits a PDF file into multiple pages and sends each page as
Expand All @@ -69,10 +108,7 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH
"""

def __init__(self) -> None:
# This allows us to use an event loop in an env with an existing loop
# Temporary fix until we can improve the async splitting behavior
nest_asyncio.apply()


self.client: Optional[requests.Session] = None
self.coroutines_to_execute: dict[
str, list[Coroutine[Any, Any, requests.Response]]
Expand Down Expand Up @@ -202,7 +238,7 @@ def before_request(
)

async def call_api_partial(page):
async with httpx.AsyncClient() as client:
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
status_code, json_response = await request_utils.call_api_async(
client=client,
original_request=request,
Expand Down Expand Up @@ -270,11 +306,12 @@ def _await_elements(
tasks = self.coroutines_to_execute.get(operation_id)
if tasks is None:
return None

ioloop = asyncio.get_event_loop()
task_responses: list[requests.Response] = ioloop.run_until_complete(
run_tasks(tasks)
)

async def run_tasks_async():
async with aiohttp.ClientSession() as _:
return await asyncio.gather(*tasks)

task_responses: list[requests.Response] = run_async_in_thread(run_tasks_async())

if task_responses is None:
return None
Expand Down