Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/SHEF/RR3ARX_1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
337
SRUS53 KARX 041300
RR3ARX
WxCoder
.A WACI4 260304 C DH0700/PP 0.00/SF 0.0/SD 6/SW 6.0
.A1 DC2603040656
6 changes: 6 additions & 0 deletions examples/SHEF/RR3ARX_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
452
SRUS53 KARX 041325
RR3ARX
WxCoder
.AR WACI4 260304 C DH0700/PP 0.00/SF 0.0/SD 6/SW M
.AR1 DC2603040720
27 changes: 12 additions & 15 deletions src/pywwa/workflows/shef.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,6 @@ def update_current_queue(
element: SHEFElement, product_id: str
) -> Deferred | None:
"""Update CURRENT_QUEUE with new data."""
# We only want observations
if element.type != "R":
return None
args = (
element.station,
element.valid,
Expand All @@ -387,16 +384,15 @@ def update_current_queue(
cur = CURRENT_QUEUE.setdefault(
key, dict(valid=element.valid, value=element.num_value, dirty=True)
)
if element.valid < cur["valid"]:
return None
cur["valid"] = element.valid
cur["depth"] = element.depth
cur["value"] = element.num_value
cur["dv_interval"] = element.dv_interval
cur["qualifier"] = element.qualifier
cur["unit_convention"] = element.unit_convention
cur["product_id"] = product_id
cur["dirty"] = True
if element.valid >= cur["valid"]:
cur["valid"] = element.valid
cur["depth"] = element.depth
cur["value"] = element.num_value
cur["dv_interval"] = element.dv_interval
cur["qualifier"] = element.qualifier
cur["unit_convention"] = element.unit_convention
cur["product_id"] = product_id
cur["dirty"] = True

return defer

Expand Down Expand Up @@ -551,8 +547,9 @@ def process_data(text: str):
product_id = prod.get_product_id()
# Update CURRENT_QUEUE
time_threshold = common.utcnow() + P1H
for element in [e for e in prod.data if e.valid < time_threshold]:
update_current_queue(element, product_id)
for element in prod.data:
if element.valid < time_threshold and element.type == "R":
update_current_queue(element, product_id)
# Create a nicer data structure
mydata = restructure_data(prod)
# Chunk thru each of the sites found and do work.
Expand Down
38 changes: 38 additions & 0 deletions tests/workflows/test_shef_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,44 @@ def sync_workflow(prod, cursor):
shef.process_accessdb_frontend()


@pytest_twisted.inlineCallbacks
def test_gh316_rr3_correction_reduex():
"""Test another problem found with a RR3 Correction."""

def _check_access_entry(txn):
txn.execute(
"select value from current_shef where station = 'WACI4' and "
"physical_code = 'SW'"
)
return txn.fetchall()

CTX["utcnow"] = utc(2026, 3, 4, 16)
shef.process_data(get_example_file("SHEF/RR3ARX_1.txt"))
assert shef.CURRENT_QUEUE["WACI4|SWIRZZZ|None"]["value"] == 6.0
shef.process_data(get_example_file("SHEF/RR3ARX_2.txt"))
assert shef.CURRENT_QUEUE["WACI4|SWIRZZZ|None"]["value"] is None

# Update iemaccess current_shef
updated = yield shef.save_current()
assert updated == 4
# Check 0: Ensure the HADSDB queue is done
attempt = 0
for db in ["HADSDB", "ACCESSDB"]:
while CTX[db].threadpool._queue.qsize() > 0:
attempt += 1
print(f"Waiting for {db} queue to drain...")
yield deferLater(reactor, 0.1, lambda: None)
if attempt > 30:
pytest.fail(f"{db} queue did not drain in time")

# The runOperation is touchy with the upsert that happens, lame
yield deferLater(reactor, 1, lambda: None)

# Check 1: see that current_shef is good
result = yield CTX["ACCESSDB"].runInteraction(_check_access_entry)
assert result[0]["value"] is None


@pytest_twisted.inlineCallbacks
def test_gh316_rr3_correction():
"""Test that a RR3 Correction properly happens?"""
Expand Down
Loading