|
1 | 1 | import pytest |
2 | 2 | from chispa.dataframe_comparer import assert_df_equality # type: ignore |
3 | | -from pyspark.errors import AnalysisException |
4 | 3 | from pyspark.sql import SparkSession |
5 | 4 | from pyspark.sql.types import IntegerType, StringType |
6 | 5 |
|
|
11 | 10 | create_partially_filled_dataset, |
12 | 11 | register_schema_to_dataset, |
13 | 12 | ) |
| 13 | +from typedspark._core.spark_imports import SPARK_CONNECT, AnalysisException |
14 | 14 | from typedspark._utils.register_schema_to_dataset import register_schema_to_dataset_with_alias |
15 | 15 |
|
16 | 16 |
|
@@ -40,7 +40,7 @@ def test_register_schema_to_dataset(spark: SparkSession): |
40 | 40 | df_b = create_partially_filled_dataset(spark, Job, {Job.a: [1, 2, 3]}) |
41 | 41 |
|
42 | 42 | with pytest.raises(AnalysisException): |
43 | | - df_a.join(df_b, Person.a == Job.a) |
| 43 | + df_a.join(df_b, Person.a == Job.a).show() |
44 | 44 |
|
45 | 45 | person = register_schema_to_dataset(df_a, Person) |
46 | 46 | job = register_schema_to_dataset(df_b, Job) |
@@ -69,13 +69,21 @@ def test_register_schema_to_dataset_with_alias(spark: SparkSession): |
69 | 69 | }, |
70 | 70 | ) |
71 | 71 |
|
72 | | - with pytest.raises(AnalysisException): |
| 72 | + def self_join_without_register_schema_to_dataset_with_alias(): |
73 | 73 | df_a = df.alias("a") |
74 | 74 | df_b = df.alias("b") |
75 | 75 | schema_a = register_schema_to_dataset(df_a, Person) |
76 | 76 | schema_b = register_schema_to_dataset(df_b, Person) |
77 | | - df_a.join(df_b, schema_a.a == schema_b.b) |
| 77 | + df_a.join(df_b, schema_a.a == schema_b.b).show() |
| 78 | + |
| 79 | + # there seems to be a discrepancy between spark and spark connect here |
| 80 | + if SPARK_CONNECT: |
| 81 | + self_join_without_register_schema_to_dataset_with_alias() |
| 82 | + else: |
| 83 | + with pytest.raises(AnalysisException): |
| 84 | + self_join_without_register_schema_to_dataset_with_alias() |
78 | 85 |
|
| 86 | + # the following is the way it works with regular spark |
79 | 87 | df_a, schema_a = register_schema_to_dataset_with_alias(df, Person, "a") |
80 | 88 | df_b, schema_b = register_schema_to_dataset_with_alias(df, Person, "b") |
81 | 89 | joined = df_a.join(df_b, schema_a.a == schema_b.b) |
|
0 commit comments