-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwizard_node.py
More file actions
155 lines (125 loc) · 6.2 KB
/
wizard_node.py
File metadata and controls
155 lines (125 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
WizardAgent -- routable node that guides users through the postulación flow.
Wraps the wizard sub-graph (wizard_workflow/wizard_graph) behind the
standard AgentNode interface so the supervisor can route to it by
description.
"""
import logging
import uuid
from pathlib import Path
import yaml
from .base import AgentNode
from ..graph.state import ConversationState
from .wizard_workflow.wizard_graph import wizard_graph
from ..db.config.database import SessionLocal
from ..services import conversation_service, postulation_service
logger = logging.getLogger(__name__)
_config = yaml.safe_load((Path(__file__).parent / "config" / "wizard.yaml").read_text())
class WizardAgent(AgentNode):
"""Guía al usuario a través del proceso de postulación de Ithaka."""
name: str = _config["name"]
description: str = _config["description"]
async def __call__(self, state: ConversationState) -> ConversationState:
"""Ejecuta el sub-grafo del wizard y devuelve el estado actualizado."""
logger.debug("=" * 60)
logger.debug("[WIZARD_NODE] __call__ invoked")
wizard_state = state.get("wizard_state")
if not wizard_state:
logger.debug("[WIZARD_NODE] No existing wizard_state, creating new one")
wizard_state = {
"wizard_session_id": str(uuid.uuid4()),
"current_question": 1,
"answers": [],
"wizard_responses": {},
"wizard_status": "ACTIVE",
"awaiting_answer": False,
"messages": state.get("messages", []),
"completed": False,
"valid": False,
}
else:
wizard_state = dict(wizard_state)
wizard_state["messages"] = state.get("messages", [])
logger.debug(f"[WIZARD_NODE] Wizard state before sub-graph invoke:")
logger.debug(f"[WIZARD_NODE] session_id={wizard_state.get('wizard_session_id')}")
logger.debug(f"[WIZARD_NODE] current_question={wizard_state.get('current_question')}")
logger.debug(f"[WIZARD_NODE] wizard_status={wizard_state.get('wizard_status')}")
logger.debug(f"[WIZARD_NODE] awaiting_answer={wizard_state.get('awaiting_answer')}")
logger.debug(f"[WIZARD_NODE] completed={wizard_state.get('completed')}")
logger.debug(f"[WIZARD_NODE] answers count={len(wizard_state.get('answers', []))}")
logger.debug(f"[WIZARD_NODE] messages count={len(wizard_state.get('messages', []))}")
for i, m in enumerate(wizard_state.get("messages", [])):
logger.debug(f"[WIZARD_NODE] msg[{i}] type={m.type} content={m.content[:100]!r}...")
result = await wizard_graph.ainvoke(wizard_state)
response_content = (
result.get("messages", [])[-1].content
if result.get("messages")
else ""
)
logger.debug(f"[WIZARD_NODE] Sub-graph result:")
logger.debug(f"[WIZARD_NODE] current_question={result.get('current_question')}")
logger.debug(f"[WIZARD_NODE] completed={result.get('completed')}")
logger.debug(f"[WIZARD_NODE] wizard_status={result.get('wizard_status')}")
logger.debug(f"[WIZARD_NODE] response (first 200 chars): {response_content[:200]!r}")
# --- Envío a API externa ---
if result.get("completed"):
try:
submission = await postulation_service.submit_postulation(
result.get("wizard_responses", {})
)
logger.info(f"[WIZARD_NODE] Postulation submitted to external API: {submission}")
except Exception as e:
logger.error(f"[WIZARD_NODE] Error submitting postulation to external API: {e}", exc_info=True)
# --- Persistencia en DB ---
conv_id = state.get("conversation_id")
wizard_responses = result.get("wizard_responses", {})
email = wizard_responses.get("email")
try:
async with SessionLocal() as db_session:
try:
conv_id = await conversation_service.get_or_create_conversation(
db_session, conv_id, email=email
)
# Guardar mensaje del usuario
user_msgs = [m for m in state.get("messages", []) if m.type == "human"]
if user_msgs:
await conversation_service.save_message(
db_session, conv_id, "user", user_msgs[-1].content
)
# Guardar respuesta del asistente
if response_content:
await conversation_service.save_message(
db_session, conv_id, "assistant", response_content
)
# Actualizar WizardSession con los datos del paso actual
ws = await conversation_service.get_or_create_wizard_session(db_session, conv_id)
wizard_state_str = "COMPLETED" if result.get("completed") else "ACTIVE"
await conversation_service.update_wizard_session(
db_session,
ws,
result.get("current_question", 1),
wizard_responses,
wizard_state_str,
)
await db_session.commit()
except Exception:
await db_session.rollback()
raise
except Exception as e:
logger.error(f"[WIZARD_NODE] Error al persistir en DB: {e}", exc_info=True)
return {
**state,
"wizard_state": result,
"messages": result.get("messages", []),
"agent_context": {
"response": response_content
},
"conversation_id": conv_id,
}
# ------------------------------------------------------------------
# Module-level instance & wrapper for LangGraph
# ------------------------------------------------------------------
wizard_agent = WizardAgent()
async def handle_wizard_flow(state: ConversationState) -> ConversationState:
"""Función wrapper para LangGraph."""
return await wizard_agent(state)