Skip to content

Commit c72cc90

Browse files
Cache Files for ETL Processing (#257)
* file caching draft * model * remove tokens * clean up project * update enum values, cancel update * alembic join * improve cancel * s3 * state change * model * alembic submodules * model * model * Remove and condition * Change revision order * Remove token from invalidation thread * Submodule merge --------- Co-authored-by: JWittmeyer <[email protected]>
1 parent 4638c1f commit c72cc90

File tree

9 files changed

+239
-68
lines changed

9 files changed

+239
-68
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""add file caching
2+
3+
Revision ID: 11675e102ac4
4+
Revises: 1118c7327b96
5+
Create Date: 2024-10-09 15:37:46.744638
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '11675e102ac4'
14+
down_revision = '1118c7327b96'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table('file_reference',
22+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
23+
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
24+
sa.Column('hash', sa.String(), nullable=True),
25+
sa.Column('minio_path', sa.String(), nullable=True),
26+
sa.Column('bucket', sa.String(), nullable=True),
27+
sa.Column('created_at', sa.DateTime(), nullable=True),
28+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
29+
sa.Column('file_size_bytes', sa.BigInteger(), nullable=True),
30+
sa.Column('content_type', sa.String(), nullable=True),
31+
sa.Column('original_file_name', sa.String(), nullable=True),
32+
sa.Column('state', sa.String(), nullable=True),
33+
sa.Column('meta_data', sa.JSON(), nullable=True),
34+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
35+
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
36+
sa.PrimaryKeyConstraint('id'),
37+
sa.UniqueConstraint('organization_id', 'hash', 'file_size_bytes', name='unique_file_reference'),
38+
schema='cognition'
39+
)
40+
op.create_index(op.f('ix_cognition_file_reference_created_by'), 'file_reference', ['created_by'], unique=False, schema='cognition')
41+
op.create_index(op.f('ix_cognition_file_reference_hash'), 'file_reference', ['hash'], unique=False, schema='cognition')
42+
op.create_index(op.f('ix_cognition_file_reference_organization_id'), 'file_reference', ['organization_id'], unique=False, schema='cognition')
43+
op.create_table('file_extraction',
44+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
45+
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
46+
sa.Column('file_reference_id', postgresql.UUID(as_uuid=True), nullable=True),
47+
sa.Column('extraction_key', sa.String(), nullable=True),
48+
sa.Column('minio_path', sa.String(), nullable=True),
49+
sa.Column('bucket', sa.String(), nullable=True),
50+
sa.Column('created_at', sa.DateTime(), nullable=True),
51+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
52+
sa.Column('state', sa.String(), nullable=True),
53+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
54+
sa.ForeignKeyConstraint(['file_reference_id'], ['cognition.file_reference.id'], ondelete='CASCADE'),
55+
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
56+
sa.PrimaryKeyConstraint('id'),
57+
sa.UniqueConstraint('organization_id', 'file_reference_id', 'extraction_key', name='unique_file_extraction'),
58+
schema='cognition'
59+
)
60+
op.create_index(op.f('ix_cognition_file_extraction_created_by'), 'file_extraction', ['created_by'], unique=False, schema='cognition')
61+
op.create_index(op.f('ix_cognition_file_extraction_file_reference_id'), 'file_extraction', ['file_reference_id'], unique=False, schema='cognition')
62+
op.create_index(op.f('ix_cognition_file_extraction_organization_id'), 'file_extraction', ['organization_id'], unique=False, schema='cognition')
63+
op.create_table('file_transformation',
64+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
65+
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
66+
sa.Column('file_extraction_id', postgresql.UUID(as_uuid=True), nullable=True),
67+
sa.Column('transformation_key', sa.String(), nullable=True),
68+
sa.Column('minio_path', sa.String(), nullable=True),
69+
sa.Column('bucket', sa.String(), nullable=True),
70+
sa.Column('created_at', sa.DateTime(), nullable=True),
71+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
72+
sa.Column('state', sa.String(), nullable=True),
73+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
74+
sa.ForeignKeyConstraint(['file_extraction_id'], ['cognition.file_extraction.id'], ondelete='CASCADE'),
75+
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
76+
sa.PrimaryKeyConstraint('id'),
77+
sa.UniqueConstraint('organization_id', 'file_extraction_id', 'transformation_key', name='unique_file_transformation'),
78+
schema='cognition'
79+
)
80+
op.create_index(op.f('ix_cognition_file_transformation_created_by'), 'file_transformation', ['created_by'], unique=False, schema='cognition')
81+
op.create_index(op.f('ix_cognition_file_transformation_file_extraction_id'), 'file_transformation', ['file_extraction_id'], unique=False, schema='cognition')
82+
op.create_index(op.f('ix_cognition_file_transformation_organization_id'), 'file_transformation', ['organization_id'], unique=False, schema='cognition')
83+
# ### end Alembic commands ###
84+
85+
86+
def downgrade():
87+
# ### commands auto generated by Alembic - please adjust! ###
88+
op.drop_index(op.f('ix_cognition_file_transformation_organization_id'), table_name='file_transformation', schema='cognition')
89+
op.drop_index(op.f('ix_cognition_file_transformation_file_extraction_id'), table_name='file_transformation', schema='cognition')
90+
op.drop_index(op.f('ix_cognition_file_transformation_created_by'), table_name='file_transformation', schema='cognition')
91+
op.drop_table('file_transformation', schema='cognition')
92+
op.drop_index(op.f('ix_cognition_file_extraction_organization_id'), table_name='file_extraction', schema='cognition')
93+
op.drop_index(op.f('ix_cognition_file_extraction_file_reference_id'), table_name='file_extraction', schema='cognition')
94+
op.drop_index(op.f('ix_cognition_file_extraction_created_by'), table_name='file_extraction', schema='cognition')
95+
op.drop_table('file_extraction', schema='cognition')
96+
op.drop_index(op.f('ix_cognition_file_reference_organization_id'), table_name='file_reference', schema='cognition')
97+
op.drop_index(op.f('ix_cognition_file_reference_hash'), table_name='file_reference', schema='cognition')
98+
op.drop_index(op.f('ix_cognition_file_reference_created_by'), table_name='file_reference', schema='cognition')
99+
op.drop_table('file_reference', schema='cognition')
100+
# ### end Alembic commands ###

alembic/versions/c626887031f6_add.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""add
2+
3+
Revision ID: c626887031f6
4+
Revises: 11675e102ac4
5+
Create Date: 2024-10-15 13:53:26.632068
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = 'c626887031f6'
14+
down_revision = '11675e102ac4'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.add_column('file_reference', sa.Column('last_used', sa.DateTime(), nullable=True), schema='cognition')
22+
# ### end Alembic commands ###
23+
24+
25+
def downgrade():
26+
# ### commands auto generated by Alembic - please adjust! ###
27+
op.drop_column('file_reference', 'last_used', schema='cognition')
28+
# ### end Alembic commands ###
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""rename llm logs
2+
3+
Revision ID: f8c313f63a36
4+
Revises: c626887031f6
5+
Create Date: 2024-10-15 16:01:26.391244
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = 'f8c313f63a36'
14+
down_revision = 'c626887031f6'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table('file_transformation_llm_logs',
22+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
23+
sa.Column('file_transformation_id', postgresql.UUID(as_uuid=True), nullable=True),
24+
sa.Column('created_at', sa.DateTime(), nullable=True),
25+
sa.Column('finished_at', sa.DateTime(), nullable=True),
26+
sa.Column('model_used', sa.String(), nullable=True),
27+
sa.Column('input', sa.String(), nullable=True),
28+
sa.Column('output', sa.String(), nullable=True),
29+
sa.Column('error', sa.String(), nullable=True),
30+
sa.ForeignKeyConstraint(['file_transformation_id'], ['cognition.file_transformation.id'], ondelete='CASCADE'),
31+
sa.PrimaryKeyConstraint('id'),
32+
schema='cognition'
33+
)
34+
op.create_index(op.f('ix_cognition_file_transformation_llm_logs_file_transformation_id'), 'file_transformation_llm_logs', ['file_transformation_id'], unique=False, schema='cognition')
35+
op.drop_index('ix_cognition_markdown_llm_logs_markdown_file_id', table_name='markdown_llm_logs', schema='cognition')
36+
op.drop_table('markdown_llm_logs', schema='cognition')
37+
# ### end Alembic commands ###
38+
39+
40+
def downgrade():
41+
# ### commands auto generated by Alembic - please adjust! ###
42+
op.create_table('markdown_llm_logs',
43+
sa.Column('id', postgresql.UUID(), autoincrement=False, nullable=False),
44+
sa.Column('markdown_file_id', postgresql.UUID(), autoincrement=False, nullable=True),
45+
sa.Column('created_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
46+
sa.Column('finished_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
47+
sa.Column('model_used', sa.VARCHAR(), autoincrement=False, nullable=True),
48+
sa.Column('input', sa.VARCHAR(), autoincrement=False, nullable=True),
49+
sa.Column('output', sa.VARCHAR(), autoincrement=False, nullable=True),
50+
sa.Column('error', sa.VARCHAR(), autoincrement=False, nullable=True),
51+
sa.ForeignKeyConstraint(['markdown_file_id'], ['cognition.markdown_file.id'], name='markdown_llm_logs_markdown_file_id_fkey', ondelete='CASCADE'),
52+
sa.PrimaryKeyConstraint('id', name='markdown_llm_logs_pkey'),
53+
schema='cognition'
54+
)
55+
op.create_index('ix_cognition_markdown_llm_logs_markdown_file_id', 'markdown_llm_logs', ['markdown_file_id'], unique=False, schema='cognition')
56+
op.drop_index(op.f('ix_cognition_file_transformation_llm_logs_file_transformation_id'), table_name='file_transformation_llm_logs', schema='cognition')
57+
op.drop_table('file_transformation_llm_logs', schema='cognition')
58+
# ### end Alembic commands ###

controller/misc/config_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def refresh_config():
2727
)
2828
global __config
2929
__config = response.json()
30-
daemon.run_with_db_token(invalidate_after, 3600) # one hour as failsave
30+
daemon.run_without_db_token(invalidate_after, 3600) # one hour as failsave
3131

3232

3333
def get_config_value(

controller/monitor/manager.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from submodules.model.business_objects import monitor as task_monitor
33
from controller.auth import kratos
44
from submodules.model.util import sql_alchemy_to_dict
5-
from submodules.s3 import controller as s3
65

76

87
def monitor_all_tasks(page: int, limit: int) -> List[Any]:
@@ -100,19 +99,19 @@ def cancel_macro_execution_task(
10099
)
101100

102101

103-
def cancel_markdown_file_task(
102+
def cancel_parse_cognition_file_task(
103+
org_id: str,
104104
task_info: Dict[str, Any],
105105
) -> None:
106-
markdown_file_id = task_info.get("fileId")
107-
org_id = task_info.get("orgId")
108-
task_monitor.set_markdown_file_task_to_failed(
109-
markdown_file_id, org_id, with_commit=True
110-
)
111106

107+
file_reference_id = task_info.get("fileReferenceId")
108+
extraction_key = task_info.get("extractionKey")
109+
transformation_key = task_info.get("transformationKey")
112110

113-
def cancel_tmp_doc_retrieval_task(
114-
task_info: Dict[str, Any],
115-
) -> None:
116-
bucket = task_info.get("bucket")
117-
minio_path = task_info.get("minioPath")
118-
s3.delete_object(bucket, minio_path)
111+
task_monitor.set_parse_cognition_file_task_to_failed(
112+
org_id,
113+
file_reference_id,
114+
extraction_key,
115+
transformation_key,
116+
with_commit=True,
117+
)
Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,49 @@
11
from typing import List
2-
from submodules.model.cognition_objects import project as cognition_project
3-
from submodules.model.cognition_objects import conversation
4-
from submodules.model.enums import TaskType
2+
from submodules.model.cognition_objects import file_reference as file_reference_db_bo
3+
from submodules.model.enums import TaskType, FileCachingProcessingScope
54
from controller.task_master import manager as task_master_manager
5+
from submodules.model import enums
6+
from submodules.model.business_objects import general
67

78

89
def handle_cognition_file_upload(path_parts: List[str]):
910

10-
if path_parts[1] != "_cognition":
11+
if path_parts[1] != "_cognition" or len(path_parts) < 5:
1112
return
13+
if path_parts[2] == "files" and path_parts[4].startswith("file_original"):
14+
org_id = path_parts[0]
15+
file_hash, file_size = path_parts[3].split("_")
16+
file_reference = file_reference_db_bo.get(org_id, file_hash, int(file_size))
1217

13-
if path_parts[3] == "chat_tmp_files" and path_parts[5] == "queued":
14-
cognition_project_id = path_parts[2]
15-
conversation_id = path_parts[4]
16-
cognition_prj = cognition_project.get(cognition_project_id)
17-
if not cognition_prj:
18+
if (
19+
not file_reference
20+
or file_reference.state == enums.FileCachingState.RUNNING.value
21+
or file_reference.state == enums.FileCachingState.COMPLETED.value
22+
):
1823
return
24+
file_reference.state = enums.FileCachingState.COMPLETED.value
25+
general.commit()
1926

20-
conversation_item = conversation.get(cognition_project_id, conversation_id)
21-
if not conversation_item:
22-
return
27+
prio = (
28+
file_reference.meta_data.get("transformation_initiator")
29+
== enums.FileCachingInitiator.TMP_DOC_RETRIEVAL.value
30+
)
31+
extraction_method = file_reference.meta_data.get("extraction_method")
2332

2433
task_master_manager.queue_task(
25-
str(cognition_prj.organization_id),
26-
str(conversation_item.created_by),
27-
TaskType.PARSE_COGNITION_TMP_FILE,
34+
str(file_reference.organization_id),
35+
str(file_reference.created_by),
36+
TaskType.PARSE_COGNITION_FILE,
2837
{
29-
"cognition_project_id": str(cognition_project_id),
30-
"conversation_id": str(conversation_id),
31-
"minio_path": "/".join(path_parts[1:]),
32-
"bucket": path_parts[0],
38+
"parse_scope": FileCachingProcessingScope.EXTRACT_TRANSFORM.value,
39+
"file_reference_id": str(file_reference.id),
40+
"extraction_method": extraction_method,
41+
"meta_data": file_reference.meta_data,
42+
"extraction_key": file_reference.meta_data.get("extraction_key"),
43+
"transformation_key": file_reference.meta_data.get(
44+
"transformation_key"
45+
),
46+
"file_name": file_reference.original_file_name,
3347
},
34-
True, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue
48+
prio, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue
3549
)

fast_api/routes/misc.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ def cancel_task(
138138
controller_manager.cancel_weak_supervision(task_info)
139139
elif task_type == enums.TaskType.RUN_COGNITION_MACRO.value:
140140
controller_manager.cancel_macro_execution_task(task_info)
141-
elif task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value:
142-
controller_manager.cancel_markdown_file_task(task_info)
143-
elif task_type == enums.TaskType.PARSE_COGNITION_TMP_FILE.value:
144-
controller_manager.cancel_tmp_doc_retrieval_task(task_info)
141+
elif task_type == enums.TaskType.PARSE_COGNITION_FILE.value:
142+
controller_manager.cancel_parse_cognition_file_task(
143+
task_entity.organization_id, task_info
144+
)
145145
else:
146146
raise ValueError(f"{task_type} is no valid task type")
147147

@@ -314,31 +314,3 @@ def update_customer_buttons(
314314
update_request.visible,
315315
)
316316
)
317-
318-
319-
@router.get("/dummy/create/wrong/session")
320-
def dummy():
321-
322-
def something():
323-
from submodules.model.business_objects import general
324-
325-
# general.get_ctx_token()
326-
from submodules.model.business_objects import organization
327-
328-
print("organization", organization.get_all(), flush=True)
329-
import json
330-
331-
print(
332-
json.dumps(
333-
general.get_session_lookup(exclude_last_x_seconds=-1),
334-
indent=4,
335-
default=str,
336-
),
337-
flush=True,
338-
)
339-
340-
from submodules.model import daemon
341-
342-
daemon.run_with_db_token(something)
343-
344-
return SILENT_SUCCESS_RESPONSE

0 commit comments

Comments
 (0)