-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathminio_upload.py
More file actions
137 lines (123 loc) · 4.7 KB
/
minio_upload.py
File metadata and controls
137 lines (123 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from typing import List
import datetime
from controller.task_master import manager as task_master_manager
from submodules.model import enums, etl_utils
from submodules.model.business_objects import general
from submodules.model.global_objects import etl_task as etl_task_bo
from submodules.model.cognition_objects import (
file_reference as file_reference_db_co,
markdown_file as markdown_file_db_co,
markdown_dataset as markdown_dataset_db_co,
)
def handle_cognition_file_upload(path_parts: List[str]):
# raise NotImplementedError("This function is not yet implemented.")
if path_parts[1] != "_cognition" or len(path_parts) < 5:
return
##tmp doc retrieval => need to understand how .info file is an indicator for cognition gateway to pick it up
if not (path_parts[2] == "files" and path_parts[4].startswith("file_original")):
return
org_id = path_parts[0]
file_hash, file_size = path_parts[3].split("_")
file_reference = file_reference_db_co.get(org_id, file_hash, int(file_size))
if (
not file_reference
or file_reference.state == enums.FileCachingState.RUNNING.value
or file_reference.state == enums.FileCachingState.COMPLETED.value
):
# file_reference is None or already processed in queue
print(
f"WARNING: {__name__} - file reference duplication error, file is already processed",
flush=True,
)
if file_reference:
print(
f"INFO: {__name__} - file reference id: {str(file_reference.id)}",
flush=True,
)
print(
f"INFO: {__name__} - file name: {file_reference.original_file_name}",
flush=True,
)
return
file_reference.state = enums.FileCachingState.COMPLETED.value
general.commit()
if (
file_reference.meta_data.get("file_caching_initiator")
== enums.FileCachingInitiator.TMP_DOC_RETRIEVAL.value
):
project_id = file_reference.meta_data.get("project_id")
conversation_id = file_reference.meta_data.get("conversation_id")
full_config, tokenizer = etl_utils.get_full_config_and_tokenizer_from_config_id(
file_reference,
project_id=project_id,
conversation_id=conversation_id,
)
etl_task = etl_task_bo.create(
org_id,
file_reference.created_by,
file_reference.original_file_name,
file_reference.file_size_bytes,
full_config=full_config,
tokenizer=tokenizer,
meta_data={
"file_reference_id": str(file_reference.id),
"tmp_doc_metadata": {
"project_id": project_id,
"conversation_id": conversation_id,
},
},
priority=1,
)
task_master_manager.queue_task(
org_id,
str(file_reference.created_by),
enums.TaskType.EXECUTE_ETL,
{
"etl_task_id": str(etl_task.id),
**etl_task.meta_data,
},
priority=True,
)
else:
priority = -1
markdown_file = markdown_file_db_co.get(
org_id, file_reference.meta_data.get("markdown_file_id")
)
full_config, tokenizer = etl_utils.get_full_config_and_tokenizer_from_config_id(
file_reference,
etl_config_id=markdown_dataset_db_co.get_default_etl_config_id(
str(markdown_file.organization_id), markdown_file.dataset_id
),
markdown_file_id=str(markdown_file.id),
)
etl_task = etl_task_bo.create(
org_id,
file_reference.created_by,
file_reference.original_file_name,
file_reference.file_size_bytes,
full_config=full_config,
tokenizer=tokenizer,
meta_data={
"file_reference_id": str(file_reference.id),
"markdown_file_id": str(markdown_file.id),
},
priority=priority,
)
markdown_file_db_co.update(
org_id=org_id,
markdown_file_id=markdown_file.id,
etl_task_id=etl_task.id,
meta_data={"file_reference_id": str(file_reference.id)},
started_at=datetime.datetime.now(datetime.UTC),
overwrite_meta_data=False,
)
task_master_manager.queue_task(
org_id,
str(file_reference.created_by),
enums.TaskType.EXECUTE_ETL,
{
"etl_task_id": str(etl_task.id),
**etl_task.meta_data,
},
priority=priority != -1,
)