Skip to content

Commit 41a6933

Browse files
Release updates (#124)
* Session handling for attribute calculation * Upload records limit fix * Message for duplicated composite key changed * adds session handling for attribute calculation thread --------- Co-authored-by: FelixKirschKern <[email protected]>
1 parent fce237d commit 41a6933

File tree

4 files changed

+84
-61
lines changed

4 files changed

+84
-61
lines changed

api/transfer.py

Lines changed: 65 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -304,57 +304,75 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
304304
notification.send_organization_update(
305305
project_id=project_id, message="calculate_attribute:started:all"
306306
)
307-
# first check project tokenization completed
308-
i = 0
309-
while True:
310-
i += 1
311-
if i >= 60:
312-
i = 0
313-
ctx_token = general.remove_and_refresh_session(ctx_token, True)
314-
if tokenization.is_doc_bin_creation_running(project_id):
315-
time.sleep(5)
316-
continue
317-
else:
318-
break
319-
# next, ensure that the attributes are calculated and tokenized
320-
i = 0
321-
while True:
322-
time.sleep(1)
323-
i += 1
324-
if len(attribute_ids) == 0:
325-
notification.send_organization_update(
326-
project_id=project_id,
327-
message="calculate_attribute:finished:all",
328-
)
329-
break
330-
if i >= 60:
331-
i = 0
332-
ctx_token = general.remove_and_refresh_session(ctx_token, True)
333-
334-
current_att_id = attribute_ids[0]
335-
current_att = attribute.get(project_id, current_att_id)
336-
if current_att.state == enums.AttributeState.RUNNING.value:
337-
continue
338-
elif current_att.state == enums.AttributeState.INITIAL.value:
339-
attribute_manager.calculate_user_attribute_all_records(
340-
project_id, user_id, current_att_id, True
341-
)
342-
else:
343-
if tokenization.is_doc_bin_creation_running_for_attribute(
344-
project_id, current_att.name
345-
):
307+
308+
try:
309+
# first check project tokenization completed
310+
i = 0
311+
while True:
312+
i += 1
313+
if i >= 60:
314+
i = 0
315+
ctx_token = general.remove_and_refresh_session(ctx_token, True)
316+
if tokenization.is_doc_bin_creation_running(project_id):
346317
time.sleep(5)
347318
continue
348319
else:
349-
attribute_ids.pop(0)
350-
notification.send_organization_update(
351-
project_id=project_id,
352-
message=f"calculate_attribute:finished:{current_att_id}",
353-
)
354-
time.sleep(5)
320+
break
321+
# next, ensure that the attributes are calculated and tokenized
322+
i = 0
323+
while True:
324+
time.sleep(1)
325+
i += 1
326+
if len(attribute_ids) == 0:
327+
break
328+
if i >= 60:
329+
i = 0
330+
ctx_token = general.remove_and_refresh_session(ctx_token, True)
355331

356-
general.remove_and_refresh_session(ctx_token, False)
357-
calculate_missing_embedding_tensors(project_id, user_id)
332+
current_att_id = attribute_ids[0]
333+
current_att = attribute.get(project_id, current_att_id)
334+
if current_att.state == enums.AttributeState.RUNNING.value:
335+
continue
336+
elif current_att.state == enums.AttributeState.INITIAL.value:
337+
attribute_manager.calculate_user_attribute_all_records(
338+
project_id, user_id, current_att_id, True
339+
)
340+
else:
341+
if tokenization.is_doc_bin_creation_running_for_attribute(
342+
project_id, current_att.name
343+
):
344+
time.sleep(5)
345+
continue
346+
else:
347+
attribute_ids.pop(0)
348+
notification.send_organization_update(
349+
project_id=project_id,
350+
message=f"calculate_attribute:finished:{current_att_id}",
351+
)
352+
time.sleep(5)
353+
except Exception as e:
354+
print(
355+
f"Error while recreating attribute calculation for {project_id} when new records are uploaded : {e}"
356+
)
357+
get_initial_attributes = attribute.get_all_ordered(
358+
project_id,
359+
True,
360+
state_filter=[
361+
enums.AttributeState.INITIAL.value,
362+
],
363+
)
364+
for attr in get_initial_attributes:
365+
attribute.update(
366+
project_id, attr.id, state=enums.AttributeState.FAILED.value
367+
)
368+
general.commit()
369+
finally:
370+
notification.send_organization_update(
371+
project_id=project_id,
372+
message="calculate_attribute:finished:all",
373+
)
374+
general.remove_and_refresh_session(ctx_token, False)
375+
calculate_missing_embedding_tensors(project_id, user_id)
358376

359377

360378
def calculate_missing_embedding_tensors(project_id: str, user_id: str) -> None:

controller/attribute/manager.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
import time
2-
from typing import List, Optional, Tuple
1+
from typing import List, Tuple
32
from controller.tokenization.tokenization_service import (
43
request_tokenize_calculated_attribute,
54
request_tokenize_project,
65
request_reupload_docbins,
76
)
8-
from submodules.model.business_objects import attribute, record, tokenization
7+
from submodules.model.business_objects import attribute, record, tokenization, general
98
from submodules.model.models import Attribute
109
from submodules.model.enums import AttributeState, DataTypes
1110
from util import daemon, notification
@@ -207,6 +206,7 @@ def calculate_user_attribute_all_records(
207206
def __calculate_user_attribute_all_records(
208207
project_id: str, user_id: str, attribute_id: str, include_rats: bool
209208
) -> None:
209+
session_token = general.get_ctx_token()
210210
try:
211211
calculated_attributes = util.run_attribute_calculation_exec_env(
212212
attribute_id=attribute_id,
@@ -226,6 +226,7 @@ def __calculate_user_attribute_all_records(
226226
attribute_id=attribute_id,
227227
log="Attribute calculation failed",
228228
)
229+
general.remove_and_refresh_session(session_token)
229230
return
230231

231232
util.add_log_to_attribute_logs(
@@ -250,6 +251,7 @@ def __calculate_user_attribute_all_records(
250251
attribute_id=attribute_id,
251252
log="Writing to the database failed.",
252253
)
254+
general.remove_and_refresh_session(session_token)
253255
return
254256
util.add_log_to_attribute_logs(project_id, attribute_id, "Finished writing.")
255257

@@ -262,7 +264,7 @@ def __calculate_user_attribute_all_records(
262264
request_tokenize_calculated_attribute(
263265
project_id, user_id, attribute_item.id, include_rats
264266
)
265-
except:
267+
except Exception:
266268
record.delete_user_created_attribute(
267269
project_id=project_id,
268270
attribute_id=attribute_id,
@@ -273,6 +275,7 @@ def __calculate_user_attribute_all_records(
273275
attribute_id=attribute_id,
274276
log="Writing to the database failed.",
275277
)
278+
general.remove_and_refresh_session(session_token)
276279
return
277280

278281
else:
@@ -292,6 +295,7 @@ def __calculate_user_attribute_all_records(
292295
notification.send_organization_update(
293296
project_id, f"calculate_attribute:finished:{attribute_id}"
294297
)
298+
general.remove_and_refresh_session(session_token)
295299

296300

297301
def __notify_attribute_calculation_failed(

controller/notification/notification_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
"docs": enums.DOCS.UPLOADING_DATA.value,
116116
},
117117
enums.NotificationType.DUPLICATED_COMPOSITE_KEY.value: {
118-
"message_template": "Please upload a file with to the selected composite primary key.",
118+
"message_template": "Please upload a file with your projects primary key(s).",
119119
"title": "Data import",
120120
"level": enums.Notification.ERROR.value,
121121
"page": enums.Pages.SETTINGS.value,

controller/transfer/checks.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import Tuple, List, Union, Dict
2+
from controller.auth import manager as auth_manager
23
from controller.misc.config_service import get_config_value
34

45
from controller.transfer import util as transfer_util
@@ -128,42 +129,42 @@ def run_checks(df: pd.DataFrame, project_id, user_id) -> None:
128129

129130

130131
def run_limit_checks(df: pd.DataFrame, project_id, user_id) -> None:
131-
limits = get_config_value("limit_checks")
132+
org = auth_manager.get_organization_by_user_id(user_id)
132133
guard = False
133134
errors = {}
134-
if df.shape[0] > limits["max_rows"]:
135+
if df.shape[0] > org.max_rows:
135136
guard = True
136137
notification = create_notification(
137138
NotificationType.NEW_ROWS_EXCEED_MAXIMUM_LIMIT,
138139
user_id,
139140
project_id,
140141
df.shape[0],
141-
limits["max_rows"],
142+
org.max_rows,
142143
)
143144
errors["MaxRows"] = notification.message
144145
else:
145146
count_current_records = record.count(project_id)
146147
if count_current_records:
147148
updating = get_update_amount(df, project_id)
148-
if count_current_records - updating + df.shape[0] > limits["max_rows"]:
149+
if count_current_records - updating + df.shape[0] > org.max_rows:
149150
guard = True
150151
notification = create_notification(
151152
NotificationType.TOTAL_ROWS_EXCEED_MAXIMUM_LIMIT,
152153
user_id,
153154
project_id,
154155
count_current_records - updating + df.shape[0],
155-
limits["max_rows"],
156+
org.max_rows,
156157
)
157158
errors["MaxRows"] = notification.message
158159

159-
if df.shape[1] > limits["max_cols"]:
160+
if df.shape[1] > org.max_cols:
160161
guard = True
161162
notification = create_notification(
162163
NotificationType.COLS_EXCEED_MAXIMUM_LIMIT,
163164
user_id,
164165
project_id,
165166
df.shape[1],
166-
limits["max_cols"],
167+
org.max_cols,
167168
)
168169
errors["MaxCols"] = notification.message
169170
max_length_dict = dict(
@@ -174,15 +175,15 @@ def run_limit_checks(df: pd.DataFrame, project_id, user_id) -> None:
174175
)
175176

176177
for key in max_length_dict:
177-
if max_length_dict[key] > limits["max_char_count"]:
178+
if max_length_dict[key] > org.max_char_count:
178179
guard = True
179180
notification = create_notification(
180181
NotificationType.COL_EXCEED_MAXIMUM_LIMIT,
181182
user_id,
182183
project_id,
183184
key,
184185
max_length_dict[key],
185-
limits["max_char_count"],
186+
org.max_char_count,
186187
)
187188
errors["MaxLength"] = notification.message
188189

0 commit comments

Comments
 (0)