|
| 1 | +from __future__ import annotations |
| 2 | +from functools import reduce |
| 3 | +from itertools import pairwise |
| 4 | +from operator import sub |
| 5 | +from typing import Iterable, Iterator, List, Tuple |
| 6 | + |
| 7 | + |
| 8 | +def answer_1(input: List[str]) -> int: |
| 9 | + all_reports = extract_reports(input) |
| 10 | + safe_reports = calculate_safety(all_reports, failure_threshold=0) |
| 11 | + return sum(safe_reports) |
| 12 | + |
| 13 | + |
| 14 | +# def answer_2(input: List[str]) -> int: |
| 15 | +# all_reports = extract_reports(input) |
| 16 | +# safe_reports = calculate_safety(all_reports, failure_threshold=1) # noqa: F841 |
| 17 | +# return sum(safe_reports) |
| 18 | + |
| 19 | + |
| 20 | +def extract_reports(input: Iterable[str], /) -> Iterator[List[int]]: |
| 21 | + levels = [list(map(int, line.split())) for line in input] |
| 22 | + yield from levels |
| 23 | + |
| 24 | + |
| 25 | +def calculate_safety(input: Iterator[List[int]], /, failure_threshold: int = 0) -> Iterator[bool]: |
| 26 | + for _, deltas, directions in filter_reports(input, failure_threshold): |
| 27 | + delta_anomalies = [d for d in deltas if d == 0 or not -4 < d < 4] |
| 28 | + safety_checks = [len(delta_anomalies) == 0, len(set(directions)) == 1] |
| 29 | + yield all(safety_checks) |
| 30 | + |
| 31 | + |
| 32 | +def filter_reports( |
| 33 | + input: Iterator[List[int]], failure_threshold: int, / |
| 34 | +) -> Iterator[Tuple[List[int], List[int], List[int]]]: |
| 35 | + failure_count = 0 |
| 36 | + report = next(input) |
| 37 | + while report: |
| 38 | + try: |
| 39 | + deltas = [reduce(sub, reversed(p)) for p in pairwise(report)] |
| 40 | + directions = [-1 if d < 0 else +1 if d > 0 else 0 for d in deltas] |
| 41 | + metadata = zip(deltas, directions) |
| 42 | + |
| 43 | + # Tolerate bad levels up to a certain threshold |
| 44 | + if failure_count < failure_threshold: |
| 45 | + prevdir = None |
| 46 | + for idx, (delta, curdir) in enumerate(metadata): |
| 47 | + # Criteria: is neither an increase or a decrease |
| 48 | + if delta == 0: |
| 49 | + raise ValueError(idx) |
| 50 | + # Criteria: is neither an increase or a decrease of at least 1 and at most 3 |
| 51 | + elif not -4 < delta < 4: |
| 52 | + raise ValueError(idx) |
| 53 | + # Criteria: is neither all increasing or all decreasing |
| 54 | + elif prevdir is not None and prevdir != curdir: |
| 55 | + raise ValueError(idx) |
| 56 | + prevdir = curdir |
| 57 | + |
| 58 | + # Return values |
| 59 | + yield report, deltas, directions |
| 60 | + |
| 61 | + # Pull next report or STOP |
| 62 | + failure_count = 0 |
| 63 | + report = next(input) |
| 64 | + except ValueError as err: |
| 65 | + # Remove failure and try again |
| 66 | + failure_count += 1 |
| 67 | + del report[err.args[0]] |
| 68 | + continue |
| 69 | + except StopIteration: |
| 70 | + break |
0 commit comments