Skip to content

Commit da49488

Browse files
committed
ensure properties and ids contain ISO8601-compliant datetime
1 parent 9216827 commit da49488

File tree

2 files changed

+51
-7
lines changed

2 files changed

+51
-7
lines changed

msc_pygeoapi/connector/elasticsearch_.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -180,21 +180,48 @@ def delete(self, indexes):
180180

181181
return True
182182

183-
def create_template(self, name, settings):
183+
def create_template(self, name, settings, overwrite=False):
184184
"""
185185
create an Elasticsearch index template
186186
187187
:param name: `str` index template name
188188
:param settings: `dict` settings dictionnary for index template
189+
:param overwrite: `bool` indicating whether to overwrite existing
190+
template
189191
190192
:returns: `bool` of index template creation status
191193
"""
192194

193-
if not self.Elasticsearch.indices.exists_template(name=name):
195+
template_exists = self.Elasticsearch.indices.exists_template(name=name)
196+
197+
if template_exists and overwrite:
198+
self.Elasticsearch.indices.delete_template(name=name)
199+
self.Elasticsearch.indices.put_template(name=name, body=settings)
200+
elif template_exists:
201+
LOGGER.warning(f'Template {name} already exists')
202+
return False
203+
else:
194204
self.Elasticsearch.indices.put_template(name=name, body=settings)
195205

196206
return True
197207

208+
def get_template(self, name):
209+
"""
210+
get an Elasticsearch index template
211+
212+
:param name: `str` index template name
213+
214+
:returns: `dict` of index template settings
215+
"""
216+
217+
try:
218+
template = self.Elasticsearch.indices.get_template(name=name)
219+
except NotFoundError:
220+
LOGGER.warning(f'Template {name} not found')
221+
return None
222+
223+
return template
224+
198225
def delete_template(self, name):
199226
"""
200227
delete an Elasticsearch index template

msc_pygeoapi/loader/hydrometric_realtime.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from msc_pygeoapi.util import (
4242
check_es_indexes_to_delete,
4343
configure_es_connection,
44+
DATETIME_RFC3339_FMT
4445
)
4546

4647

@@ -60,7 +61,7 @@
6061
SETTINGS = {
6162
'order': 0,
6263
'version': 1,
63-
'index_patterns': [INDEX_BASENAME],
64+
'index_patterns': [f'{INDEX_BASENAME}*'],
6465
'settings': {
6566
'number_of_shards': 1,
6667
'number_of_replicas': 0
@@ -98,7 +99,7 @@
9899
},
99100
'DATETIME': {
100101
'type': 'date',
101-
'format': 'strict_date_hour_minute_second||strict_date_optional_time' # noqa
102+
'format': 'date_time_no_millis||strict_date_optional_time' # noqa
102103
},
103104
'DATETIME_LST': {
104105
'type': 'date',
@@ -174,7 +175,24 @@ def __init__(self, conn_config={}):
174175
BaseLoader.__init__(self)
175176

176177
self.conn = ElasticsearchConnector(conn_config)
177-
self.conn.create_template(INDEX_BASENAME, SETTINGS)
178+
179+
index_template = self.conn.get_template(INDEX_BASENAME)
180+
181+
# compare index template mappping with mapping defined in SETTINGS
182+
if index_template:
183+
# if mappings are different, update the index template
184+
if (
185+
index_template[INDEX_BASENAME]['mappings']
186+
!= SETTINGS['mappings']
187+
):
188+
LOGGER.info(
189+
f'Updating {INDEX_BASENAME} index template with mapping changes in provider.'
190+
)
191+
self.conn.create_template(
192+
INDEX_BASENAME, SETTINGS, overwrite=True
193+
)
194+
else:
195+
self.conn.create_template(INDEX_BASENAME, SETTINGS)
178196

179197
self.stations = {}
180198
self.read_stations_list()
@@ -284,11 +302,10 @@ def generate_observations(self, filepath):
284302
try:
285303
# Convert timestamp to UTC time.
286304
utc_datetime = delocalize_date(date_)
287-
utc_datestamp = utc_datetime.strftime('%Y-%m-%d.%H:%M:%S')
305+
utc_datestamp = utc_datetime.strftime(DATETIME_RFC3339_FMT)
288306
# Generate an ID now that all fields are known.
289307
observation_id = f'{station}.{utc_datestamp}'
290308

291-
utc_datestamp = utc_datestamp.replace('.', 'T')
292309
except Exception as err:
293310
LOGGER.error(f'Cannot interpret datetime value {date_} in {filepath}' # noqa
294311
f' due to: {err} (skipping)')

0 commit comments

Comments
 (0)