Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2e43e14
[Cosmos Spark] Surface empty change feed pages to avoid end-to-end ti…
tvaron3 May 26, 2026
8616057
Add PR link to CHANGELOG entries
tvaron3 May 26, 2026
08f690b
Restore noChanges short-circuit in isFullyDrained when emptyPagesAllo…
tvaron3 May 26, 2026
48a9e99
Address pass-3 review: defense-in-depth on NO_RETRY + DRY cleanup
tvaron3 May 27, 2026
0304dfe
Address pass-4 review: lock contract on data-page non-termination
tvaron3 May 27, 2026
34d3da4
Shrink bridge accessor surface; reclassify CHANGELOG entry
tvaron3 May 27, 2026
255bad9
Address pass-5 review (Kushagra + Annie): drift hazard, test coverage…
tvaron3 May 28, 2026
5683b08
De-flake CosmosContainerChangeFeedTest.asyncChangeFeedPrefetching
tvaron3 May 28, 2026
635d86d
change
xinlian12 May 28, 2026
0fd65d3
add one more change
xinlian12 May 28, 2026
277db10
Fix asyncChangeFeedPrefetching FULL_FIDELITY: insert AFTER subscribe
tvaron3 May 29, 2026
2e62c35
Merge pull request #1 from xinlian12/tvaron3/spark-allow-empty-pages
tvaron3 May 29, 2026
7fa6a1a
Address @xinlian12 review: rename changefeed flag, fix broken test, a…
tvaron3 May 29, 2026
c7478c9
Stabilize asyncChangeFeedPrefetching on Windows EmulatorTcp
tvaron3 May 29, 2026
578ddc8
Revert asyncChangeFeedPrefetching to original Thread.sleep shape + ad…
tvaron3 May 29, 2026
098f378
Remove stale TODO-style comment on surfaceOrRetryNoChangesPage
tvaron3 May 29, 2026
87a90bb
Merge branch 'main' into tvaron3/spark-allow-empty-pages
tvaron3 May 29, 2026
e8804d2
Merge remote-tracking branch 'upstream/main' into tvaron3/spark-allow…
tvaron3 Jun 2, 2026
3985e2f
Revert change-feed half; keep query-path empty-pages fix and CF optio…
tvaron3 Jun 4, 2026
92c1e94
Merge customOptions in inheritNonContinuationFieldsFrom to preserve t…
tvaron3 Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
Comment thread
tvaron3 marked this conversation as resolved.
Outdated

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ private case class ChangeFeedPartitionReader
.setEndLSN(options, this.partition.endLsn.get)
}

// Bubble empty pages up to the iterator so the per-page end-to-end timeout
// applies to each individual page rather than being exceeded by serial
// empty-page drains inside ChangeFeedFetcher.
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor
Comment thread
tvaron3 marked this conversation as resolved.
Outdated
.setAllowEmptyPages(options, true)

options.setCustomItemSerializer(itemDeserializer)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ private case class ItemsPartitionReader
.getCosmosQueryRequestOptionsAccessor
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())

// Bubble empty pages up to the iterator so the per-page end-to-end timeout
// applies to each individual page rather than being exceeded by serial
// empty-page drains inside ParallelDocumentQueryExecutionContext.
ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
.setAllowEmptyPages(queryOptions, true)

private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
ThroughputControlHelper.populateThroughputControlGroupName(
ImplementationBridgeHelpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,68 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
factoryCallCount.get shouldEqual 1
}

"TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in {
// Regression test for the empty-page drain scenario: when the SDK is configured with
// emptyPagesAllowed=true the iterator must surface many consecutive empty
// pages without busy-waiting beyond the per-page end-to-end timeout. Even
// with hundreds of empty pages followed by real data, the iterator should
// return all real rows.
val emptyLeadingPages = 200
val realPages = 5
val totalPages = emptyLeadingPages + realPages
val iterator = new TransientIOErrorsRetryingIterator(
continuationToken => generateMockedCosmosPagedFluxWithEmptyPrefix(
continuationToken, totalPages, emptyLeadingPages),
pageSize,
1,
None,
None
)
iterator.maxRetryIntervalInMs = 5

// 2 producers (Left/Right) each emit realPages * pageSize rows
iterator.count(_ => true) shouldEqual (realPages * pageSize * 2)
}

private def generateMockedCosmosPagedFluxWithEmptyPrefix
(
continuationToken: String,
initialPageCount: Int,
leadingEmptyPageCount: Int
) = {

val leftProducer = generateFeedResponseFluxWithEmptyPrefix(
"Left", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
val rightProducer = generateFeedResponseFluxWithEmptyPrefix(
"Right", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
val toBeMerged = Array(leftProducer, rightProducer).toIterable.asJava
val mergedFlux = Flux.mergeSequential(toBeMerged, 1, 2)
UtilBridgeInternal.createCosmosPagedFlux(_ => mergedFlux)
}

private def generateFeedResponseFluxWithEmptyPrefix
(
prefix: String,
pageCount: Int,
leadingEmptyPageCount: Int,
requestContinuationToken: Option[String]
): Flux[FeedResponse[SparkRowItem]] = {

// generateFeedResponse uses documentStartIndex=-1 as the "emit an empty page" sentinel.
val emptyPageSentinel = -1
val firstDataPageStartIndex = 1

val responses = Array.range(1, pageCount + 1)
.map(i => generateFeedResponse(
prefix,
i,
if (i <= leadingEmptyPageCount) emptyPageSentinel else firstDataPageStartIndex))
.filter(response => requestContinuationToken.isEmpty ||
requestContinuationToken.get < response.getContinuationToken)

Flux.fromArray(responses)
}

private val objectMapper = new ObjectMapper

@throws[JsonProcessingException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -334,6 +336,14 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro

@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT)
public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception {
// De-flaked: previously this test relied on `.subscribe()` + `Thread.sleep(3000)` for
// both subscriptions, which raced both with the continuation propagation between the
// two subscriptions AND with the page-arrival cadence on slower CI runners. Now the
// first subscription is awaited via a CountDownLatch on a known minimum page count
// before the second is started, and the final bounded `take(2, true)` block is awaited
// via `.blockLast(...)` rather than fire-and-forget.
final long awaitSeconds = 30L;

this.createContainer(
(cp) -> {
if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) {
Expand All @@ -354,11 +364,31 @@ public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exc
AtomicInteger count = new AtomicInteger(0);
insertDocuments(5, 20);
AtomicReference<String> continuation = new AtomicReference<>("");
createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> {

// First subscription: drain at least 3 pages deterministically before proceeding.
final int firstMinPages = 3;
CountDownLatch firstLatch = new CountDownLatch(firstMinPages);
AtomicReference<Throwable> firstError = new AtomicReference<>();
reactor.core.Disposable firstSub = createdContainer.asyncContainer
.queryChangeFeed(options, ObjectNode.class)
.handle((r) -> {
count.incrementAndGet();
continuation.set(r.getContinuationToken());
}
).byPage().subscribe();
firstLatch.countDown();
})
.byPage()
.subscribe(r -> { /* page consumed by handle() */ }, firstError::set);
try {
assertThat(firstLatch.await(awaitSeconds, TimeUnit.SECONDS))
.as("first change-feed subscription should produce at least %d pages within %d seconds",
firstMinPages, awaitSeconds)
.isTrue();
assertThat(firstError.get()).as("first subscription must not error").isNull();
// intent of the original assertion: prefetch surfaces more than 2 pages
assertThat(count.get()).isGreaterThan(2);
} finally {
firstSub.dispose();
}

CosmosChangeFeedRequestOptions optionsFF = null;
if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
Expand All @@ -367,25 +397,46 @@ public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exc
optionsFF = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(continuation.get())
.setMaxItemCount(10).allVersionsAndDeletes();
createdContainer.asyncContainer.queryChangeFeed(optionsFF, ObjectNode.class).handle((r) -> {
count.incrementAndGet();
continuation.set(r.getContinuationToken());

final int secondMinPages = 3;
CountDownLatch secondLatch = new CountDownLatch(secondMinPages);
AtomicReference<Throwable> secondError = new AtomicReference<>();
reactor.core.Disposable secondSub = createdContainer.asyncContainer
.queryChangeFeed(optionsFF, ObjectNode.class)
.handle((r) -> {
count.incrementAndGet();
continuation.set(r.getContinuationToken());
secondLatch.countDown();
})
.byPage()
.subscribe(r -> { /* page consumed by handle() */ }, secondError::set);
try {
assertThat(secondLatch.await(awaitSeconds, TimeUnit.SECONDS))
.as("FULL_FIDELITY resume-from-continuation subscription should produce at least %d pages within %d seconds",
secondMinPages, awaitSeconds)
.isTrue();
assertThat(secondError.get()).as("second subscription must not error").isNull();
assertThat(count.get()).isGreaterThan(2);
} finally {
secondSub.dispose();
}
).byPage().subscribe();
}
Thread.sleep(3000);
assertThat(count.get()).isGreaterThan(2);

if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
// full fidelity is only from now so need to insert more documents
insertDocuments(5, 20);
}
count.set(0);
// should only get two pages
createdContainer.asyncContainer.queryChangeFeed(changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)? optionsFF
: options, ObjectNode.class).handle((r) -> count.incrementAndGet())
.byPage().take(2, true).subscribe();
Thread.sleep(3000);
// should only get two pages — `take(2, true)` bounds the upstream request, so the
// pipeline completes naturally after 2 pages. Use blockLast() instead of fire-and-forget
// to wait for that completion deterministically.
createdContainer.asyncContainer
.queryChangeFeed(changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY) ? optionsFF : options,
ObjectNode.class)
.handle((r) -> count.incrementAndGet())
.byPage()
.take(2, true)
.blockLast(Duration.ofSeconds(awaitSeconds));
assertThat(count.get()).isEqualTo(2);
}

Expand Down Expand Up @@ -1097,6 +1148,78 @@ public void changeFeedQueryCompleteAfterAvailableNow(
}
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT * 5)
public void changeFeedQuery_emptyPagesAllowed_surfacesNoChangesPagesAndTerminates() {
// End-to-end guard for IcM 51000001033272: when the SDK is opted into emptyPagesAllowed=true
// (via the friend-API bridge accessor — the same opt-in the Cosmos Spark connector uses),
// change-feed reads against a multi-partition container must:
// (a) surface 304/noChanges pages individually to the caller, AND
// (b) terminate via the FeedRangeCompositeContinuationImpl >4*(size+1) consecutive-304
// defense rather than poll indefinitely.
//
// This is the integration-level pin for the contract that ChangeFeedFetcher.nextPageInternal
// branch 2 explicitly calls disableShouldFetchMore() on NO_RETRY noChanges. Without that
// arm, a caller that drained the flux to completion would hang.
String testContainerId = UUID.randomUUID().toString();
try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
// throughput high enough to provision multiple physical partitions
11000);

// Sparse workload: a few docs spread across partitions; most physical partitions
// will return 304 / noChanges on read, exercising the empty-page surfacing path.
insertDocuments(/* partitionCount */ 3, /* documentCount */ 2, testContainer);

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.setAllowEmptyPages(options, true);

AtomicInteger totalPagesObserved = new AtomicInteger(0);
AtomicInteger totalDocsObserved = new AtomicInteger(0);
AtomicInteger emptyPagesObserved = new AtomicInteger(0);

// Drain a bounded slice of the change feed - the iteration must terminate within
// a reasonable page count via the SDK's consecutive-304 defense.
testContainer.queryChangeFeed(options, JsonNode.class)
.byPage(1)
.take(100)
.doOnNext(response -> {
totalPagesObserved.incrementAndGet();
int pageSize = response.getResults().size();
totalDocsObserved.addAndGet(pageSize);
if (pageSize == 0) {
emptyPagesObserved.incrementAndGet();
}
})
.blockLast();

// (a) at least some empty pages must have surfaced - the whole point of the opt-in
assertThat(emptyPagesObserved.get())
.describedAs("emptyPagesAllowed=true must surface 304/noChanges pages individually")
.isGreaterThan(0);
// (b) all inserted docs must be observed - empty-page surfacing must not interfere
// with data-page emission
assertThat(totalDocsObserved.get())
.describedAs("all inserted documents must surface")
.isEqualTo(6);
// (c) iteration must have terminated (we didn't hit the take(100) cap, otherwise
// we'd be polling indefinitely - that's the regression the defense-in-depth arm prevents)
assertThat(totalPagesObserved.get())
.describedAs("iteration must terminate via consecutive-304 defense, not hit the take(100) cap")
.isLessThan(100);
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
}
}

void insertDocuments(
int partitionCount,
int documentCount) {
Expand Down
Loading
Loading