Skip to content

Commit 91c6093

Browse files
add spark test
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent 0732298 commit 91c6093

2 files changed

Lines changed: 344 additions & 0 deletions

File tree

pg_lake_table/tests/pytests/test_iceberg_ddl.py

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
import psycopg2
33
from utils_pytest import *
4+
from spark_utils import *
45

56
import os
67
import glob
@@ -2174,6 +2175,316 @@ def test_alter_column_type_partitioned_table(pg_conn, s3, with_default_location)
21742175
pg_conn.commit()
21752176

21762177

2178+
def test_alter_column_type_spark_comparison(
2179+
installcheck, spark_session, pg_conn, s3, with_default_location
2180+
):
2181+
"""Create the same table in Spark and pg_lake, perform identical type promotions,
2182+
verify both produce the same results, compare metadata schemas, and confirm
2183+
Spark can read pg_lake's metadata.json after type promotion."""
2184+
if installcheck:
2185+
return
2186+
2187+
pg_schema = "test_type_promo_pg"
2188+
spark_ns = "public"
2189+
2190+
run_command(f"CREATE SCHEMA {pg_schema};", pg_conn)
2191+
2192+
# ── 1. int → bigint ──
2193+
2194+
# Spark side
2195+
spark_session.sql(
2196+
f"CREATE TABLE {spark_ns}.spark_int_promo (a int, b int) USING iceberg"
2197+
)
2198+
spark_session.sql(f"INSERT INTO {spark_ns}.spark_int_promo VALUES (1, 10), (2, 20)")
2199+
spark_session.sql(
2200+
f"ALTER TABLE {spark_ns}.spark_int_promo ALTER COLUMN a TYPE bigint"
2201+
)
2202+
spark_session.sql(f"INSERT INTO {spark_ns}.spark_int_promo VALUES ({2**40}, 30)")
2203+
2204+
# pg_lake side — same operations
2205+
run_command(
2206+
f"CREATE TABLE {pg_schema}.int_tbl (a int, b int) USING iceberg;", pg_conn
2207+
)
2208+
run_command(f"INSERT INTO {pg_schema}.int_tbl VALUES (1, 10), (2, 20);", pg_conn)
2209+
pg_conn.commit()
2210+
run_command(f"ALTER TABLE {pg_schema}.int_tbl ALTER COLUMN a TYPE bigint;", pg_conn)
2211+
pg_conn.commit()
2212+
run_command(f"INSERT INTO {pg_schema}.int_tbl VALUES ({2**40}, 30);", pg_conn)
2213+
pg_conn.commit()
2214+
2215+
# Compare: query both natively
2216+
spark_query = f"SELECT a, b FROM {spark_ns}.spark_int_promo ORDER BY b ASC"
2217+
pg_query = f"SELECT a, b FROM {pg_schema}.int_tbl ORDER BY b ASC"
2218+
2219+
pg_lake_result = assert_query_result_on_spark_and_pg(
2220+
installcheck, spark_session, pg_conn, spark_query, pg_query
2221+
)
2222+
2223+
assert len(pg_lake_result) == 3
2224+
assert pg_lake_result == [[1, 10], [2, 20], [2**40, 30]]
2225+
2226+
# Compare full schemas JSON
2227+
spark_metadata_loc = (
2228+
spark_session.sql(
2229+
f"SELECT file FROM {spark_ns}.spark_int_promo.metadata_log_entries ORDER BY timestamp DESC"
2230+
)
2231+
.collect()[0]
2232+
.file
2233+
)
2234+
spark_json = normalize_json(read_s3_operations(s3, spark_metadata_loc))
2235+
2236+
pg_metadata_loc = run_query(
2237+
f"SELECT metadata_location FROM iceberg_tables "
2238+
f"WHERE table_name = 'int_tbl' AND table_namespace = '{pg_schema}'",
2239+
pg_conn,
2240+
)[0][0]
2241+
pg_json = normalize_json(read_s3_operations(s3, pg_metadata_loc))
2242+
2243+
assert_iceberg_schemas_equal(spark_json, pg_json, "int→bigint")
2244+
2245+
# Verify Spark can read pg_lake's metadata.json
2246+
# Disable vectorized reading: Spark 3.5 / Iceberg 1.4.3 vectorized reader
2247+
# crashes with ClassCastException when an old data file's Parquet physical
2248+
# type (int32) doesn't match the current schema type (int64) after promotion.
2249+
spark_session.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
2250+
spark_register_table(
2251+
installcheck, spark_session, "int_tbl", pg_schema, pg_metadata_loc
2252+
)
2253+
spark_cross_query = f"SELECT a, b FROM {pg_schema}.int_tbl ORDER BY b ASC"
2254+
spark_cross = spark_session.sql(spark_cross_query).collect()
2255+
assert len(spark_cross) == 3
2256+
assert [spark_cross[0].a, spark_cross[0].b] == [1, 10]
2257+
assert [spark_cross[1].a, spark_cross[1].b] == [2, 20]
2258+
assert [spark_cross[2].a, spark_cross[2].b] == [2**40, 30]
2259+
spark_unregister_table(installcheck, spark_session, "int_tbl", pg_schema)
2260+
spark_session.conf.set("spark.sql.iceberg.vectorization.enabled", "true")
2261+
2262+
spark_session.sql(f"DROP TABLE {spark_ns}.spark_int_promo")
2263+
2264+
# ── 2. float → double ──
2265+
2266+
# Spark side
2267+
spark_session.sql(
2268+
f"CREATE TABLE {spark_ns}.spark_float_promo (a float, b int) USING iceberg"
2269+
)
2270+
spark_session.sql(f"INSERT INTO {spark_ns}.spark_float_promo VALUES (1.5, 1)")
2271+
spark_session.sql(
2272+
f"ALTER TABLE {spark_ns}.spark_float_promo ALTER COLUMN a TYPE double"
2273+
)
2274+
spark_session.sql(
2275+
f"INSERT INTO {spark_ns}.spark_float_promo VALUES (1.23456789012345, 2)"
2276+
)
2277+
2278+
# pg_lake side
2279+
run_command(
2280+
f"CREATE TABLE {pg_schema}.float_tbl (a real, b int) USING iceberg;", pg_conn
2281+
)
2282+
run_command(f"INSERT INTO {pg_schema}.float_tbl VALUES (1.5, 1);", pg_conn)
2283+
pg_conn.commit()
2284+
run_command(
2285+
f"ALTER TABLE {pg_schema}.float_tbl ALTER COLUMN a TYPE double precision;",
2286+
pg_conn,
2287+
)
2288+
pg_conn.commit()
2289+
run_command(
2290+
f"INSERT INTO {pg_schema}.float_tbl VALUES (1.23456789012345, 2);", pg_conn
2291+
)
2292+
pg_conn.commit()
2293+
2294+
spark_query = f"SELECT a, b FROM {spark_ns}.spark_float_promo ORDER BY b ASC"
2295+
pg_query = f"SELECT a, b FROM {pg_schema}.float_tbl ORDER BY b ASC"
2296+
2297+
pg_lake_result = assert_query_result_on_spark_and_pg(
2298+
installcheck, spark_session, pg_conn, spark_query, pg_query
2299+
)
2300+
2301+
assert len(pg_lake_result) == 2
2302+
2303+
# Compare full schemas JSON
2304+
spark_metadata_loc = (
2305+
spark_session.sql(
2306+
f"SELECT file FROM {spark_ns}.spark_float_promo.metadata_log_entries ORDER BY timestamp DESC"
2307+
)
2308+
.collect()[0]
2309+
.file
2310+
)
2311+
spark_json = normalize_json(read_s3_operations(s3, spark_metadata_loc))
2312+
2313+
pg_metadata_loc = run_query(
2314+
f"SELECT metadata_location FROM iceberg_tables "
2315+
f"WHERE table_name = 'float_tbl' AND table_namespace = '{pg_schema}'",
2316+
pg_conn,
2317+
)[0][0]
2318+
pg_json = normalize_json(read_s3_operations(s3, pg_metadata_loc))
2319+
2320+
assert_iceberg_schemas_equal(spark_json, pg_json, "float→double")
2321+
2322+
# Verify Spark can read pg_lake's metadata.json (vectorized off, same reason)
2323+
spark_session.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
2324+
spark_register_table(
2325+
installcheck, spark_session, "float_tbl", pg_schema, pg_metadata_loc
2326+
)
2327+
spark_cross_query = f"SELECT a, b FROM {pg_schema}.float_tbl ORDER BY b ASC"
2328+
spark_cross = spark_session.sql(spark_cross_query).collect()
2329+
assert len(spark_cross) == 2
2330+
assert spark_cross[0].a == pytest.approx(1.5, abs=1e-6)
2331+
assert spark_cross[1].a == pytest.approx(1.23456789012345, abs=1e-10)
2332+
spark_unregister_table(installcheck, spark_session, "float_tbl", pg_schema)
2333+
spark_session.conf.set("spark.sql.iceberg.vectorization.enabled", "true")
2334+
2335+
spark_session.sql(f"DROP TABLE {spark_ns}.spark_float_promo")
2336+
2337+
# ── 3. decimal(P,S) → decimal(P',S) where P' > P ──
2338+
2339+
# Spark side
2340+
spark_session.sql(
2341+
f"CREATE TABLE {spark_ns}.spark_dec_promo (a decimal(10,2), b int) USING iceberg"
2342+
)
2343+
spark_session.sql(f"INSERT INTO {spark_ns}.spark_dec_promo VALUES (12345.67, 1)")
2344+
spark_session.sql(
2345+
f"ALTER TABLE {spark_ns}.spark_dec_promo ALTER COLUMN a TYPE decimal(20,2)"
2346+
)
2347+
spark_session.sql(
2348+
f"INSERT INTO {spark_ns}.spark_dec_promo VALUES (123456789012345.67, 2)"
2349+
)
2350+
2351+
# pg_lake side
2352+
run_command(
2353+
f"CREATE TABLE {pg_schema}.dec_tbl (a numeric(10,2), b int) USING iceberg;",
2354+
pg_conn,
2355+
)
2356+
run_command(f"INSERT INTO {pg_schema}.dec_tbl VALUES (12345.67, 1);", pg_conn)
2357+
pg_conn.commit()
2358+
run_command(
2359+
f"ALTER TABLE {pg_schema}.dec_tbl ALTER COLUMN a TYPE numeric(20,2);", pg_conn
2360+
)
2361+
pg_conn.commit()
2362+
run_command(
2363+
f"INSERT INTO {pg_schema}.dec_tbl VALUES (123456789012345.67, 2);", pg_conn
2364+
)
2365+
pg_conn.commit()
2366+
2367+
spark_query = f"SELECT a, b FROM {spark_ns}.spark_dec_promo ORDER BY b ASC"
2368+
pg_query = f"SELECT a, b FROM {pg_schema}.dec_tbl ORDER BY b ASC"
2369+
2370+
pg_lake_result = assert_query_result_on_spark_and_pg(
2371+
installcheck, spark_session, pg_conn, spark_query, pg_query
2372+
)
2373+
2374+
assert len(pg_lake_result) == 2
2375+
2376+
# Compare full schemas JSON
2377+
spark_metadata_loc = (
2378+
spark_session.sql(
2379+
f"SELECT file FROM {spark_ns}.spark_dec_promo.metadata_log_entries ORDER BY timestamp DESC"
2380+
)
2381+
.collect()[0]
2382+
.file
2383+
)
2384+
spark_json = normalize_json(read_s3_operations(s3, spark_metadata_loc))
2385+
2386+
pg_metadata_loc = run_query(
2387+
f"SELECT metadata_location FROM iceberg_tables "
2388+
f"WHERE table_name = 'dec_tbl' AND table_namespace = '{pg_schema}'",
2389+
pg_conn,
2390+
)[0][0]
2391+
pg_json = normalize_json(read_s3_operations(s3, pg_metadata_loc))
2392+
2393+
assert_iceberg_schemas_equal(spark_json, pg_json, "decimal widening")
2394+
2395+
# Verify Spark can read pg_lake's metadata.json
2396+
spark_register_table(
2397+
installcheck, spark_session, "dec_tbl", pg_schema, pg_metadata_loc
2398+
)
2399+
spark_cross_query = f"SELECT a, b FROM {pg_schema}.dec_tbl ORDER BY b ASC"
2400+
spark_cross = spark_session.sql(spark_cross_query).collect()
2401+
assert len(spark_cross) == 2
2402+
assert str(spark_cross[0].a) == "12345.67"
2403+
assert str(spark_cross[1].a) == "123456789012345.67"
2404+
spark_unregister_table(installcheck, spark_session, "dec_tbl", pg_schema)
2405+
2406+
spark_session.sql(f"DROP TABLE {spark_ns}.spark_dec_promo")
2407+
2408+
# ── 4. partitioned table: int → bigint on partition column ──
2409+
2410+
# Spark side
2411+
spark_session.sql(
2412+
f"CREATE TABLE {spark_ns}.spark_part_promo (a int, b int) "
2413+
f"USING iceberg PARTITIONED BY (b)"
2414+
)
2415+
spark_session.sql(
2416+
f"INSERT INTO {spark_ns}.spark_part_promo VALUES (1, 10), (2, 20)"
2417+
)
2418+
spark_session.sql(
2419+
f"ALTER TABLE {spark_ns}.spark_part_promo ALTER COLUMN b TYPE bigint"
2420+
)
2421+
spark_session.sql(f"INSERT INTO {spark_ns}.spark_part_promo VALUES (3, 30)")
2422+
2423+
# pg_lake side
2424+
run_command(
2425+
f"CREATE TABLE {pg_schema}.part_tbl (a int, b int) "
2426+
f"USING iceberg WITH (partition_by = 'b');",
2427+
pg_conn,
2428+
)
2429+
run_command(f"INSERT INTO {pg_schema}.part_tbl VALUES (1, 10), (2, 20);", pg_conn)
2430+
pg_conn.commit()
2431+
run_command(
2432+
f"ALTER TABLE {pg_schema}.part_tbl ALTER COLUMN b TYPE bigint;", pg_conn
2433+
)
2434+
pg_conn.commit()
2435+
run_command(f"INSERT INTO {pg_schema}.part_tbl VALUES (3, 30);", pg_conn)
2436+
pg_conn.commit()
2437+
2438+
spark_query = f"SELECT a, b FROM {spark_ns}.spark_part_promo ORDER BY a ASC"
2439+
pg_query = f"SELECT a, b FROM {pg_schema}.part_tbl ORDER BY a ASC"
2440+
2441+
pg_lake_result = assert_query_result_on_spark_and_pg(
2442+
installcheck, spark_session, pg_conn, spark_query, pg_query
2443+
)
2444+
2445+
assert len(pg_lake_result) == 3
2446+
assert pg_lake_result == [[1, 10], [2, 20], [3, 30]]
2447+
2448+
# Compare full schemas JSON
2449+
spark_metadata_loc = (
2450+
spark_session.sql(
2451+
f"SELECT file FROM {spark_ns}.spark_part_promo.metadata_log_entries ORDER BY timestamp DESC"
2452+
)
2453+
.collect()[0]
2454+
.file
2455+
)
2456+
spark_json = normalize_json(read_s3_operations(s3, spark_metadata_loc))
2457+
2458+
pg_metadata_loc = run_query(
2459+
f"SELECT metadata_location FROM iceberg_tables "
2460+
f"WHERE table_name = 'part_tbl' AND table_namespace = '{pg_schema}'",
2461+
pg_conn,
2462+
)[0][0]
2463+
pg_json = normalize_json(read_s3_operations(s3, pg_metadata_loc))
2464+
2465+
assert_iceberg_schemas_equal(spark_json, pg_json, "partitioned int→bigint")
2466+
2467+
# Verify Spark can read pg_lake's metadata.json (vectorized off, same reason)
2468+
spark_session.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
2469+
spark_register_table(
2470+
installcheck, spark_session, "part_tbl", pg_schema, pg_metadata_loc
2471+
)
2472+
spark_cross_query = f"SELECT a, b FROM {pg_schema}.part_tbl ORDER BY a ASC"
2473+
spark_cross = spark_session.sql(spark_cross_query).collect()
2474+
assert len(spark_cross) == 3
2475+
assert [spark_cross[0].a, spark_cross[0].b] == [1, 10]
2476+
assert [spark_cross[1].a, spark_cross[1].b] == [2, 20]
2477+
assert [spark_cross[2].a, spark_cross[2].b] == [3, 30]
2478+
spark_unregister_table(installcheck, spark_session, "part_tbl", pg_schema)
2479+
spark_session.conf.set("spark.sql.iceberg.vectorization.enabled", "true")
2480+
2481+
spark_session.sql(f"DROP TABLE {spark_ns}.spark_part_promo")
2482+
2483+
# cleanup
2484+
run_command(f"DROP SCHEMA {pg_schema} CASCADE;", pg_conn)
2485+
pg_conn.commit()
2486+
2487+
21772488
def get_current_schema_id(pg_conn, s3, namespace, name):
21782489

21792490
metadata_location = run_query(

test_common/helpers/iceberg.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,3 +798,36 @@ def adjust_object_store_settings(superuser_conn):
798798

799799
run_command("SELECT pg_reload_conf()", superuser_conn)
800800
superuser_conn.commit()
801+
802+
803+
def assert_iceberg_schemas_equal(left_json, right_json, label=""):
804+
"""Compare the full schemas array from two Iceberg metadata JSONs.
805+
806+
Both engines should produce the same number of schema versions with
807+
matching field definitions. Field IDs and schema-ids are ignored
808+
because they may be assigned differently by each engine.
809+
"""
810+
def _norm_fields(fields):
811+
return [
812+
(f["name"],
813+
re.sub(r"\s+", "", f["type"]) if isinstance(f["type"], str) else f["type"],
814+
f.get("required", False))
815+
for f in fields
816+
]
817+
818+
left_schemas = left_json["schemas"]
819+
right_schemas = right_json["schemas"]
820+
821+
assert len(left_schemas) == len(right_schemas), (
822+
f"[{label}] schema count mismatch: "
823+
f"left has {len(left_schemas)}, right has {len(right_schemas)}"
824+
)
825+
826+
for idx, (ls, rs) in enumerate(zip(left_schemas, right_schemas)):
827+
left_fields = _norm_fields(ls["fields"])
828+
right_fields = _norm_fields(rs["fields"])
829+
assert left_fields == right_fields, (
830+
f"[{label}] schema #{idx} field mismatch:\n"
831+
f" left: {left_fields}\n"
832+
f" right: {right_fields}"
833+
)

0 commit comments

Comments
 (0)