|
2 | 2 | import pathlib |
3 | 3 | import typing as t |
4 | 4 | import re |
5 | | -from datetime import date, timedelta |
| 5 | +from datetime import date, timedelta, datetime |
6 | 6 | from tempfile import TemporaryDirectory |
7 | 7 | from unittest.mock import PropertyMock, call, patch |
8 | 8 |
|
|
36 | 36 | from sqlmesh.core.dialect import parse, schema_ |
37 | 37 | from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter |
38 | 38 | from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements |
| 39 | +from sqlmesh.core.plan.definition import Plan |
39 | 40 | from sqlmesh.core.macros import MacroEvaluator, RuntimeStage |
40 | 41 | from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model |
41 | 42 | from sqlmesh.core.model.cache import OptimizedQueryCache |
@@ -2094,6 +2095,7 @@ def test_plan_audit_intervals(tmp_path: pathlib.Path, caplog): |
2094 | 2095 | plan = ctx.plan( |
2095 | 2096 | environment="dev", auto_apply=True, no_prompts=True, start="2025-02-01", end="2025-02-01" |
2096 | 2097 | ) |
| 2098 | + assert plan.missing_intervals |
2097 | 2099 |
|
2098 | 2100 | date_snapshot = next(s for s in plan.new_snapshots if "date_example" in s.name) |
2099 | 2101 | timestamp_snapshot = next(s for s in plan.new_snapshots if "timestamp_example" in s.name) |
@@ -2304,3 +2306,114 @@ def test_dev_environment_virtual_update_with_environment_statements(tmp_path: Pa |
2304 | 2306 | updated_statements[0].before_all[1] |
2305 | 2307 | == "CREATE TABLE IF NOT EXISTS metrics (metric_name VARCHAR(50), value INT)" |
2306 | 2308 | ) |
| 2309 | + |
| 2310 | + |
| 2311 | +def test_plan_min_intervals(tmp_path: Path): |
| 2312 | + init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb") |
| 2313 | + |
| 2314 | + context = Context( |
| 2315 | + paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) |
| 2316 | + ) |
| 2317 | + |
| 2318 | + current_time = to_datetime("2020-02-01 00:00:01") |
| 2319 | + |
| 2320 | + # initial state of example project |
| 2321 | + context.plan(auto_apply=True, execution_time=current_time) |
| 2322 | + |
| 2323 | + (tmp_path / "models" / "daily_model.sql").write_text(""" |
| 2324 | + MODEL ( |
| 2325 | + name sqlmesh_example.daily_model, |
| 2326 | + kind INCREMENTAL_BY_TIME_RANGE ( |
| 2327 | + time_column event_date |
| 2328 | + ), |
| 2329 | + start '2020-01-01', |
| 2330 | + cron '@daily' |
| 2331 | + ); |
| 2332 | +
|
| 2333 | + select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds |
| 2334 | + """) |
| 2335 | + |
| 2336 | + (tmp_path / "models" / "weekly_model.sql").write_text(""" |
| 2337 | + MODEL ( |
| 2338 | + name sqlmesh_example.weekly_model, |
| 2339 | + kind INCREMENTAL_BY_TIME_RANGE ( |
| 2340 | + time_column event_date |
| 2341 | + ), |
| 2342 | + start '2020-01-01', |
| 2343 | + cron '@weekly' |
| 2344 | + ); |
| 2345 | +
|
| 2346 | + select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds |
| 2347 | + """) |
| 2348 | + |
| 2349 | + (tmp_path / "models" / "monthly_model.sql").write_text(""" |
| 2350 | + MODEL ( |
| 2351 | + name sqlmesh_example.monthly_model, |
| 2352 | + kind INCREMENTAL_BY_TIME_RANGE ( |
| 2353 | + time_column event_date |
| 2354 | + ), |
| 2355 | + start '2020-01-01', |
| 2356 | + cron '@monthly' |
| 2357 | + ); |
| 2358 | +
|
| 2359 | + select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds |
| 2360 | + """) |
| 2361 | + |
| 2362 | + context.load() |
| 2363 | + |
| 2364 | + # initial state - backfill from 2020-01-01 -> now() (2020-01-02 00:00:01) on new models |
| 2365 | + plan = context.plan(execution_time=current_time) |
| 2366 | + |
| 2367 | + assert to_datetime(plan.start) == to_datetime("2020-01-01 00:00:00") |
| 2368 | + assert to_datetime(plan.end) == to_datetime("2020-02-01 00:00:00") |
| 2369 | + assert to_datetime(plan.execution_time) == to_datetime("2020-02-01 00:00:01") |
| 2370 | + |
| 2371 | + def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, datetime]]: |
| 2372 | + snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id |
| 2373 | + snapshot_intervals = next( |
| 2374 | + si for si in plan.missing_intervals if si.snapshot_id == snapshot_id |
| 2375 | + ) |
| 2376 | + return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals] |
| 2377 | + |
| 2378 | + # check initial intervals - should be full time range between start and execution time |
| 2379 | + assert len(plan.missing_intervals) == 3 |
| 2380 | + |
| 2381 | + assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [ |
| 2382 | + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00")) |
| 2383 | + ] |
| 2384 | + assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [ |
| 2385 | + ( |
| 2386 | + to_datetime("2020-01-01 00:00:00"), |
| 2387 | + to_datetime("2020-01-26 00:00:00"), |
| 2388 | + ) # last week in 2020-01 hasnt fully elapsed yet |
| 2389 | + ] |
| 2390 | + assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [ |
| 2391 | + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00")) |
| 2392 | + ] |
| 2393 | + |
| 2394 | + # now, create a dev env for "1 day ago" |
| 2395 | + plan = context.plan( |
| 2396 | + environment="pr_env", |
| 2397 | + start="1 day ago", |
| 2398 | + execution_time=current_time, |
| 2399 | + min_intervals=1, |
| 2400 | + ) |
| 2401 | + |
| 2402 | + # this should pick up last day for daily model, last week for weekly model and last month for the monthly model |
| 2403 | + assert len(plan.missing_intervals) == 3 |
| 2404 | + |
| 2405 | + assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [ |
| 2406 | + (to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00")) |
| 2407 | + ] |
| 2408 | + assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [ |
| 2409 | + ( |
| 2410 | + to_datetime("2020-01-19 00:00:00"), # last completed week |
| 2411 | + to_datetime("2020-01-26 00:00:00"), |
| 2412 | + ) |
| 2413 | + ] |
| 2414 | + assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [ |
| 2415 | + ( |
| 2416 | + to_datetime("2020-01-01 00:00:00"), # last completed month |
| 2417 | + to_datetime("2020-02-01 00:00:00"), |
| 2418 | + ) |
| 2419 | + ] |
0 commit comments