@@ -58,389 +58,3 @@ def get_a2a_app() -> Starlette:
5858 http_handler = handler
5959 )
6060 return a2a_app .build ()
61-
62-
63- # def _extract_text_from_message(message_data: dict) -> str:
64- # """Extract text content from an A2A message."""
65- # text_parts = []
66- # for part in message_data.get("parts", []):
67- # if isinstance(part, dict) and "text" in part:
68- # text_parts.append(part["text"])
69- # return " ".join(text_parts)
70-
71-
72- # def _extract_response_from_chunk(chunk: str) -> str | None:
73- # """Extract the response content from a graph chunk."""
74- # try:
75- # chunk_data = json.loads(chunk)
76- # for node_name, node_output in chunk_data.items():
77- # if isinstance(node_output, dict) and "messages" in node_output:
78- # messages = node_output["messages"]
79- # for msg in messages:
80- # if isinstance(msg, dict) and "content" in msg:
81- # return str(msg["content"])
82- # elif hasattr(msg, "content"):
83- # return str(getattr(msg, "content"))
84- # except (json.JSONDecodeError, KeyError, TypeError):
85- # pass
86- # return None
87-
88-
89- # @router.get("/.well-known/agent.json", response_model=None)
90- # async def get_agent_card_endpoint(request: Request) -> JSONResponse:
91- # """
92- # Return the A2A Agent Card.
93-
94- # The Agent Card describes the capabilities and skills of Kyma Companion
95- # to other agents in the A2A ecosystem.
96- # """
97- # base_url = str(request.base_url).rstrip("/")
98- # card = get_agent_card(f"{base_url}{A2A_ROUTER_PREFIX}")
99- # return JSONResponse(content=card.model_dump(mode="json", exclude_none=True))
100-
101-
102- # @router.post("")
103- # @router.post("/")
104- # async def handle_a2a_request(
105- # request: Request,
106- # graph: Annotated[CompanionGraph, Depends(get_companion_graph)],
107- # ):
108- # """
109- # Main A2A JSON-RPC endpoint.
110-
111- # Handles all A2A protocol methods including:
112- # - message/send: Send a message and get a response
113- # - message/stream: Send a message and stream the response
114- # - tasks/get: Get task status
115- # - tasks/cancel: Cancel a task
116- # """
117- # body = await request.json()
118- # method = body.get("method", "")
119- # params = body.get("params", {})
120- # request_id = body.get("id")
121-
122- # logger.info(f"A2A request received: method={method}, id={request_id}")
123-
124- # # Route based on method
125- # if method in ("message/send", "tasks/send"):
126- # return await _handle_message_send(request, graph, params, request_id)
127- # elif method in ("message/stream", "tasks/sendSubscribe"):
128- # return await _handle_message_stream(request, graph, params, request_id)
129- # elif method == "tasks/get":
130- # task_id = params.get("id", "")
131- # return JSONResponse(content={
132- # "jsonrpc": "2.0",
133- # "id": request_id,
134- # "result": {
135- # "id": task_id,
136- # "status": {"state": "unknown"},
137- # }
138- # })
139- # elif method == "tasks/cancel":
140- # task_id = params.get("id", "")
141- # return JSONResponse(content={
142- # "jsonrpc": "2.0",
143- # "id": request_id,
144- # "result": {
145- # "id": task_id,
146- # "status": {"state": "canceled"},
147- # }
148- # })
149- # else:
150- # return JSONResponse(
151- # content={
152- # "jsonrpc": "2.0",
153- # "id": request_id,
154- # "error": {
155- # "code": -32601,
156- # "message": f"Method not found: {method}",
157- # }
158- # },
159- # status_code=400,
160- # )
161-
162-
163- # async def _handle_message_send(
164- # request: Request,
165- # graph: CompanionGraph,
166- # params: dict,
167- # request_id: str | None,
168- # ) -> JSONResponse:
169- # """Handle message/send method - synchronous response."""
170- # try:
171- # message_data = params.get("message", {})
172- # task_id = params.get("id") or str(uuid4())
173-
174- # # Extract text from A2A message
175- # query_text = _extract_text_from_message(message_data)
176- # if not query_text:
177- # return JSONResponse(
178- # content={
179- # "jsonrpc": "2.0",
180- # "id": request_id,
181- # "error": {
182- # "code": -32602,
183- # "message": "No message text provided",
184- # }
185- # },
186- # status_code=400,
187- # )
188-
189- # # Check for K8s credentials in headers
190- # headers = dict(request.headers)
191- # cluster_url = headers.get("x-cluster-url")
192- # cluster_ca = headers.get("x-cluster-certificate-authority-data")
193- # k8s_auth = headers.get("x-k8s-authorization")
194-
195- # if not (cluster_url and cluster_ca and k8s_auth):
196- # return JSONResponse(
197- # content={
198- # "jsonrpc": "2.0",
199- # "id": request_id,
200- # "error": {
201- # "code": -32602,
202- # "message": "Kubernetes cluster credentials required. "
203- # "Provide x-cluster-url, x-cluster-certificate-authority-data, "
204- # "and x-k8s-authorization headers.",
205- # }
206- # },
207- # status_code=400,
208- # )
209-
210- # # Create K8s client
211- # from services.k8s import K8sAuthHeaders, K8sClient
212- # k8s_headers = K8sAuthHeaders(
213- # x_cluster_url=cluster_url,
214- # x_cluster_certificate_authority_data=cluster_ca,
215- # x_k8s_authorization=k8s_auth,
216- # )
217- # k8s_client = K8sClient(k8s_headers)
218-
219- # # Create Companion message
220- # companion_message = CompanionMessage(
221- # query=query_text,
222- # resource_kind=None,
223- # resource_api_version=None,
224- # resource_name=None,
225- # namespace=None,
226- # )
227-
228- # # Process through the graph
229- # response_parts: list[str] = []
230- # async for chunk in graph.astream(
231- # conversation_id=task_id,
232- # message=companion_message,
233- # k8s_client=k8s_client,
234- # ):
235- # content = _extract_response_from_chunk(chunk)
236- # if content:
237- # response_parts.append(content)
238-
239- # # Get the final response
240- # final_response = response_parts[-1] if response_parts else "Unable to process request."
241-
242- # return JSONResponse(content={
243- # "jsonrpc": "2.0",
244- # "id": request_id,
245- # "result": {
246- # "id": task_id,
247- # "status": {"state": "completed"},
248- # "artifacts": [{
249- # "parts": [{"type": "text", "text": final_response}],
250- # }],
251- # }
252- # })
253-
254- # except Exception as e:
255- # logger.exception("Error processing A2A message/send")
256- # return JSONResponse(
257- # content={
258- # "jsonrpc": "2.0",
259- # "id": request_id,
260- # "error": {
261- # "code": -32000,
262- # "message": str(e),
263- # }
264- # },
265- # status_code=500,
266- # )
267-
268-
269- # async def _handle_message_stream(
270- # request: Request,
271- # graph: CompanionGraph,
272- # params: dict,
273- # request_id: str | None,
274- # ) -> StreamingResponse:
275- # """Handle message/stream method with SSE."""
276-
277- # async def event_generator():
278- # try:
279- # message_data = params.get("message", {})
280- # task_id = params.get("id") or str(uuid4())
281-
282- # # Extract text from A2A message
283- # query_text = _extract_text_from_message(message_data)
284- # if not query_text:
285- # error_event = {
286- # "jsonrpc": "2.0",
287- # "id": request_id,
288- # "error": {
289- # "code": -32602,
290- # "message": "No message text provided",
291- # }
292- # }
293- # yield f"data: {json.dumps(error_event)}\n\n"
294- # return
295-
296- # # Check for K8s credentials
297- # headers = dict(request.headers)
298- # cluster_url = headers.get("x-cluster-url")
299- # cluster_ca = headers.get("x-cluster-certificate-authority-data")
300- # k8s_auth = headers.get("x-k8s-authorization")
301-
302- # if not (cluster_url and cluster_ca and k8s_auth):
303- # error_event = {
304- # "jsonrpc": "2.0",
305- # "id": request_id,
306- # "error": {
307- # "code": -32602,
308- # "message": "Kubernetes cluster credentials required.",
309- # }
310- # }
311- # yield f"data: {json.dumps(error_event)}\n\n"
312- # return
313-
314- # # Create K8s client
315- # from services.k8s import K8sAuthHeaders, K8sClient
316- # k8s_headers = K8sAuthHeaders(
317- # x_cluster_url=cluster_url,
318- # x_cluster_certificate_authority_data=cluster_ca,
319- # x_k8s_authorization=k8s_auth,
320- # )
321- # k8s_client = K8sClient(k8s_headers)
322-
323- # # Create Companion message
324- # companion_message = CompanionMessage(
325- # query=query_text,
326- # resource_kind=None,
327- # resource_api_version=None,
328- # resource_name=None,
329- # namespace=None,
330- # )
331-
332- # # Send working status
333- # working_event = {
334- # "jsonrpc": "2.0",
335- # "id": request_id,
336- # "result": {
337- # "id": task_id,
338- # "status": {"state": "working"},
339- # }
340- # }
341- # yield f"data: {json.dumps(working_event)}\n\n"
342-
343- # # Stream responses
344- # final_response = ""
345- # async for chunk in graph.astream(
346- # conversation_id=task_id,
347- # message=companion_message,
348- # k8s_client=k8s_client,
349- # ):
350- # content = _extract_response_from_chunk(chunk)
351- # if content:
352- # final_response = content
353- # # Send incremental update
354- # update_event = {
355- # "jsonrpc": "2.0",
356- # "id": request_id,
357- # "result": {
358- # "id": task_id,
359- # "status": {"state": "working"},
360- # "artifacts": [{
361- # "parts": [{"type": "text", "text": content}],
362- # }],
363- # }
364- # }
365- # yield f"data: {json.dumps(update_event)}\n\n"
366-
367- # # Send completion
368- # complete_event = {
369- # "jsonrpc": "2.0",
370- # "id": request_id,
371- # "result": {
372- # "id": task_id,
373- # "status": {"state": "completed"},
374- # "artifacts": [{
375- # "parts": [{"type": "text", "text": final_response or "Request processed."}],
376- # }],
377- # }
378- # }
379- # yield f"data: {json.dumps(complete_event)}\n\n"
380-
381- # except Exception as e:
382- # logger.exception("Error in A2A SSE stream")
383- # error_event = {
384- # "jsonrpc": "2.0",
385- # "id": request_id,
386- # "error": {
387- # "code": -32000,
388- # "message": str(e),
389- # }
390- # }
391- # yield f"data: {json.dumps(error_event)}\n\n"
392-
393- # return StreamingResponse(
394- # event_generator(),
395- # media_type="text/event-stream",
396- # headers={
397- # "Cache-Control": "no-cache",
398- # "Connection": "keep-alive",
399- # "X-Accel-Buffering": "no",
400- # },
401- # )
402-
403-
404- # @router.post("/tasks/send")
405- # async def send_task(
406- # request: Request,
407- # graph: Annotated[CompanionGraph, Depends(get_companion_graph)],
408- # ) -> JSONResponse:
409- # """Handle synchronous A2A task requests (legacy endpoint)."""
410- # body = await request.json()
411- # return await _handle_message_send(request, graph, body, body.get("id"))
412-
413-
414- # @router.post("/tasks/sendSubscribe")
415- # async def send_subscribe(
416- # request: Request,
417- # graph: Annotated[CompanionGraph, Depends(get_companion_graph)],
418- # ):
419- # """Handle streaming A2A task requests (legacy endpoint)."""
420- # body = await request.json()
421- # return await _handle_message_stream(request, graph, body, body.get("id"))
422-
423-
424- # @router.get("/tasks/{task_id}")
425- # async def get_task_status(task_id: str) -> JSONResponse:
426- # """Get the status of a task (stateless - not persisted)."""
427- # return JSONResponse(
428- # content={
429- # "id": task_id,
430- # "status": {"state": "unknown"},
431- # "message": "Task status not available. Tasks are processed synchronously.",
432- # },
433- # status_code=404,
434- # )
435-
436-
437- # @router.post("/tasks/{task_id}/cancel")
438- # async def cancel_task(task_id: str) -> JSONResponse:
439- # """Cancel an ongoing task."""
440- # return JSONResponse(
441- # content={
442- # "id": task_id,
443- # "status": {"state": "canceled"},
444- # "message": "Cancellation requested.",
445- # }
446- # )
0 commit comments