Describe the bug
For a bounded Cosmos DB change feed read, the Spark connector can report successful end-of-stream even when the latest continuation token has not reached the partition's planned endLsn.
TransientIOErrorsRetryingIterator receives the planned endLsn, updates lastContinuationToken after every FeedResponse, and applies validateNextLsn to prevent returning records beyond the upper bound. However, when the page iterator reaches EOF, hasNextInternalCore returns Some(false) without comparing the final continuation LSN with endLsn.
The relevant behavior on the current main branch is:
if (hasNext) {
val feedResponse = feedResponseIterator.next()
// ...
lastContinuationToken.set(feedResponse.getContinuationToken)
// ...
} else {
Some(false)
}
Source:
https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala
This creates a correctness hole for Spark Structured Streaming:
driver plans a bounded partition read from LSN 10 through LSN 20
executor receives pages only through continuation LSN 15
the page publisher terminates without an exception
TransientIOErrorsRetryingIterator.hasNext returns false
Spark considers the input partition complete
Spark commits the micro-batch's planned end offset, LSN 20
the next micro-batch starts after LSN 20
records in the unread portion of the interval can no longer be recovered
This is particularly dangerous because it is silent. There is no exception, failed Spark task, retry, or indication in the checkpoint that the executor stopped below the driver's planned boundary.
We observed the corresponding failure shape in a long-running Structured Streaming workload:
- a micro-batch checkpoint advanced a physical feed range across a bounded LSN interval;
- source records whose positions were inside that interval were absent from the sink;
- replaying the same bounded interval later returned those records;
- downstream filtering and sink processing were not involved because the records were absent from the connector output itself.
That production observation does not prove why the original page sequence terminated early. The trigger could be in the Cosmos service response sequence, SDK paging, Reactor, networking, cancellation, or their interaction. The deterministic bug reported here is that the Spark connector accepts early EOF as successful completion and therefore cannot contain such a transient failure.
The same unchecked EOF behavior is present in:
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12:4.41.0, where the behavior was initially observed;
4.48.0, the latest stable release at the time of filing;
- current
main / 4.49.0-beta.1.
Exception or Stack Trace
No exception or stack trace is produced. That is the core of the issue: EOF below endLsn is treated as normal completion.
With the expected assertion in the reproduction below, ScalaTest reports that the expected exception was not thrown because iterator.hasNext returns false.
To Reproduce
-
Clone Azure/azure-sdk-for-java at current main.
-
Open:
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala
-
Add the test and helper shown below to TransientIOErrorsRetryingIteratorSpec.
-
Add these imports:
import com.azure.cosmos.implementation.OperationCancelledException
import java.util.Base64
-
Run:
mvn -pl sdk/cosmos/azure-cosmos-spark_3-5_2-12 -am \
-Pspark-e2e_3-5 \
-DwildcardSuites=com.azure.cosmos.spark.TransientIOErrorsRetryingIteratorSpec \
-DskipITs=true \
-Dcosmos.test.e2e.skip=true \
-DskipSpringITs=true \
-Dgpg.skip=true \
-Dmaven.javadoc.skip=true \
-Drevapi.skip=true \
-Dspotbugs.skip=true \
-Djacoco.skip=true \
test
-
Observe that the test fails because iterator.hasNext returns false; it does not throw or retry even though the final continuation is LSN 15 and the planned end is LSN 20.
Code Snippet
This test uses the existing generateFeedResponse helper in TransientIOErrorsRetryingIteratorSpec. It creates one empty response carrying a valid change feed continuation at LSN 15, followed by EOF, while the bounded reader expects to reach LSN 20.
"Bounded change feed reads" should
"not complete when the feed ends before the planned end LSN" in {
val endLsn = 20L
val lastReturnedLsn = 15L
val response = generateFeedResponse("ChangeFeed", 1, -1)
ModelBridgeInternal.setFeedResponseContinuationToken(
changeFeedContinuation(lastReturnedLsn),
response
)
val iterator = new TransientIOErrorsRetryingIterator(
_ => UtilBridgeInternal.createCosmosPagedFlux(
_ => Flux.fromArray(Array(response))
),
pageSize,
1,
None,
Some(endLsn)
)
iterator.maxRetryCount = 0
// Current behavior: hasNext returns false.
// Expected behavior: the bounded read must not report completion.
intercept[OperationCancelledException](iterator.hasNext)
}
private def changeFeedContinuation(lsn: Long): String = {
val state =
s"""{
| "V": 1,
| "Rid": "testContainer",
| "Mode": "INCREMENTAL",
| "StartFrom": {
| "Type": "BEGINNING"
| },
| "Continuation": {
| "V": 1,
| "Rid": "testContainer",
| "Continuation": [
| {
| "token": "$lsn",
| "range": {
| "min": "",
| "max": "FF"
| }
| }
| ],
| "Range": {
| "min": "",
| "max": "FF"
| }
| }
|}""".stripMargin
Base64.getEncoder.encodeToString(state.getBytes("UTF-8"))
}
The reproduction does not require a live Cosmos account because it targets the connector's completion contract directly. A real Cosmos account would be needed to reproduce the lower-level condition that causes a page publisher to terminate early, but that condition should not be able to make the bounded Spark reader commit an unconsumed interval.
Expected behavior
For every bounded change feed input partition, successful EOF should require proof that the latest continuation reached the planned endLsn.
Conceptually:
latest continuation LSN >= planned endLsn
-> report partition completion
latest continuation LSN < planned endLsn
-> retry from the latest continuation token, or fail the Spark task
For continuation state containing multiple ranges, completion should require every represented range to reach the boundary. Equivalently, the minimum continuation LSN should be at least endLsn.
If no page was returned, the reader also needs the partition's starting LSN:
startLsn == endLsn with no page is a valid empty bounded interval;
startLsn < endLsn with no page must not be accepted as complete.
A retryable exception such as OperationCancelledException would fit the connector's existing transient retry path. The retry can reopen the paged flux from lastContinuationToken. If retries are exhausted, the Spark task should fail so Spark does not commit the planned offset.
The existing validateNextLsn check should remain. It handles the opposite boundary: preventing records with _lsn > endLsn from being emitted. It does not prove that EOF occurred at or beyond endLsn.
Screenshots
Not applicable. This is a connector control-flow and checkpoint-correctness issue. The unit reproduction and source link are more precise than a screenshot.
Setup (please complete the following information):
- OS: Linux x86_64 in the affected distributed workload; the unit reproduction is OS-independent
- IDE: N/A; Maven/ScalaTest command-line build
- Library/Libraries:
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12:4.41.0 in the affected workload
- confirmed by source inspection in
4.48.0
- confirmed by source inspection on current
main / 4.49.0-beta.1
- Java version: Java 17; the affected connector versions also support Java 8 and Java 11
- App Server/Environment: Apache Spark 3.5 Structured Streaming, Scala 2.12, distributed executor environment
- Frameworks: Apache Spark Structured Streaming; no Spring Boot, Micronaut, or Quarkus
If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:
- verbose dependency tree (
mvn dependency:tree -Dverbose)
- exception message, full stack trace, and any available logs
This does not appear to be a dependency version mismatch. The behavior follows directly from the connector's EOF branch and is reproducible with the repository's own test infrastructure.
Additional context
Impact:
- The bug can change an at-least-once Structured Streaming source into an effectively at-most-once source for part of a bounded micro-batch interval.
- Once Spark checkpoints the planned end offset, normal incremental processing cannot revisit the skipped portion.
- Watermark and lag monitoring can remain healthy because the checkpoint continues advancing.
- The resulting omission can be sparse rather than a complete partition outage, making count-based monitoring insufficient.
- A later replay can return the missing records, which makes the original run appear nondeterministic.
The connector already has robust retry handling for exceptions and page-retrieval timeouts. The uncovered case bypasses that handling because feedResponseIterator.hasNext == false is interpreted as successful completion.
Suggested validation cases for a fix:
| Start LSN |
End LSN |
Final continuation |
Expected result |
| 10 |
20 |
15 |
Retry or fail; must not complete |
| 10 |
20 |
20 |
Complete |
| 20 |
20 |
no page |
Complete |
| 10 |
20 |
no page |
Retry or fail |
| 10 |
20 |
multi-range [20, 18] |
Retry or fail |
| unbounded |
none |
any EOF |
Preserve existing behavior |
I searched the repository's open and closed issues for bounded change feed EOF, continuation below endLsn, and Spark change feed missing records, and did not find an existing issue describing this completion invariant.
Related but distinct work:
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
Describe the bug
For a bounded Cosmos DB change feed read, the Spark connector can report successful end-of-stream even when the latest continuation token has not reached the partition's planned
endLsn.TransientIOErrorsRetryingIteratorreceives the plannedendLsn, updateslastContinuationTokenafter everyFeedResponse, and appliesvalidateNextLsnto prevent returning records beyond the upper bound. However, when the page iterator reaches EOF,hasNextInternalCorereturnsSome(false)without comparing the final continuation LSN withendLsn.The relevant behavior on the current
mainbranch is:Source:
https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala
This creates a correctness hole for Spark Structured Streaming:
This is particularly dangerous because it is silent. There is no exception, failed Spark task, retry, or indication in the checkpoint that the executor stopped below the driver's planned boundary.
We observed the corresponding failure shape in a long-running Structured Streaming workload:
That production observation does not prove why the original page sequence terminated early. The trigger could be in the Cosmos service response sequence, SDK paging, Reactor, networking, cancellation, or their interaction. The deterministic bug reported here is that the Spark connector accepts early EOF as successful completion and therefore cannot contain such a transient failure.
The same unchecked EOF behavior is present in:
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12:4.41.0, where the behavior was initially observed;4.48.0, the latest stable release at the time of filing;main/4.49.0-beta.1.Exception or Stack Trace
No exception or stack trace is produced. That is the core of the issue: EOF below
endLsnis treated as normal completion.With the expected assertion in the reproduction below, ScalaTest reports that the expected exception was not thrown because
iterator.hasNextreturnsfalse.To Reproduce
Clone
Azure/azure-sdk-for-javaat currentmain.Open:
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scalaAdd the test and helper shown below to
TransientIOErrorsRetryingIteratorSpec.Add these imports:
Run:
mvn -pl sdk/cosmos/azure-cosmos-spark_3-5_2-12 -am \ -Pspark-e2e_3-5 \ -DwildcardSuites=com.azure.cosmos.spark.TransientIOErrorsRetryingIteratorSpec \ -DskipITs=true \ -Dcosmos.test.e2e.skip=true \ -DskipSpringITs=true \ -Dgpg.skip=true \ -Dmaven.javadoc.skip=true \ -Drevapi.skip=true \ -Dspotbugs.skip=true \ -Djacoco.skip=true \ testObserve that the test fails because
iterator.hasNextreturnsfalse; it does not throw or retry even though the final continuation is LSN15and the planned end is LSN20.Code Snippet
This test uses the existing
generateFeedResponsehelper inTransientIOErrorsRetryingIteratorSpec. It creates one empty response carrying a valid change feed continuation at LSN15, followed by EOF, while the bounded reader expects to reach LSN20.The reproduction does not require a live Cosmos account because it targets the connector's completion contract directly. A real Cosmos account would be needed to reproduce the lower-level condition that causes a page publisher to terminate early, but that condition should not be able to make the bounded Spark reader commit an unconsumed interval.
Expected behavior
For every bounded change feed input partition, successful EOF should require proof that the latest continuation reached the planned
endLsn.Conceptually:
For continuation state containing multiple ranges, completion should require every represented range to reach the boundary. Equivalently, the minimum continuation LSN should be at least
endLsn.If no page was returned, the reader also needs the partition's starting LSN:
startLsn == endLsnwith no page is a valid empty bounded interval;startLsn < endLsnwith no page must not be accepted as complete.A retryable exception such as
OperationCancelledExceptionwould fit the connector's existing transient retry path. The retry can reopen the paged flux fromlastContinuationToken. If retries are exhausted, the Spark task should fail so Spark does not commit the planned offset.The existing
validateNextLsncheck should remain. It handles the opposite boundary: preventing records with_lsn > endLsnfrom being emitted. It does not prove that EOF occurred at or beyondendLsn.Screenshots
Not applicable. This is a connector control-flow and checkpoint-correctness issue. The unit reproduction and source link are more precise than a screenshot.
Setup (please complete the following information):
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12:4.41.0in the affected workload4.48.0main/4.49.0-beta.1If you suspect a dependency version mismatch (e.g. you see
NoClassDefFoundError,NoSuchMethodErroror similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:mvn dependency:tree -Dverbose)This does not appear to be a dependency version mismatch. The behavior follows directly from the connector's EOF branch and is reproducible with the repository's own test infrastructure.
Additional context
Impact:
The connector already has robust retry handling for exceptions and page-retrieval timeouts. The uncovered case bypasses that handling because
feedResponseIterator.hasNext == falseis interpreted as successful completion.Suggested validation cases for a fix:
[20, 18]I searched the repository's open and closed issues for bounded change feed EOF, continuation below
endLsn, and Spark change feed missing records, and did not find an existing issue describing this completion invariant.Related but distinct work:
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report