Skip to content

Commit 4eba0a2

Browse files
authored
129 data downloading issue with a specific gauging station (#131)
* Improve JSON parsing #129
1 parent fb6521a commit 4eba0a2

File tree

2 files changed

+79
-101
lines changed

2 files changed

+79
-101
lines changed

dataretrieval/nwis.py

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import pandas as pd
2020
import requests
2121

22-
from dataretrieval.utils import BaseMetadata, format_datetime, to_str, update_merge
22+
from dataretrieval.utils import BaseMetadata, format_datetime, to_str
2323

2424
from .utils import query
2525

@@ -834,6 +834,7 @@ def get_iv(
834834
response = query_waterservices(
835835
service='iv', format='json', ssl_check=ssl_check, **kwargs
836836
)
837+
837838
df = _read_json(response.json())
838839
return format_response(df, **kwargs), NWIS_Metadata(response, **kwargs)
839840

@@ -1304,67 +1305,88 @@ def _read_json(json):
13041305
A custom metadata object
13051306
13061307
"""
1307-
merged_df = pd.DataFrame()
1308-
1309-
for timeseries in json['value']['timeSeries']:
1310-
site_no = timeseries['sourceInfo']['siteCode'][0]['value']
1311-
param_cd = timeseries['variable']['variableCode'][0]['value']
1312-
# check whether min, max, mean record XXX
1313-
option = timeseries['variable']['options']['option'][0].get('value')
1314-
1315-
# loop through each parameter in timeseries.
1316-
for parameter in timeseries['values']:
1317-
col_name = param_cd
1318-
method = parameter['method'][0]['methodDescription']
1319-
1320-
# if len(timeseries['values']) > 1 and method:
1321-
if method:
1322-
# get method, format it, and append to column name
1323-
method = method.strip('[]()').lower()
1324-
col_name = f'{col_name}_{method}'
1325-
1326-
if option:
1327-
col_name = f'{col_name}_{option}'
1328-
1329-
record_json = parameter['value']
1330-
1331-
if not record_json:
1332-
# no data in record
1333-
continue
1334-
# should be able to avoid this by dumping
1335-
record_json = str(record_json).replace("'", '"')
1336-
1337-
# read json, converting all values to float64 and all qualifiers
1338-
# Lists can't be hashed, thus we cannot df.merge on a list column
1339-
record_df = pd.read_json(
1340-
StringIO(record_json),
1341-
orient='records',
1342-
dtype={'value': 'float64', 'qualifiers': 'unicode'},
1343-
convert_dates=False,
1344-
)
1308+
merged_df = pd.DataFrame(columns=['site_no', 'datetime'])
13451309

1346-
record_df['qualifiers'] = (
1347-
record_df['qualifiers'].str.strip('[]').str.replace("'", '')
1348-
)
1349-
record_df['site_no'] = site_no
1350-
1351-
record_df.rename(
1352-
columns={
1353-
'value': col_name,
1354-
'dateTime': 'datetime',
1355-
'qualifiers': col_name + '_cd',
1356-
},
1357-
inplace=True,
1358-
)
1310+
site_list = [
1311+
ts['sourceInfo']['siteCode'][0]['value'] for ts in json['value']['timeSeries']
1312+
]
13591313

1360-
if merged_df.empty:
1361-
merged_df = record_df
1314+
# create a list of indexes for each change in site no
1315+
# for example, [0, 21, 22] would be the first and last indeces
1316+
index_list = [0]
1317+
index_list.extend(
1318+
[i + 1 for i, (a, b) in enumerate(zip(site_list[:-1], site_list[1:])) if a != b]
1319+
)
1320+
index_list.append(len(site_list))
1321+
1322+
for i in range(len(index_list) - 1):
1323+
start = index_list[i] # [0]
1324+
end = index_list[i + 1] # [21]
1325+
1326+
# grab a block containing timeseries 0:21,
1327+
# which are all from the same site
1328+
site_block = json['value']['timeSeries'][start:end]
1329+
if not site_block:
1330+
continue
1331+
1332+
site_no = site_block[0]['sourceInfo']['siteCode'][0]['value']
1333+
site_df = pd.DataFrame(columns=['datetime'])
1334+
1335+
for timeseries in site_block:
1336+
param_cd = timeseries['variable']['variableCode'][0]['value']
1337+
# check whether min, max, mean record XXX
1338+
option = timeseries['variable']['options']['option'][0].get('value')
1339+
1340+
# loop through each parameter in timeseries, then concat to the merged_df
1341+
for parameter in timeseries['values']:
1342+
col_name = param_cd
1343+
method = parameter['method'][0]['methodDescription']
1344+
1345+
# if len(timeseries['values']) > 1 and method:
1346+
if method:
1347+
# get method, format it, and append to column name
1348+
method = method.strip('[]()').lower()
1349+
col_name = f'{col_name}_{method}'
1350+
1351+
if option:
1352+
col_name = f'{col_name}_{option}'
1353+
1354+
record_json = parameter['value']
1355+
1356+
if not record_json:
1357+
# no data in record
1358+
continue
1359+
# should be able to avoid this by dumping
1360+
record_json = str(record_json).replace("'", '"')
1361+
1362+
# read json, converting all values to float64 and all qualifiers
1363+
# Lists can't be hashed, thus we cannot df.merge on a list column
1364+
record_df = pd.read_json(
1365+
StringIO(record_json),
1366+
orient='records',
1367+
dtype={'value': 'float64', 'qualifiers': 'unicode'},
1368+
convert_dates=False,
1369+
)
1370+
1371+
record_df['qualifiers'] = (
1372+
record_df['qualifiers'].str.strip('[]').str.replace("'", '')
1373+
)
13621374

1363-
else:
1364-
merged_df = update_merge(
1365-
merged_df, record_df, na_only=True, on=['site_no', 'datetime']
1375+
record_df.rename(
1376+
columns={
1377+
'value': col_name,
1378+
'dateTime': 'datetime',
1379+
'qualifiers': col_name + '_cd',
1380+
},
1381+
inplace=True,
13661382
)
13671383

1384+
site_df = site_df.merge(record_df, how='outer', on='datetime')
1385+
1386+
# end of site loop
1387+
site_df['site_no'] = site_no
1388+
merged_df = pd.concat([merged_df, site_df])
1389+
13681390
# convert to datetime, normalizing the timezone to UTC when doing so
13691391
if 'datetime' in merged_df.columns:
13701392
merged_df['datetime'] = pd.to_datetime(merged_df['datetime'], utc=True)

dataretrieval/utils.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -95,50 +95,6 @@ def format_datetime(df, date_field, time_field, tz_field):
9595
return df
9696

9797

98-
# This function may be deprecated once pandas.update support joins besides left.
99-
def update_merge(left, right, na_only=False, on=None, **kwargs):
100-
"""Performs a combination update and merge.
101-
102-
Parameters
103-
----------
104-
left: ``pandas.DataFrame``
105-
Original data
106-
right: ``pandas.DataFrame``
107-
Updated data
108-
na_only: bool
109-
If True, only update na values
110-
111-
Returns
112-
-------
113-
df: ``pandas.DataFrame``
114-
Updated data frame
115-
116-
.. todo::
117-
118-
add na_only parameter support
119-
120-
"""
121-
# df = left.merge(right, how='outer',
122-
# left_index=True, right_index=True)
123-
df = left.merge(right, how='outer', on=on, **kwargs)
124-
125-
# check for column overlap and resolve update
126-
for column in df.columns:
127-
# if duplicated column, use the value from right
128-
if column[-2:] == '_x':
129-
name = column[:-2] # find column name
130-
131-
if na_only:
132-
df[name] = df[name + '_x'].fillna(df[name + '_y'])
133-
134-
else:
135-
df[name] = df[name + '_x'].update(df[name + '_y'])
136-
137-
df.drop([name + '_x', name + '_y'], axis=1, inplace=True)
138-
139-
return df
140-
141-
14298
class BaseMetadata:
14399
"""Base class for metadata.
144100

0 commit comments

Comments
 (0)