Skip to content

Commit 68e3d9d

Browse files
ETL Postrelease Improvements (#370)
* fix default etl config id * chore: update submodules * perf(alembic): stale etl * perf(alembic): add new monitoring attrs * chore: update submodules * perf(alembic): new etl attrs * chore: update submodules * fix: default etl config id * chore: update submodules * perf(alembic): set default on etl deletion * chore: update submodules * perf: add file_reference_id to markdown_file.meta_data * chore: update submodules --------- Co-authored-by: andhreljaKern <andrea.hrelja@kern.ai>
1 parent d35079c commit 68e3d9d

File tree

4 files changed

+119
-1
lines changed

4 files changed

+119
-1
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""adds etl new attributes
2+
3+
Revision ID: 15f133dd208b
4+
Revises: 04cd434ed6eb
5+
Create Date: 2025-12-11 22:34:07.966633
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "15f133dd208b"
15+
down_revision = "04cd434ed6eb"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
op.add_column(
23+
"etl_task",
24+
sa.Column("full_config_hash", sa.String(), nullable=True),
25+
schema="global",
26+
)
27+
op.add_column(
28+
"etl_task", sa.Column("is_stale", sa.Boolean(), nullable=True), schema="global"
29+
)
30+
op.add_column(
31+
"etl_task", sa.Column("llm_ops", sa.JSON(), nullable=True), schema="global"
32+
)
33+
op.add_column(
34+
"etl_task",
35+
sa.Column("updated_at", sa.DateTime(), nullable=True),
36+
schema="global",
37+
)
38+
op.create_index(
39+
op.f("ix_global_etl_task_full_config_hash"),
40+
"etl_task",
41+
["full_config_hash"],
42+
unique=False,
43+
schema="global",
44+
)
45+
# ### end Alembic commands ###
46+
47+
48+
def downgrade():
49+
# ### commands auto generated by Alembic - please adjust! ###
50+
op.drop_index(
51+
op.f("ix_global_etl_task_full_config_hash"),
52+
table_name="etl_task",
53+
schema="global",
54+
)
55+
op.drop_column("etl_task", "updated_at", schema="global")
56+
op.drop_column("etl_task", "llm_ops", schema="global")
57+
op.drop_column("etl_task", "is_stale", schema="global")
58+
op.drop_column("etl_task", "full_config_hash", schema="global")
59+
# ### end Alembic commands ###
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""sets md file etl task id null on deletion
2+
3+
Revision ID: 5186966ea4db
4+
Revises: 15f133dd208b
5+
Create Date: 2025-12-15 07:59:30.702200
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "5186966ea4db"
15+
down_revision = "15f133dd208b"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
op.drop_constraint(
23+
"markdown_file_etl_task_id_fkey",
24+
"markdown_file",
25+
schema="cognition",
26+
type_="foreignkey",
27+
)
28+
op.create_foreign_key(
29+
None,
30+
"markdown_file",
31+
"etl_task",
32+
["etl_task_id"],
33+
["id"],
34+
source_schema="cognition",
35+
referent_schema="global",
36+
ondelete="SET NULL",
37+
)
38+
# ### end Alembic commands ###
39+
40+
41+
def downgrade():
42+
# ### commands auto generated by Alembic - please adjust! ###
43+
op.drop_constraint(None, "markdown_file", schema="cognition", type_="foreignkey")
44+
op.create_foreign_key(
45+
"markdown_file_etl_task_id_fkey",
46+
"markdown_file",
47+
"etl_task",
48+
["etl_task_id"],
49+
["id"],
50+
source_schema="cognition",
51+
referent_schema="global",
52+
ondelete="CASCADE",
53+
)
54+
# ### end Alembic commands ###

controller/transfer/cognition/minio_upload.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import List
22

3+
import datetime
4+
35
from controller.task_master import manager as task_master_manager
46
from submodules.model import enums, etl_utils
57
from submodules.model.business_objects import general
@@ -120,6 +122,9 @@ def handle_cognition_file_upload(path_parts: List[str]):
120122
org_id=org_id,
121123
markdown_file_id=markdown_file.id,
122124
etl_task_id=etl_task.id,
125+
meta_data={"file_reference_id": str(file_reference.id)},
126+
started_at=datetime.datetime.now(datetime.UTC),
127+
overwrite_meta_data=False,
123128
)
124129

125130
task_master_manager.queue_task(

0 commit comments

Comments
 (0)