Skip to content

NEXUS-703: remove nestio loop #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions _test_unstructured_client/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,9 @@ def test_partition_strategies(split_pdf, strategy, client, doc_path):
assert len(response.elements)


@pytest.fixture(scope="session")
def event_loop():
"""Make the loop session scope to use session async fixtures."""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()


@pytest.mark.parametrize("split_pdf", [True, False])
@pytest.mark.parametrize("error", [(500, ServerError), (403, SDKError), (422, HTTPValidationError)])
def test_partition_handling_server_error(error, split_pdf, monkeypatch, doc_path, event_loop):
def test_partition_handling_server_error(error, split_pdf, monkeypatch, doc_path):
"""
Mock different error responses, assert that the client throws the correct error
"""
Expand Down
4 changes: 3 additions & 1 deletion _test_unstructured_client/unit/test_split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def test_unit_get_page_range_returns_valid_range(page_range, expected_result):

async def _request_mock(
async_client: httpx.AsyncClient, # not used by mock
limiter: asyncio.Semaphore, # not used by mock
fails: bool,
content: str) -> requests.Response:
response = requests.Response()
Expand Down Expand Up @@ -405,6 +406,7 @@ async def test_unit_disallow_failed_coroutines(

async def _fetch_canceller_error(
async_client: httpx.AsyncClient, # not used by mock
limiter: asyncio.Semaphore, # not used by mock
fails: bool,
content: str,
cancelled_counter: Counter):
Expand All @@ -414,7 +416,7 @@ async def _fetch_canceller_error(
print("Doesn't fail")
else:
print("Fails")
return await _request_mock(async_client=async_client, fails=fails, content=content)
return await _request_mock(async_client=async_client, limiter=limiter, fails=fails, content=content)
except asyncio.CancelledError:
cancelled_counter.update(["cancelled"])
print(cancelled_counter["cancelled"])
Expand Down
1 change: 0 additions & 1 deletion gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ python:
aiofiles: '>=24.1.0'
cryptography: '>=3.1'
httpx: '>=0.27.0'
nest-asyncio: '>=1.6.0'
pypdf: '>=4.0'
requests-toolbelt: '>=1.0.0'
authors:
Expand Down
14 changes: 1 addition & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ dependencies = [
"cryptography >=3.1",
"eval-type-backport >=0.2.0",
"httpx >=0.27.0",
"nest-asyncio >=1.6.0",
"pydantic >=2.10.3",
"pypdf >=4.0",
"python-dateutil >=2.8.2",
Expand Down
46 changes: 31 additions & 15 deletions src/unstructured_client/_hooks/custom/split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import tempfile
import uuid
from collections.abc import Awaitable
from concurrent import futures
from functools import partial
from pathlib import Path
from typing import Any, Coroutine, Optional, Tuple, Union, cast, Generator, BinaryIO

import aiofiles
import httpx
import nest_asyncio # type: ignore
from httpx import AsyncClient
from pypdf import PdfReader, PdfWriter

Expand Down Expand Up @@ -56,6 +56,11 @@
HI_RES_STRATEGY = 'hi_res'
MAX_PAGE_LENGTH = 4000

def _run_coroutines_in_separate_thread(
coroutines_task: Coroutine[Any, Any, list[tuple[int, httpx.Response]]],
) -> list[tuple[int, httpx.Response]]:
return asyncio.run(coroutines_task)


async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]:
response = await coro
Expand All @@ -64,7 +69,8 @@ async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Respons

async def run_tasks(
coroutines: list[partial[Coroutine[Any, Any, httpx.Response]]],
allow_failed: bool = False
allow_failed: bool = False,
concurrency_level: int = 10,
) -> list[tuple[int, httpx.Response]]:
"""Run a list of coroutines in parallel and return the results in order.

Expand All @@ -80,13 +86,14 @@ async def run_tasks(
# Use a variable to adjust the httpx client timeout, or default to 30 minutes
# When we're able to reuse the SDK to make these calls, we can remove this var
# The SDK timeout will be controlled by parameter
limiter = asyncio.Semaphore(concurrency_level)
client_timeout_minutes = 60
if timeout_var := os.getenv("UNSTRUCTURED_CLIENT_TIMEOUT_MINUTES"):
client_timeout_minutes = int(timeout_var)
client_timeout = httpx.Timeout(60 * client_timeout_minutes)

async with httpx.AsyncClient(timeout=client_timeout) as client:
armed_coroutines = [coro(async_client=client) for coro in coroutines] # type: ignore
armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore
if allow_failed:
responses = await asyncio.gather(*armed_coroutines, return_exceptions=False)
return list(enumerate(responses, 1))
Expand Down Expand Up @@ -163,8 +170,10 @@ def __init__(self) -> None:
self.coroutines_to_execute: dict[
str, list[partial[Coroutine[Any, Any, httpx.Response]]]
] = {}
self.concurrency_level: dict[str, int] = {}
self.api_successful_responses: dict[str, list[httpx.Response]] = {}
self.api_failed_responses: dict[str, list[httpx.Response]] = {}
self.executors: dict[str, futures.ThreadPoolExecutor] = {}
self.tempdirs: dict[str, tempfile.TemporaryDirectory] = {}
self.allow_failed: bool = DEFAULT_ALLOW_FAILED
self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA
Expand Down Expand Up @@ -268,10 +277,6 @@ def before_request(
logger.warning("Splitting is currently incompatible with uvloop. Continuing without splitting.")
return request

# This allows us to use an event loop in an env with an existing loop
# Temporary fix until we can improve the async splitting behavior
nest_asyncio.apply()

# This is our key into coroutines_to_execute
# We need to pass it on to after_success so
# we know which results are ours
Expand Down Expand Up @@ -315,13 +320,15 @@ def before_request(
fallback_value=DEFAULT_ALLOW_FAILED,
)

concurrency_level = form_utils.get_split_pdf_concurrency_level_param(
self.concurrency_level[operation_id] = form_utils.get_split_pdf_concurrency_level_param(
form_data,
key=PARTITION_FORM_CONCURRENCY_LEVEL_KEY,
fallback_value=DEFAULT_CONCURRENCY_LEVEL,
max_allowed=MAX_CONCURRENCY_LEVEL,
)
limiter = asyncio.Semaphore(concurrency_level)

executor = futures.ThreadPoolExecutor(max_workers=1)
self.executors[operation_id] = executor

self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data(
form_data,
Expand All @@ -344,7 +351,7 @@ def before_request(
page_count = page_range_end - page_range_start + 1

split_size = get_optimal_split_size(
num_pages=page_count, concurrency_level=concurrency_level
num_pages=page_count, concurrency_level=self.concurrency_level[operation_id]
)

# If the doc is small enough, and we aren't slicing it with a page range:
Expand Down Expand Up @@ -387,7 +394,6 @@ def before_request(
# in `after_success`.
coroutine = partial(
self.call_api_partial,
limiter=limiter,
operation_id=operation_id,
pdf_chunk_request=pdf_chunk_request,
pdf_chunk_file=pdf_chunk_file,
Expand Down Expand Up @@ -605,10 +611,16 @@ def _await_elements(self, operation_id: str) -> Optional[list]:
if tasks is None:
return None

ioloop = asyncio.get_event_loop()
task_responses: list[tuple[int, httpx.Response]] = ioloop.run_until_complete(
run_tasks(tasks, allow_failed=self.allow_failed)
)
concurrency_level = self.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL)
coroutines = run_tasks(tasks, allow_failed=self.allow_failed, concurrency_level=concurrency_level)

# sending the coroutines to a separate thread to avoid blocking the current event loop
# this operation should be removed when the SDK is updated to support async hooks
executor = self.executors.get(operation_id)
if executor is None:
raise RuntimeError("Executor not found for operation_id")
task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines)
task_responses = task_responses_future.result()

if task_responses is None:
return None
Expand Down Expand Up @@ -715,6 +727,10 @@ def _clear_operation(self, operation_id: str) -> None:
"""
self.coroutines_to_execute.pop(operation_id, None)
self.api_successful_responses.pop(operation_id, None)
self.concurrency_level.pop(operation_id, None)
executor = self.executors.pop(operation_id, None)
if executor is not None:
executor.shutdown(wait=True)
tempdir = self.tempdirs.pop(operation_id, None)
if tempdir:
tempdir.cleanup()
Loading