Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Python/Scripts/Any/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,21 @@ To utilize the checks provided, follow these configuration steps. The script pat
| Path | Scripts/pii_pan.py |
| Args | |
| Snapshot | false |
1. [**SchemaCheck**](schema_check.py)
| Key | Value |
|--------|----------|
| Database | Relational |
```
liquibase checks customize --check-name=CustomCheckTemplate
```
| Prompt | Response |
|--------|----------|
| Short Name | SchemaCheck |
| Severity | 0-4 |
| Description | Ensure objects being referenced contain a schema name. |
| Scope | changelog |
| Message | Ensure objects being referenced contain a schema name. |
| Type | python |
| Path | Scripts/schema_check.py |
| Args | |
| Snapshot | false |
149 changes: 149 additions & 0 deletions Python/Scripts/Any/schema_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
###
### This script checks that all SQL object references include a schema (e.g., dbo.TableName)
###
### Notes:
### 1. Applies to SELECT, INSERT, UPDATE, DELETE, EXEC, CREATE, and ALTER statements
### 2. Ignores literals, column names, and functions
###
import sys
import re
import sqlparse
from sqlparse.sql import Identifier
from sqlparse.tokens import Keyword, DDL, DML, Name
import liquibase_utilities

###
### Retrieve log handler
### Ex. liquibase_logger.info(message)
###
liquibase_logger = liquibase_utilities.get_logger()
###
### Retrieve status handler
###
liquibase_status = liquibase_utilities.get_status()

def extract_object_identifiers(stmt):
"""
Return object identifiers (tables, procs) that should have a schema.
Only takes the first token after relevant keywords to avoid multi-line issues.
"""
identifiers = []
tokens = [t for t in stmt.tokens if not t.is_whitespace]

# Pre-collect all aliases defined in the statement (e.g., "dbo.users u" -> alias "u")
defined_aliases = set()
for token in tokens:
if isinstance(token, Identifier):
alias = token.get_alias()
if alias:
defined_aliases.add(alias.lower())

# Pre-collect cursor names (e.g., DECLARE crs_Year CURSOR ...)
defined_cursors = set()
for i, token in enumerate(tokens):
if token.ttype is Keyword and token.value.upper() == "CURSOR":
if i > 0:
cursor_name = str(tokens[i - 1]).strip().lower()
defined_cursors.add(cursor_name)

for i, token in enumerate(tokens):
is_dml_trigger = (
token.ttype is DML and token.value.upper() in ("UPDATE", "INSERT", "DELETE")
)
is_ddl_trigger = (
token.ttype in (DDL, Keyword) and token.value.upper() in ("CREATE", "ALTER")
)
is_keyword_trigger = (
token.ttype is Keyword and token.value.upper() in (
"FROM", "JOIN", "INTO", "EXEC", "EXECUTE",
"PROCEDURE", "TABLE", "VIEW"
)
)
if not (is_dml_trigger or is_ddl_trigger or is_keyword_trigger):
continue

# Look ahead for next non-whitespace token
j = i + 1
while j < len(tokens) and tokens[j].is_whitespace:
j += 1
if j >= len(tokens):
continue

next_token = tokens[j]

# Skip tokens that are known aliases or cursor names (e.g., UPDATE u ... FROM dbo.users u)
token_str = str(next_token).strip().lower()
if token_str in defined_aliases or token_str in defined_cursors:
continue

# Skip subqueries (next token is an opening parenthesis)
if str(next_token).strip().startswith("("):
continue

if isinstance(next_token, Identifier):
identifiers.append(next_token)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
elif next_token.ttype in (Name, None):
obj_name_str = str(next_token).strip()
if obj_name_str:
identifiers.append(Identifier([next_token]))

return identifiers

def check_schema_qualification(sql_text):
"""
Parses the provided SQL text and returns a list of unqualified object names
that are missing a schema prefix (e.g., TableName instead of dbo.TableName).
Returns an empty list if all objects are properly schema-qualified.
"""
violations = []
statements = sqlparse.parse(sql_text)
for stmt in statements:
stmt_str = str(stmt).strip()
# Skip unsupported statements
if not re.match(r"^\s*(SELECT|INSERT|UPDATE|DELETE|EXEC|EXECUTE|CREATE|ALTER)\b", stmt_str, re.IGNORECASE):
continue
Comment on lines +103 to +104

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

CTE queries (WITH ... AS (...)) are silently skipped — no schema check applied.

The regex requires statements to start with SELECT/INSERT/etc., so any WITH-prefixed CTE entirely bypasses the check. Unqualified table references inside the CTE body (e.g., WITH cte AS (SELECT * FROM UnqualifiedTable)) go unreported.

Adding WITH to the regex is trivial; the harder part is tracking the CTE alias names (cte, etc.) and excluding them from violation detection in the outer query's FROM cte reference to avoid introducing false positives.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Python/Scripts/Any/schema_check.py` around lines 76 - 77, The current check
skips CTEs because the stmt_str regex only accepts statements starting with
SELECT/INSERT/...; update that regex to include WITH so CTE-led statements are
inspected, and implement CTE alias tracking: when processing stmt_str, parse
leading CTE definitions (e.g., match "WITH\s+([^(\s,]+)\s+AS" repeatedly) to
collect alias names, then when your unqualified-table detection logic (the
routine that scans FROM/JOIN table identifiers in the same scope as stmt_str)
runs, exclude any table references that match the collected CTE aliases to avoid
false positives; refer to the existing re.match check on stmt_str and the
variable holding the statement text (stmt_str) when adding the CTE parsing and
exclusion logic.

# Extract identifiers and check for schema prefix
identifiers = extract_object_identifiers(stmt)
for ident in identifiers:
full_name = str(ident).strip().split()[0] # only first token
schema = ident.get_parent_name()
name = ident.get_real_name() or ident.get_name()
# Ignore temp tables, variables, sys objects, etc.
if not name or name.startswith("#") or name.startswith("@") or full_name.lower().startswith("sys."):
continue
# Flag if schema missing
if not schema and "." not in full_name:
violations.append(full_name)
return violations

###
### Retrieve all changes in changeset
###
changes = liquibase_utilities.get_changeset().getChanges()

###
### Loop through all changes
###
for change in changes:
###
### LoadData change types are not currently supported
###
if "loaddatachange" in change.getClass().getSimpleName().lower():
continue
###
### Retrieve raw sql and check for schema qualification violations
###
sql_text = liquibase_utilities.generate_sql(change)
# print(f"RAW SQL: {repr(sql_text)}")
violations = check_schema_qualification(sql_text)
if violations:
msg = f"Missing schema for object '{violations[0]}' in SQL statement."
liquibase_logger.warning(msg)
liquibase_status.fired = True
liquibase_status.message = msg
sys.exit(1)
Comment on lines +139 to +144

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Only the first violation per changeset is reported; remaining violations are silently dropped.

Even though check_schema_qualification returns all unqualified names, the outer loop breaks on the first violating change using violations[0]. A developer with three unqualified tables must fix-and-re-run three times to discover all of them.

💡 Proposed improvement — report all violations before exiting
-    if violations:
-        msg = f"Missing schema for object '{violations[0]}' in SQL statement."
-        liquibase_logger.warning(msg)
-        liquibase_status.fired = True
-        liquibase_status.message = msg
-        sys.exit(1)
+    for v in violations:
+        liquibase_logger.warning(f"Missing schema for object '{v}' in SQL statement.")
+    if violations:
+        msg = f"Missing schema qualification: {', '.join(violations)}"
+        liquibase_status.fired = True
+        liquibase_status.message = msg
+        sys.exit(1)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Python/Scripts/Any/schema_check.py` around lines 111 - 116, The code only
reports violations[0] then exits; change it to report all violations returned by
check_schema_qualification by joining or iterating over the violations list and
including every unqualified object in the log and the liquibase_status.message
before exiting; update the block that references violations, liquibase_logger,
and liquibase_status (the same scope where violations is set by
check_schema_qualification) so the warning message contains all items (e.g.,
comma-separated or multiline) and still sets liquibase_status.fired = True and
sys.exit(1).


###
### Default return code
###
False