Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51587][PYTHON][SS] Fix an issue where timestamp cannot be used in ListState when multiple state data is involved #50349

Closed
wants to merge 2 commits into from

Conversation

bogao007
Copy link
Contributor

@bogao007 bogao007 commented Mar 21, 2025

What changes were proposed in this pull request?

Fix an issue where timestamp cannot be used in ListState when multiple state data is involved.

When transmitting multiple state data, we use Arrow to construct an Arrow record batch from Pandas dataframe, but this needs proper type conversion to make it compatible with Spark.

Timestamp is missed in this conversion util. Since the timestamp precision in Pandas is nanosecond while the precision in Spark is microsecond, we need proper conversion to make them compatible.

Why are the changes needed?

Without this change, using a timestamp type with ListState put() or appendList() will result in below error

[UNSUPPORTED_ARROWTYPE] Unsupported arrow type Timestamp(NANOSECOND, null). SQLSTATE: 0A000

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new test case.

Was this patch authored or co-authored using generative AI tooling?

No.

@@ -1424,6 +1424,12 @@ def _to_numpy_type(type: DataType) -> Optional["np.dtype"]:
return np.dtype("float32")
elif type == DoubleType():
return np.dtype("float64")
elif type == TimestampType():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is inside the base pandas type files, does this mean all pandas related operators(batch/streaming) are all affected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by the PR title, if this change is inside the base pyspark type, why it only impacts timestamp type when "multiple state data" is involved? Why it works fine if there is only single state data?

Copy link
Contributor Author

@bogao007 bogao007 Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This utility function is only used by TWS, but we followed the previous suggestion to move this util to the base types file since it's doing basic type conversion. This issue only happens when we use Arrow to transmit state data to the JVM side so only multiple state data is involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more details in PR's description

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I understand now since list_state_client is the only variable using _send_arrow_state and _send_arrow_state is the only function using _to_numpy_type

@@ -1424,6 +1424,12 @@ def _to_numpy_type(type: DataType) -> Optional["np.dtype"]:
return np.dtype("float32")
elif type == DoubleType():
return np.dtype("float64")
elif type == TimestampType():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move spark_type_to_pandas_dtype in pandas/typedef/typehints here, and reuse them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon It seems spark_type_to_pandas_dtype uses datetime64[ns] instead of datetime64[us]. This would still return the same error since Spark only supports microsecond when converting from Arrow. We actually have _to_corrected_pandas_type in the same file to reuse, but it also uses nanosecond and would fail in this case. Any suggestions on reusing this but also fixing the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

background: we are using Pandas to convert a spark type data to Arrow record batch

def _send_arrow_state(self, schema: StructType, state: List[Tuple]) -> None:
import pyarrow as pa
import pandas as pd
column_names = [field.name for field in schema.fields]
pandas_df = convert_pandas_using_numpy_type(
pd.DataFrame(state, columns=column_names), schema
)
batch = pa.RecordBatch.from_pandas(pandas_df)
self.serializer.dump_stream(iter([batch]), self.sockfile)
self.sockfile.flush()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

@HyukjinKwon
Copy link
Member

@jingz-db does it look good to you?

@HyukjinKwon
Copy link
Member

Merged to master.

@HeartSaVioR
Copy link
Contributor

Since this is a bugfix where we are introducing this first in Spark 4.0.0, do we think there is still a door to fix this in Spark 4.0.0, or at least, Spark 4.0.1?

cc. @cloud-fan as release manager of Spark 4.0.0

@HyukjinKwon
Copy link
Member

Oh yeah. let's backport I don't mind it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants