Skip to content
55 changes: 37 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@

_MAX_COPY_ENTRIES_PER_TABLE = 20

_PROVISIONED_SEGMENT_SIZE = 200 # STL_QUERYTEXT, SVL_STATEMENTTEXT
_SERVERLESS_SEGMENT_SIZE = 4000 # SYS_QUERY_TEXT

# TODO: Boundary detection uses LEN(RTRIM(text)) < segment_size to decide whether to
# add a space when stitching segments. This approach can't distinguish between padding
# and tokens that exactly fill a segment. Edge case: a 199-char token followed by a
# segment boundary gets a spurious space. Future: implement keyword-aware fallback
# with CHR(1) markers (SQL marks boundaries, Python detects keywords) to preserve
# identifiers split mid-word while correctly separating keywords at boundaries.


class RedshiftCommonQuery:
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
Expand Down Expand Up @@ -507,14 +517,24 @@ def stl_scan_based_lineage_query(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
WITH query_txt AS (
SELECT
query,
userid,
RTRIM(LISTAGG(RTRIM(text) || CASE WHEN LEN(RTRIM(text)) < {_PROVISIONED_SEGMENT_SIZE} THEN ' ' ELSE '' END, '')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Just wondering how is the implementation in PR is slightly different to the one suggested here

https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_STATEMENTTEXT.html

select LISTAGG(CASE WHEN LEN(RTRIM(text)) = 0 THEN text ELSE RTRIM(text) END, '') within group (order by sequence) AS query_statement 
from SVL_STATEMENTTEXT where pid=pg_backend_pid();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS's approach doesn't fix the keyword merge problem. Their condition LEN(RTRIM(text)) = 0 only preserves completely empty segments. A segment like "GROUP " becomes "GROUP" (length 5, not 0), which merges with the next "BY" segment -> "GROUPBY"

Our approach uses segment size as a boundary detector: if LEN(RTRIM(text)) < segment_size, we add a space back because the segment likely reached a natural boundary (not just had trailing spaces stripped).

WITHIN GROUP (ORDER BY sequence)) AS querytxt
FROM STL_QUERYTEXT
WHERE sequence < {_QUERY_SEQUENCE_LIMIT}
GROUP BY query, userid
)
select
distinct cluster,
target_schema,
target_table,
username as username,
source_schema,
source_table,
querytxt as ddl, -- TODO: this querytxt is truncated to 4000 characters
querytxt as ddl,
starttime as timestamp
from
(
Expand Down Expand Up @@ -554,7 +574,7 @@ def stl_scan_based_lineage_query(
) ss
join SVV_TABLE_INFO sti on
sti.table_id = ss.tbl
left join stl_query sq on
left join query_txt sq on
ss.query = sq.query
left join svl_user_info sui on
sq.userid = sui.usesysid
Expand All @@ -566,6 +586,8 @@ def stl_scan_based_lineage_query(
scan_type in (1, 2, 3)
order by cluster, target_schema, target_table, starttime asc
""".format(
_QUERY_SEQUENCE_LIMIT=_QUERY_SEQUENCE_LIMIT,
_PROVISIONED_SEGMENT_SIZE=_PROVISIONED_SEGMENT_SIZE,
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
Expand Down Expand Up @@ -613,12 +635,8 @@ def list_insert_create_queries_sql(
select
query,
pid,
LISTAGG(case
when LEN(RTRIM(text)) = 0 then text
else RTRIM(text)
end) within group (
order by sequence
) as ddl
RTRIM(LISTAGG(RTRIM(text) || CASE WHEN LEN(RTRIM(text)) < {_PROVISIONED_SEGMENT_SIZE} THEN ' ' ELSE '' END, '')
WITHIN GROUP (ORDER BY sequence)) as ddl
from (
select
query,
Expand Down Expand Up @@ -672,6 +690,7 @@ def list_insert_create_queries_sql(
sq.query
""".format(
_QUERY_SEQUENCE_LIMIT=_QUERY_SEQUENCE_LIMIT,
_PROVISIONED_SEGMENT_SIZE=_PROVISIONED_SEGMENT_SIZE,
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
Expand Down Expand Up @@ -714,13 +733,8 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
xid,
type,
userid,
LISTAGG(case
when LEN(RTRIM(text)) = 0 then text
else RTRIM(text)
end,
'') within group (
order by sequence
) as query_text
RTRIM(LISTAGG(RTRIM(text) || CASE WHEN LEN(RTRIM(text)) < {_PROVISIONED_SEGMENT_SIZE} THEN ' ' ELSE '' END, '')
WITHIN GROUP (ORDER BY sequence)) as query_text
from
SVL_STATEMENTTEXT
where
Expand Down Expand Up @@ -949,7 +963,8 @@ def stl_scan_based_lineage_query(
table_id as source_table_id,
queries.query_id as query_id,
username,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
RTRIM(LISTAGG(RTRIM(qt."text") || CASE WHEN LEN(RTRIM(qt."text")) < {_SERVERLESS_SEGMENT_SIZE} THEN ' ' ELSE '' END, '')
WITHIN GROUP (ORDER BY sequence)) AS query_text
FROM
"queries" LEFT JOIN
unique_query_text qt ON qt.query_id = queries.query_id
Expand Down Expand Up @@ -987,6 +1002,7 @@ def stl_scan_based_lineage_query(
WHERE source_table_id <> target_table_id
ORDER BY cluster, target_schema, target_table, "timestamp" ASC;
""".format(
_SERVERLESS_SEGMENT_SIZE=_SERVERLESS_SEGMENT_SIZE,
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
Expand Down Expand Up @@ -1037,7 +1053,8 @@ def list_insert_create_queries_sql(
target_table,
username,
query_id,
LISTAGG(CASE WHEN LEN(RTRIM(querytxt)) = 0 THEN querytxt ELSE RTRIM(querytxt) END) WITHIN GROUP (ORDER BY sequence) AS ddl,
RTRIM(LISTAGG(RTRIM(querytxt) || CASE WHEN LEN(RTRIM(querytxt)) < {_SERVERLESS_SEGMENT_SIZE} THEN ' ' ELSE '' END, '')
WITHIN GROUP (ORDER BY sequence)) AS ddl,
ANY_VALUE(session_id) AS session_id,
starttime AS timestamp
FROM
Expand Down Expand Up @@ -1074,6 +1091,7 @@ def list_insert_create_queries_sql(
ORDER BY cluster, query_id, target_schema, target_table, starttime ASC
;
""".format(
_SERVERLESS_SEGMENT_SIZE=_SERVERLESS_SEGMENT_SIZE,
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
Expand Down Expand Up @@ -1118,7 +1136,8 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
qh.transaction_id AS transaction_id,
qh.start_time AS start_time,
qh.user_id AS userid,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
RTRIM(LISTAGG(RTRIM(qt."text") || CASE WHEN LEN(RTRIM(qt."text")) < {_SERVERLESS_SEGMENT_SIZE} THEN ' ' ELSE '' END, '')
WITHIN GROUP (ORDER BY sequence)) AS query_text
FROM
SYS_QUERY_HISTORY qh
LEFT JOIN SYS_QUERY_TEXT qt on qt.query_id = qh.query_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def mock_alter_table_rename_cursor(cursor: MagicMock) -> None:

query_vs_cursor_mocker = {
(
"-- DataHub Redshift Source temp table DDL query\nselect\n *\nfrom (\n select\n session_id,\n transaction_id,\n start_time,\n userid,\n REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\\\\\n','\\\\n'), '(CREATE(?:[\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)[^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n query_text,\n row_number() over (\n partition by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n from (\n select\n pid as session_id,\n xid as transaction_id,\n starttime as start_time,\n type,\n query_text,\n userid\n from (\n select\n starttime,\n pid,\n xid,\n type,\n userid,\n LISTAGG(case\n when LEN(RTRIM(text)) = 0 then text\n else RTRIM(text)\n end,\n '') within group (\n order by sequence\n ) as query_text\n from\n SVL_STATEMENTTEXT\n where\n type in ('DDL', 'QUERY')\n AND starttime >= '2024-01-01 12:00:00'\n AND starttime < '2024-01-10 12:00:00'\n AND sequence < 290\n group by\n starttime,\n pid,\n xid,\n type,\n userid\n order by\n starttime,\n pid,\n xid,\n type,\n userid\n asc\n )\n where\n type in ('DDL', 'QUERY')\n )\n where\n (create_command ilike 'create temp table %'\n or create_command ilike 'create temporary table %'\n -- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction\n or create_command ilike 'create table %')\n -- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.\n and query_text not ilike 'CREATE TEMP TABLE volt_tt_%'\n and create_command not like 'CREATE TEMP TABLE volt_tt_'\n -- We need to filter out our query and it was not possible earlier when we did not have any comment in the query\n and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%'\n\n)\nwhere\n rn = 1\n"
"-- DataHub Redshift Source temp table DDL query\nselect\n *\nfrom (\n select\n session_id,\n transaction_id,\n start_time,\n userid,\n REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\\\\\n','\\\\n'), '(CREATE(?:[\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)[^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n query_text,\n row_number() over (\n partition by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n from (\n select\n pid as session_id,\n xid as transaction_id,\n starttime as start_time,\n type,\n query_text,\n userid\n from (\n select\n starttime,\n pid,\n xid,\n type,\n userid,\n RTRIM(LISTAGG(RTRIM(text) || CASE WHEN LEN(RTRIM(text)) < 200 THEN ' ' ELSE '' END, '')\n WITHIN GROUP (ORDER BY sequence)) as query_text\n from\n SVL_STATEMENTTEXT\n where\n type in ('DDL', 'QUERY')\n AND starttime >= '2024-01-01 12:00:00'\n AND starttime < '2024-01-10 12:00:00'\n AND sequence < 290\n group by\n starttime,\n pid,\n xid,\n type,\n userid\n order by\n starttime,\n pid,\n xid,\n type,\n userid\n asc\n )\n where\n type in ('DDL', 'QUERY')\n )\n where\n (create_command ilike 'create temp table %'\n or create_command ilike 'create temporary table %'\n -- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction\n or create_command ilike 'create table %')\n -- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.\n and query_text not ilike 'CREATE TEMP TABLE volt_tt_%'\n and create_command not like 'CREATE TEMP TABLE volt_tt_'\n -- We need to filter out our query and it was not possible earlier when we did not have any comment in the query\n and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%'\n\n)\nwhere\n rn = 1\n"
): mock_temp_table_cursor,
"select * from test_collapse_temp_lineage": mock_stl_insert_table_cursor,
"\n SELECT transaction_id,\n session_id,\n start_time,\n query_text\n FROM sys_query_history SYS\n WHERE SYS.status = 'success'\n AND SYS.query_type = 'DDL'\n AND SYS.database_name = 'test'\n AND SYS.start_time >= '2024-01-01 12:00:00'\n AND SYS.end_time < '2024-01-10 12:00:00'\n AND SYS.query_text ILIKE '%alter table % rename to %'\n ": mock_alter_table_rename_cursor,
Expand Down
117 changes: 117 additions & 0 deletions metadata-ingestion/tests/unit/redshift/test_redshift_queries.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, we are just doing string contains validations

So I would remove the focus on "stitching" and make this just a generic test for redshift queries

  • test_redshift_queries.py
  • class TestProvisionedQueries
  • class TestServerlessQueries
  • ...

So we can add more validations in the future

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised.

Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Tests for Redshift query generation and validation.

Covers query patterns used across RedshiftProvisionedQuery and RedshiftServerlessQuery,
including segment stitching strategies that preserve word boundaries when reconstructing
queries from fixed-width character segments (200 bytes provisioned, 4000 bytes serverless).
"""

from datetime import datetime

from datahub.ingestion.source.redshift.query import (
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)

START_TIME = datetime(2024, 1, 1, 12, 0, 0)
END_TIME = datetime(2024, 1, 10, 12, 0, 0)

# The boundary-aware LISTAGG pattern for 200-byte segments (provisioned).
# Appends a space when the trimmed segment is shorter than the segment size,
# indicating a word boundary was at the segment edge.
PROVISIONED_LISTAGG_PATTERN = (
"RTRIM(LISTAGG(RTRIM(text) "
"|| CASE WHEN LEN(RTRIM(text)) < 200 THEN ' ' ELSE '' END, '')"
)

# The boundary-aware LISTAGG pattern for 4000-byte segments (serverless).
SERVERLESS_LISTAGG_PATTERN_TEXT = (
'RTRIM(LISTAGG(RTRIM(qt."text") '
"|| CASE WHEN LEN(RTRIM(qt.\"text\")) < 4000 THEN ' ' ELSE '' END, '')"
)

SERVERLESS_LISTAGG_PATTERN_QUERYTXT = (
"RTRIM(LISTAGG(RTRIM(querytxt) "
"|| CASE WHEN LEN(RTRIM(querytxt)) < 4000 THEN ' ' ELSE '' END, '')"
)


class TestProvisionedQueries:
def test_list_insert_create_queries_uses_boundary_aware_listagg(self):
sql = RedshiftProvisionedQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert PROVISIONED_LISTAGG_PATTERN in sql

def test_temp_table_ddl_query_uses_boundary_aware_listagg(self):
sql = RedshiftProvisionedQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
)
assert PROVISIONED_LISTAGG_PATTERN in sql

def test_stl_scan_based_lineage_uses_boundary_aware_listagg(self):
sql = RedshiftProvisionedQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert PROVISIONED_LISTAGG_PATTERN in sql

def test_stl_scan_based_lineage_uses_cte_not_stl_query(self):
"""The provisioned scan lineage query should use a CTE from STL_QUERYTEXT
instead of stl_query.querytxt (which is truncated to 4000 chars)."""
sql = RedshiftProvisionedQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert "WITH query_txt AS" in sql
assert "STL_QUERYTEXT" in sql
# Should join query_txt CTE (not stl_query table) for querytxt
assert "join query_txt sq" in sql.lower()
# Should NOT join stl_query table directly (only stl_querytext via CTE)
assert "join stl_query " not in sql.lower()

def test_no_old_listagg_pattern_provisioned(self):
"""Ensure the old LISTAGG pattern with LEN(RTRIM(text)) = 0 is gone."""
for sql in [
RedshiftProvisionedQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
),
RedshiftProvisionedQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
),
]:
assert "LEN(RTRIM(text)) = 0" not in sql


class TestServerlessQueries:
def test_stl_scan_based_lineage_uses_boundary_aware_listagg(self):
sql = RedshiftServerlessQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert SERVERLESS_LISTAGG_PATTERN_TEXT in sql

def test_list_insert_create_queries_uses_boundary_aware_listagg(self):
sql = RedshiftServerlessQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert SERVERLESS_LISTAGG_PATTERN_QUERYTXT in sql

def test_temp_table_ddl_query_uses_boundary_aware_listagg(self):
sql = RedshiftServerlessQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
)
assert SERVERLESS_LISTAGG_PATTERN_TEXT in sql

def test_no_old_listagg_pattern_serverless(self):
"""Ensure the old bare LISTAGG(qt."text") pattern is gone for serverless."""
for sql in [
RedshiftServerlessQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
),
RedshiftServerlessQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
),
RedshiftServerlessQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
),
]:
# Should not have bare LISTAGG without RTRIM wrapper
assert 'LISTAGG(qt."text")' not in sql
assert "LEN(RTRIM(querytxt)) = 0" not in sql
Loading