1616from pyiem .observation import Observation
1717from pyiem .util import convert_value , utc
1818from twisted .internet import reactor
19+ from twisted .internet .defer import Deferred
1920from twisted .internet .task import LoopingCall , deferLater
2021
2122# Local
@@ -361,11 +362,13 @@ def insert_raw_inbound(cursor, args) -> int:
361362 return cursor .rowcount
362363
363364
364- def update_current_queue (element : SHEFElement , product_id : str ):
365+ def update_current_queue (
366+ element : SHEFElement , product_id : str
367+ ) -> Deferred | None :
365368 """Update CURRENT_QUEUE with new data."""
366369 # We only want observations
367370 if element .type != "R" :
368- return
371+ return None
369372 args = (
370373 element .station ,
371374 element .valid ,
@@ -385,7 +388,7 @@ def update_current_queue(element: SHEFElement, product_id: str):
385388 key , dict (valid = element .valid , value = element .num_value , dirty = True )
386389 )
387390 if element .valid < cur ["valid" ]:
388- return
391+ return None
389392 cur ["valid" ] = element .valid
390393 cur ["depth" ] = element .depth
391394 cur ["value" ] = element .num_value
@@ -395,6 +398,8 @@ def update_current_queue(element: SHEFElement, product_id: str):
395398 cur ["product_id" ] = product_id
396399 cur ["dirty" ] = True
397400
401+ return defer
402+
398403
399404def process_site_time (prod , sid , ts , elements : list [SHEFElement ]):
400405 """Ingest for IEMAccess."""
@@ -532,7 +537,7 @@ def write_access_records_eb(err, records: list, iemid, entry: ACCESSDB_ENTRY):
532537 common .email_error (err , f"write_access_entry({ entry .station } ) got { err } " )
533538
534539
535- def process_data (text ):
540+ def process_data (text : str ):
536541 """Callback when text is received."""
537542 prod = parser (text , utcnow = common .utcnow (), ugc_provider = {})
538543 if prod .warnings :
0 commit comments