Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ repos:
types: [python]

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.14.14"
rev: "v0.15.0"
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format

- repo: https://github.com/tox-dev/pyproject-fmt
rev: 'v2.11.1'
rev: 'v2.12.1'
hooks:
- id: pyproject-fmt

5 changes: 4 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""For testing."""

# 3rd party
import sys

import pytest
from twisted.python import log

from pywwa import CTX, CTX_DEFAULTS
from pywwa.database import get_dbconnc
Expand All @@ -10,6 +12,7 @@
# pytest + twisted + click == too much magic for poor me
# So, this effectively disables reactor.stop() from working
CTX_DEFAULTS["shutdown_delay"] = 1800
log.startLogging(sys.stdout)


@pytest.fixture(autouse=True)
Expand Down
7 changes: 7 additions & 0 deletions examples/SHEF/RR3MPX_1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
000
SRUS53 KMPX 031510
RR3MPX
WxCoder
.A LCHM5 260203 C DH0800/TX 19/TN -6/TA 0/PP 0.00
.A1 SF 0.0/SD 4/SW 3.0/DC2602030907

7 changes: 7 additions & 0 deletions examples/SHEF/RR3MPX_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
000
SRUS53 KMPX 032100
RR3MPX
WxCoder
.AR LCHM5 260203 C DH0800/TX 19/TN -6/TA 0/PP 0.00
.AR1 SF 0.0/SD 4/SW M/DC2602031459

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ lint.select = [
lint.per-file-ignores."goes/*.py" = [
"T201",
]
lint.per-file-ignores."tests/*.py" = [
"T201",
]
lint.per-file-ignores."util/*.py" = [
"T201",
]
Expand Down
9 changes: 6 additions & 3 deletions src/pywwa/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,14 @@ def email_error(exp, message="", trimstr=100):
msg["subject"] = (
f"[pyWWA] {sys.argv[0].rsplit('/', maxsplit=1)[-1]} Traceback -- {hn}"
)
msg["From"] = SETTINGS.get("pywwa_errors_from", "ldm@localhost")
msg["To"] = SETTINGS.get("pywwa_errors_to", "ldm@localhost")
msg["From"] = SETTINGS.get("pywwa_errors_from", "root@localhost")
msg["To"] = SETTINGS.get("pywwa_errors_to", "root@localhost")
if not CTX["disable_email"]:
df = smtp.sendmail(
SETTINGS.get("pywwa_smtp", "smtp"), msg["From"], msg["To"], msg
SETTINGS.get("pywwa_smtp", "localhost"),
msg["From"],
msg["To"],
msg,
)
df.addErrback(LOG.error)
else:
Expand Down
8 changes: 4 additions & 4 deletions src/pywwa/workflows/nexrad3_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ def load_station_table(txn):
LOG.info("Station Table size %s", len(ST.keys()))


def process_data(data):
def process_data(data: bytes) -> dict:
"""I am called when data is ahoy"""
bio = BytesIO()
bio.write(data)
bio.seek(0)
process(bio)
return process(bio)


def process(bio):
def process(bio: BytesIO) -> dict:
"""Process our data, please"""
l3 = Level3File(bio)
ctx = {
Expand All @@ -57,7 +57,7 @@ def process(bio):
if "text" in line:
ctx["lines"].append(line["text"])
df = PGCONN.runInteraction(really_process, ctx)
df.addErrback(common.email_error, ctx)
df.addErrback(common.email_error, str(ctx))
return ctx


Expand Down
13 changes: 9 additions & 4 deletions src/pywwa/workflows/shef.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pyiem.observation import Observation
from pyiem.util import convert_value, utc
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall, deferLater

# Local
Expand Down Expand Up @@ -361,11 +362,13 @@ def insert_raw_inbound(cursor, args) -> int:
return cursor.rowcount


def update_current_queue(element: SHEFElement, product_id: str):
def update_current_queue(
element: SHEFElement, product_id: str
) -> Deferred | None:
"""Update CURRENT_QUEUE with new data."""
# We only want observations
if element.type != "R":
return
return None
args = (
element.station,
element.valid,
Expand All @@ -385,7 +388,7 @@ def update_current_queue(element: SHEFElement, product_id: str):
key, dict(valid=element.valid, value=element.num_value, dirty=True)
)
if element.valid < cur["valid"]:
return
return None
cur["valid"] = element.valid
cur["depth"] = element.depth
cur["value"] = element.num_value
Expand All @@ -395,6 +398,8 @@ def update_current_queue(element: SHEFElement, product_id: str):
cur["product_id"] = product_id
cur["dirty"] = True

return defer


def process_site_time(prod, sid, ts, elements: list[SHEFElement]):
"""Ingest for IEMAccess."""
Expand Down Expand Up @@ -532,7 +537,7 @@ def write_access_records_eb(err, records: list, iemid, entry: ACCESSDB_ENTRY):
common.email_error(err, f"write_access_entry({entry.station}) got {err}")


def process_data(text):
def process_data(text: str):
"""Callback when text is received."""
prod = parser(text, utcnow=common.utcnow(), ugc_provider={})
if prod.warnings:
Expand Down
2 changes: 0 additions & 2 deletions src/pywwa/workflows/sps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

from functools import partial

# 3rd Party
import click
from pyiem.database import get_sqlalchemy_conn
from pyiem.nws.products.sps import parser
from pyiem.nws.ugc import UGCProvider

# Local
from pywwa import LOG, common
from pywwa.database import get_database, load_nwsli
from pywwa.ldm import bridge
Expand Down
2 changes: 0 additions & 2 deletions src/pywwa/workflows/vtec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@

from functools import partial

# 3rd Party
import click
from pyiem.database import get_sqlalchemy_conn
from pyiem.nws.products.vtec import parser as vtecparser
from pyiem.nws.ugc import UGCProvider
from twisted.mail.smtp import SMTPSenderFactory

# Local
from pywwa import LOG, common
from pywwa.database import get_database, load_nwsli
from pywwa.ldm import bridge
Expand Down
4 changes: 1 addition & 3 deletions src/pywwa/workflows/xteus.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""XTEUS Product Parser!"""

# 3rd Party
import click
from pyiem.nws.products.xteus import parser

# Local
from pywwa import common
from pywwa.database import get_database
from pywwa.ldm import bridge


def process_data(txn, data):
def process_data(txn, data: str):
"""Process the product"""
prod = parser(data, utcnow=common.utcnow())
if prod.warnings:
Expand Down
6 changes: 4 additions & 2 deletions tests/test_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test pywwa.common"""

import pytest_twisted
from pyiem.util import utc

from pywwa import CTX, common
Expand Down Expand Up @@ -31,9 +32,10 @@ def test_should_email():
assert not common.email_error(None, None)


@pytest_twisted.inlineCallbacks
def test_email_error():
"""Test that we can email an error."""
common.EMAIL_TIMESTAMPS = []
common.email_error(None, None)
yield common.email_error(None, None)
CTX["disable_email"] = True
common.email_error(None, None)
yield common.email_error(None, None)
4 changes: 2 additions & 2 deletions tests/workflows/test_dsm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def test_load_stations(cursor):
"""Test station loading."""
# Need to set one station to a bad tzname to exercise an exception
cursor.execute(
"UPDATE stations SET tzname = 'BoG0S' where id = 'MSP' and "
"network = 'MN_ASOS'"
"UPDATE stations SET tzname = 'BoG0S' where id = 'DSM' and "
"network = 'IA_ASOS'"
)
assert cursor.rowcount == 1
dsm.load_stations(cursor)
Expand Down
5 changes: 1 addition & 4 deletions tests/workflows/test_nexrad3_attr.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
"""Test nexrad3_attr."""

# 3rd Party
import pytest

from pywwa.testing import get_example_filepath

# Local
from pywwa.workflows import nexrad3_attr


Expand All @@ -29,7 +26,7 @@ def test_process(cursor):
def test_210910_badvil(cursor):
"""Test that a missing VIL does not cause issues."""
with open(get_example_filepath("NCR_20210911_0023"), "rb") as fh:
ctx = nexrad3_attr.process(fh)
ctx = nexrad3_attr.process_data(fh.read())
assert ctx["nexrad"] == "LAS"
processed = nexrad3_attr.really_process(cursor, ctx)
assert processed == 2
72 changes: 61 additions & 11 deletions tests/workflows/test_shef_parser.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
"""Test shef_parser."""

import datetime
from functools import partial

# 3rd Party
# pylint: disable=no-member-in-module
from unittest import mock
from zoneinfo import ZoneInfo

import pytest

# Local
import pytest_twisted
from psycopg.errors import DeadlockDetected
from pyiem.util import utc
from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.python.failure import Failure

from pywwa import CTX, SETTINGS
Expand Down Expand Up @@ -51,6 +48,62 @@ def sync_workflow(prod, cursor):
shef.process_accessdb_frontend()


@pytest_twisted.inlineCallbacks
def test_gh316_rr3_correction():
"""Test that a RR3 Correction properly happens?"""

def _delete_raw_inbound(txn):
"""Delete any prior data."""
txn.execute("delete from raw_inbound where station = 'LCHM5'")
return txn.rowcount

def _check_entry(txn):
txn.execute(
"select value from raw_inbound where station = 'LCHM5' and "
"key = 'SWIRZZZ' order by updated desc"
)
return txn.fetchall()

def _check_access_entry(txn):
txn.execute(
"select value from current_shef where station = 'LCHM5' and "
"physical_code = 'SW'"
)
return txn.fetchall()

result = yield CTX["HADSDB"].runInteraction(_delete_raw_inbound)
print(result)

CTX["utcnow"] = utc(2026, 2, 3, 16)
shef.process_data(get_example_file("SHEF/RR3MPX_1.txt"))
shef.process_data(get_example_file("SHEF/RR3MPX_2.txt"))
# Update iemaccess current_shef
updated = yield shef.save_current()
assert updated == 7
# Check 0: Ensure the HADSDB queue is done
attempt = 0
for db in ["HADSDB", "ACCESSDB"]:
while CTX[db].threadpool._queue.qsize() > 0:
attempt += 1
print(f"Waiting for {db} queue to drain...")
yield deferLater(reactor, 0.1, lambda: None)
if attempt > 30:
pytest.fail(f"{db} queue did not drain in time")

# The runOperation is touchy with the upsert that happens, lame
yield deferLater(reactor, 1, lambda: None)

# Check 1: see that current_shef is good
result = yield CTX["ACCESSDB"].runInteraction(_check_access_entry)
assert result[0]["value"] is None
# Check 2: Ensure the value in the current_queue is None
assert shef.CURRENT_QUEUE["LCHM5|SWIRZZZ|None"]["value"] is None
# Check 3: Check what is in raw_inbound
result = yield CTX["HADSDB"].runInteraction(_check_entry)
assert len(result) == 2
assert result[0]["value"] is None


def test_accessdb_exception():
"""Test some GIGO raises an exception."""
shef.ACCESSDB_QUEUE[-99] = 0
Expand Down Expand Up @@ -171,11 +224,12 @@ def test_230926_rr8krf(cursor):
shef.insert_raw_inbound(cursor, args)


@pytest_twisted.inlineCallbacks
def test_bad_element_in_current_queue():
"""Test GIGO on current_queue."""
shef.CURRENT_QUEUE.clear()
shef.CURRENT_QUEUE["HI|BYE|HI"] = {"dirty": True}
shef.save_current()
yield shef.save_current()


@pytest.mark.parametrize("database", ["iem"])
Expand Down Expand Up @@ -217,10 +271,6 @@ def test_omit_report(cursor):
assert row["max_tmpf"] == 84
assert row["report"] == ans

CTX["ACCESSDB"].runOperation = partial(run_interaction, cursor)
assert shef.save_current() == 7
assert shef.save_current() == 0


def test_process_site_eb():
"""Test that the errorback works without any side effects."""
Expand Down
13 changes: 13 additions & 0 deletions tests/workflows/test_sps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Exercise the sps workflow."""

import pytest

from pywwa.testing import get_example_file
from pywwa.workflows import sps


@pytest.mark.parametrize("database", ["postgis"])
def test_real_process(cursor):
"""Can we process a real SPS product?"""
data = get_example_file("SPS.txt")
sps.real_process({}, cursor, data)
Loading