|
| 1 | +from collections.abc import Sequence |
| 2 | +from pathlib import Path |
| 3 | +from typing import Any, NamedTuple, Optional |
| 4 | + |
| 5 | +import click |
| 6 | +from jsonschema import Draft202012Validator, ValidationError |
| 7 | +from yaml.scanner import ScannerError |
| 8 | + |
| 9 | +from dagster_dg.cli.check_utils import error_dict_to_formatted_error |
| 10 | +from dagster_dg.cli.global_options import dg_global_options |
| 11 | +from dagster_dg.component import RemoteComponentRegistry |
| 12 | +from dagster_dg.component_key import ComponentKey, LocalComponentKey |
| 13 | +from dagster_dg.config import normalize_cli_config |
| 14 | +from dagster_dg.context import DgContext |
| 15 | +from dagster_dg.utils import DgClickCommand, DgClickGroup |
| 16 | +from dagster_dg.yaml_utils import parse_yaml_with_source_positions |
| 17 | +from dagster_dg.yaml_utils.source_position import ( |
| 18 | + LineCol, |
| 19 | + SourcePosition, |
| 20 | + SourcePositionTree, |
| 21 | + ValueAndSourcePositionTree, |
| 22 | +) |
| 23 | + |
| 24 | + |
| 25 | +@click.group(name="check", cls=DgClickGroup) |
| 26 | +def check_group(): |
| 27 | + """Commands for checking the integrity of your Dagster code.""" |
| 28 | + |
| 29 | + |
| 30 | +# ######################## |
| 31 | +# ##### COMPONENT |
| 32 | +# ######################## |
| 33 | + |
| 34 | +COMPONENT_FILE_SCHEMA = { |
| 35 | + "type": "object", |
| 36 | + "properties": { |
| 37 | + "type": {"type": "string"}, |
| 38 | + "params": {"type": "object"}, |
| 39 | + }, |
| 40 | + "additionalProperties": False, |
| 41 | +} |
| 42 | + |
| 43 | + |
| 44 | +def _is_local_component(component_name: str) -> bool: |
| 45 | + return component_name.endswith(".py") |
| 46 | + |
| 47 | + |
| 48 | +def _scaffold_value_and_source_position_tree( |
| 49 | + filename: str, row: int, col: int |
| 50 | +) -> ValueAndSourcePositionTree: |
| 51 | + return ValueAndSourcePositionTree( |
| 52 | + value=None, |
| 53 | + source_position_tree=SourcePositionTree( |
| 54 | + position=SourcePosition( |
| 55 | + filename=filename, start=LineCol(row, col), end=LineCol(row, col) |
| 56 | + ), |
| 57 | + children={}, |
| 58 | + ), |
| 59 | + ) |
| 60 | + |
| 61 | + |
| 62 | +class ErrorInput(NamedTuple): |
| 63 | + component_name: Optional[ComponentKey] |
| 64 | + error: ValidationError |
| 65 | + source_position_tree: ValueAndSourcePositionTree |
| 66 | + |
| 67 | + |
| 68 | +@check_group.command(name="component", cls=DgClickCommand) |
| 69 | +@click.argument("paths", nargs=-1, type=click.Path(exists=True)) |
| 70 | +@dg_global_options |
| 71 | +@click.pass_context |
| 72 | +def component_check_command( |
| 73 | + context: click.Context, |
| 74 | + paths: Sequence[str], |
| 75 | + **global_options: object, |
| 76 | +) -> None: |
| 77 | + """Check component files against their schemas, showing validation errors.""" |
| 78 | + resolved_paths = [Path(path).absolute() for path in paths] |
| 79 | + top_level_component_validator = Draft202012Validator(schema=COMPONENT_FILE_SCHEMA) |
| 80 | + |
| 81 | + cli_config = normalize_cli_config(global_options, context) |
| 82 | + dg_context = DgContext.for_code_location_environment(Path.cwd(), cli_config) |
| 83 | + |
| 84 | + validation_errors: list[ErrorInput] = [] |
| 85 | + |
| 86 | + component_contents_by_key: dict[ComponentKey, Any] = {} |
| 87 | + local_component_dirs = set() |
| 88 | + for component_dir in dg_context.components_path.iterdir(): |
| 89 | + if resolved_paths and not any( |
| 90 | + path == component_dir or path in component_dir.parents for path in resolved_paths |
| 91 | + ): |
| 92 | + continue |
| 93 | + |
| 94 | + component_path = component_dir / "component.yaml" |
| 95 | + |
| 96 | + if component_path.exists(): |
| 97 | + text = component_path.read_text() |
| 98 | + try: |
| 99 | + component_doc_tree = parse_yaml_with_source_positions( |
| 100 | + text, filename=str(component_path) |
| 101 | + ) |
| 102 | + except ScannerError as se: |
| 103 | + validation_errors.append( |
| 104 | + ErrorInput( |
| 105 | + None, |
| 106 | + ValidationError(f"Unable to parse YAML: {se.context}, {se.problem}"), |
| 107 | + _scaffold_value_and_source_position_tree( |
| 108 | + filename=str(component_path), |
| 109 | + row=se.problem_mark.line + 1 if se.problem_mark else 1, |
| 110 | + col=se.problem_mark.column + 1 if se.problem_mark else 1, |
| 111 | + ), |
| 112 | + ) |
| 113 | + ) |
| 114 | + continue |
| 115 | + # First, validate the top-level structure of the component file |
| 116 | + # (type and params keys) before we try to validate the params themselves. |
| 117 | + top_level_errs = list( |
| 118 | + top_level_component_validator.iter_errors(component_doc_tree.value) |
| 119 | + ) |
| 120 | + for err in top_level_errs: |
| 121 | + validation_errors.append(ErrorInput(None, err, component_doc_tree)) |
| 122 | + if top_level_errs: |
| 123 | + continue |
| 124 | + |
| 125 | + component_key = ComponentKey.from_typename( |
| 126 | + component_doc_tree.value.get("type"), dirpath=component_path.parent |
| 127 | + ) |
| 128 | + component_contents_by_key[component_key] = component_doc_tree |
| 129 | + if isinstance(component_key, LocalComponentKey): |
| 130 | + local_component_dirs.add(component_dir) |
| 131 | + |
| 132 | + # Fetch the local component types, if we need any local components |
| 133 | + component_registry = RemoteComponentRegistry.from_dg_context( |
| 134 | + dg_context, local_component_type_dirs=list(local_component_dirs) |
| 135 | + ) |
| 136 | + for component_key, component_doc_tree in component_contents_by_key.items(): |
| 137 | + try: |
| 138 | + json_schema = component_registry.get(component_key).component_params_schema or {} |
| 139 | + |
| 140 | + v = Draft202012Validator(json_schema) |
| 141 | + for err in v.iter_errors(component_doc_tree.value["params"]): |
| 142 | + validation_errors.append(ErrorInput(component_key, err, component_doc_tree)) |
| 143 | + except KeyError: |
| 144 | + # No matching component type found |
| 145 | + validation_errors.append( |
| 146 | + ErrorInput( |
| 147 | + None, |
| 148 | + ValidationError( |
| 149 | + f"Component type '{component_key.to_typename()}' not found in {component_key.python_file}." |
| 150 | + if isinstance(component_key, LocalComponentKey) |
| 151 | + else f"Component type '{component_key.to_typename()}' not found." |
| 152 | + ), |
| 153 | + component_doc_tree, |
| 154 | + ) |
| 155 | + ) |
| 156 | + if validation_errors: |
| 157 | + for component_key, error, component_doc_tree in validation_errors: |
| 158 | + click.echo( |
| 159 | + error_dict_to_formatted_error( |
| 160 | + component_key, |
| 161 | + error, |
| 162 | + source_position_tree=component_doc_tree.source_position_tree, |
| 163 | + prefix=["params"] if component_key else [], |
| 164 | + ) |
| 165 | + ) |
| 166 | + context.exit(1) |
| 167 | + else: |
| 168 | + click.echo("All components validated successfully.") |
0 commit comments