1717from .. import TIMESERIES_DB
1818from .index import MetricIndex , Point , find_metric
1919from .queries import default_chart_query , math_map , operator_lookup
20+ from .retention_policies import _make_policy , default_rp_policy
2021
2122logger = logging .getLogger (__name__ )
2223
@@ -70,7 +71,10 @@ def __init__(self, db_name='metric'):
7071 def create_database (self ):
7172 """ creates connection to elasticsearch """
7273 connections .create_connection (hosts = [TIMESERIES_DB ['HOST' ]])
73- self .get_db
74+ db = self .get_db
75+ # Skip if support for Index Lifecycle Management is disabled or no privileges
76+ self .ilm_enabled = db .ilm .start ()['acknowledged' ]
77+ self .create_or_alter_retention_policy (name = 'default' )
7478
7579 def drop_database (self ):
7680 """ deletes all indices """
@@ -81,27 +85,48 @@ def drop_database(self):
8185 @cached_property
8286 def get_db (self ):
8387 """ Returns an ``Elasticsearch Client`` instance """
84- # TODO: AUTHENTICATION remains see `SecurityClient`
8588 return Elasticsearch (
8689 [f"{ TIMESERIES_DB ['HOST' ]} :{ TIMESERIES_DB ['PORT' ]} " ],
8790 http_auth = (TIMESERIES_DB ['USER' ], TIMESERIES_DB ['PASSWORD' ]),
8891 retry_on_timeout = True ,
8992 )
9093
91- def create_or_alter_retention_policy (self , name , duration ):
92- """ creates or alters existing retention policy if necessary """
93- # TODO
94- pass
94+ def create_or_alter_retention_policy (self , name , duration = None ):
95+ """
96+ creates or alters existing retention policy if necessary
97+
98+ Note: default retention policy can't be altered with this function
99+ """
100+ if not self .ilm_enabled :
101+ return
102+ ilm = self .get_db .ilm
103+ if not duration :
104+ ilm .put_lifecycle (policy = name , body = default_rp_policy )
105+ return
106+ days = f'{ int (duration .split ("h" )[0 ]) / 24 } d'
107+ duration_changed = False
108+ try :
109+ policy = ilm .get_lifecycle ('default' )
110+ exists = True
111+ current_duration = policy ['default' ]['policy' ]['phases' ]['hot' ]['actions' ][
112+ 'rollover'
113+ ]['max_age' ]
114+ duration_changed = current_duration != days
115+ except NotFoundError :
116+ exists = False
117+ if not exists or duration_changed :
118+ policy = _make_policy (days )
119+ ilm .put_lifecycle (policy = name , body = policy )
95120
96121 def query (self , query , precision = None ):
97122 index = query .pop ('key' )
98123 return Search (index = index ).from_dict (query ).execute ().to_dict ()
99124
100125 def write (self , name , values , ** kwargs ):
101- # TODO: Add support for retention policy
126+ rp = kwargs . get ( 'retention_policy' )
102127 tags = kwargs .get ('tags' )
103128 timestamp = kwargs .get ('timestamp' )
104- metric_id = find_metric (name , tags , add = True )
129+ metric_id = find_metric (self . get_db , name , tags , rp , add = True )
105130 metric_index = MetricIndex ().get (metric_id , index = name )
106131 point = Point (fields = values , time = timestamp or datetime .now ())
107132 metric_index .points .append (point )
@@ -110,14 +135,12 @@ def write(self, name, values, **kwargs):
110135 def read (self , key , fields , tags , limit = 1 , order = '-time' , ** kwargs ):
111136 extra_fields = kwargs .get ('extra_fields' )
112137 time_format = kwargs .get ('time_format' )
138+ # TODO: It will be of the form 'now() - <int>s'
113139 # since = kwargs.get('since')
114- metric_id = find_metric (key , tags )
140+ metric_id = find_metric (self . get_db , key , tags )
115141 if not metric_id :
116- return list ()
117- try :
118- metric_index = MetricIndex ().get (metric_id , index = key )
119- except NotFoundError :
120142 return []
143+ metric_index = self .get_db .get (index = key , id = metric_id )
121144 if order == 'time' :
122145 points = list (metric_index .points [0 :limit ])
123146 elif order == '-time' :
@@ -127,33 +150,28 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
127150 f'Invalid order "{ order } " passed.\n You may pass "time" / "-time" to get '
128151 'result sorted in ascending /descending order respectively.'
129152 )
130- if not points :
131- return list ()
132153 # distinguish between traffic and clients
133154 for point in list (points ):
134155 if fields not in point .fields .to_dict ():
135156 points .remove (point )
136157 if extra_fields and extra_fields != '*' :
137158 assert isinstance (extra_fields , list )
138- _points = []
139- for point in points :
140- point = point .to_dict ()
141- _point = {
159+ for count , point in enumerate (points ):
160+ fields_dict = point .to_dict ()['fields' ]
161+ point = {
142162 'time' : self ._format_time (point ['time' ], time_format ),
143- fields : point [ 'fields' ] [fields ],
163+ fields : fields_dict [fields ],
144164 }
145165 for extra_field in extra_fields :
146- if point ['fields' ].get (extra_field ) is not None :
147- _point .update ({extra_field : point ['fields' ][extra_field ]})
148- _points .append (_point )
149- points = _points
166+ if fields_dict .get (extra_field ) is not None :
167+ point .update ({extra_field : fields_dict [extra_field ]})
168+ points [count ] = point
150169 elif extra_fields == '*' :
151- points = [
152- deep_merge_dicts (
153- p .fields .to_dict (), {'time' : self ._format_time (p .time , time_format )}
170+ for count , point in enumerate (points ):
171+ points [count ] = deep_merge_dicts (
172+ point .fields .to_dict (),
173+ {'time' : self ._format_time (point .time , time_format )},
154174 )
155- for p in points
156- ]
157175 else :
158176 points = [
159177 deep_merge_dicts (
@@ -210,12 +228,14 @@ def _fill_points(self, query, points):
210228
211229 def delete_metric_data (self , key = None , tags = None ):
212230 """
213- deletes a specific metric based on the key and tags
214- provided, you may also choose to delete all metrics
231+ deletes a specific metric based on given key and tags;
232+ deletes all metrics if neither provided
215233 """
216234 if key and tags :
217- metric_id = find_metric (key , tags )
235+ metric_id = find_metric (self . get_db , key , tags )
218236 self .get_db .delete (index = key , id = metric_id )
237+ elif key :
238+ self .get_db .indices .delete (index = key , ignore = [400 , 404 ])
219239 else :
220240 self .get_db .indices .delete (index = '*' , ignore = [400 , 404 ])
221241
@@ -317,7 +337,7 @@ def default_chart_query(self, tags):
317337 return q
318338
319339
320- # Old data - delete by query (inefficient) / retention policy - Index lifecycle management
340+ # TODO:
321341# Fix Average - currently it's computing average over all fields!
322342# Time Interval - fix range
323343# Device query
0 commit comments