|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +from lsprotocol.types import Range, Position |
| 4 | +import typing as t |
| 5 | + |
| 6 | +from sqlmesh.core.dialect import normalize_model_name |
| 7 | +from sqlmesh.core.model.definition import SqlModel |
| 8 | +from sqlmesh.lsp.context import LSPContext |
| 9 | +from sqlglot import exp |
| 10 | + |
| 11 | +from sqlmesh.utils.pydantic import PydanticModel |
| 12 | + |
| 13 | + |
| 14 | +class Reference(PydanticModel): |
| 15 | + range: Range |
| 16 | + uri: str |
| 17 | + |
| 18 | + |
| 19 | +def get_model_definitions_for_a_path( |
| 20 | + lint_context: LSPContext, document_uri: str |
| 21 | +) -> t.List[Reference]: |
| 22 | + """ |
| 23 | + Get the model references for a given path. |
| 24 | +
|
| 25 | + Works for models and audits. |
| 26 | + Works for targeting sql and python models. |
| 27 | +
|
| 28 | + Steps: |
| 29 | + - Get the parsed query |
| 30 | + - Find all table objects using find_all exp.Table |
| 31 | + - Match the string against all model names |
| 32 | + - Need to normalize it before matching |
| 33 | + - Try get_model before normalization |
| 34 | + - Match to models that the model refers to |
| 35 | + """ |
| 36 | + # Ensure the path is a sql model |
| 37 | + if not document_uri.endswith(".sql"): |
| 38 | + return [] |
| 39 | + |
| 40 | + # Get the model |
| 41 | + models = lint_context.map[document_uri] |
| 42 | + if models is None: |
| 43 | + return [] |
| 44 | + if len(models) == 0: |
| 45 | + return [] |
| 46 | + model_name = models[0] |
| 47 | + model = lint_context.context.get_model(model_or_snapshot=model_name, raise_if_missing=False) |
| 48 | + if model is None: |
| 49 | + return [] |
| 50 | + if not isinstance(model, SqlModel): |
| 51 | + return [] |
| 52 | + |
| 53 | + # Find all possible references |
| 54 | + tables = list(model.query.find_all(exp.Table)) |
| 55 | + if len(tables) == 0: |
| 56 | + return [] |
| 57 | + |
| 58 | + references = [] |
| 59 | + for table in tables: |
| 60 | + depends_on = model.depends_on |
| 61 | + |
| 62 | + # Normalize the table reference |
| 63 | + reference_name = table.this.this if table.db is None else f"{table.db}.{table.this.this}" |
| 64 | + normalized_reference_name = normalize_model_name( |
| 65 | + reference_name, default_catalog=lint_context.context.default_catalog |
| 66 | + ) |
| 67 | + if normalized_reference_name not in depends_on: |
| 68 | + continue |
| 69 | + |
| 70 | + # Get the referenced model uri |
| 71 | + referenced_model = lint_context.context.get_model( |
| 72 | + model_or_snapshot=normalized_reference_name, raise_if_missing=False |
| 73 | + ) |
| 74 | + if referenced_model is None: |
| 75 | + continue |
| 76 | + # Get the model uri |
| 77 | + referenced_model_path = referenced_model._path |
| 78 | + if referenced_model_path is None: |
| 79 | + continue |
| 80 | + # Fully qualify the path in case |
| 81 | + path = Path.resolve(Path(referenced_model_path)) |
| 82 | + referenced_model_uri = f"file://{path}" |
| 83 | + read_file = open(path, "r").readlines() |
| 84 | + |
| 85 | + # Extract metadata for positioning |
| 86 | + table_meta = TokenPositionDetails.from_meta(table.this.meta) |
| 87 | + table_range = _range_from_token_position_details(table_meta, read_file) |
| 88 | + start_pos = table_range.start |
| 89 | + end_pos = table_range.end |
| 90 | + |
| 91 | + # If there's a database qualifier, adjust the start position |
| 92 | + db = table.args.get("db") |
| 93 | + if db is not None: |
| 94 | + db_meta = TokenPositionDetails.from_meta(db.meta) |
| 95 | + db_range = _range_from_token_position_details(db_meta, read_file) |
| 96 | + start_pos = db_range.start |
| 97 | + |
| 98 | + # If there's a catalog qualifier, adjust the start position further |
| 99 | + catalog = table.args.get("catalog") |
| 100 | + if catalog is not None: |
| 101 | + catalog_meta = TokenPositionDetails.from_meta(catalog.meta) |
| 102 | + catalog_range = _range_from_token_position_details(catalog_meta, read_file) |
| 103 | + start_pos = catalog_range.start |
| 104 | + |
| 105 | + references.append( |
| 106 | + Reference(uri=referenced_model_uri, range=Range(start=start_pos, end=end_pos)) |
| 107 | + ) |
| 108 | + |
| 109 | + return references |
| 110 | + |
| 111 | + |
| 112 | +class TokenPositionDetails(PydanticModel): |
| 113 | + """ |
| 114 | + Details about a token's position in the source code. |
| 115 | +
|
| 116 | + Attributes: |
| 117 | + line (int): The line that the token ends on. |
| 118 | + col (int): The column that the token ends on. |
| 119 | + start (int): The start index of the token. |
| 120 | + end (int): The ending index of the token. |
| 121 | + """ |
| 122 | + |
| 123 | + line: int |
| 124 | + col: int |
| 125 | + start: int |
| 126 | + end: int |
| 127 | + |
| 128 | + @staticmethod |
| 129 | + def from_meta(meta: t.Dict[str, int]) -> "TokenPositionDetails": |
| 130 | + return TokenPositionDetails( |
| 131 | + line=meta["line"], |
| 132 | + col=meta["col"], |
| 133 | + start=meta["start"], |
| 134 | + end=meta["end"], |
| 135 | + ) |
| 136 | + |
| 137 | + |
| 138 | +def _range_from_token_position_details( |
| 139 | + token_position_details: TokenPositionDetails, read_file: t.List[str] |
| 140 | +) -> Range: |
| 141 | + """ |
| 142 | + Convert a TokenPositionDetails object to a Range object. |
| 143 | +
|
| 144 | + :param token_position_details: Details about a token's position |
| 145 | + :param read_file: List of lines from the file |
| 146 | + :return: A Range object representing the token's position |
| 147 | + """ |
| 148 | + # Convert from 1-indexed to 0-indexed for line and column |
| 149 | + end_line_0 = token_position_details.line - 1 |
| 150 | + end_col_0 = token_position_details.col |
| 151 | + |
| 152 | + # Find the start line and column by counting backwards from the end position |
| 153 | + start_pos = token_position_details.start |
| 154 | + end_pos = token_position_details.end |
| 155 | + |
| 156 | + # Initialize with the end position |
| 157 | + start_line_0 = end_line_0 |
| 158 | + start_col_0 = end_col_0 - (end_pos - start_pos + 1) |
| 159 | + |
| 160 | + # If start_col_0 is negative, we need to go back to previous lines |
| 161 | + while start_col_0 < 0 and start_line_0 > 0: |
| 162 | + start_line_0 -= 1 |
| 163 | + start_col_0 += len(read_file[start_line_0]) |
| 164 | + # Account for newline character |
| 165 | + if start_col_0 >= 0: |
| 166 | + break |
| 167 | + start_col_0 += 1 # For the newline character |
| 168 | + |
| 169 | + # Ensure we don't have negative values |
| 170 | + start_col_0 = max(0, start_col_0) |
| 171 | + return Range( |
| 172 | + start=Position(line=start_line_0, character=start_col_0), |
| 173 | + end=Position(line=end_line_0, character=end_col_0), |
| 174 | + ) |
0 commit comments