-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Expand file tree
/
Copy pathtest_redshift_queries.py
More file actions
117 lines (99 loc) · 4.85 KB
/
Copy pathtest_redshift_queries.py
File metadata and controls
117 lines (99 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Tests for Redshift query generation and validation.
Covers query patterns used across RedshiftProvisionedQuery and RedshiftServerlessQuery,
including segment stitching strategies that preserve word boundaries when reconstructing
queries from fixed-width character segments (200 bytes provisioned, 4000 bytes serverless).
"""
from datetime import datetime
from datahub.ingestion.source.redshift.query import (
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
START_TIME = datetime(2024, 1, 1, 12, 0, 0)
END_TIME = datetime(2024, 1, 10, 12, 0, 0)
# The boundary-aware LISTAGG pattern for 200-byte segments (provisioned).
# Appends a space when the trimmed segment is shorter than the segment size,
# indicating a word boundary was at the segment edge.
PROVISIONED_LISTAGG_PATTERN = (
"RTRIM(LISTAGG(RTRIM(text) "
"|| CASE WHEN LEN(RTRIM(text)) < 200 THEN ' ' ELSE '' END, '')"
)
# The boundary-aware LISTAGG pattern for 4000-byte segments (serverless).
SERVERLESS_LISTAGG_PATTERN_TEXT = (
'RTRIM(LISTAGG(RTRIM(qt."text") '
"|| CASE WHEN LEN(RTRIM(qt.\"text\")) < 4000 THEN ' ' ELSE '' END, '')"
)
SERVERLESS_LISTAGG_PATTERN_QUERYTXT = (
"RTRIM(LISTAGG(RTRIM(querytxt) "
"|| CASE WHEN LEN(RTRIM(querytxt)) < 4000 THEN ' ' ELSE '' END, '')"
)
class TestProvisionedQueries:
def test_list_insert_create_queries_uses_boundary_aware_listagg(self):
sql = RedshiftProvisionedQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert PROVISIONED_LISTAGG_PATTERN in sql
def test_temp_table_ddl_query_uses_boundary_aware_listagg(self):
sql = RedshiftProvisionedQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
)
assert PROVISIONED_LISTAGG_PATTERN in sql
def test_stl_scan_based_lineage_uses_boundary_aware_listagg(self):
sql = RedshiftProvisionedQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert PROVISIONED_LISTAGG_PATTERN in sql
def test_stl_scan_based_lineage_uses_cte_not_stl_query(self):
"""The provisioned scan lineage query should use a CTE from STL_QUERYTEXT
instead of stl_query.querytxt (which is truncated to 4000 chars)."""
sql = RedshiftProvisionedQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert "WITH query_txt AS" in sql
assert "STL_QUERYTEXT" in sql
# Should join query_txt CTE (not stl_query table) for querytxt
assert "join query_txt sq" in sql.lower()
# Should NOT join stl_query table directly (only stl_querytext via CTE)
assert "join stl_query " not in sql.lower()
def test_no_old_listagg_pattern_provisioned(self):
"""Ensure the old LISTAGG pattern with LEN(RTRIM(text)) = 0 is gone."""
for sql in [
RedshiftProvisionedQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
),
RedshiftProvisionedQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
),
]:
assert "LEN(RTRIM(text)) = 0" not in sql
class TestServerlessQueries:
def test_stl_scan_based_lineage_uses_boundary_aware_listagg(self):
sql = RedshiftServerlessQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert SERVERLESS_LISTAGG_PATTERN_TEXT in sql
def test_list_insert_create_queries_uses_boundary_aware_listagg(self):
sql = RedshiftServerlessQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
)
assert SERVERLESS_LISTAGG_PATTERN_QUERYTXT in sql
def test_temp_table_ddl_query_uses_boundary_aware_listagg(self):
sql = RedshiftServerlessQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
)
assert SERVERLESS_LISTAGG_PATTERN_TEXT in sql
def test_no_old_listagg_pattern_serverless(self):
"""Ensure the old bare LISTAGG(qt."text") pattern is gone for serverless."""
for sql in [
RedshiftServerlessQuery.stl_scan_based_lineage_query(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
),
RedshiftServerlessQuery.list_insert_create_queries_sql(
db_name="test_db", start_time=START_TIME, end_time=END_TIME
),
RedshiftServerlessQuery.temp_table_ddl_query(
start_time=START_TIME, end_time=END_TIME
),
]:
# Should not have bare LISTAGG without RTRIM wrapper
assert 'LISTAGG(qt."text")' not in sql
assert "LEN(RTRIM(querytxt)) = 0" not in sql