Skip to content

Commit 6b81735

Browse files
authored
fix: Address some issues in the split pdf logic (#170)
This is a copy of [the fixes](#165) that went into the release/0.25.x branch, updated slightly for the v2 SDK. The main branch is already retrying split requests, but we do not have an updated httpx timeout value, and the retries are not logged. # To verify Same as the linked pr, we can mock arbitrary errors from the server and verify the behavior. Apply this patch to your local unstructured-api, and do `make run-web-app`. Whenever we send a request starting at page 3, the server will sleep for a bit, and then return a 502 error. ``` diff --git a/prepline_general/api/general.py b/prepline_general/api/general.py index 2ca56270..0624076c 100644 *** a/unstructured-api/prepline_general/api/general.py --- b/unstructured-api/prepline_general/api/general.py *************** *** 812,817 **** --- 814,822 ---- ) + global num_bounces + num_bounces = 2 + @elasticapm.capture_span() @router.post( "/general/v0/general", *************** *** 835,840 **** --- 840,854 ---- # For new parameters - add them in models/form_params.py form_params: GeneralFormParams = Depends(GeneralFormParams.as_form), ): + + global num_bounces + page_num = form_params.starting_page_number or 1 + if num_bounces > 0 and page_num == 3: + num_bounces -= 1 + time.sleep(10) + raise HTTPException(status_code=502, detail="BOUNCE") + # -- must have a valid API key -- if api_key_env := os.environ.get("UNSTRUCTURED_API_KEY"): api_key = request.headers.get("unstructured-api-key") ``` Use the main branch to make a client request using layout-parser-paper, which should default to 2 page splits, with the second split starting at page 3. Because the httpx timeout is not specified, this split will raise a ReadTimeout error, and get stuck in a retry loop. This is not logged, so it will appear to be a client hang. ``` from unstructured_client import UnstructuredClient from unstructured_client.models import shared, operations filename = "_sample_docs/layout-parser-paper.pdf" s = UnstructuredClient( server_url="http://localhost:5000", ) with open(filename, "rb") as f: files=shared.Files( content=f.read(), file_name=filename, ) req = operations.PartitionRequest( shared.PartitionParameters( files=files, strategy="fast", split_pdf_page=True, ), ) resp = s.general.partition(req) if num_elements := len(resp.elements): print(f"Succeeded with {num_elements}") ``` Finally, make the same request against this branch. The bad split should be retried for the allotted 2 times, with logging, and then the doc will succeed.
1 parent 974f74b commit 6b81735

File tree

12 files changed

+170
-165
lines changed

12 files changed

+170
-165
lines changed

Diff for: _test_unstructured_client/integration/test_decorators.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -291,19 +291,27 @@ async def test_split_pdf_requests_do_retry(monkeypatch):
291291
"""
292292
Test that when we split a pdf, the split requests will honor retryable errors.
293293
"""
294+
mock_endpoint_called = False
294295
number_of_split_502s = 2
295296
number_of_last_page_502s = 2
296297

297-
async def mock_send(_, request):
298+
async def mock_send(_, request: httpx.Request):
298299
"""
299300
Return a predefined number of 502s for requests with certain starting_page_number values.
300301
301-
This is because N-1 splits are sent off in the hook logic. These need explicit retry handling.
302-
The final split is returned to the SDK and gets the built in retry code.
302+
This is to make sure specific portions of the doc are retried properly.
303303
304304
We want to make sure both code paths are retried.
305305
"""
306+
# Assert that the SDK issues our no-op request
307+
# returned by the BeforeRequestHook
308+
nonlocal mock_endpoint_called
309+
if request.url.host == "no-op":
310+
mock_endpoint_called = True
311+
return Response(200, request=request)
312+
306313
request_body = request.read()
314+
307315
decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type"))
308316
form_data = form_utils.parse_form_data(decoded_body)
309317

@@ -360,4 +368,6 @@ async def mock_send(_, request):
360368

361369
assert number_of_split_502s == 0
362370
assert number_of_last_page_502s == 0
371+
assert mock_endpoint_called
372+
363373
assert res.status_code == 200

Diff for: _test_unstructured_client/integration/test_integration_freemium.py

+23
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,29 @@ def test_partition_handling_server_error(error, split_pdf, monkeypatch, doc_path
101101
response = client.general.partition(request=req)
102102

103103

104+
@pytest.mark.asyncio
105+
async def test_partition_async_returns_elements(client, doc_path):
106+
filename = "layout-parser-paper.pdf"
107+
with open(doc_path / filename, "rb") as f:
108+
files = shared.Files(
109+
content=f.read(),
110+
file_name=filename,
111+
)
112+
113+
req = operations.PartitionRequest(
114+
partition_parameters=shared.PartitionParameters(
115+
files=files,
116+
strategy="fast",
117+
languages=["eng"],
118+
split_pdf_page=True,
119+
)
120+
)
121+
122+
response = await client.general.partition_async(request=req)
123+
assert response.status_code == 200
124+
assert len(response.elements)
125+
126+
104127
def test_uvloop_partitions_without_errors(client, doc_path):
105128
async def call_api():
106129
filename = "layout-parser-paper-fast.pdf"

Diff for: _test_unstructured_client/unit/test_split_pdf_hook.py

+2-22
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from collections import Counter
66
from typing import Coroutine
77

8+
import httpx
89
import pytest
910
import requests
1011
from requests_toolbelt import MultipartDecoder, MultipartEncoder
@@ -27,18 +28,6 @@
2728
from unstructured_client.models import shared
2829

2930

30-
def test_unit_sdk_init():
31-
"""Test sdk init method properly sets the client."""
32-
hook = SplitPdfHook()
33-
# This is a fake URL, test doesn't make an API call
34-
test_url = "http://localhost:5000"
35-
test_client = requests.Session()
36-
37-
hook.sdk_init(test_url, test_client)
38-
39-
assert hook.client == test_client
40-
41-
4231
def test_unit_clear_operation():
4332
"""Test clear operation method properly clears request/response data."""
4433
hook = SplitPdfHook()
@@ -105,21 +94,12 @@ def test_unit_prepare_request_headers():
10594
def test_unit_create_response():
10695
"""Test create response method properly overrides body elements and Content-Length header."""
10796
test_elements = [{"key": "value"}, {"key_2": "value"}]
108-
test_response = requests.Response()
109-
test_response.status_code = 200
110-
test_response._content = b'[{"key_2": "value"}]'
111-
test_response.headers = requests.structures.CaseInsensitiveDict(
112-
{
113-
"Content-Type": "application/json",
114-
"Content-Length": len(test_response._content),
115-
}
116-
)
11797

11898
expected_status_code = 200
11999
expected_content = b'[{"key": "value"}, {"key_2": "value"}]'
120100
expected_content_length = "38"
121101

122-
response = request_utils.create_response(test_response, test_elements)
102+
response = request_utils.create_response(test_elements)
123103

124104
assert response.status_code, expected_status_code
125105
assert response._content, expected_content

Diff for: src/unstructured_client/_hooks/custom/clean_server_url_hook.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from urllib.parse import ParseResult, urlparse, urlunparse
33

44
from unstructured_client._hooks.types import SDKInitHook
5-
from unstructured_client.httpclient import HttpClient
5+
from unstructured_client.httpclient import HttpClient, AsyncHttpClient
66

77

88
class CleanServerUrlSDKInitHook(SDKInitHook):
@@ -25,9 +25,9 @@ def clean_server_url(self, base_url) -> str:
2525
return urlunparse(parsed_url._replace(path=""))
2626

2727
def sdk_init(
28-
self, base_url: str, client: HttpClient
29-
) -> Tuple[str, HttpClient]:
28+
self, base_url: str, client: HttpClient, async_client: AsyncHttpClient
29+
) -> Tuple[str, HttpClient, AsyncHttpClient]:
3030
"""Concrete implementation for SDKInitHook."""
3131
cleaned_url = self.clean_server_url(base_url)
3232

33-
return cleaned_url, client
33+
return cleaned_url, client, async_client

Diff for: src/unstructured_client/_hooks/custom/logger_hook.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
SDKInitHook,
1212
AfterSuccessHook,
1313
)
14-
from unstructured_client.httpclient import HttpClient
14+
from unstructured_client.httpclient import HttpClient, AsyncHttpClient
1515
from collections import defaultdict
1616

1717
logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)
@@ -46,17 +46,20 @@ def log_retries(self, response: Optional[httpx.Response], error: Optional[Excep
4646

4747

4848
def sdk_init(
49-
self, base_url: str, client: HttpClient
50-
) -> Tuple[str, HttpClient]:
49+
self, base_url: str, client: HttpClient, async_client: AsyncHttpClient
50+
) -> Tuple[str, HttpClient, AsyncHttpClient]:
5151
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
52-
return base_url, client
52+
return base_url, client, async_client
5353

5454
def after_success(
5555
self, hook_ctx: AfterSuccessContext, response: httpx.Response
5656
) -> Union[httpx.Response, Exception]:
5757
self.retries_counter.pop(hook_ctx.operation_id, None)
58-
# NOTE: In case of split page partition this means - at least one of the splits was partitioned successfully
59-
logger.info("Successfully partitioned the document.")
58+
# Note(austin) - pdf splitting returns a mock request
59+
# so we always reach the AfterSuccessHook
60+
# This doesn't mean the splits succeeded
61+
# Need to revisit our logging strategy
62+
# logger.info("Successfully partitioned the document.")
6063
return response
6164

6265
def after_error(

Diff for: src/unstructured_client/_hooks/custom/request_utils.py

+18-38
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,16 @@ async def call_api_async(
6969
headers={**original_headers, "Content-Type": body.content_type},
7070
)
7171

72-
async with limiter:
73-
response = await send_request_async_with_retries(client, new_request)
74-
return response
72+
one_second = 1000
73+
one_minute = 1000 * 60
7574

76-
77-
async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request):
78-
# Hardcode the retry config until we can
79-
# properly reuse the SDK logic
80-
# (Values are in ms)
8175
retry_config = utils.RetryConfig(
8276
"backoff",
8377
utils.BackoffStrategy(
84-
initial_interval=3000, # 3 seconds
85-
max_interval=1000 * 60 * 12, # 12 minutes
86-
exponent=1.88,
87-
max_elapsed_time=1000 * 60 * 30 # 30 minutes
78+
initial_interval = one_second * 3,
79+
max_interval = one_minute * 12,
80+
max_elapsed_time = one_minute * 30,
81+
exponent = 1.88,
8882
),
8983
retry_connection_errors=True
9084
)
@@ -96,14 +90,14 @@ async def send_request_async_with_retries(client: httpx.AsyncClient, request: ht
9690
]
9791

9892
async def do_request():
99-
return await client.send(request)
93+
return await client.send(new_request)
10094

101-
response = await utils.retry_async(
102-
do_request,
103-
utils.Retries(retry_config, retryable_codes)
104-
)
105-
106-
return response
95+
async with limiter:
96+
response = await utils.retry_async(
97+
do_request,
98+
utils.Retries(retry_config, retryable_codes)
99+
)
100+
return response
107101

108102

109103
def prepare_request_headers(
@@ -145,34 +139,20 @@ def prepare_request_payload(form_data: FormData) -> FormData:
145139
return payload
146140

147141

148-
def create_response(response: httpx.Response, elements: list) -> httpx.Response:
142+
def create_response(elements: list) -> httpx.Response:
149143
"""
150144
Creates a modified response object with updated content.
151145
152146
Args:
153-
response: The original response object.
154147
elements: The list of elements to be serialized and added to
155148
the response.
156149
157150
Returns:
158151
The modified response object with updated content.
159152
"""
160-
response_copy = copy.deepcopy(response)
153+
response = httpx.Response(status_code=200, headers={"Content-Type": "application/json"})
161154
content = json.dumps(elements).encode()
162155
content_length = str(len(content))
163-
response_copy.headers.update({"Content-Length": content_length})
164-
setattr(response_copy, "_content", content)
165-
return response_copy
166-
167-
168-
def log_after_split_response(status_code: int, split_number: int):
169-
if status_code == 200:
170-
logger.info(
171-
"Successfully partitioned set #%d, elements added to the final result.",
172-
split_number,
173-
)
174-
else:
175-
logger.warning(
176-
"Failed to partition set #%d, its elements will be omitted in the final result.",
177-
split_number,
178-
)
156+
response.headers.update({"Content-Length": content_length})
157+
setattr(response, "_content", content)
158+
return response

0 commit comments

Comments
 (0)