Skip to content

Commit 8662e7e

Browse files
Gal ToppertalIguazGal Topper
authored
Aggregation optimizations. (#144)
* split aggregateStoreElement to query and aggregate, where the latter is optimized * lint * typo + minor performance improvment * AggregationValue optimizations. * Replace min and max with if. * Further optimizations. * Optimization. * Rename method. * raise NotImplementedError * Reuse AggregationValues to avoid allocations. * More use of reset. Fix reset. * More use of reset. * More AggregationValue reuse. * Remove dict key redundancy. * Optimization. * Optimization. * Simplification/optimization * Optimize _set_value. * Remove max_value from reset(). Co-authored-by: Tal Neiman <Taln@iguazio.com> Co-authored-by: Gal Topper <galt@iguazio.com>
1 parent d4ccee0 commit 8662e7e

File tree

4 files changed

+222
-116
lines changed

4 files changed

+222
-116
lines changed

storey/aggregations.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import copy
32
from datetime import datetime
43
import time
54
import re
@@ -174,10 +173,9 @@ async def _emit_event(self, key, event):
174173
emitted_attr_name = self._aliases.get(col, None) or col
175174
if col in self._table._get_static_attrs(key):
176175
features[emitted_attr_name] = self._table._get_static_attrs(key)[col]
177-
new_event = copy.copy(event)
178-
new_event.key = key
179-
new_event.body = features
180-
await self._do_downstream(new_event)
176+
event.key = key
177+
event.body = features
178+
await self._do_downstream(event)
181179

182180
# Emit multiple events for every key in the store with the current time
183181
async def _emit_all_events(self, timestamp):

storey/dataframe.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import copy
2-
31
import pandas as pd
42

53
from .flow import _termination_obj, Flow
@@ -78,6 +76,5 @@ async def _do(self, event):
7876
df = pd.DataFrame(event.body, columns=self._columns)
7977
if self._index:
8078
df.set_index(self._index, inplace=True)
81-
new_event = copy.copy(event)
82-
new_event.body = df
83-
return await self._do_downstream(new_event)
79+
event.body = df
80+
return await self._do_downstream(event)

storey/drivers.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -252,21 +252,21 @@ def _build_conditioned_feature_store_request(self, aggregation_element, pending=
252252
if not initialized_attributes.get(array_attribute_name, 0) == expected_time:
253253
initialized_attributes[array_attribute_name] = expected_time
254254
init_expression = f'{array_attribute_name}=if_else(({get_array_time_expr}<{expected_time_expr}),' \
255-
f"init_array({bucket.total_number_of_buckets},'double'," \
256-
f'{aggregation_value.get_default_value()}),{array_attribute_name})'
255+
f"init_array({bucket.total_number_of_buckets},'double'," \
256+
f'{aggregation_value.default_value}),{array_attribute_name})'
257257
expressions.append(init_expression)
258258

259259
arr_at_index = f'{array_attribute_name}[{index_to_update}]'
260260
update_array_expression = f'{arr_at_index}=if_else(({get_array_time_expr}>{expected_time_expr}),{arr_at_index},' \
261-
f'{self._get_update_expression_by_aggregation(arr_at_index, aggregation_value)})'
261+
f'{aggregation_value.get_update_expression(arr_at_index)})'
262262

263263
expressions.append(update_array_expression)
264264

265265
# Separating time attribute updates, so that they will be executed in the end and only once per feature name.
266266
if array_time_attribute_name not in times_update_expressions:
267267
times_update_expressions[array_time_attribute_name] = f'{array_time_attribute_name}=' \
268-
f'if_else(({get_array_time_expr}<{expected_time_expr}),' \
269-
f'{expected_time_expr},{array_time_attribute_name})'
268+
f'if_else(({get_array_time_expr}<{expected_time_expr}),' \
269+
f'{expected_time_expr},{array_time_attribute_name})'
270270

271271
expressions.extend(times_update_expressions.values())
272272

@@ -304,7 +304,7 @@ def _build_simplified_feature_store_request(self, aggregation_element):
304304
if not initialized_attributes.get(array_attribute_name, 0) == expected_time:
305305
initialized_attributes[array_attribute_name] = expected_time
306306
expressions.append(f"{array_attribute_name}=init_array({bucket.total_number_of_buckets},'double',"
307-
f'{aggregation_value.get_default_value()})')
307+
f'{aggregation_value.default_value})')
308308
if array_time_attribute_name not in times_update_expressions:
309309
times_update_expressions[array_time_attribute_name] = \
310310
f'{array_time_attribute_name}={expected_time_expr}'
@@ -313,8 +313,7 @@ def _build_simplified_feature_store_request(self, aggregation_element):
313313
# Updating the specific cells
314314
if cached_time <= expected_time:
315315
arr_at_index = f'{array_attribute_name}[{index_to_update}]'
316-
expressions.append(
317-
f'{arr_at_index}={self._get_update_expression_by_aggregation(arr_at_index, aggregation_value)}')
316+
expressions.append(f'{arr_at_index}={aggregation_value.get_update_expression(arr_at_index)}')
318317

319318
# Separating time attribute updates, so that they will be executed in the end and only once per feature name.
320319
expressions.extend(times_update_expressions.values())
@@ -325,20 +324,6 @@ def _build_simplified_feature_store_request(self, aggregation_element):
325324
aggregation_element.aggregation_buckets[name].storage_specific_cache[attribute_name] = new_time
326325
return expressions, pending_updates
327326

328-
@staticmethod
329-
def _get_update_expression_by_aggregation(old, aggregation):
330-
value = aggregation.get_value()[1]
331-
if aggregation.aggregation == 'max':
332-
return f'max({old}, {value})'
333-
elif aggregation.aggregation == 'min':
334-
return f'min({old}, {value})'
335-
elif aggregation.aggregation == 'last':
336-
return f'{value}'
337-
elif aggregation.aggregation == 'first':
338-
return f'if_else(({old} == {aggregation.get_default_value()}), {value}, {old})'
339-
else:
340-
return f'{old}+{value}'
341-
342327
@staticmethod
343328
def _convert_python_obj_to_expression_value(value):
344329
if isinstance(value, str):

0 commit comments

Comments
 (0)