|
| 1 | +"""Snowflake-style cost model layered on top of a ``ReplayReport``. |
| 2 | +
|
| 3 | +Translates wall-clock, thread-idleness, and the critical-path floor into |
| 4 | +dollar amounts the user can reason about. Kept intentionally small and |
| 5 | +primitive-driven so a future ``whatif`` simulator can call ``compute_cost`` |
| 6 | +twice (baseline + simulated) and diff the two ``CostReport``s without |
| 7 | +having to fabricate a full replay. |
| 8 | +""" |
| 9 | + |
| 10 | +from __future__ import annotations |
| 11 | + |
| 12 | +from dataclasses import dataclass |
| 13 | + |
| 14 | +from dbt_dag_opt.errors import InvalidArtifactError |
| 15 | +from dbt_dag_opt.replay import ReplayReport |
| 16 | + |
| 17 | +SNOWFLAKE_CREDITS_PER_HOUR: dict[str, float] = { |
| 18 | + "XS": 1.0, |
| 19 | + "S": 2.0, |
| 20 | + "M": 4.0, |
| 21 | + "L": 8.0, |
| 22 | + "XL": 16.0, |
| 23 | + "2XL": 32.0, |
| 24 | + "3XL": 64.0, |
| 25 | + "4XL": 128.0, |
| 26 | + "5XL": 256.0, |
| 27 | + "6XL": 512.0, |
| 28 | +} |
| 29 | + |
| 30 | +DEFAULT_RATE_PER_CREDIT_USD: float = 2.0 |
| 31 | +MIN_BILLING_SECONDS: float = 60.0 |
| 32 | + |
| 33 | +_SIZE_ALIASES: dict[str, str] = { |
| 34 | + "XSMALL": "XS", |
| 35 | + "SMALL": "S", |
| 36 | + "MEDIUM": "M", |
| 37 | + "LARGE": "L", |
| 38 | + "XLARGE": "XL", |
| 39 | + "2XLARGE": "2XL", |
| 40 | + "3XLARGE": "3XL", |
| 41 | + "4XLARGE": "4XL", |
| 42 | + "5XLARGE": "5XL", |
| 43 | + "6XLARGE": "6XL", |
| 44 | +} |
| 45 | + |
| 46 | + |
| 47 | +@dataclass(frozen=True) |
| 48 | +class CostInputs: |
| 49 | + """Primitive inputs to the cost model. |
| 50 | +
|
| 51 | + Decoupled from ``ReplayReport`` so ``whatif`` can pass simulated numbers |
| 52 | + without going through the full parse pipeline. |
| 53 | + """ |
| 54 | + |
| 55 | + wall_clock_seconds: float |
| 56 | + total_cpu_seconds: float |
| 57 | + thread_count: int |
| 58 | + critical_path_seconds: float |
| 59 | + credits_per_hour: float |
| 60 | + rate_per_credit_usd: float |
| 61 | + apply_minimum_billing: bool = True |
| 62 | + warehouse_size_label: str | None = None |
| 63 | + |
| 64 | + |
| 65 | +@dataclass(frozen=True) |
| 66 | +class CostReport: |
| 67 | + inputs: CostInputs |
| 68 | + billed_seconds: float |
| 69 | + rate_per_second_usd: float |
| 70 | + run_cost_usd: float |
| 71 | + floor_cost_usd: float |
| 72 | + headroom_usd: float |
| 73 | + idle_thread_seconds: float |
| 74 | + waste_fraction: float |
| 75 | + idle_cost_usd: float |
| 76 | + min_billing_applied: bool |
| 77 | + |
| 78 | + |
| 79 | +def credits_per_hour_for(size: str) -> float: |
| 80 | + """Resolve a Snowflake warehouse size alias to credits/hour. |
| 81 | +
|
| 82 | + Case-insensitive; strips ``-`` and whitespace. Accepts both shorthand |
| 83 | + (``"L"``, ``"2XL"``) and long form (``"Large"``, ``"2X-Large"``). |
| 84 | + """ |
| 85 | + normalized = size.upper().replace("-", "").replace(" ", "").replace("_", "") |
| 86 | + if normalized in SNOWFLAKE_CREDITS_PER_HOUR: |
| 87 | + return SNOWFLAKE_CREDITS_PER_HOUR[normalized] |
| 88 | + if normalized in _SIZE_ALIASES: |
| 89 | + return SNOWFLAKE_CREDITS_PER_HOUR[_SIZE_ALIASES[normalized]] |
| 90 | + raise InvalidArtifactError( |
| 91 | + f"unknown warehouse size: {size!r}. " |
| 92 | + f"Expected one of {sorted(SNOWFLAKE_CREDITS_PER_HOUR)}." |
| 93 | + ) |
| 94 | + |
| 95 | + |
| 96 | +def compute_cost(inputs: CostInputs) -> CostReport: |
| 97 | + rate_per_second = inputs.credits_per_hour * inputs.rate_per_credit_usd / 3600.0 |
| 98 | + |
| 99 | + raw_wall = max(0.0, inputs.wall_clock_seconds) |
| 100 | + if inputs.apply_minimum_billing and raw_wall < MIN_BILLING_SECONDS: |
| 101 | + billed = MIN_BILLING_SECONDS |
| 102 | + min_billing_applied = True |
| 103 | + else: |
| 104 | + billed = raw_wall |
| 105 | + min_billing_applied = False |
| 106 | + |
| 107 | + run_cost = billed * rate_per_second |
| 108 | + |
| 109 | + raw_floor = max(0.0, inputs.critical_path_seconds) |
| 110 | + if inputs.apply_minimum_billing: |
| 111 | + floor_seconds = max(raw_floor, MIN_BILLING_SECONDS) |
| 112 | + else: |
| 113 | + floor_seconds = raw_floor |
| 114 | + floor_cost = floor_seconds * rate_per_second |
| 115 | + |
| 116 | + warehouse_seconds = raw_wall * max(1, inputs.thread_count) |
| 117 | + idle_thread_seconds = max(0.0, warehouse_seconds - max(0.0, inputs.total_cpu_seconds)) |
| 118 | + waste_fraction = ( |
| 119 | + idle_thread_seconds / warehouse_seconds if warehouse_seconds > 0 else 0.0 |
| 120 | + ) |
| 121 | + idle_cost = run_cost * waste_fraction |
| 122 | + headroom = max(0.0, run_cost - floor_cost) |
| 123 | + |
| 124 | + return CostReport( |
| 125 | + inputs=inputs, |
| 126 | + billed_seconds=billed, |
| 127 | + rate_per_second_usd=rate_per_second, |
| 128 | + run_cost_usd=run_cost, |
| 129 | + floor_cost_usd=floor_cost, |
| 130 | + headroom_usd=headroom, |
| 131 | + idle_thread_seconds=idle_thread_seconds, |
| 132 | + waste_fraction=waste_fraction, |
| 133 | + idle_cost_usd=idle_cost, |
| 134 | + min_billing_applied=min_billing_applied, |
| 135 | + ) |
| 136 | + |
| 137 | + |
| 138 | +def cost_inputs_from_replay( |
| 139 | + report: ReplayReport, |
| 140 | + *, |
| 141 | + warehouse_size: str | None, |
| 142 | + credits_per_hour: float | None, |
| 143 | + rate_per_credit_usd: float, |
| 144 | + apply_minimum_billing: bool, |
| 145 | +) -> CostInputs: |
| 146 | + """Adapt a ``ReplayReport`` + user flags into ``CostInputs``. |
| 147 | +
|
| 148 | + Exactly one of ``warehouse_size`` / ``credits_per_hour`` must be set; |
| 149 | + the caller (CLI) enforces this. |
| 150 | + """ |
| 151 | + if warehouse_size is not None and credits_per_hour is not None: |
| 152 | + raise InvalidArtifactError( |
| 153 | + "pass either warehouse_size or credits_per_hour, not both" |
| 154 | + ) |
| 155 | + if warehouse_size is not None: |
| 156 | + cph = credits_per_hour_for(warehouse_size) |
| 157 | + label = warehouse_size.upper().replace("-", "").replace(" ", "").replace("_", "") |
| 158 | + if label in _SIZE_ALIASES: |
| 159 | + label = _SIZE_ALIASES[label] |
| 160 | + elif credits_per_hour is not None: |
| 161 | + if credits_per_hour <= 0: |
| 162 | + raise InvalidArtifactError("credits_per_hour must be positive") |
| 163 | + cph = float(credits_per_hour) |
| 164 | + label = None |
| 165 | + else: |
| 166 | + raise InvalidArtifactError( |
| 167 | + "cost inputs require warehouse_size or credits_per_hour" |
| 168 | + ) |
| 169 | + |
| 170 | + return CostInputs( |
| 171 | + wall_clock_seconds=report.wall_clock_seconds, |
| 172 | + total_cpu_seconds=report.total_cpu_seconds, |
| 173 | + thread_count=report.thread_count, |
| 174 | + critical_path_seconds=report.critical_path_seconds, |
| 175 | + credits_per_hour=cph, |
| 176 | + rate_per_credit_usd=rate_per_credit_usd, |
| 177 | + apply_minimum_billing=apply_minimum_billing, |
| 178 | + warehouse_size_label=label, |
| 179 | + ) |
0 commit comments