Skip to content

Commit 8ce3d3d

Browse files
committed
add aliasing functionality for climate archive loader
1 parent c2869f6 commit 8ce3d3d

File tree

2 files changed

+130
-32
lines changed

2 files changed

+130
-32
lines changed

msc_pygeoapi/connector/elasticsearch_.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030

3131
import logging
3232

33-
from elasticsearch import Elasticsearch, logger as elastic_logger
33+
from elasticsearch import (
34+
Elasticsearch,
35+
NotFoundError,
36+
logger as elastic_logger
37+
)
3438
from elasticsearch.helpers import streaming_bulk, BulkIndexError
3539

3640
from msc_pygeoapi.connector.base import BaseConnector
@@ -234,6 +238,25 @@ def create_alias(self, alias, index, overwrite=False):
234238

235239
return True
236240

241+
def get_alias_indices(self, alias):
242+
"""
243+
get index(es) associated with an alias
244+
245+
:param alias: `str` alias name
246+
247+
:return: `list` of index names associated with alias
248+
"""
249+
250+
try:
251+
index_list = list(
252+
self.Elasticsearch.indices.get_alias(name=alias).keys()
253+
)
254+
except NotFoundError:
255+
LOGGER.warning(f'Alias {alias} not found')
256+
return None
257+
258+
return index_list
259+
237260
def submit_elastic_package(
238261
self, package, request_size=10000, refresh=False
239262
):

msc_pygeoapi/loader/climate_archive.py

+106-31
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
# =================================================================
3333

3434
import collections
35+
from datetime import datetime
3536
import logging
3637

3738
import click
@@ -48,6 +49,7 @@
4849
HTTP_OK = 200
4950
POST_OK = 201
5051
HEADERS = {'Content-type': 'application/json'}
52+
TODAY = datetime.now().strftime('%Y-%m-%d')
5153

5254

5355
class ClimateArchiveLoader(BaseLoader):
@@ -77,6 +79,7 @@ def create_index(self, index):
7779
created.
7880
7981
:param index: the index to be created.
82+
:returns: the name of the index created.
8083
"""
8184

8285
if index == 'stations':
@@ -285,7 +288,7 @@ def create_index(self, index):
285288
},
286289
}
287290

288-
index_name = 'climate_normals_data'
291+
index_name = f'climate_normals_data.{TODAY}'
289292
self.conn.create(index_name, mapping, overwrite=True)
290293

291294
if index == 'monthly_summary':
@@ -372,7 +375,7 @@ def create_index(self, index):
372375
},
373376
}
374377

375-
index_name = 'climate_public_climate_summary'
378+
index_name = f'climate_public_climate_summary.{TODAY}'
376379
self.conn.create(index_name, mapping, overwrite=True)
377380

378381
if index == 'daily_summary':
@@ -487,7 +490,7 @@ def create_index(self, index):
487490
},
488491
}
489492

490-
index_name = 'climate_public_daily_data'
493+
index_name = f'climate_public_daily_data.{TODAY}'
491494
self.conn.create(index_name, mapping, overwrite=True)
492495

493496
if index == 'hourly_summary':
@@ -598,18 +601,20 @@ def create_index(self, index):
598601
},
599602
}
600603

601-
index_name = 'climate_public_hourly_data'
604+
index_name = f'climate_public_hourly_data.{TODAY}'
602605
self.conn.create(index_name, mapping, overwrite=True)
603606

604-
def generate_stations(self):
607+
return index_name
608+
609+
def generate_stations(self, index_name):
605610
"""
606611
Queries stations data from the db, and reformats
607612
data so it can be inserted into Elasticsearch.
608613
609614
Returns a generator of dictionaries that represent upsert actions
610615
into Elasticsearch's bulk API.
611616
612-
:param cur: oracle cursor to perform queries against.
617+
:param index_name: name of the index to insert data into.
613618
:returns: generator of bulk API upsert actions.
614619
"""
615620

@@ -656,14 +661,16 @@ def generate_stations(self):
656661

657662
action = {
658663
'_id': climate_identifier,
659-
'_index': 'climate_station_information',
664+
'_index': index_name,
660665
'_op_type': 'update',
661666
'doc': wrapper,
662667
'doc_as_upsert': True,
663668
}
664669
yield action
665670

666-
def generate_normals(self, stn_dict, normals_dict, periods_dict):
671+
def generate_normals(
672+
self, stn_dict, normals_dict, periods_dict, index_name
673+
):
667674
"""
668675
Queries normals data from the db, and reformats
669676
data so it can be inserted into Elasticsearch.
@@ -676,6 +683,7 @@ def generate_normals(self, stn_dict, normals_dict, periods_dict):
676683
:param normals_dict: mapping of normal IDs to normals information.
677684
:param periods_dict: mapping of normal period IDs to
678685
normal period information.
686+
:param index_name: name of the index to insert data into.
679687
:returns: generator of bulk API upsert actions.
680688
"""
681689

@@ -738,7 +746,7 @@ def generate_normals(self, stn_dict, normals_dict, periods_dict):
738746
}
739747
action = {
740748
'_id': insert_dict['ID'],
741-
'_index': 'climate_normals_data',
749+
'_index': index_name,
742750
'_op_type': 'update',
743751
'doc': wrapper,
744752
'doc_as_upsert': True,
@@ -750,7 +758,7 @@ def generate_normals(self, stn_dict, normals_dict, periods_dict):
750758
f" records for this station"
751759
)
752760

753-
def generate_monthly_data(self, stn_dict, date=None):
761+
def generate_monthly_data(self, stn_dict, index_name, date=None):
754762
"""
755763
Queries monthly data from the db, and reformats
756764
data so it can be inserted into Elasticsearch.
@@ -760,6 +768,7 @@ def generate_monthly_data(self, stn_dict, date=None):
760768
761769
:param cur: oracle cursor to perform queries against.
762770
:param stn_dict: mapping of station IDs to station information.
771+
:param index_name: name of the index to insert data into.
763772
:param date: date to start fetching data from.
764773
:returns: generator of bulk API upsert actions.
765774
"""
@@ -813,7 +822,7 @@ def generate_monthly_data(self, stn_dict, date=None):
813822
}
814823
action = {
815824
'_id': insert_dict['ID'],
816-
'_index': 'climate_public_climate_summary',
825+
'_index': index_name,
817826
'_op_type': 'update',
818827
'doc': wrapper,
819828
'doc_as_upsert': True,
@@ -825,7 +834,7 @@ def generate_monthly_data(self, stn_dict, date=None):
825834
f" records for this station"
826835
)
827836

828-
def generate_daily_data(self, stn_dict, date=None):
837+
def generate_daily_data(self, stn_dict, index_name, date=None):
829838
"""
830839
Queries daily data from the db, and reformats
831840
data so it can be inserted into Elasticsearch.
@@ -835,6 +844,7 @@ def generate_daily_data(self, stn_dict, date=None):
835844
836845
:param cur: oracle cursor to perform queries against.
837846
:param stn_dict: mapping of station IDs to station information.
847+
:param index_name: name of the index to insert data into.
838848
:param date: date to start fetching data from.
839849
:returns: generator of bulk API upsert actions.
840850
"""
@@ -900,7 +910,7 @@ def generate_daily_data(self, stn_dict, date=None):
900910
}
901911
action = {
902912
'_id': insert_dict['ID'],
903-
'_index': 'climate_public_daily_data',
913+
'_index': index_name,
904914
'_op_type': 'update',
905915
'doc': wrapper,
906916
'doc_as_upsert': True,
@@ -912,7 +922,7 @@ def generate_daily_data(self, stn_dict, date=None):
912922
f" records for this station"
913923
)
914924

915-
def generate_hourly_data(self, stn_dict, date=None):
925+
def generate_hourly_data(self, stn_dict, index_name, date=None):
916926
"""
917927
Queries hourly data from the db, and reformats
918928
data so it can be inserted into Elasticsearch.
@@ -922,6 +932,7 @@ def generate_hourly_data(self, stn_dict, date=None):
922932
923933
:param cur: oracle cursor to perform queries against.
924934
:param stn_dict: mapping of station IDs to station information.
935+
:param index_name: name of the index to insert data into.
925936
:param date: date to start fetching data from.
926937
:returns: generator of bulk API upsert actions.
927938
"""
@@ -987,7 +998,7 @@ def generate_hourly_data(self, stn_dict, date=None):
987998
}
988999
action = {
9891000
'_id': insert_dict['ID'],
990-
'_index': 'climate_public_hourly_data',
1001+
'_index': index_name,
9911002
'_op_type': 'update',
9921003
'doc': wrapper,
9931004
'doc_as_upsert': True,
@@ -1184,13 +1195,17 @@ def add(
11841195
else:
11851196
datasets_to_process = [dataset]
11861197

1198+
# if no date, station or starting_from is provided, then it is
1199+
# a full reindexing
1200+
full_reindex = not (date or station or starting_from)
1201+
11871202
click.echo(f'Processing dataset(s): {datasets_to_process}')
11881203

11891204
if 'stations' in datasets_to_process:
11901205
try:
11911206
click.echo('Populating stations index')
1192-
loader.create_index('stations')
1193-
stations = loader.generate_stations()
1207+
index_name = loader.create_index('stations')
1208+
stations = loader.generate_stations(index_name)
11941209
loader.conn.submit_elastic_package(stations, batch_size)
11951210
except Exception as err:
11961211
msg = f'Could not populate stations index: {err}'
@@ -1202,11 +1217,20 @@ def add(
12021217
stn_dict = loader.get_station_data(station, starting_from)
12031218
normals_dict = loader.get_normals_data()
12041219
periods_dict = loader.get_normals_periods()
1205-
loader.create_index('normals')
1220+
1221+
index_name = loader.create_index('normals')
1222+
12061223
normals = loader.generate_normals(
1207-
stn_dict, normals_dict, periods_dict
1224+
stn_dict, normals_dict, periods_dict, index_name
1225+
)
1226+
indexing_succesful = loader.conn.submit_elastic_package(
1227+
normals, batch_size
12081228
)
1209-
loader.conn.submit_elastic_package(normals, batch_size)
1229+
1230+
if indexing_succesful and full_reindex:
1231+
loader.conn.create_alias(
1232+
'climate_normals_data', index_name, overwrite=True
1233+
)
12101234
except Exception as err:
12111235
msg = f'Could not populate normals index: {err}'
12121236
raise click.ClickException(msg)
@@ -1215,10 +1239,29 @@ def add(
12151239
try:
12161240
click.echo('Populating monthly index')
12171241
stn_dict = loader.get_station_data(station, starting_from)
1218-
if not (date or station or starting_from):
1219-
loader.create_index('monthly_summary')
1220-
monthlies = loader.generate_monthly_data(stn_dict, date)
1221-
loader.conn.submit_elastic_package(monthlies, batch_size)
1242+
1243+
if full_reindex:
1244+
index_name = loader.create_index('monthly_summary')
1245+
else:
1246+
index_name = loader.conn.get_alias_indices('climate_public_climate_summary')[0] # noqa
1247+
if index_name is None:
1248+
raise click.ClickException(
1249+
'No associated index found for alias climate_public_climate_summary.' # noqa
1250+
)
1251+
1252+
monthlies = loader.generate_monthly_data(
1253+
stn_dict, index_name, date
1254+
)
1255+
indexing_succesful = loader.conn.submit_elastic_package(
1256+
monthlies, batch_size
1257+
)
1258+
1259+
if indexing_succesful and full_reindex:
1260+
loader.conn.create_alias(
1261+
'climate_public_climate_summary',
1262+
index_name,
1263+
overwrite=True
1264+
)
12221265
except Exception as err:
12231266
msg = f'Could not populate montly index: {err}'
12241267
raise click.ClickException(msg)
@@ -1227,10 +1270,26 @@ def add(
12271270
try:
12281271
click.echo('Populating daily index')
12291272
stn_dict = loader.get_station_data(station, starting_from)
1230-
if not (date or station or starting_from):
1231-
loader.create_index('daily_summary')
1232-
dailies = loader.generate_daily_data(stn_dict, date)
1233-
loader.conn.submit_elastic_package(dailies, batch_size)
1273+
1274+
if full_reindex:
1275+
index_name = loader.create_index('daily_summary')
1276+
else:
1277+
index_name = loader.conn.get_alias_indices('climate_public_daily_data')[0] # noqa
1278+
if index_name is None:
1279+
raise click.ClickException(
1280+
'No index found for alias climate_public_daily_data.'
1281+
)
1282+
1283+
dailies = loader.generate_daily_data(stn_dict, index_name, date)
1284+
indexing_succesful = loader.conn.submit_elastic_package(
1285+
dailies, batch_size
1286+
)
1287+
1288+
if indexing_succesful and full_reindex:
1289+
loader.conn.create_alias(
1290+
'climate_public_daily_data', index_name, overwrite=True
1291+
)
1292+
12341293
except Exception as err:
12351294
msg = f'Could not populate daily index: {err}'
12361295
raise click.ClickException(msg)
@@ -1239,10 +1298,26 @@ def add(
12391298
try:
12401299
click.echo('Populating hourly index')
12411300
stn_dict = loader.get_station_data(station, starting_from)
1242-
if not (date or station or starting_from):
1301+
1302+
if full_reindex:
12431303
loader.create_index('hourly_summary')
1244-
hourlies = loader.generate_hourly_data(stn_dict, date)
1245-
loader.conn.submit_elastic_package(hourlies, batch_size)
1304+
else:
1305+
index_name = loader.conn.get_alias_indices('climate_public_hourly_data')[0] # noqa
1306+
if index_name is None:
1307+
raise click.ClickException(
1308+
'No index found for alias climate_public_hourly_data.'
1309+
)
1310+
1311+
hourlies = loader.generate_hourly_data(stn_dict, index_name, date)
1312+
indexing_succesful = loader.conn.submit_elastic_package(
1313+
hourlies, batch_size
1314+
)
1315+
1316+
if indexing_succesful and full_reindex:
1317+
loader.conn.create_alias(
1318+
'climate_public_hourly_data', index_name, overwrite=True
1319+
)
1320+
12461321
except Exception as err:
12471322
msg = f'Could not populate hourly index: {err}'
12481323
raise click.ClickException(msg)

0 commit comments

Comments
 (0)