-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathschema_change_handlers.py
More file actions
90 lines (79 loc) · 3.34 KB
/
Copy pathschema_change_handlers.py
File metadata and controls
90 lines (79 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from typing import Callable, Dict, Optional
from dbt_dry_run.exception import SchemaChangeException
from dbt_dry_run.models import OnSchemaChange, Table
from dbt_dry_run.models.dry_run_result import DryRunResult
from dbt_dry_run.models.report import DryRunStatus
from dbt_dry_run.utils import find_missing_fields, build_predicted_fields
def ignore_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunResult:
return dry_run_result.replace_table(target_table)
def append_new_columns_handler(
dry_run_result: DryRunResult, target_table: Table
) -> DryRunResult:
if dry_run_result.table is None:
return dry_run_result
missing_fields = find_missing_fields(
dry_run_result.table.fields, target_table.fields
)
predicted_fields = build_predicted_fields(target_table, missing_fields)
return dry_run_result.replace_table(Table(fields=predicted_fields))
def sync_all_columns_handler(
dry_run_result: DryRunResult, target_table: Table
) -> DryRunResult:
if dry_run_result.table is None:
return dry_run_result
predicted_column_names = set(field.name for field in dry_run_result.table.fields)
target_column_names = set(field.name for field in target_table.fields)
new_columns = [
new_field
for new_field in dry_run_result.table.fields
if new_field.name not in target_column_names
]
existing_columns = [
existing_field
for existing_field in target_table.fields
if existing_field.name in predicted_column_names
]
final_field_names = set(field.name for field in existing_columns + new_columns)
missing_fields = find_missing_fields(dry_run_result.table.fields, existing_columns)
final_fields = build_predicted_fields(
target_table=target_table,
missing_fields=missing_fields,
included_top_level_field_names=final_field_names,
)
return dry_run_result.replace_table(Table(fields=final_fields))
def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunResult:
if dry_run_result.table is None:
return dry_run_result
predicted_table_field_names = set(
[field.name for field in dry_run_result.table.fields]
)
target_table_field_names = set([field.name for field in target_table.fields])
added_fields = predicted_table_field_names.difference(target_table_field_names)
removed_fields = target_table_field_names.difference(predicted_table_field_names)
schema_changed = added_fields or removed_fields
table: Optional[Table] = target_table
status = dry_run_result.status
exception = dry_run_result.exception
if schema_changed:
table = None
status = DryRunStatus.FAILURE
msg = (
f"Incremental model has changed schemas. "
f"Fields added: {added_fields}, "
f"Fields removed: {removed_fields}"
)
exception = SchemaChangeException(msg)
return DryRunResult(
node=dry_run_result.node,
table=table,
status=status,
exception=exception,
)
ON_SCHEMA_CHANGE_TABLE_HANDLER: Dict[
OnSchemaChange, Callable[[DryRunResult, Table], DryRunResult]
] = {
OnSchemaChange.IGNORE: ignore_handler,
OnSchemaChange.APPEND_NEW_COLUMNS: append_new_columns_handler,
OnSchemaChange.SYNC_ALL_COLUMNS: sync_all_columns_handler,
OnSchemaChange.FAIL: fail_handler,
}