Skip to content

Commit be59c33

Browse files
authored
Unprotoype Makros (#216)
macros
1 parent cb6373f commit be59c33

File tree

8 files changed

+259
-7
lines changed

8 files changed

+259
-7
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""Macro tables
2+
3+
Revision ID: 194838aa0431
4+
Revises: a14f1a9b12b0
5+
Create Date: 2024-06-05 11:42:56.258816
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '194838aa0431'
14+
down_revision = 'a14f1a9b12b0'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table('macro',
22+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
23+
sa.Column('macro_type', sa.String(), nullable=True),
24+
sa.Column('scope', sa.String(), nullable=True),
25+
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
26+
sa.Column('project_id', postgresql.UUID(as_uuid=True), nullable=True),
27+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
28+
sa.Column('created_at', sa.DateTime(), nullable=True),
29+
sa.Column('state', sa.String(), nullable=True),
30+
sa.Column('name', sa.String(), nullable=True),
31+
sa.Column('description', sa.String(), nullable=True),
32+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
33+
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
34+
sa.ForeignKeyConstraint(['project_id'], ['cognition.project.id'], ondelete='CASCADE'),
35+
sa.PrimaryKeyConstraint('id'),
36+
schema='cognition'
37+
)
38+
op.create_index(op.f('ix_cognition_macro_created_by'), 'macro', ['created_by'], unique=False, schema='cognition')
39+
op.create_index(op.f('ix_cognition_macro_organization_id'), 'macro', ['organization_id'], unique=False, schema='cognition')
40+
op.create_index(op.f('ix_cognition_macro_project_id'), 'macro', ['project_id'], unique=False, schema='cognition')
41+
op.create_index(op.f('ix_cognition_macro_scope'), 'macro', ['scope'], unique=False, schema='cognition')
42+
op.create_table('macro_execution',
43+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
44+
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
45+
sa.Column('macro_id', postgresql.UUID(as_uuid=True), nullable=True),
46+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
47+
sa.Column('created_at', sa.DateTime(), nullable=True),
48+
sa.Column('state', sa.String(), nullable=True),
49+
sa.Column('execution_group_id', postgresql.UUID(as_uuid=True), nullable=True),
50+
sa.Column('meta_info', sa.JSON(), nullable=True),
51+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
52+
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
53+
sa.ForeignKeyConstraint(['macro_id'], ['cognition.macro.id'], ondelete='CASCADE'),
54+
sa.PrimaryKeyConstraint('id'),
55+
schema='cognition'
56+
)
57+
op.create_index(op.f('ix_cognition_macro_execution_created_by'), 'macro_execution', ['created_by'], unique=False, schema='cognition')
58+
op.create_index(op.f('ix_cognition_macro_execution_execution_group_id'), 'macro_execution', ['execution_group_id'], unique=False, schema='cognition')
59+
op.create_index(op.f('ix_cognition_macro_execution_macro_id'), 'macro_execution', ['macro_id'], unique=False, schema='cognition')
60+
op.create_table('macro_node',
61+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
62+
sa.Column('macro_id', postgresql.UUID(as_uuid=True), nullable=True),
63+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
64+
sa.Column('created_at', sa.DateTime(), nullable=True),
65+
sa.Column('is_root', sa.Boolean(), nullable=True),
66+
sa.Column('config', sa.JSON(), nullable=True),
67+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
68+
sa.ForeignKeyConstraint(['macro_id'], ['cognition.macro.id'], ondelete='CASCADE'),
69+
sa.PrimaryKeyConstraint('id'),
70+
schema='cognition'
71+
)
72+
op.create_index(op.f('ix_cognition_macro_node_created_by'), 'macro_node', ['created_by'], unique=False, schema='cognition')
73+
op.create_index(op.f('ix_cognition_macro_node_macro_id'), 'macro_node', ['macro_id'], unique=False, schema='cognition')
74+
op.create_table('macro_edge',
75+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
76+
sa.Column('macro_id', postgresql.UUID(as_uuid=True), nullable=True),
77+
sa.Column('from_node_id', postgresql.UUID(as_uuid=True), nullable=True),
78+
sa.Column('to_node_id', postgresql.UUID(as_uuid=True), nullable=True),
79+
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
80+
sa.Column('created_at', sa.DateTime(), nullable=True),
81+
sa.Column('config', sa.JSON(), nullable=True),
82+
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'),
83+
sa.ForeignKeyConstraint(['from_node_id'], ['cognition.macro_node.id'], ondelete='CASCADE'),
84+
sa.ForeignKeyConstraint(['macro_id'], ['cognition.macro.id'], ondelete='CASCADE'),
85+
sa.ForeignKeyConstraint(['to_node_id'], ['cognition.macro_node.id'], ondelete='CASCADE'),
86+
sa.PrimaryKeyConstraint('id'),
87+
schema='cognition'
88+
)
89+
op.create_index(op.f('ix_cognition_macro_edge_created_by'), 'macro_edge', ['created_by'], unique=False, schema='cognition')
90+
op.create_index(op.f('ix_cognition_macro_edge_from_node_id'), 'macro_edge', ['from_node_id'], unique=False, schema='cognition')
91+
op.create_index(op.f('ix_cognition_macro_edge_macro_id'), 'macro_edge', ['macro_id'], unique=False, schema='cognition')
92+
op.create_index(op.f('ix_cognition_macro_edge_to_node_id'), 'macro_edge', ['to_node_id'], unique=False, schema='cognition')
93+
op.create_table('macro_execution_link',
94+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
95+
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
96+
sa.Column('execution_id', postgresql.UUID(as_uuid=True), nullable=True),
97+
sa.Column('execution_node_id', postgresql.UUID(as_uuid=True), nullable=True),
98+
sa.Column('action', sa.String(), nullable=True),
99+
sa.Column('other_id_target', sa.String(), nullable=True),
100+
sa.Column('other_id', postgresql.UUID(as_uuid=True), nullable=True),
101+
sa.ForeignKeyConstraint(['execution_id'], ['cognition.macro_execution.id'], ondelete='CASCADE'),
102+
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
103+
sa.ForeignKeyConstraint(['execution_node_id'], ['cognition.macro_node.id'], ondelete='CASCADE'),
104+
sa.PrimaryKeyConstraint('id'),
105+
schema='cognition'
106+
)
107+
op.create_index(op.f('ix_cognition_macro_execution_link_execution_id'), 'macro_execution_link', ['execution_id'], unique=False, schema='cognition')
108+
op.create_index(op.f('ix_cognition_macro_execution_link_execution_node_id'), 'macro_execution_link', ['execution_node_id'], unique=False, schema='cognition')
109+
op.create_index(op.f('ix_cognition_macro_execution_link_other_id'), 'macro_execution_link', ['other_id'], unique=False, schema='cognition')
110+
op.add_column('project', sa.Column('macro_config', sa.JSON(), nullable=True), schema='cognition')
111+
# ### end Alembic commands ###
112+
113+
114+
def downgrade():
115+
# ### commands auto generated by Alembic - please adjust! ###
116+
op.drop_column('project', 'macro_config', schema='cognition')
117+
op.drop_index(op.f('ix_cognition_macro_execution_link_other_id'), table_name='macro_execution_link', schema='cognition')
118+
op.drop_index(op.f('ix_cognition_macro_execution_link_execution_node_id'), table_name='macro_execution_link', schema='cognition')
119+
op.drop_index(op.f('ix_cognition_macro_execution_link_execution_id'), table_name='macro_execution_link', schema='cognition')
120+
op.drop_table('macro_execution_link', schema='cognition')
121+
op.drop_index(op.f('ix_cognition_macro_edge_to_node_id'), table_name='macro_edge', schema='cognition')
122+
op.drop_index(op.f('ix_cognition_macro_edge_macro_id'), table_name='macro_edge', schema='cognition')
123+
op.drop_index(op.f('ix_cognition_macro_edge_from_node_id'), table_name='macro_edge', schema='cognition')
124+
op.drop_index(op.f('ix_cognition_macro_edge_created_by'), table_name='macro_edge', schema='cognition')
125+
op.drop_table('macro_edge', schema='cognition')
126+
op.drop_index(op.f('ix_cognition_macro_node_macro_id'), table_name='macro_node', schema='cognition')
127+
op.drop_index(op.f('ix_cognition_macro_node_created_by'), table_name='macro_node', schema='cognition')
128+
op.drop_table('macro_node', schema='cognition')
129+
op.drop_index(op.f('ix_cognition_macro_execution_macro_id'), table_name='macro_execution', schema='cognition')
130+
op.drop_index(op.f('ix_cognition_macro_execution_execution_group_id'), table_name='macro_execution', schema='cognition')
131+
op.drop_index(op.f('ix_cognition_macro_execution_created_by'), table_name='macro_execution', schema='cognition')
132+
op.drop_table('macro_execution', schema='cognition')
133+
op.drop_index(op.f('ix_cognition_macro_scope'), table_name='macro', schema='cognition')
134+
op.drop_index(op.f('ix_cognition_macro_project_id'), table_name='macro', schema='cognition')
135+
op.drop_index(op.f('ix_cognition_macro_organization_id'), table_name='macro', schema='cognition')
136+
op.drop_index(op.f('ix_cognition_macro_created_by'), table_name='macro', schema='cognition')
137+
op.drop_table('macro', schema='cognition')
138+
# ### end Alembic commands ###

api/transfer.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import traceback
33
import time
44
from typing import Optional, Dict
5-
5+
from time import sleep
66
from starlette.endpoints import HTTPEndpoint
77
from starlette.responses import PlainTextResponse, JSONResponse
88
from controller.embedding.manager import recreate_embeddings
@@ -22,7 +22,10 @@
2222
project as refinery_project,
2323
)
2424

25-
from submodules.model.cognition_objects import project as cognition_project
25+
from submodules.model.cognition_objects import (
26+
project as cognition_project,
27+
macro as macro_db_bo,
28+
)
2629

2730
from controller.transfer import manager as transfer_manager
2831
from controller.upload_task import manager as upload_task_manager
@@ -279,6 +282,45 @@ def __add_parse_markdown_file_thread(
279282
general.remove_and_refresh_session(ctx_token, False)
280283

281284

285+
class CognitionStartMacroExecutionGroup(HTTPEndpoint):
286+
def put(self, request) -> PlainTextResponse:
287+
macro_id = request.path_params["macro_id"]
288+
group_id = request.path_params["group_id"]
289+
290+
execution_entries = macro_db_bo.get_all_macro_executions(macro_id, group_id)
291+
292+
if len(execution_entries) == 0:
293+
return PlainTextResponse("No executions found", status_code=400)
294+
if not (cognition_prj_id := execution_entries[0].meta_info.get("project_id")):
295+
return PlainTextResponse("No project id found", status_code=400)
296+
cognition_prj = cognition_project.get(cognition_prj_id)
297+
refinery_prj_id = str(
298+
refinery_project.get_or_create_queue_project(
299+
cognition_prj.organization_id, cognition_prj.created_by, True
300+
).id
301+
)
302+
303+
def queue_tasks():
304+
token = general.get_ctx_token()
305+
for entry in execution_entries:
306+
task_queue_manager.add_task(
307+
refinery_prj_id,
308+
TaskType.RUN_COGNITION_MACRO,
309+
entry.created_by,
310+
{
311+
"macro_id": macro_id,
312+
"execution_id": str(entry.id),
313+
"execution_group_id": group_id,
314+
},
315+
)
316+
general.commit()
317+
general.remove_and_refresh_session(token, False)
318+
319+
daemon.run(queue_tasks)
320+
321+
return PlainTextResponse("OK")
322+
323+
282324
class AssociationsImport(HTTPEndpoint):
283325
async def post(self, request) -> JSONResponse:
284326
project_id = request.path_params["project_id"]

app.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
CognitionImport,
1616
CognitionPrepareProject,
1717
CognitionParseMarkdownFile,
18+
CognitionStartMacroExecutionGroup,
1819
)
1920
from fast_api.routes.organization import router as org_router
2021
from fast_api.routes.project import router as project_router
@@ -138,6 +139,10 @@
138139
),
139140
Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo),
140141
Route("/project", ProjectCreationFromWorkflow),
142+
Route(
143+
"/macro/{macro_id:str}/execution-group/{group_id:str}/queue",
144+
CognitionStartMacroExecutionGroup,
145+
),
141146
Route("/is_managed", IsManagedRest),
142147
Route("/is_demo", IsDemoRest),
143148
Mount("/api", app=fastapi_app, name="REST API"),

controller/organization/manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from util import notification
99
from controller.auth import kratos
1010
from submodules.model.util import sql_alchemy_to_dict
11+
from submodules.s3 import controller as s3
1112

1213
USER_INFO_WHITELIST = {"id", "role"}
1314
ORGANIZATION_WHITELIST = {
@@ -94,6 +95,7 @@ def create_organization(name: str) -> Organization:
9495
f"Organization with name {name} already exists"
9596
)
9697
organization_item = organization.create(name, with_commit=True)
98+
s3.create_bucket(str(organization_item.id))
9799
return organization_item
98100

99101

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Any, Dict, Tuple, Callable
2+
import os
3+
4+
import requests
5+
from submodules.model.business_objects import (
6+
task_queue as task_queue_db_bo,
7+
general,
8+
)
9+
10+
from submodules.model.cognition_objects import macro as macro_db_bo
11+
12+
BASE_URI = os.getenv("COGNITION_GATEWAY")
13+
14+
15+
def get_task_functions() -> Tuple[Callable, Callable, int]:
16+
return __start_task, __check_finished, 1
17+
18+
19+
def __start_task(task: Dict[str, Any]) -> bool:
20+
# check task still relevant
21+
task_db_obj = task_queue_db_bo.get(task["id"])
22+
if task_db_obj is None or task_db_obj.is_active:
23+
return False
24+
25+
task_db_obj.is_active = True
26+
general.commit()
27+
28+
action = task["task_info"]
29+
30+
macro_id = action["macro_id"]
31+
execution_id = action["execution_id"]
32+
group_id = action.get("execution_group_id")
33+
requests.put(
34+
f"{BASE_URI}/api/v1/converters/internal/macros/{macro_id}/execution/{execution_id}/start?group_execution_id={group_id}"
35+
)
36+
return True
37+
38+
39+
def __check_finished(task: Dict[str, Any]) -> bool:
40+
41+
action = task["task_info"]
42+
return macro_db_bo.macro_execution_finished(
43+
action["macro_id"], action["execution_id"], action["execution_group_id"]
44+
)

controller/task_queue/manager.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
attribute_calculation as attribute_calculation_handler,
1818
task_queue as task_queue_handler,
1919
markdown_file as markdown_file_handler,
20-
parse_cognition_tmp_file as cognition_tmp_file,
20+
parse_cognition_tmp_file as cognition_tmp_file_handler,
21+
macro as macro_handler,
2122
)
2223
from .util import if_task_queue_send_websocket
2324

@@ -92,14 +93,20 @@ def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]:
9293
if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value:
9394
return markdown_file_handler.get_task_functions()
9495
if task_type == enums.TaskType.PARSE_COGNITION_TMP_FILE.value:
95-
return cognition_tmp_file.get_task_functions()
96+
return cognition_tmp_file_handler.get_task_functions()
97+
if task_type == enums.TaskType.RUN_COGNITION_MACRO.value:
98+
return macro_handler.get_task_functions()
9699
raise ValueError(f"Task type {task_type} not supported yet")
97100

98101

99102
def add_task_to_task_queue(task: TaskQueueDBObj) -> int:
100103
start_func, check_func, check_every = get_task_function_by_type(task.task_type)
101104
queue = None
102-
if task.task_type == enums.TaskType.TASK_QUEUE.value:
105+
if (
106+
task.task_type == enums.TaskType.TASK_QUEUE.value
107+
or task.task_type == enums.TaskType.RUN_COGNITION_MACRO.value
108+
):
109+
# macros have tasks (e.g. etl parsing) inside so the execution shouldn't block own queue items
103110
queue = task_queue.get_task_queue_queue()
104111
else:
105112
queue = task_queue.get_task_queue()

controller/task_queue/task_queue.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
from submodules.model.models import TaskQueue as TaskQueueDBObj
66
from . import manager
77
from submodules.model.business_objects import general, task_queue as task_queue_db_bo
8+
from submodules.model.cognition_objects import macro as macro_db_bo
89
import traceback
9-
from submodules.model.enums import TaskType
10+
from submodules.model.enums import TaskType, MacroExecutionState
1011

1112

1213
# custom class wrapping a list in order to make it thread safe
@@ -207,8 +208,21 @@ def init_task_queues() -> CustomTaskQueue:
207208
)
208209
if task.task_type == TaskType.TASK_QUEUE.value:
209210
task_queue_queue.add_task(task, start_func, check_func, check_every)
211+
elif task.task_type == TaskType.RUN_COGNITION_MACRO.value:
212+
# macros that were running when the server was restarted are set to failed since we dont have pointers to the running process
213+
item = macro_db_bo.get_macro_execution(
214+
task.task_info["execution_id"],
215+
task.task_info["execution_group_id"],
216+
MacroExecutionState.RUNNING,
217+
)
218+
if item is not None:
219+
item.state = MacroExecutionState.FAILED.value
220+
task_queue_db_bo.remove_task_from_queue(task.project_id, task.id)
221+
continue
222+
task_queue_queue.add_task(task, start_func, check_func, check_every)
210223
else:
211224
task_queue.add_task(task, start_func, check_func, check_every)
225+
general.commit()
212226
return task_queue
213227

214228

0 commit comments

Comments
 (0)