Skip to content

Conversation

jiateoh
Copy link
Contributor

@jiateoh jiateoh commented Oct 13, 2025

What changes were proposed in this pull request?

This is a branch-4.0 PR for #52539. Description is copied and updated below (4.0 has a slightly different test setup and only provides pandas tests).

Fix the TransformWithState StateServer's parseProtoMessage method to fully read the desired message using the correct readFully DataInputStream API rather than read (InputStream/FilterInputStream) which only reads all available data and may not return the full message. readFully (DataInputStream) will continue fetching until it fills up the provided buffer.

In addition to the linked API above, this StackOverflow post also illustrates the difference between the two APIs: https://stackoverflow.com/a/25900095

Why are the changes needed?

For large state values used in the TransformWithState API, inputStream.read is not guaranteed to read messageLen's bytes of data as per the InputStream API. For large values, read will return prematurely and the messageBytes will only be partially filled, yielding an incorrect and likely unparseable proto message.

This is not a common scenario, as testing also indicated that the actual proto messages had to be somewhat large to consistently trigger this error. The test case I added uses 512KB strings in the state value updates.

Does this PR introduce any user-facing change?

No

How was this patch tested?

(Note: compared to original PR, this 4.0 branch organizes tests differently and only supports the pandas tests)

Added a new test case using 512KB strings:

  • Value state update
  • List state update with 3 (different) values (note: list state provides a multi-value update API, so this message is even larger than the other two)
  • Map state update with single key/value
build/sbt -Phive -Phive-thriftserver -DskipTests package
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPandasTests'

The configured data size (512KB) triggers an incomplete read, while also completing in a reasonable time (within 30s on my laptop). I had separately tested a larger input size of 4MB which took 30min which I considered too expensive to include in the test.
Below is sample/testing results from using read only (i.e., no fix) and adding a check on message length vs read bytes (test code is included in this commit but reverted later for the PR). The check is no longer required after the readFully fix as that is handled within the provided API.

    TransformWithStateInPandasTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating map state value: TESTING: Failed to read message bytes: expected 524369 bytes, but only read 261312 bytes

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-sonnet-4-5-20250929)

…es in TransformWithStateInPySparkStateServer

Fix the TransformWithState StateServer's `parseProtoMessage` method to fully read the desired message using the correct [readFully DataInputStream API](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/DataInput.html#readFully(byte%5B%5D)) rather than `read` (InputStream/FilterInputStream) which only reads all available data and may not return the full message. [`readFully` (DataInputStream)](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/DataInput.html#readFully(byte%5B%5D)) will continue fetching until it fills up the provided buffer.

In addition to the linked API above, this StackOverflow post also illustrates the difference between the two APIs: https://stackoverflow.com/a/25900095

For large state values used in the TransformWithState API, `inputStream.read` is not guaranteed to read `messageLen`'s bytes of data as per the InputStream API. For large values, `read` will return prematurely and the messageBytes will only be partially filled, yielding an incorrect and likely unparseable proto message.

This is not a common scenario, as testing also indicated that the actual proto messages had to be somewhat large to consistently trigger this error. The test case I added uses 512KB strings in the state value updates.

No

Added a new test case using 512KB strings:

- Value state update
- List state update with 3 (different) values (note: list state provides a multi-value update API, so this message is even larger than the other two)
- Map state update with single key/value

```
build/sbt -Phive -Phive-thriftserver -DskipTests package
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPandasTests'
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPySparkTests'
```

The configured data size (512KB) triggers an incomplete read, while also completing in a reasonable time (within 30s on my laptop). I had separately tested a larger input size of 4MB which took 30min which I considered too expensive to include in the test.
Below is sample/testing results from using `read` only (i.e., no fix) and adding a check on message length vs read bytes ([test code is included in this commit](apache@b68cfd7) but reverted later for the PR). The check is no longer required after the `readFully` fix as that is handled within the provided API.
```
    TransformWithStateInPandasTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating map state value: TESTING: Failed to read message bytes: expected 524369 bytes, but only read 261312 bytes

    TransformWithStateInPySparkTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating value state: TESTING: Failed to read message bytes: expected 524336 bytes, but only read 392012 bytes

```

Generated-by: Claude Code (claude-sonnet-4-5-20250929)

Closes apache#52539 from jiateoh/tws_readFully_fix.

Lead-authored-by: Jason Teoh <[email protected]>
Co-authored-by: Jason Teoh <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
@jiateoh jiateoh force-pushed the tws_readFully_fix-4.0 branch from 9b1805f to 6aafca2 Compare October 13, 2025 20:49
@jiateoh jiateoh changed the title [WIP] Tws read fully fix 4.0 [SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer (branch-4.0 PR) Oct 13, 2025
@jiateoh jiateoh marked this pull request as ready for review October 13, 2025 21:40
@jiateoh
Copy link
Contributor Author

jiateoh commented Oct 13, 2025

@HeartSaVioR Can you help review this PR when you get a chance? This is the 4.0 follow up for #52539 to resolve merge conflicts as requested in this comment.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-53870][PYTHON][SS] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer (branch-4.0 PR) [SPARK-53870][PYTHON][SS][4.0] Fix partial read bug for large proto messages in TransformWithStateInPySparkStateServer Oct 13, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs). Thank you, @jiateoh .

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants