Skip to content

Commit fb2f636

Browse files
authored
Add tool to gather details (#54)
* Add tool to gather details * Bump changelog
1 parent a74e7e2 commit fb2f636

File tree

4 files changed

+156
-2
lines changed

4 files changed

+156
-2
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
## 0.1.5
2+
3+
### Enhancements
4+
- Add a new tool `list_workflows_with_finished_jobs` that lists all workflows that have any completed job, together
5+
with information about source and destination details
6+
7+
### Fixes
8+
19
## 0.1.4
210

311
### Enhancements

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uns_mcp"
3-
version = "0.1.4" # Set only non-dev versions to release
3+
version = "0.1.5" # Set only non-dev versions to release
44
description = "MCP server implementation providing structured tools for interacting with the Unstructured API, managing sources, destinations, workflows, and jobs"
55
requires-python = ">=3.12"
66
readme = "README.md"

uns_mcp/server.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1+
import asyncio
12
import json
23
import os
34
import sys
45
from contextlib import asynccontextmanager
6+
from copy import deepcopy
57
from dataclasses import dataclass
8+
from itertools import groupby
69
from typing import AsyncIterator, Optional
710

811
import uvicorn
912
from dotenv import load_dotenv
1013
from mcp.server import Server
1114
from mcp.server.fastmcp import Context, FastMCP
1215
from mcp.server.sse import SseServerTransport
16+
from pydantic import BaseModel
1317
from starlette.applications import Starlette
1418
from starlette.requests import Request
1519
from starlette.routing import Mount, Route
@@ -31,8 +35,11 @@
3135
)
3236
from unstructured_client.models.shared import (
3337
CreateWorkflow,
38+
DestinationConnectorInformation,
3439
DestinationConnectorType,
40+
JobInformation,
3541
JobStatus,
42+
SourceConnectorInformation,
3643
SourceConnectorType,
3744
UpdateWorkflow,
3845
WorkflowInformation,
@@ -551,6 +558,93 @@ async def cancel_job(ctx: Context, job_id: str) -> str:
551558
return f"Error canceling job: {str(e)}"
552559

553560

561+
@mcp.tool()
562+
async def list_workflows_with_finished_jobs(
563+
ctx: Context,
564+
source_type: Optional[SourceConnectorType | str] = None,
565+
destination_type: Optional[DestinationConnectorType | str] = None,
566+
) -> str:
567+
"""
568+
List workflows with finished jobs via the Unstructured API.
569+
Args:
570+
source_type: Optional source connector type to filter by
571+
destination_type: Optional destination connector type to filter by
572+
Returns:
573+
String containing the list of workflows with finished jobs and source and destination
574+
details
575+
"""
576+
if source_type:
577+
try:
578+
source_type = (
579+
SourceConnectorType(source_type) if isinstance(source_type, str) else source_type
580+
)
581+
except KeyError:
582+
return f"Invalid source type: {source_type}"
583+
if destination_type:
584+
try:
585+
destination_type = (
586+
DestinationConnectorType(destination_type)
587+
if isinstance(destination_type, str)
588+
else destination_type
589+
)
590+
except KeyError:
591+
return f"Invalid destination type: {destination_type}"
592+
593+
client = ctx.request_context.lifespan_context.client
594+
try:
595+
workflows_details = await gather_workflows_details(client=client)
596+
except Exception as e:
597+
return f"Error retrieving workflows: {str(e)}"
598+
599+
filtered_workflows_details = []
600+
601+
for workflow_details in workflows_details:
602+
updated_workflow_details = deepcopy(workflow_details)
603+
604+
if source_type:
605+
updated_workflow_details.sources = [
606+
source for source in workflow_details.sources if source.type == source_type
607+
]
608+
609+
if destination_type:
610+
updated_workflow_details.destinations = [
611+
destination
612+
for destination in workflow_details.destinations
613+
if destination.type == destination_type
614+
]
615+
616+
updated_workflow_details.jobs = [
617+
job for job in workflow_details.jobs if job.status == JobStatus.COMPLETED
618+
]
619+
620+
if (
621+
updated_workflow_details.sources
622+
and updated_workflow_details.destinations
623+
and updated_workflow_details.jobs
624+
):
625+
filtered_workflows_details.append(updated_workflow_details)
626+
627+
if not filtered_workflows_details:
628+
return "No workflows found with finished jobs"
629+
630+
result = ["Workflows:"]
631+
for workflow_details in filtered_workflows_details:
632+
result.append(f"- Workflow ID: {workflow_details.workflow.id}")
633+
result.append(" Sources:")
634+
for source in workflow_details.sources:
635+
result.append(f" - {source.name} (ID: {source.id})")
636+
for key, value in source.config:
637+
result.append(f" {key}: {value}")
638+
639+
result.append(" Destinations:")
640+
for destination in workflow_details.destinations:
641+
result.append(f" - {destination.name} (ID: {destination.id})")
642+
for key, value in destination.config:
643+
result.append(f" {key}: {value}")
644+
645+
return "\n".join(result)
646+
647+
554648
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
555649
"""Create a Starlette application that can server the provied mcp server with SSE."""
556650
sse = SseServerTransport("/messages/")
@@ -576,6 +670,58 @@ async def handle_sse(request: Request) -> None:
576670
)
577671

578672

673+
class WorkflowDetails(BaseModel):
674+
workflow: WorkflowInformation
675+
jobs: list[JobInformation]
676+
sources: list[SourceConnectorInformation]
677+
destinations: list[DestinationConnectorInformation]
678+
679+
680+
async def gather_workflows_details(client: UnstructuredClient) -> list[WorkflowDetails]:
681+
workflows, jobs, sources, destinations = await asyncio.gather(
682+
client.workflows.list_workflows_async(request=ListWorkflowsRequest()),
683+
client.jobs.list_jobs_async(request=ListJobsRequest()),
684+
client.sources.list_sources_async(request=ListSourcesRequest()),
685+
client.destinations.list_destinations_async(request=ListDestinationsRequest()),
686+
)
687+
workflows: list[WorkflowInformation] = workflows.response_list_workflows
688+
jobs: list[JobInformation] = jobs.response_list_jobs
689+
sources: list[SourceConnectorInformation] = sources.response_list_sources
690+
destinations: list[DestinationConnectorInformation] = destinations.response_list_destinations
691+
692+
workflow_id_to_jobs = {
693+
workflow_id: list(grouped_jobs)
694+
for workflow_id, grouped_jobs in groupby(jobs, lambda x: x.workflow_id)
695+
}
696+
source_id_to_source_info = {source.id: source for source in sources}
697+
destination_id_to_destination_info = {
698+
destination.id: destination for destination in destinations
699+
}
700+
701+
sorted_workflows = sorted(workflows, key=lambda x: x.updated_at, reverse=True)
702+
703+
workflows_details = []
704+
705+
for workflow in sorted_workflows:
706+
workflow_details = WorkflowDetails(
707+
workflow=workflow,
708+
jobs=list(workflow_id_to_jobs.get(workflow.id, [])),
709+
sources=[
710+
source_id_to_source_info[source_id]
711+
for source_id in workflow.sources
712+
if source_id in source_id_to_source_info
713+
],
714+
destinations=[
715+
destination_id_to_destination_info[destination_id]
716+
for destination_id in workflow.destinations
717+
if destination_id in destination_id_to_destination_info
718+
],
719+
)
720+
workflows_details.append(workflow_details)
721+
722+
return workflows_details
723+
724+
579725
def main():
580726
load_environment_variables()
581727
if len(sys.argv) < 2:

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)