Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pg_lake_engine/src/pgduck/rewrite_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,31 @@ RewriteQueryTreeForPGDuckMutator(Node *node, RewriteQueryTreeContext * context)
}
}

/*
* DuckDB does not have a jsonb[] type (only json[]). Remap any remaining
* JSONBARRAYOID references to JSONARRAYOID so that both the FDW deparse
* path (deparse.c) and the ruleutils deparse path (pg_get_querydef) emit
* "json[]" instead of "jsonb[]".
*
* We only touch value/output nodes here (Const, RelabelType, Param,
* ArrayExpr), NOT operator or function nodes, to avoid breaking operator
* resolution which depends on matching type OIDs.
*
* This must run after RewriteConst/RewriteParam so that the RelabelType
* nodes they create are also caught.
*/
if (IsA(node, Const) && ((Const *) node)->consttype == JSONBARRAYOID)
((Const *) node)->consttype = JSONARRAYOID;
else if (IsA(node, RelabelType) && ((RelabelType *) node)->resulttype == JSONBARRAYOID)
((RelabelType *) node)->resulttype = JSONARRAYOID;
else if (IsA(node, Param) && ((Param *) node)->paramtype == JSONBARRAYOID)
((Param *) node)->paramtype = JSONARRAYOID;
Copy link
Copy Markdown
Collaborator

@sfc-gh-mslot sfc-gh-mslot Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks messy. How do we map JSONB to JSON? Seems like we'd want to do the same thing here?

Iirc we create an alias in DuckDB, so do we actually need to do anything special for arrays w.r.t. deparsing/rewriting?

else if (IsA(node, ArrayExpr) && ((ArrayExpr *) node)->array_typeid == JSONBARRAYOID)
{
((ArrayExpr *) node)->array_typeid = JSONARRAYOID;
((ArrayExpr *) node)->element_typeid = JSONOID;
}

return node;
}

Expand Down
9 changes: 1 addition & 8 deletions pg_lake_engine/src/pgduck/type.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,7 @@ GetDuckDBTypeForPGType(PGType postgresType)
char typtype = get_typtype(pgTypeId);

if (OidIsValid(elementType))
{
/*
* DuckDB doesn't have JSONB type, so we map it to JSON type. However,
* in case of JSONB[], we have not done the mapping yet, so we prevent
* pushdown of JSONB[].
*/
return elementType == JSONBOID ? DUCKDB_TYPE_INVALID : DUCKDB_TYPE_LIST;
}
return DUCKDB_TYPE_LIST;

if (IsMapTypeOid(pgTypeId))
return DUCKDB_TYPE_MAP;
Expand Down
13 changes: 13 additions & 0 deletions pg_lake_table/src/fdw/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,19 @@ is_foreign_pathkey(PlannerInfo *root,
static char *
deparse_type_name(Oid type_oid, int32 typemod)
{
/*
* DuckDB does not have a jsonb[] type. Map it to json[] which DuckDB
* understands. We do this at deparse time rather than rewriting OIDs
* in the expression tree, because changing OIDs would break operator
* and function lookup.
Copy link
Copy Markdown
Collaborator

@sfc-gh-mslot sfc-gh-mslot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the real deparse though, we still parse the output of this (foreignscan case). Not sure I understand the effect of this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just giving Claude a chance to bang its head against this for a bit, so it could definitely be a bit offbase here. (I guess I'll make this a draft so you don't waste your time on it until things are happy.)

*
* Scalar jsonb is handled differently: rewrite_query.c wraps jsonb
* expressions in __lake__internal__nsp__.jsonb() function calls, so
* we do not need to remap JSONBOID here.
*/
if (type_oid == JSONBARRAYOID)
type_oid = JSONARRAYOID;

bits16 flags = FORMAT_TYPE_TYPEMOD_GIVEN;

if (!is_builtin(type_oid))
Expand Down
9 changes: 6 additions & 3 deletions pg_lake_table/src/planner/insert_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,12 @@ TransformPushdownableInsertSelect(Query *query)
* we need to do it here as well because the transformation to a
* SELECT query after before the rewrite phase.
*/
Oid nullAttributeOid =
column->atttypid == JSONBOID ?
JSONOID : column->atttypid;
Oid nullAttributeOid = column->atttypid;

if (nullAttributeOid == JSONBOID)
nullAttributeOid = JSONOID;
else if (nullAttributeOid == JSONBARRAYOID)
nullAttributeOid = JSONARRAYOID;

/* create a NULL value of the column type */
Const *nullConst = makeNullConst(nullAttributeOid,
Expand Down
102 changes: 101 additions & 1 deletion pg_lake_table/tests/pytests/test_jsonb_pushdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def test_jsonb_pushdown(s3, pg_conn, extension, with_default_location):
assert err is not None
pg_conn.rollback()

# we currently do not support pusing down jsonb array
# col::jsonb[] is a cast from text[] to jsonb[] which is not pushdownable
res = run_query(
'explain (verbose) SELECT * FROM text_array_table WHERE col::jsonb[] =ARRAY[\'{"name": "Alice", "age": 30}\'::jsonb]::jsonb[];',
pg_conn,
Expand All @@ -305,3 +305,103 @@ def test_jsonb_pushdown(s3, pg_conn, extension, with_default_location):

run_command("DROP SCHEMA test_jsonb_pushdown CASCADE", pg_conn)
pg_conn.commit()


def test_jsonb_array_pushdown(s3, pg_conn, extension, with_default_location):
"""Test that jsonb[] columns can be pushed down and produce correct results."""
location = "s3://" + TEST_BUCKET + "/test_jsonb_array_pushdown"

run_command(
"""
CREATE SCHEMA test_jsonb_array_pushdown;
SET search_path TO test_jsonb_array_pushdown;

CREATE TABLE jsonb_array_table(id int, col jsonb[]) USING iceberg;
""",
pg_conn,
)
pg_conn.commit()

# insert jsonb[] data with various value types
run_command(
"""
INSERT INTO jsonb_array_table VALUES
(1, ARRAY['{"name": "Alice", "age": 30}'::jsonb, '{"name": "Bob", "age": 25}'::jsonb]),
(2, ARRAY['[1,2,3]'::jsonb, '"hello"'::jsonb, '42'::jsonb]),
(3, ARRAY['null'::jsonb, 'true'::jsonb, 'false'::jsonb]),
(4, NULL),
(5, ARRAY[]::jsonb[]);
""",
pg_conn,
)
pg_conn.commit()

# also store the same data in a heap table for comparison
run_command(
"""
CREATE TABLE heap_jsonb_array_table AS
SELECT * FROM jsonb_array_table;
""",
pg_conn,
)
pg_conn.commit()

# basic SELECT pushdown: verify data roundtrips correctly
res_iceberg = run_query(
"SELECT id, col FROM jsonb_array_table ORDER BY id", pg_conn
)
res_heap = run_query(
"SELECT id, col FROM heap_jsonb_array_table ORDER BY id", pg_conn
)
assert len(res_iceberg) == len(res_heap)
for r_ice, r_heap in zip(res_iceberg, res_heap):
assert str(r_ice) == str(r_heap), f"Mismatch: {r_ice} vs {r_heap}"

# verify pushdown happens for SELECT
res = run_query("EXPLAIN (VERBOSE) SELECT * FROM jsonb_array_table", pg_conn)
assert "Custom Scan (Query Pushdown)" in str(res)

# verify WHERE with jsonb[] equality
res = run_query(
"""SELECT id FROM jsonb_array_table
WHERE col = ARRAY['{"name": "Alice", "age": 30}'::jsonb, '{"name": "Bob", "age": 25}'::jsonb]
ORDER BY id""",
pg_conn,
)
assert len(res) == 1
assert res[0]["id"] == 1

# verify NULL handling
res = run_query(
"SELECT id FROM jsonb_array_table WHERE col IS NULL ORDER BY id", pg_conn
)
assert len(res) == 1
assert res[0]["id"] == 4

# verify empty array
res = run_query(
"SELECT id FROM jsonb_array_table WHERE col = ARRAY[]::jsonb[] ORDER BY id",
pg_conn,
)
assert len(res) == 1
assert res[0]["id"] == 5

# INSERT .. SELECT with jsonb[] pushdown
run_command(
"""
CREATE TABLE jsonb_array_table_copy(id int, col jsonb[]) USING iceberg;
INSERT INTO jsonb_array_table_copy SELECT * FROM jsonb_array_table;
""",
pg_conn,
)
pg_conn.commit()

res_copy = run_query(
"SELECT id, col FROM jsonb_array_table_copy ORDER BY id", pg_conn
)
assert len(res_copy) == len(res_iceberg)
for r_copy, r_orig in zip(res_copy, res_iceberg):
assert str(r_copy) == str(r_orig), f"Copy mismatch: {r_copy} vs {r_orig}"

run_command("DROP SCHEMA test_jsonb_array_pushdown CASCADE", pg_conn)
pg_conn.commit()
Loading