Skip to content

Commit

Permalink
Merge pull request #36 from charlie-becker/HRRR_Zarr
Browse files Browse the repository at this point in the history
HRRR-Zarr Streaming
  • Loading branch information
djgagne authored Jun 22, 2021
2 parents 2506198 + 5be2910 commit 817a570
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 0 deletions.
5 changes: 5 additions & 0 deletions bin/hsdata
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ def process_ensemble_member(run_date, member, config):
print("Starting", run_date, member)
start_date = run_date + timedelta(hours=config.start_hour)
end_date = run_date + timedelta(hours=config.end_hour)

if config.ensemble_name == "HRRR-ZARR":
if hasattr(config, "HRRR_alt_end_hour") and run_date.hour in config.HRRR_alt_run_hours:
end_date = run_date + timedelta(hours=config.HRRR_alt_end_hour)

if hasattr(config, "mask_file"):
mask_file = config.mask_file
else:
Expand Down
68 changes: 68 additions & 0 deletions config/HRRR_AWS_Stream.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python
import numpy as np
import pandas as pd
from hagelslag.processing.ObjectMatcher import shifted_centroid_distance
from hagelslag.processing.ObjectMatcher import centroid_distance, time_distance

# NOTE: HRRR variables must be listed in the following format:
# {HRRR_VARIABLE_NAME}-{HRRR_level}
# For example, Composite Reflectivity (REFC) which is at the (entire_atmosphere) level
# would be listed as 'REFC-entire_atmosphere'

# 'ensemble_name' must be lsited as 'HRRR-ZARR'
# 'model_path' must be "hrrrzarr/sfc/"
# 'end_hour' must be no more than n-1 number of forecast hours in model

# Support for model runs with different forecast lengths, use 'HRRR_alt_end_hour' for an alternative
# forecast length that is used for each model run hour listed in 'HRRR_alt_hours'. If neither is provided,
# 'end_hour' will be used for all model runs.

## output path
scratch_path = "/glade/scratch/dgagne/HRRR_objects/HRRR_AWS_realtime_test_060421/"

# Historical runs
#date_index = pd.date_range(start='2021-03-15', end='2021-03-22', freq='1H', closed='left', tz='UTC').to_pydatetime()

# Real Time runs
# Use pd.Timedelta to correspond with delay in data availability from hour that script is submitted
date_index = pd.DatetimeIndex([pd.Timestamp.utcnow().strftime("%Y-%m-%d-%H")]) - pd.Timedelta(hours=3)

ensemble_members = ['oper']

config = dict(dates=date_index,
start_hour=1,
end_hour=17,
HRRR_alt_end_hour=47,
HRRR_alt_hours=[0, 6, 12, 18],
watershed_variable="REFC-entire_atmosphere",
ensemble_name="HRRR-ZARR",
ensemble_members=ensemble_members,
model_path="hrrrzarr/sfc/",
segmentation_approach="hyst",
model_watershed_params=(35, 50),
size_filter=12,
gaussian_window=1,
mrms_path=None,
mrms_variable="MESH_Max_60min_00.50",
mrms_watershed_params=(13, 1, 125, 100, 100),
object_matcher_params=([shifted_centroid_distance], np.array([1.0]),
np.array([24000])),
track_matcher_params=([centroid_distance, time_distance],
np.array([80000, 2])),
storm_variables=["REFC-entire_atmosphere", "MXUPHL_1hr_max_fcst-5000_2000m_above_ground"],
potential_variables=[],
tendency_variables=[],
shape_variables=["area", "eccentricity", "major_axis_length", "minor_axis_length", "orientation"],
variable_statistics=["mean", "max", "min"],
csv_path=scratch_path + "track_data_hrrr_3km_csv_refl/",
geojson_path=scratch_path + "track_data_hrrr_3km_json_refl/",
nc_path=scratch_path + "track_data_hrrr_3km_nc_refl/",
patch_radius=48,
unique_matches=True,
closest_matches=True,
match_steps=True,
train=False,
single_step=True,
label_type="gamma",
model_map_file="/glade/u/home/dgagne/hagelslag/mapfiles/hrrr_map_2016.txt",
mask_file=None)
17 changes: 17 additions & 0 deletions hagelslag/data/HRRRZarrModelGrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import numpy as np
import pandas as pd
from hagelslag.data.ZarrModelGrid import ZarrModelGrid
from os.path import join


class HRRRZarrModelGrid(ZarrModelGrid):

def __init__(self, run_date, variable, start_date, end_date, path, frequency="1H"):
self.run_date = pd.Timestamp(run_date)
self.variable = variable
self.start_date = pd.Timestamp(start_date)
self.end_date = pd.Timestamp(end_date)
self.frequency = frequency
self.path = path

super(HRRRZarrModelGrid, self).__init__(path, run_date, start_date, end_date, variable)
9 changes: 9 additions & 0 deletions hagelslag/data/ModelOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def load_data(self):
self.path)
self.data, self.units = mg.load_data()
mg.close()

elif self.ensemble_name.upper() == "NCARSTORM":
mg = NCARStormEventModelGrid(self.run_date,
self.variable,
Expand All @@ -181,6 +182,14 @@ def load_data(self):
self.path)
self.data, self.units = mg.load_data()
mg.close()

elif self.ensemble_name.upper() == "HRRR-ZARR":
mg = HRRRZarrModelGrid(self.run_date,
self.variable,
self.start_date,
self.end_date,
self.path)
self.data, self.units = mg.load_data()
else:
print(self.ensemble_name + " not supported.")

Expand Down
67 changes: 67 additions & 0 deletions hagelslag/data/ZarrModelGrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np
import pandas as pd
from pandas import date_range
from os.path import exists, join
import s3fs
import xarray as xr
from datetime import timedelta


class ZarrModelGrid(object):
"""
Base class for reading 2D model output grids from HRRR Zarr data streamed off of AWS.
Given an AWS bucket name, loads the values of a single variable from a model run. Supports model output in
Zarr format.
Attributes:
path (str): Base Path for AWS Bucket
run_date (ISO date string or datetime.datetime object): Date of the initialization time of the model run.
start_date (ISO date string or datetime.datetime object): Date of the first timestep extracted.
end_date (ISO date string or datetime.datetime object): Date of the last timestep extracted.
freqency (str): spacing between model time steps.
valid_dates: DatetimeIndex of all model timesteps
forecast_hours: array of all hours in the forecast
file_objects (list): List of the file objects for each model time step
"""
def __init__(self,
path,
run_date,
start_date,
end_date,
variable,
frequency="1H"):
self.path = path
self.variable = variable
self.run_date = pd.to_datetime(run_date)
self.start_date = pd.to_datetime(start_date)
self.end_date = pd.to_datetime(end_date)
self.frequency = frequency
self.valid_dates = date_range(start=self.start_date,
end=self.end_date,
freq=self.frequency)
print(self.run_date)
print(type(self.run_date))
self.forecast_hours = (self.valid_dates - self.run_date).astype("timedelta64[h]").astype(int)


def load_data(self):

units = ""
level = self.variable.split('-')[1]
self.variable = self.variable.split('-')[0]
fs = s3fs.S3FileSystem(anon=True)
files = []
run_date_str = self.run_date.strftime("%Y%m%d")
forecast_hour = self.run_date.strftime("%H")
path = join(self.path, run_date_str, f'{run_date_str}_{forecast_hour}z_fcst.zarr', level, self.variable, level)
f = s3fs.S3Map(root=path, s3=fs, check=False)
files.append(f)

ds = xr.open_mfdataset(files, engine='zarr').load()
array = ds[self.variable].values.astype('float32')

if hasattr(ds[self.variable], 'units'):
units = ds[self.variable].attrs['units']

return array, units

0 comments on commit 817a570

Please sign in to comment.