Skip to content

Commit aff5f7a

Browse files
tvaron3Copilot
andauthored
[Cosmos] Fix /pkranges change-feed drain loop (#47245)
* [Cosmos] Honor max_item_count for query_items(feed_range=...) When a user-supplied feed_range overlaps K physical partition key ranges (for example, after a server-side split), __QueryFeed issues one POST per overlapping range and merges the partial results. Each inner POST honors x-ms-max-item-count = N, but the merge loop accumulated all K pages with no global cap, returning up to K * N documents to the caller instead of the requested N. Truncate the merged Documents list to options['maxItemCount'] before returning. Apply the fix to both the sync and async client connections. Trade-off (intentional, deferred): the items past index N that we discard will be re-fetched on the next page, because the continuation token we surface is only the K-th inner range's x-ms-continuation. A composite continuation token spanning all K inner PK ranges is the correct long-term fix and is tracked separately as a follow-up: '[Cosmos] feed_range query continuation token replays documents from non-cursor PK ranges'. Adds mock-based unit tests (sync and async) that build a bare CosmosClientConnection, mock the routing-map provider to return three overlapping PK ranges and __Post to return five documents per range, then assert that a single page is capped at max_item_count = 5 (not 15). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Suppress continuation token when feed_range page is truncated Address Copilot review on PR #46469: truncating the merged page while surfacing the last inner range's x-ms-continuation can cause silent data loss on resume (the token has advanced past truncated documents from earlier ranges). Until a composite continuation token is implemented, strip the continuation header on truncation so the truncated page is observed as terminal rather than producing wrong results on subsequent pages. - _cosmos_client_connection.py: pop Continuation header on truncation - aio/_cosmos_client_connection_async.py: mirror on self.last_response_headers - CHANGELOG: document the safety mitigation - tests: assert continuation is suppressed on truncation, preserved otherwise Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * [Cosmos] Fix /pkranges drain loop for containers with >8K PK ranges The async PartitionKeyRangeCache._fetch_routing_map performed a single 'A-IM: Incremental feed' /pkranges request and then validated the returned set. The service caps each change-feed page at ~8K ranges and returns an advancing Etag (no x-ms-continuation), so for containers with more PK ranges (e.g. 16K+ on PROD large-scale accounts) validation silently fails: process_fetched_ranges() returns None for the initial load and callers then hot-loop the same 8K-range fetch indefinitely. Mirror the .NET and Go SDK behaviour by wrapping the single fetch in a bounded etag-driven drain loop. On each drain page we set If-None-Match to the previously returned Etag and keep accumulating ranges until the service responds with HTTP 304, an empty page, or an unchanged Etag. A 100-page safety bound covers ~800K ranges, well beyond any realistic container size. Validated against ffcf-large-container-2 (16,384 PK ranges, 163.8M RU/s). Before: 0 queries fired, "Full load of routing map failed" spammed in a tight loop. After: read_feed_ranges() returns the full set and feedrange-scoped queries fan out across the entire key space. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Port pkranges drain fix to sync path, add safety-bound 503, add pagination tests - Mirror async drain-loop fix in sync routing_map_provider so /pkranges change-feed paginates correctly when the service returns multiple pages per refresh (sync path was previously susceptible to the same incomplete routing map seen in async). - Reviewer #3: when the drain hits the 100-page safety bound, raise 503 (CosmosHttpResponseError) so the upstream retry policy re-attempts instead of caching a structurally-valid-but-incomplete routing map. - Reviewer #4: when the service returns ranges but the ETag does not advance, log a loud warning and terminate the drain to avoid an infinite loop on a change-feed protocol anomaly. - Track seen_any_etag during the drain so process_fetched_ranges still surfaces the existing 'no ETag' observability warning when the service never returns an ETag header. - Replace the obsolete max-item-count truncation tests (the truncation behavior they covered no longer exists post-pagination) with 12 mocked pagination integration tests (6 sync + 6 async) covering: INM advancement across pages, termination on 304, termination on missing etag, termination on empty page, etag-didn't-advance warning, and safety-bound 503. - Update existing routing-map unit tests with INM-aware mocks so they exercise the new drain semantics (server returning an empty page on a matching If-None-Match). - CHANGELOG: cover sync+async paths and call out the 503 safety bound and etag-didn't-advance warning. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Cosmos: align A-IM header with peer SDKs + pagination integration tests - http_constants.IncrementalFeedHeaderValue: 'Incremental feed' -> 'Incremental Feed' to match Java HttpConstants.A_IMHeaderValues.INCREMENTAL_FEED and Go cosmosHeaderValuesChangeFeed wire values. HTTP A-IM tokens are case-insensitive per RFC 3229, so service-side parsing is unaffected. - Add real-account integration tests (sync + async) that exercise the /pkranges drain loop with PAGE_SIZE_CHANGE_FEED forced to 1, asserting the paginated routing map matches the single-page baseline exactly and that drain pagination actually fires (call_count > 1). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Cosmos: bump to 4.16.1 with pkranges hotfix; loosen timeout test bound - Bump azure-cosmos to 4.16.1 and add 4.16.1 (Unreleased) section in CHANGELOG.md for the /pkranges drain-loop fix (PR #47245). - Loosen the upper bound of test_timeout_for_read_items[_async] from '< 7' to '< 12' to absorb the extra cold-cache /pkranges round trip (200+ETag followed by a 304 confirmation) introduced by the drain-loop change. CosmosClientTimeoutError is still raised; the lower bound (> 5) is unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Cosmos: shorten 4.16.1 pkranges changelog entry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Match peer SDKs: terminate /pkranges drain on literal HTTP 304 Pivots drain-loop termination from the 'empty page' proxy to a literal status_code == 304 match, mirroring Java/.NET/Go peer SDKs more closely. - Wire status capture through _synchronized_request and aio counterpart via a per-call _internal_response_status_capture sidecar list. - evaluate_drain_page now checks 304 first; empty-page and stuck-etag branches remain as fallbacks for legacy / non-status-aware callers. - Update all routing-map unit test mocks to phase-stable etags so each logical drain produces N data pages + 1 terminating 304 wire call. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Restore defensive drain fallbacks for status-blind callers; add gap-coverage tests - Restore is_empty_page + no-etag-advance fallbacks in evaluate_drain_page for callers that don't wire status capture (test doubles, legacy mocks). Literal-304 remains the primary peer-SDK termination signal. - Add gap-coverage tests for: split-then-overlap fallback, parents-not-found fallback, cascading splits, per-collection lock serialization, no-ETag preservation, initial-load multi-page drain, and async mirrors. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Log warning when drain falls back to status-blind termination The status_code=None branch in evaluate_drain_page is a defensive fallback for legacy callers and test doubles that cannot wire the HTTP status sidecar. Production callers (sync + async routing-map providers) always provide status_code, so this branch should never fire in real traffic. Emit a WARNING on both sub-cases (empty page, stalled etag) so the condition is observable in production logs if it ever fires outside of test contexts -- the warning includes etag/if_none_match/seen_any_etag for triage. Pin the behavior with four new unit tests (sync + async mirror for each sub-case) that assert both the STOP_DRAINED decision and the warning emission, so a future refactor cannot silently drop either signal. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove status-blind drain fallback; tighten status_code contract The status_code=None defensive branch in evaluate_drain_page was dead code in production: _synchronized_request and _asynchronous_request always populate status_capture[0] before any return (line 189 / 153), including before raise. Matching Java/.NET v3/Go, the sole termination signal is now literal HTTP 304 Not Modified. Tighten the contract: make status_code a required int, drop the unused is_empty_page parameter, remove both status-blind warning branches the previous commit added, and delete the 4 unit tests that pinned the now- removed fallback. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test(cosmos): relax pkranges drain integration assertion and add AAD split markers - Drop per-partition page-count assertion in drain integration tests: the /pkranges gateway endpoint may ignore x-ms-max-item-count for small range counts on some builds, so per-page granularity is a server concern, not a drain-loop invariant. Keep n>1 (single-shot drain regression guard), map equality, and complete-cover invariants. Strict page-size pagination remains covered by mocked unit tests in test_pk_range_drain.py. - Add @pytest.mark.cosmosAADSplit to test_post_split_resume (sync+async) in test_query_feed_range_multipartition[_async].py. - Spell-check fix in test_pk_range_drain.py. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): address review feedback and wire status sidecar in split test mocks - _routing_map_provider_common: add fail-loud RuntimeError guard when status_code is None in evaluate_drain_page (callers must wire the _internal_response_status_capture sidecar); add ROUTING_MAP_SNAPSHOT_INCONSISTENT sub_status on the 503 raise. - http_constants: add SubStatusCodes.ROUTING_MAP_SNAPSHOT_INCONSISTENT (21015). - routing_map_provider (sync + async): hoist prepare_fetch_options_and_headers out of the per-page drain loop. - test_pk_range_drain (sync + async): add caller-headers-not-mutated regression test. - test_pk_range_drain_integration (sync + async): relax assertion to >= baseline_pairs and clarify docstring. - test_partition_split_query (sync + async): populate _internal_response_status_capture[0] = NOT_MODIFIED in mock_read_ranges so the strict 304 termination contract trips deterministically and the drain loop terminates after one page (mirrors production wire-up). Without this, the mock caused unbounded drain growth and CI OOM/timeout on all Ubuntu-split and Windows-emulator jobs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test(cosmos): remove gateway-incompatible drain integration tests; complete status sidecar wiring - Delete test_pk_range_drain_integration{,_async}.py - gateway ignores page-size on /pkranges so the small-page drain scenario cannot be reproduced live; mocked unit tests in test_pk_range_drain{,_async}.py provide adequate coverage. - Wire _internal_response_status_capture[0] = NOT_MODIFIED into the second mock_read_ranges in test_partition_split_query{,_async}.py to match b46fbec's fix on the first mock; without it that mock would also cause unbounded drain growth. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): widen evaluate_drain_page status_code to Optional[int] to match sidecar typing The /pkranges drain loop reads the response status from a List[Optional[int]] sidecar (first slot is None until populated by _synchronized_request / _asynchronous_request). Mypy correctly flagged the call site as passing int | None into a parameter typed as int. The function already has a runtime None guard that raises RuntimeError for the sidecar-not-wired programming error, so widening the signature lines the type system up with the existing runtime contract without changing behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test(cosmos): make routing-map drain-loop unit-test mocks compatible with strict status_code contract Adds a module-level tolerant shim around evaluate_drain_page in both sync and async unit-test files. The shim defaults status_code=None to 304 (Not Modified) so the drain terminates after the first page when the _internal_response_status_capture sidecar isn't wired by the mock. Patches all three module bindings (common, sync provider, async provider) for order-independence. Production code is unchanged; the strict contract remains enforced for real callers via _Request which always populates the sidecar. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * refactor(cosmos): address PR review feedback from @simorenoh - Collapse explicit async-for loop into list comprehension in the /pkranges drain loop (aio routing_map_provider) per review. - Extract repeated empty async generator into a module-level _empty_async_gen() helper in the async unit-test file (6 call sites). No behavior change. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test: pin stale-etag cleanup contract instead of brittle call-count The IfNoneMatch-cleanup tests were asserting exactly 3 calls to _ReadPartitionKeyRanges, which was wrong under the new drain-loop contract introduced by this PR. Under the new contract the full-load fallback drain runs until it receives the literal 304 terminator (peer-SDK parity with .NET v3, Java, and Go). That means the fallback path is: page 1 -> ranges + ETag X (status 200) page 2 -> If-None-Match=X -> 304 -> STOP So the full fallback is 2 calls, not 1, and the total is 4, not 3. The tests' real intent is to pin that the *stale* etag from the previous routing map is not resurrected after fallback. Rewrite both assertions accordingly: - call 1, 2 must carry the stale etag (incremental + retry) - call 3 must drop IfNoneMatch entirely (the bug fix's whole point) - calls 4+ (post-fallback drain pages) may carry a *fresh* IfNoneMatch (the etag returned by call 3), but must never re-introduce the stale etag we already invalidated This makes the contract explicit and removes brittleness around the fallback drain's internal page count. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * chore: set 4.16.1 release date to 2026-05-31 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test: rename hdrs -> request_headers to satisfy cspell Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Move AVG breaking change in changelog; drop brittle post-fallback drain assertion - Move 'SELECT VALUE AVG(...) cross-partition raises ValueError' entry from 'Bugs Fixed' to 'Breaking Changes' with migration guidance (SUM(...) / COUNT(...) or partition_key= scoping). - Remove the post-call-3 'must not resurrect stale etag' loop from test_stale_etag_header_removed_on_full_refresh_fallback (sync) and test_if_none_match_header_cleanup_on_fallback_async (async). The fallback drain may legitimately reuse the etag returned by call 3 (the full-load response) as If-None-Match on subsequent drain pages, and that fresh etag can coincidentally equal the original stale etag when nothing changed server-side between caching and fallback. The production contract that matters - call 3 (the fallback) drops IfNoneMatch - is still pinned. Validated locally against a fresh Cosmos account (both tests pass). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent c8abdc1 commit aff5f7a

20 files changed

Lines changed: 2310 additions & 160 deletions

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
## Release History
22

3+
### 4.16.1 (2026-05-31)
4+
5+
#### Bugs Fixed
6+
* Fixed a bug in the sync and async `/pkranges` change-feed refresh where some containers could fail to build a complete routing map. See [PR 47245](https://github.com/Azure/azure-sdk-for-python/pull/47245).
7+
38
### 4.16.0 (2026-05-29)
49

510
#### Features Added
@@ -8,14 +13,14 @@
813

914
#### Breaking Changes
1015
* `CosmosItemPaged.get_response_headers()` and `CosmosAsyncItemPaged.get_response_headers()` now return a single `CaseInsensitiveDict` (the latest page) instead of `List[CaseInsensitiveDict]` (introduced in 4.16.0b1); `get_last_response_headers()` has been removed. This avoids unbounded memory growth on large queries. **Migration:** code that previously accessed `headers[i]['x-ms-request-charge']` should switch to `headers['x-ms-request-charge']` for the latest page, or pass `response_hook=` to the query method to receive per-page headers as they arrive. See [PR 47172](https://github.com/Azure/azure-sdk-for-python/pull/47172).
16+
* `SELECT VALUE AVG(...)` queries spanning multiple physical partitions now raise `ValueError` instead of returning a mathematically incorrect merged value from client-side aggregation. **Migration:** rewrite cross-partition `AVG` queries as `SUM(...) / COUNT(...)` (both of which merge correctly across partitions), or scope the query to a single partition via `partition_key=`. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105).
1117

1218
#### Bugs Fixed
1319
* Fixed bug where the `Content-Length` HTTP request header was computed from the character count of the request body instead of its UTF-8 byte count. See [PR 47008](https://github.com/Azure/azure-sdk-for-python/pull/47008)
1420
* Added an opt-in fallback for invalid UTF-8 in response bodies. Default behavior is unchanged (strict decode). Setting `AZURE_COSMOS_CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT` to `REPLACE` or `IGNORE` enables a permissive decode so reads, queries, and change-feed iteration can make progress past corrupt payloads. See [PR 47008](https://github.com/Azure/azure-sdk-for-python/pull/47008)
1521
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
1622
* Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937)
1723
* Fixed a bug in `query_items(feed_range=...)` where pagination could return incorrect results after a partition split caused the supplied feed range to overlap multiple physical partitions. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105)
18-
* Fixed bug where `SELECT VALUE AVG(...)` queries spanning multiple physical partitions returned mathematically incorrect merged values from client-side aggregation. These queries now raise `ValueError`. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105)
1924
* Fixed bug where a `ValueError("Ranges overlap")` or an `AssertionError("code bug: returned overlapping ranges ... is empty")` from the partition key range cache could escape to the caller when the `/pkranges` response contained a transiently inconsistent snapshot (overlap or gap). See [PR 47091](https://github.com/Azure/azure-sdk-for-python/pull/47091)
2025

2126
#### Other Changes

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def _handle_transient_snapshot_retry_decision(
171171
)
172172
raise CosmosHttpResponseError(
173173
status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE,
174+
sub_status=http_constants.SubStatusCodes.ROUTING_MAP_SNAPSHOT_INCONSISTENT,
174175
message=(
175176
"Routing-map fetch for collection '{}' returned overlapping "
176177
"or gapped ranges on {} attempt(s)."
@@ -295,6 +296,86 @@ def _resolve_endpoint(client: Any) -> str:
295296
return f"__unknown_{id(client)}__"
296297

297298

299+
300+
301+
# ---------------------------------------------------------------------------
302+
# /pkranges change-feed drain helpers (shared by sync + async providers)
303+
# ---------------------------------------------------------------------------
304+
#
305+
# These helpers hoist the *pure decision logic* of the routing-map change-feed
306+
# drain out of the sync and async providers so a future bug-fix lands in one
307+
# place. The providers still own the I/O-shaped parts that genuinely differ:
308+
# - sync uses ``ranges.extend(list(generator))``
309+
# - async uses ``async for item in generator: ...``
310+
# Everything else (per-page state transitions) lives here.
311+
312+
313+
class _DrainPageDecision:
314+
"""Outcome of evaluating a single /pkranges drain page."""
315+
316+
CONTINUE = "continue"
317+
STOP_DRAINED = "stop_drained"
318+
319+
320+
def evaluate_drain_page(
321+
*,
322+
page_new_etag: Optional[str],
323+
current_if_none_match: Optional[str],
324+
new_etag: Optional[str],
325+
seen_any_etag: bool,
326+
status_code: Optional[int],
327+
) -> Tuple[str, Optional[str], Optional[str], bool]:
328+
"""Decide whether to keep draining the /pkranges change feed.
329+
330+
Pure function: no I/O. The sole termination signal is literal HTTP
331+
``304 Not Modified`` (matching Java, .NET v3, and Go). ``status_code``
332+
is required: production callers wire it via the
333+
``_internal_response_status_capture`` sidecar populated by
334+
``_synchronized_request`` / ``_asynchronous_request`` before any
335+
return, so it is always a concrete int by the time we land here.
336+
There is intentionally no secondary safety net (e.g. a page cap)
337+
here -- peer SDKs (.NET v3, Java, Go) all rely solely on the 304
338+
termination predicate and we mirror that contract.
339+
340+
:keyword page_new_etag: ETag header from the current page response, if any.
341+
:paramtype page_new_etag: str or None
342+
:keyword current_if_none_match: The ``If-None-Match`` we sent for this page.
343+
:paramtype current_if_none_match: str or None
344+
:keyword new_etag: Running accumulator for the final etag to publish.
345+
:paramtype new_etag: str or None
346+
:keyword bool seen_any_etag: Whether the service has ever surfaced an ETag
347+
across the drain so far.
348+
:keyword status_code: HTTP status code of the page response. Required at runtime;
349+
``None`` indicates the response-status sidecar was not wired by the caller and
350+
raises ``RuntimeError``. Typed as ``Optional[int]`` so callers that read the
351+
status from a sidecar list typed as ``List[Optional[int]]`` (whose first slot
352+
is ``None`` until populated by ``_synchronized_request`` /
353+
``_asynchronous_request``) satisfy mypy without an extra cast.
354+
:paramtype status_code: int or None
355+
356+
:returns: ``(decision, new_etag, next_if_none_match, seen_any_etag)``.
357+
``next_if_none_match`` is only meaningful when ``decision == CONTINUE``.
358+
:rtype: tuple
359+
"""
360+
if status_code is None:
361+
raise RuntimeError(
362+
"evaluate_drain_page invoked with status_code=None. The /pkranges "
363+
"drain loop requires the _internal_response_status_capture sidecar "
364+
"to be wired by the caller; this indicates a programming error in "
365+
"the routing-map provider."
366+
)
367+
368+
if page_new_etag:
369+
seen_any_etag = True
370+
new_etag = page_new_etag
371+
372+
if status_code == http_constants.StatusCodes.NOT_MODIFIED:
373+
return (_DrainPageDecision.STOP_DRAINED, new_etag, current_if_none_match, seen_any_etag)
374+
375+
next_inm = page_new_etag if page_new_etag else current_if_none_match
376+
return (_DrainPageDecision.CONTINUE, new_etag, next_inm, seen_any_etag)
377+
378+
298379
class _IncrementalMergeFailed(Exception):
299380
"""Private exception type raised by :func:`process_fetched_ranges` when the
300381
incremental update cannot resolve all partition key ranges.

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
_OverlapDetected,
4242
_GapDetected,
4343
_handle_transient_snapshot_retry_decision,
44+
_DrainPageDecision,
45+
evaluate_drain_page,
4446
)
4547

4648

@@ -334,6 +336,7 @@ async def get_routing_map(
334336
return self._collection_routing_map_by_item.get(collection_id)
335337

336338

339+
# pylint: disable=too-many-statements,too-many-locals
337340
async def _fetch_routing_map(
338341
self,
339342
collection_link: str,
@@ -377,35 +380,84 @@ async def _fetch_routing_map(
377380
inconsistency_attempt_count = 0
378381

379382
while True:
380-
request_kwargs = dict(kwargs)
381-
response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
382-
request_kwargs['_internal_response_headers_capture'] = response_headers
383-
384-
# Prepare sanitised options and headers for the PK-range fetch.
383+
ranges: List[Dict[str, Any]] = []
384+
# Start the change-feed drain at the previous map's etag (if any).
385+
# On subsequent drain pages we advance this with the etag returned
386+
# for the previous page so the service returns "what's new since X"
387+
# until it eventually responds with 304 / no new ranges, mirroring
388+
# the .NET and Go SDK behaviour.
389+
current_if_none_match = (
390+
current_previous_map.change_feed_etag if current_previous_map else None
391+
)
392+
new_etag = current_if_none_match
393+
# Track whether the service ever surfaced an ETag header during this
394+
# drain attempt. If it never did, we want ``process_fetched_ranges``
395+
# to surface the "no ETag" observability warning rather than
396+
# silently treating ``current_if_none_match`` as the fresh etag.
397+
seen_any_etag = False
398+
399+
# Hoist: ``prepare_fetch_options_and_headers`` is loop-invariant
400+
# for this drain attempt -- ``change_feed_options`` depends only on
401+
# ``feed_options`` and the headers it builds depend only on
402+
# ``current_previous_map.change_feed_etag``, neither of which
403+
# change inside the inner drain loop. Compute them once here; the
404+
# only per-page mutation is the ``If-None-Match`` override below.
405+
base_kwargs_for_headers: Dict[str, Any] = dict(kwargs)
385406
change_feed_options = prepare_fetch_options_and_headers(
386-
current_previous_map, feed_options, request_kwargs
407+
current_previous_map, feed_options, base_kwargs_for_headers
387408
)
409+
base_headers: Dict[str, Any] = base_kwargs_for_headers['headers']
388410

389-
ranges: List[Dict[str, Any]] = []
390-
try:
391-
pk_range_generator = self._document_client._ReadPartitionKeyRanges(
392-
collection_link,
393-
change_feed_options,
394-
**request_kwargs
395-
)
396-
async for item in pk_range_generator:
397-
ranges.append(item)
398-
399-
except CosmosHttpResponseError as e:
400-
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug,do-not-log-raised-errors
401-
"Failed to read partition key ranges for collection '%s': %s", collection_link, e)
402-
raise
411+
while True:
412+
request_kwargs = dict(kwargs)
413+
# Shallow-copy ``base_headers`` so the per-iter
414+
# ``If-None-Match`` override does not bleed across iterations.
415+
request_kwargs['headers'] = dict(base_headers)
416+
response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
417+
request_kwargs['_internal_response_headers_capture'] = response_headers
418+
# Sidecar list -- populated by _Request with the raw wire
419+
# status. Lets us terminate on literal 304 (matching peer
420+
# SDKs) instead of inferring it from an empty page.
421+
status_capture: List[Optional[int]] = [None]
422+
request_kwargs['_internal_response_status_capture'] = status_capture
423+
424+
# Override If-None-Match with the running etag from the drain
425+
# so each page advances. ``prepare_fetch_options_and_headers``
426+
# only sets it from ``current_previous_map.change_feed_etag``
427+
# which never advances during this drain.
428+
drain_headers = request_kwargs['headers']
429+
if current_if_none_match:
430+
drain_headers[http_constants.HttpHeaders.IfNoneMatch] = current_if_none_match
431+
else:
432+
drain_headers.pop(http_constants.HttpHeaders.IfNoneMatch, None)
403433

404-
new_etag = response_headers.get(http_constants.HttpHeaders.ETag)
434+
try:
435+
pk_range_generator = self._document_client._ReadPartitionKeyRanges(
436+
collection_link,
437+
change_feed_options,
438+
**request_kwargs
439+
)
440+
ranges.extend([item async for item in pk_range_generator])
441+
except CosmosHttpResponseError as e:
442+
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug,do-not-log-raised-errors
443+
"Failed to read partition key ranges for collection '%s': %s",
444+
collection_link, e)
445+
raise
446+
447+
decision, new_etag, current_if_none_match, seen_any_etag = evaluate_drain_page(
448+
page_new_etag=response_headers.get(http_constants.HttpHeaders.ETag),
449+
current_if_none_match=current_if_none_match,
450+
new_etag=new_etag,
451+
seen_any_etag=seen_any_etag,
452+
status_code=status_capture[0],
453+
)
454+
if decision == _DrainPageDecision.STOP_DRAINED:
455+
break
405456

406457
try:
458+
effective_new_etag = new_etag if seen_any_etag else None
407459
return process_fetched_ranges(
408-
ranges, current_previous_map, collection_id, collection_link, new_etag
460+
ranges, current_previous_map, collection_id, collection_link, effective_new_etag
409461
)
410462
except _IncrementalMergeFailed:
411463
if current_previous_map is not None and incomplete_attempt_count < _INCOMPLETE_ROUTING_MAP_MAX_RETRIES:

0 commit comments

Comments
 (0)