|
46 | 46 | from pathlib import Path |
47 | 47 | from shutil import rmtree |
48 | 48 | from types import MappingProxyType |
| 49 | +from datetime import datetime |
49 | 50 |
|
50 | 51 | from sqlglot import Dialect, exp |
51 | 52 | from sqlglot.helper import first |
|
126 | 127 | format_tz_datetime, |
127 | 128 | now_timestamp, |
128 | 129 | now, |
| 130 | + to_datetime, |
| 131 | + make_exclusive, |
129 | 132 | ) |
130 | 133 | from sqlmesh.utils.errors import ( |
131 | 134 | CircuitBreakerError, |
@@ -1215,6 +1218,7 @@ def plan( |
1215 | 1218 | diff_rendered: t.Optional[bool] = None, |
1216 | 1219 | skip_linter: t.Optional[bool] = None, |
1217 | 1220 | explain: t.Optional[bool] = None, |
| 1221 | + min_intervals: t.Optional[int] = None, |
1218 | 1222 | ) -> Plan: |
1219 | 1223 | """Interactively creates a plan. |
1220 | 1224 |
|
@@ -1261,6 +1265,8 @@ def plan( |
1261 | 1265 | diff_rendered: Whether the diff should compare raw vs rendered models |
1262 | 1266 | skip_linter: Linter runs by default so this will skip it if enabled |
1263 | 1267 | explain: Whether to explain the plan instead of applying it. |
| 1268 | + min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered |
| 1269 | + on every model when checking for missing intervals |
1264 | 1270 |
|
1265 | 1271 | Returns: |
1266 | 1272 | The populated Plan object. |
@@ -1289,6 +1295,7 @@ def plan( |
1289 | 1295 | diff_rendered=diff_rendered, |
1290 | 1296 | skip_linter=skip_linter, |
1291 | 1297 | explain=explain, |
| 1298 | + min_intervals=min_intervals, |
1292 | 1299 | ) |
1293 | 1300 |
|
1294 | 1301 | plan = plan_builder.build() |
@@ -1338,6 +1345,7 @@ def plan_builder( |
1338 | 1345 | diff_rendered: t.Optional[bool] = None, |
1339 | 1346 | skip_linter: t.Optional[bool] = None, |
1340 | 1347 | explain: t.Optional[bool] = None, |
| 1348 | + min_intervals: t.Optional[int] = None, |
1341 | 1349 | ) -> PlanBuilder: |
1342 | 1350 | """Creates a plan builder. |
1343 | 1351 |
|
@@ -1374,6 +1382,8 @@ def plan_builder( |
1374 | 1382 | enable_preview: Indicates whether to enable preview for forward-only models in development environments. |
1375 | 1383 | run: Whether to run latest intervals as part of the plan application. |
1376 | 1384 | diff_rendered: Whether the diff should compare raw vs rendered models |
| 1385 | + min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered |
| 1386 | + on every model when checking for missing intervals |
1377 | 1387 |
|
1378 | 1388 | Returns: |
1379 | 1389 | The plan builder. |
@@ -1401,6 +1411,7 @@ def plan_builder( |
1401 | 1411 | "run": run, |
1402 | 1412 | "diff_rendered": diff_rendered, |
1403 | 1413 | "skip_linter": skip_linter, |
| 1414 | + "min_intervals": min_intervals, |
1404 | 1415 | } |
1405 | 1416 | user_provided_flags: t.Dict[str, UserProvidedFlags] = { |
1406 | 1417 | k: v for k, v in kwargs.items() if v is not None |
@@ -1523,6 +1534,15 @@ def plan_builder( |
1523 | 1534 | # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. |
1524 | 1535 | self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) |
1525 | 1536 |
|
| 1537 | + start_override_per_model = self._calculate_start_override_per_model( |
| 1538 | + min_intervals, |
| 1539 | + start or default_start, |
| 1540 | + end or default_end, |
| 1541 | + execution_time or now(), |
| 1542 | + backfill_models, |
| 1543 | + snapshots, |
| 1544 | + ) |
| 1545 | + |
1526 | 1546 | return self.PLAN_BUILDER_TYPE( |
1527 | 1547 | context_diff=context_diff, |
1528 | 1548 | start=start, |
@@ -1553,6 +1573,7 @@ def plan_builder( |
1553 | 1573 | ), |
1554 | 1574 | end_bounded=not run, |
1555 | 1575 | ensure_finalized_snapshots=self.config.plan.use_finalized_state, |
| 1576 | + start_override_per_model=start_override_per_model, |
1556 | 1577 | interval_end_per_model=max_interval_end_per_model, |
1557 | 1578 | console=self.console, |
1558 | 1579 | user_provided_flags=user_provided_flags, |
@@ -2864,6 +2885,58 @@ def _get_plan_default_start_end( |
2864 | 2885 |
|
2865 | 2886 | return default_start, default_end |
2866 | 2887 |
|
| 2888 | + def _calculate_start_override_per_model( |
| 2889 | + self, |
| 2890 | + min_intervals: t.Optional[int], |
| 2891 | + plan_start: t.Optional[TimeLike], |
| 2892 | + plan_end: t.Optional[TimeLike], |
| 2893 | + plan_execution_time: TimeLike, |
| 2894 | + backfill_model_fqns: t.Optional[t.Set[str]], |
| 2895 | + snapshots_by_model_fqn: t.Dict[str, Snapshot], |
| 2896 | + ) -> t.Dict[str, datetime]: |
| 2897 | + if not min_intervals or not backfill_model_fqns or not plan_start: |
| 2898 | + # If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number |
| 2899 | + # If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number |
| 2900 | + # If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis |
| 2901 | + return {} |
| 2902 | + |
| 2903 | + start_overrides = {} |
| 2904 | + |
| 2905 | + plan_execution_time_dt = to_datetime(plan_execution_time) |
| 2906 | + plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt) |
| 2907 | + plan_end_dt = to_datetime( |
| 2908 | + plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt |
| 2909 | + ) |
| 2910 | + |
| 2911 | + for model_fqn in backfill_model_fqns: |
| 2912 | + snapshot = snapshots_by_model_fqn.get(model_fqn) |
| 2913 | + if not snapshot: |
| 2914 | + continue |
| 2915 | + |
| 2916 | + starting_point = plan_end_dt |
| 2917 | + if node_end := snapshot.node.end: |
| 2918 | + # if we dont do this, if the node end is a date (as opposed to a timestamp) |
| 2919 | + # we end up incorrectly winding back an extra day |
| 2920 | + node_end_dt = make_exclusive(node_end) |
| 2921 | + |
| 2922 | + if node_end_dt < plan_end_dt: |
| 2923 | + # if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals |
| 2924 | + # instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid |
| 2925 | + starting_point = node_end_dt |
| 2926 | + |
| 2927 | + snapshot_start = snapshot.node.cron_floor(starting_point) |
| 2928 | + |
| 2929 | + for _ in range(min_intervals): |
| 2930 | + # wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date |
| 2931 | + snapshot_start = snapshot.node.cron_prev(snapshot_start) |
| 2932 | + |
| 2933 | + # only consider this an override if the wound-back start date is earlier than the plan start date |
| 2934 | + # if it isnt then the plan already covers :min_intervals intervals for this snapshot |
| 2935 | + if snapshot_start < plan_start_dt: |
| 2936 | + start_overrides[model_fqn] = snapshot_start |
| 2937 | + |
| 2938 | + return start_overrides |
| 2939 | + |
2867 | 2940 | def _get_max_interval_end_per_model( |
2868 | 2941 | self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]] |
2869 | 2942 | ) -> t.Dict[str, int]: |
|
0 commit comments