diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8aff2817..e7513bce 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,14 +10,14 @@ repos: types: [python] - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.14.14" + rev: "v0.15.0" hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] - id: ruff-format - repo: https://github.com/tox-dev/pyproject-fmt - rev: 'v2.11.1' + rev: 'v2.12.1' hooks: - id: pyproject-fmt diff --git a/conftest.py b/conftest.py index c0070ba1..c735efb3 100644 --- a/conftest.py +++ b/conftest.py @@ -1,7 +1,9 @@ """For testing.""" -# 3rd party +import sys + import pytest +from twisted.python import log from pywwa import CTX, CTX_DEFAULTS from pywwa.database import get_dbconnc @@ -10,6 +12,7 @@ # pytest + twisted + click == too much magic for poor me # So, this effectively disables reactor.stop() from working CTX_DEFAULTS["shutdown_delay"] = 1800 +log.startLogging(sys.stdout) @pytest.fixture(autouse=True) diff --git a/examples/SHEF/RR3MPX_1.txt b/examples/SHEF/RR3MPX_1.txt new file mode 100644 index 00000000..1199efd0 --- /dev/null +++ b/examples/SHEF/RR3MPX_1.txt @@ -0,0 +1,7 @@ +000 +SRUS53 KMPX 031510 +RR3MPX +WxCoder +.A LCHM5 260203 C DH0800/TX 19/TN -6/TA 0/PP 0.00 +.A1 SF 0.0/SD 4/SW 3.0/DC2602030907 + diff --git a/examples/SHEF/RR3MPX_2.txt b/examples/SHEF/RR3MPX_2.txt new file mode 100644 index 00000000..c0acf814 --- /dev/null +++ b/examples/SHEF/RR3MPX_2.txt @@ -0,0 +1,7 @@ +000 +SRUS53 KMPX 032100 +RR3MPX +WxCoder +.AR LCHM5 260203 C DH0800/TX 19/TN -6/TA 0/PP 0.00 +.AR1 SF 0.0/SD 4/SW M/DC2602031459 + diff --git a/pyproject.toml b/pyproject.toml index fa31934f..7806ea90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,6 +117,9 @@ lint.select = [ lint.per-file-ignores."goes/*.py" = [ "T201", ] +lint.per-file-ignores."tests/*.py" = [ + "T201", +] lint.per-file-ignores."util/*.py" = [ "T201", ] diff --git a/src/pywwa/common.py b/src/pywwa/common.py index f30062d7..e7850c13 100644 --- a/src/pywwa/common.py +++ b/src/pywwa/common.py @@ -205,11 +205,14 @@ def email_error(exp, message="", trimstr=100): msg["subject"] = ( f"[pyWWA] {sys.argv[0].rsplit('/', maxsplit=1)[-1]} Traceback -- {hn}" ) - msg["From"] = SETTINGS.get("pywwa_errors_from", "ldm@localhost") - msg["To"] = SETTINGS.get("pywwa_errors_to", "ldm@localhost") + msg["From"] = SETTINGS.get("pywwa_errors_from", "root@localhost") + msg["To"] = SETTINGS.get("pywwa_errors_to", "root@localhost") if not CTX["disable_email"]: df = smtp.sendmail( - SETTINGS.get("pywwa_smtp", "smtp"), msg["From"], msg["To"], msg + SETTINGS.get("pywwa_smtp", "localhost"), + msg["From"], + msg["To"], + msg, ) df.addErrback(LOG.error) else: diff --git a/src/pywwa/workflows/nexrad3_attr.py b/src/pywwa/workflows/nexrad3_attr.py index 95f2bf7a..2b60135d 100644 --- a/src/pywwa/workflows/nexrad3_attr.py +++ b/src/pywwa/workflows/nexrad3_attr.py @@ -33,15 +33,15 @@ def load_station_table(txn): LOG.info("Station Table size %s", len(ST.keys())) -def process_data(data): +def process_data(data: bytes) -> dict: """I am called when data is ahoy""" bio = BytesIO() bio.write(data) bio.seek(0) - process(bio) + return process(bio) -def process(bio): +def process(bio: BytesIO) -> dict: """Process our data, please""" l3 = Level3File(bio) ctx = { @@ -57,7 +57,7 @@ def process(bio): if "text" in line: ctx["lines"].append(line["text"]) df = PGCONN.runInteraction(really_process, ctx) - df.addErrback(common.email_error, ctx) + df.addErrback(common.email_error, str(ctx)) return ctx diff --git a/src/pywwa/workflows/shef.py b/src/pywwa/workflows/shef.py index e2cb9478..2851664a 100644 --- a/src/pywwa/workflows/shef.py +++ b/src/pywwa/workflows/shef.py @@ -16,6 +16,7 @@ from pyiem.observation import Observation from pyiem.util import convert_value, utc from twisted.internet import reactor +from twisted.internet.defer import Deferred from twisted.internet.task import LoopingCall, deferLater # Local @@ -361,11 +362,13 @@ def insert_raw_inbound(cursor, args) -> int: return cursor.rowcount -def update_current_queue(element: SHEFElement, product_id: str): +def update_current_queue( + element: SHEFElement, product_id: str +) -> Deferred | None: """Update CURRENT_QUEUE with new data.""" # We only want observations if element.type != "R": - return + return None args = ( element.station, element.valid, @@ -385,7 +388,7 @@ def update_current_queue(element: SHEFElement, product_id: str): key, dict(valid=element.valid, value=element.num_value, dirty=True) ) if element.valid < cur["valid"]: - return + return None cur["valid"] = element.valid cur["depth"] = element.depth cur["value"] = element.num_value @@ -395,6 +398,8 @@ def update_current_queue(element: SHEFElement, product_id: str): cur["product_id"] = product_id cur["dirty"] = True + return defer + def process_site_time(prod, sid, ts, elements: list[SHEFElement]): """Ingest for IEMAccess.""" @@ -532,7 +537,7 @@ def write_access_records_eb(err, records: list, iemid, entry: ACCESSDB_ENTRY): common.email_error(err, f"write_access_entry({entry.station}) got {err}") -def process_data(text): +def process_data(text: str): """Callback when text is received.""" prod = parser(text, utcnow=common.utcnow(), ugc_provider={}) if prod.warnings: diff --git a/src/pywwa/workflows/sps.py b/src/pywwa/workflows/sps.py index 54d9dc1b..6f8953c6 100644 --- a/src/pywwa/workflows/sps.py +++ b/src/pywwa/workflows/sps.py @@ -2,13 +2,11 @@ from functools import partial -# 3rd Party import click from pyiem.database import get_sqlalchemy_conn from pyiem.nws.products.sps import parser from pyiem.nws.ugc import UGCProvider -# Local from pywwa import LOG, common from pywwa.database import get_database, load_nwsli from pywwa.ldm import bridge diff --git a/src/pywwa/workflows/vtec.py b/src/pywwa/workflows/vtec.py index 0f22f3e7..a127157e 100644 --- a/src/pywwa/workflows/vtec.py +++ b/src/pywwa/workflows/vtec.py @@ -13,14 +13,12 @@ from functools import partial -# 3rd Party import click from pyiem.database import get_sqlalchemy_conn from pyiem.nws.products.vtec import parser as vtecparser from pyiem.nws.ugc import UGCProvider from twisted.mail.smtp import SMTPSenderFactory -# Local from pywwa import LOG, common from pywwa.database import get_database, load_nwsli from pywwa.ldm import bridge diff --git a/src/pywwa/workflows/xteus.py b/src/pywwa/workflows/xteus.py index 59d1dc1d..76543f78 100644 --- a/src/pywwa/workflows/xteus.py +++ b/src/pywwa/workflows/xteus.py @@ -1,16 +1,14 @@ """XTEUS Product Parser!""" -# 3rd Party import click from pyiem.nws.products.xteus import parser -# Local from pywwa import common from pywwa.database import get_database from pywwa.ldm import bridge -def process_data(txn, data): +def process_data(txn, data: str): """Process the product""" prod = parser(data, utcnow=common.utcnow()) if prod.warnings: diff --git a/tests/test_common.py b/tests/test_common.py index 6dce245d..c8c0247e 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -1,5 +1,6 @@ """Test pywwa.common""" +import pytest_twisted from pyiem.util import utc from pywwa import CTX, common @@ -31,9 +32,10 @@ def test_should_email(): assert not common.email_error(None, None) +@pytest_twisted.inlineCallbacks def test_email_error(): """Test that we can email an error.""" common.EMAIL_TIMESTAMPS = [] - common.email_error(None, None) + yield common.email_error(None, None) CTX["disable_email"] = True - common.email_error(None, None) + yield common.email_error(None, None) diff --git a/tests/workflows/test_dsm_parser.py b/tests/workflows/test_dsm_parser.py index e27da3fc..b7fda187 100644 --- a/tests/workflows/test_dsm_parser.py +++ b/tests/workflows/test_dsm_parser.py @@ -15,8 +15,8 @@ def test_load_stations(cursor): """Test station loading.""" # Need to set one station to a bad tzname to exercise an exception cursor.execute( - "UPDATE stations SET tzname = 'BoG0S' where id = 'MSP' and " - "network = 'MN_ASOS'" + "UPDATE stations SET tzname = 'BoG0S' where id = 'DSM' and " + "network = 'IA_ASOS'" ) assert cursor.rowcount == 1 dsm.load_stations(cursor) diff --git a/tests/workflows/test_nexrad3_attr.py b/tests/workflows/test_nexrad3_attr.py index e8e75dce..8cf939ae 100644 --- a/tests/workflows/test_nexrad3_attr.py +++ b/tests/workflows/test_nexrad3_attr.py @@ -1,11 +1,8 @@ """Test nexrad3_attr.""" -# 3rd Party import pytest from pywwa.testing import get_example_filepath - -# Local from pywwa.workflows import nexrad3_attr @@ -29,7 +26,7 @@ def test_process(cursor): def test_210910_badvil(cursor): """Test that a missing VIL does not cause issues.""" with open(get_example_filepath("NCR_20210911_0023"), "rb") as fh: - ctx = nexrad3_attr.process(fh) + ctx = nexrad3_attr.process_data(fh.read()) assert ctx["nexrad"] == "LAS" processed = nexrad3_attr.really_process(cursor, ctx) assert processed == 2 diff --git a/tests/workflows/test_shef_parser.py b/tests/workflows/test_shef_parser.py index 8fbb7d08..048e07d5 100644 --- a/tests/workflows/test_shef_parser.py +++ b/tests/workflows/test_shef_parser.py @@ -1,18 +1,15 @@ """Test shef_parser.""" import datetime -from functools import partial - -# 3rd Party -# pylint: disable=no-member-in-module from unittest import mock from zoneinfo import ZoneInfo import pytest - -# Local +import pytest_twisted from psycopg.errors import DeadlockDetected from pyiem.util import utc +from twisted.internet import reactor +from twisted.internet.task import deferLater from twisted.python.failure import Failure from pywwa import CTX, SETTINGS @@ -51,6 +48,62 @@ def sync_workflow(prod, cursor): shef.process_accessdb_frontend() +@pytest_twisted.inlineCallbacks +def test_gh316_rr3_correction(): + """Test that a RR3 Correction properly happens?""" + + def _delete_raw_inbound(txn): + """Delete any prior data.""" + txn.execute("delete from raw_inbound where station = 'LCHM5'") + return txn.rowcount + + def _check_entry(txn): + txn.execute( + "select value from raw_inbound where station = 'LCHM5' and " + "key = 'SWIRZZZ' order by updated desc" + ) + return txn.fetchall() + + def _check_access_entry(txn): + txn.execute( + "select value from current_shef where station = 'LCHM5' and " + "physical_code = 'SW'" + ) + return txn.fetchall() + + result = yield CTX["HADSDB"].runInteraction(_delete_raw_inbound) + print(result) + + CTX["utcnow"] = utc(2026, 2, 3, 16) + shef.process_data(get_example_file("SHEF/RR3MPX_1.txt")) + shef.process_data(get_example_file("SHEF/RR3MPX_2.txt")) + # Update iemaccess current_shef + updated = yield shef.save_current() + assert updated == 7 + # Check 0: Ensure the HADSDB queue is done + attempt = 0 + for db in ["HADSDB", "ACCESSDB"]: + while CTX[db].threadpool._queue.qsize() > 0: + attempt += 1 + print(f"Waiting for {db} queue to drain...") + yield deferLater(reactor, 0.1, lambda: None) + if attempt > 30: + pytest.fail(f"{db} queue did not drain in time") + + # The runOperation is touchy with the upsert that happens, lame + yield deferLater(reactor, 1, lambda: None) + + # Check 1: see that current_shef is good + result = yield CTX["ACCESSDB"].runInteraction(_check_access_entry) + assert result[0]["value"] is None + # Check 2: Ensure the value in the current_queue is None + assert shef.CURRENT_QUEUE["LCHM5|SWIRZZZ|None"]["value"] is None + # Check 3: Check what is in raw_inbound + result = yield CTX["HADSDB"].runInteraction(_check_entry) + assert len(result) == 2 + assert result[0]["value"] is None + + def test_accessdb_exception(): """Test some GIGO raises an exception.""" shef.ACCESSDB_QUEUE[-99] = 0 @@ -171,11 +224,12 @@ def test_230926_rr8krf(cursor): shef.insert_raw_inbound(cursor, args) +@pytest_twisted.inlineCallbacks def test_bad_element_in_current_queue(): """Test GIGO on current_queue.""" shef.CURRENT_QUEUE.clear() shef.CURRENT_QUEUE["HI|BYE|HI"] = {"dirty": True} - shef.save_current() + yield shef.save_current() @pytest.mark.parametrize("database", ["iem"]) @@ -217,10 +271,6 @@ def test_omit_report(cursor): assert row["max_tmpf"] == 84 assert row["report"] == ans - CTX["ACCESSDB"].runOperation = partial(run_interaction, cursor) - assert shef.save_current() == 7 - assert shef.save_current() == 0 - def test_process_site_eb(): """Test that the errorback works without any side effects.""" diff --git a/tests/workflows/test_sps.py b/tests/workflows/test_sps.py new file mode 100644 index 00000000..ac48d4b0 --- /dev/null +++ b/tests/workflows/test_sps.py @@ -0,0 +1,13 @@ +"""Exercise the sps workflow.""" + +import pytest + +from pywwa.testing import get_example_file +from pywwa.workflows import sps + + +@pytest.mark.parametrize("database", ["postgis"]) +def test_real_process(cursor): + """Can we process a real SPS product?""" + data = get_example_file("SPS.txt") + sps.real_process({}, cursor, data)