Skip to content

Commit 30d358c

Browse files
committed
feat: upload flow
1 parent 1e6d28a commit 30d358c

File tree

4 files changed

+162
-3
lines changed

4 files changed

+162
-3
lines changed

app/agents/faq.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from ..db.config.database import get_async_session
1818
from ..graph.state import ConversationState
1919
from ..services.embedding_service import embedding_service
20+
from ..services import conversation_service
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -106,13 +107,25 @@ async def __call__(self, state: ConversationState) -> ConversationState:
106107
"query_processed": True
107108
}
108109

110+
# --- Persistencia en DB ---
111+
conv_id = state.get("conversation_id")
112+
try:
113+
conv_id = await conversation_service.get_or_create_conversation(
114+
session, conv_id
115+
)
116+
await conversation_service.save_message(session, conv_id, "user", user_message)
117+
await conversation_service.save_message(session, conv_id, "assistant", response)
118+
except Exception as db_err:
119+
logger.error(f"[FAQ] Error al persistir mensajes en DB: {db_err}", exc_info=True)
120+
109121
# Devolver delta de messages para que add_messages lo agregue
110122
return {
111123
"agent_context": state["agent_context"],
112124
"faq_results": state.get("faq_results", []),
113125
"next_action": state["next_action"],
114126
"should_continue": state["should_continue"],
115-
"messages": [AIMessage(content=response)]
127+
"messages": [AIMessage(content=response)],
128+
"conversation_id": conv_id,
116129
}
117130

118131
except Exception as e:

app/agents/wizard_node.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from .base import AgentNode
1616
from ..graph.state import ConversationState
1717
from .wizard_workflow.wizard_graph import wizard_graph
18+
from ..db.config.database import get_async_session
19+
from ..services import conversation_service
1820

1921
logger = logging.getLogger(__name__)
2022

@@ -77,13 +79,51 @@ async def __call__(self, state: ConversationState) -> ConversationState:
7779
logger.debug(f"[WIZARD_NODE] wizard_status={result.get('wizard_status')}")
7880
logger.debug(f"[WIZARD_NODE] response (first 200 chars): {response_content[:200]!r}")
7981

82+
# --- Persistencia en DB ---
83+
conv_id = state.get("conversation_id")
84+
wizard_responses = result.get("wizard_responses", {})
85+
email = wizard_responses.get("email")
86+
87+
try:
88+
async for db_session in get_async_session():
89+
conv_id = await conversation_service.get_or_create_conversation(
90+
db_session, conv_id, email=email
91+
)
92+
93+
# Guardar mensaje del usuario
94+
user_msgs = [m for m in state.get("messages", []) if m.type == "human"]
95+
if user_msgs:
96+
await conversation_service.save_message(
97+
db_session, conv_id, "user", user_msgs[-1].content
98+
)
99+
100+
# Guardar respuesta del asistente
101+
if response_content:
102+
await conversation_service.save_message(
103+
db_session, conv_id, "assistant", response_content
104+
)
105+
106+
# Actualizar WizardSession con los datos del paso actual
107+
ws = await conversation_service.get_or_create_wizard_session(db_session, conv_id)
108+
status = "COMPLETED" if result.get("completed") else "ACTIVE"
109+
await conversation_service.update_wizard_session(
110+
db_session,
111+
ws,
112+
result.get("current_question", 1),
113+
wizard_responses,
114+
status,
115+
)
116+
except Exception as e:
117+
logger.error(f"[WIZARD_NODE] Error al persistir en DB: {e}", exc_info=True)
118+
80119
return {
81120
**state,
82121
"wizard_state": result,
83122
"messages": result.get("messages", []),
84123
"agent_context": {
85124
"response": response_content
86125
},
126+
"conversation_id": conv_id,
87127
}
88128

89129

app/agents/wizard_workflow/nodes.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@ def ask_question_node(state: WizardState):
2424
def store_answer_node(state: WizardState):
2525
user_message = [m.content for m in state["messages"] if m.type == "human"][-1]
2626

27-
new_index = state["current_question"] + 1
27+
current_q = state["current_question"]
28+
new_index = current_q + 1
2829
is_completed = new_index > max(WIZARD_QUESTIONS.keys())
2930

31+
q_config = WIZARD_QUESTIONS.get(current_q, {})
32+
field_name = q_config.get("field_name", str(current_q))
33+
34+
wizard_responses = dict(state.get("wizard_responses", {}))
35+
wizard_responses[field_name] = user_message
36+
3037
logger.debug("-" * 60)
31-
logger.debug(f"[WIZARD/store_answer] Storing answer for question #{state['current_question']}")
38+
logger.debug(f"[WIZARD/store_answer] Storing answer for question #{current_q} (field={field_name!r})")
3239
logger.debug(f"[WIZARD/store_answer] User answer: {user_message[:200]!r}")
3340
logger.debug(f"[WIZARD/store_answer] Next question index: {new_index}")
3441
logger.debug(f"[WIZARD/store_answer] Is completed: {is_completed}")
@@ -37,6 +44,7 @@ def store_answer_node(state: WizardState):
3744
return {
3845
**state,
3946
"answers": state.get("answers", []) + [user_message],
47+
"wizard_responses": wizard_responses,
4048
"current_question": new_index,
4149
"completed": is_completed,
4250
"awaiting_answer": False,
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
Servicio para persistir datos de conversaciones en la base de datos.
3+
4+
Centraliza las operaciones de DB relacionadas a conversaciones, mensajes
5+
y sesiones del wizard. Todos los métodos reciben una sesión activa de
6+
SQLAlchemy para poder participar de la misma transacción.
7+
"""
8+
9+
import logging
10+
11+
from sqlalchemy import select
12+
from sqlalchemy.ext.asyncio import AsyncSession
13+
14+
from ..db.models import Conversation, Message, WizardSession
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
async def get_or_create_conversation(
20+
session: AsyncSession,
21+
conv_id: int | None,
22+
email: str | None = None,
23+
) -> int:
24+
"""Devuelve el conv_id existente o crea una nueva Conversation.
25+
26+
Si se pasa email y la conversación no lo tiene guardado, lo actualiza.
27+
"""
28+
if conv_id is not None:
29+
if email:
30+
conv = await session.get(Conversation, conv_id)
31+
if conv and not conv.email:
32+
conv.email = email
33+
await session.commit()
34+
return conv_id
35+
36+
conv = Conversation(email=email)
37+
session.add(conv)
38+
await session.flush()
39+
await session.commit()
40+
logger.debug(f"[CONV_SERVICE] Nueva conversación creada id={conv.id}")
41+
return conv.id
42+
43+
44+
async def save_message(
45+
session: AsyncSession,
46+
conv_id: int,
47+
role: str,
48+
content: str,
49+
) -> None:
50+
"""Guarda un mensaje en la tabla messages."""
51+
msg = Message(conv_id=conv_id, role=role, content=content)
52+
session.add(msg)
53+
await session.commit()
54+
logger.debug(f"[CONV_SERVICE] Mensaje guardado role={role!r} conv_id={conv_id}")
55+
56+
57+
async def get_or_create_wizard_session(
58+
session: AsyncSession,
59+
conv_id: int,
60+
) -> WizardSession:
61+
"""Obtiene la WizardSession activa para la conversación o crea una nueva."""
62+
result = await session.execute(
63+
select(WizardSession)
64+
.where(WizardSession.conv_id == conv_id)
65+
.where(WizardSession.state == "ACTIVE")
66+
.order_by(WizardSession.created_at.desc())
67+
.limit(1)
68+
)
69+
ws = result.scalar_one_or_none()
70+
if ws is None:
71+
ws = WizardSession(
72+
conv_id=conv_id,
73+
current_question=1,
74+
responses={},
75+
state="ACTIVE",
76+
)
77+
session.add(ws)
78+
await session.flush()
79+
logger.debug(f"[CONV_SERVICE] Nueva WizardSession creada conv_id={conv_id}")
80+
return ws
81+
82+
83+
async def update_wizard_session(
84+
session: AsyncSession,
85+
wizard_session: WizardSession,
86+
current_question: int,
87+
responses: dict,
88+
status: str,
89+
) -> None:
90+
"""Actualiza current_question, responses y state de la WizardSession."""
91+
wizard_session.current_question = current_question
92+
wizard_session.responses = responses
93+
wizard_session.state = status
94+
await session.commit()
95+
logger.debug(
96+
f"[CONV_SERVICE] WizardSession actualizada id={wizard_session.id} "
97+
f"question={current_question} status={status!r}"
98+
)

0 commit comments

Comments
 (0)