generated from oracle/template-repo
-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathstart_mcp_server.py
More file actions
299 lines (245 loc) · 10 KB
/
start_mcp_server.py
File metadata and controls
299 lines (245 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# Copyright © 2025 Oracle and/or its affiliates.
#
# This software is under the Apache License 2.0
# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
import argparse
from contextvars import ContextVar
from os import PathLike
from typing import Annotated, AsyncGenerator, Dict, List, Literal, Optional, Union
import anyio
from mcp.server.fastmcp import Context
from mcp.server.fastmcp import FastMCP as BaseFastMCP
from mcp.types import EmbeddedResource, TextResourceContents
from pydantic import AnyUrl, BaseModel, Field, RootModel
from starlette.applications import Starlette
from typing_extensions import TypedDict
from wayflowcore.mcp.mcphelpers import mcp_streaming_tool
UvicornExtraConfig = TypedDict(
"UvicornExtraConfig",
{
"ssl_keyfile": str | PathLike[str] | None,
"ssl_certfile": str | PathLike[str] | None,
"ssl_ca_certs": str | None,
"ssl_cert_reqs": int,
},
total=False,
)
_EXTRA_CONFIG: ContextVar[Optional[UvicornExtraConfig]] = ContextVar("_EXTRA_CONFIG", default=None)
class FastMCP(BaseFastMCP):
async def _start_server(self, starlette_app: Starlette) -> None:
import uvicorn
extra_config = _EXTRA_CONFIG.get()
config = uvicorn.Config(
starlette_app,
host=self.settings.host,
port=self.settings.port,
log_level=self.settings.log_level.lower(),
**extra_config,
)
server = uvicorn.Server(config)
await server.serve()
async def run_sse_async(self, mount_path: str | None = None) -> None:
"""Run the server using SSE transport."""
starlette_app = self.sse_app(mount_path)
await self._start_server(starlette_app)
async def run_streamable_http_async(self) -> None:
"""Run the server using StreamableHTTP transport."""
starlette_app = self.streamable_http_app()
await self._start_server(starlette_app)
class GenerateTupleOut(BaseModel, title="tool_output"):
result: tuple[
Annotated[str, Field(title="str_output")], Annotated[bool, Field(title="bool_output")]
]
# /!\ this needs to be named `result`
class GenerateTupleOut2(BaseModel, title="tool_output"):
result: tuple[
Annotated[int, Field(title="int_output")], Annotated[str, Field(title="str_output")]
]
# /!\ this needs to be named `result`
class GenerateListOut(BaseModel, title="tool_output"):
result: list[str] # /!\ this needs to be named `result`
class GenerateDictOut(RootModel[dict[str, str]], title="tool_output"):
pass
class GenerateOptionalOut(BaseModel, title="tool_output"):
# Optional output to validate anyOf handling in WayFlow
result: Optional[str]
class GenerateUnionOut(BaseModel, title="tool_output"):
# True union output (non-null) to validate anyOf handling in WayFlow
result: str | int
def create_server(host: str, port: int, server_id: str = ""):
"""Create and configure the MCP server"""
server = FastMCP(
name="Example MCP Server",
instructions="A MCP Server.",
host=host,
port=port,
)
@server.tool(
description="Return the result of the fooza operation between numbers a and b. Do not use for anything else than computing a fooza operation."
)
def fooza_tool(a: int, b: int) -> int:
return a * 2 + b * 3 - 1
@server.tool(description="Return a server identifier")
def server_id_tool() -> str:
return server_id
@server.tool(
description="Return the result of the bwip operation between numbers a and b. Do not use for anything else than computing a bwip operation."
)
def bwip_tool(a: int, b: int) -> int:
return a - b + 1
@server.tool(
description="Return the result of the zbuk operation between numbers a and b. Do not use for anything else than computing a zbuk operation."
)
def zbuk_tool(a: int, b: int) -> int:
return a + b * 2
@server.tool(
description="Return the result of the ggwp operation between numbers a and b. Do not use for anything else than computing a ggwp operation."
)
def ggwp_tool(a: int, b: int) -> int:
return a + b // 2
@server.tool(
description="This tool is not useful."
)
def all_input_types_tool(
# basic types
a: int,
b: float,
c: str,
d: bool,
# complext types
e: List[int],
f: list[bool],
g: Dict[str, int],
h: dict[str, int],
i: Optional[str],
j: int | None,
k: Union[str, int, float],
l: str | int | float,
# complex compositions
m: List[Dict[str | float, Optional[int]]]
) -> float:
return a + b / 2
@server.tool(description="Tool to return a random string")
def generate_random_string() -> str:
import random
return f"random_string_{random.randint(100, 999)}"
@server.tool(description="Tool that returns a complex type", structured_output=True)
def generate_complex_type() -> list[str]:
return ["value1", "value2"]
@server.tool(description="Tool that returns a dict", structured_output=True)
def generate_dict() -> GenerateDictOut:
# ^ the pydantic models should be used when users want to have
# fine control over the output schema (e.g. root schema title)
return GenerateDictOut({"key": "value"})
@server.tool(description="Tool that returns a list", structured_output=True)
def generate_list() -> GenerateListOut:
return GenerateListOut(result=["value1", "value2"])
@server.tool(description="Tool that returns a tuple", structured_output=True)
def generate_tuple() -> GenerateTupleOut:
return GenerateTupleOut(result=("value", True))
@server.tool(description="Tool that returns an optional string", structured_output=True)
def generate_optional() -> GenerateOptionalOut:
# Deterministic value for testing
return GenerateOptionalOut(result="maybe")
@server.tool(description="Tool that returns a union value", structured_output=True)
def generate_union() -> GenerateUnionOut:
# Deterministic value for testing
return GenerateUnionOut(result="maybe")
@server.tool(description="Tool that consumes a list and a dict")
def consumes_list_and_dict(vals: list[str], props: dict[str, str]) -> str:
return f"vals={vals!r}, props={props!r}"
@server.tool(description="Returns the resource associated with a user")
def get_resource(user: str): # on purpose not put the type to check we handle
return EmbeddedResource(
resource=TextResourceContents(
text=f"{user}_response",
uri=AnyUrl("users://{user}/profile"),
mimeType="text/plain",
),
type="resource",
)
@server.tool(description="Streaming tool")
@mcp_streaming_tool
async def streaming_tool() -> AsyncGenerator[str, None]:
contents = [f"This is the sentence N°{i}" for i in range(5)]
for chunk in contents:
yield chunk # streamed chunks
await anyio.sleep(0.2)
yield ". ".join(contents) # final result
@server.tool(description="Streaming tool")
@mcp_streaming_tool
async def streaming_tool_with_ctx(ctx: Context) -> AsyncGenerator[str, None]:
ctx.info("Hello")
contents = [f"This is the sentence N°{i}" for i in range(5)]
for chunk in contents:
yield chunk # streamed chunks
await anyio.sleep(0.2)
yield ". ".join(contents) # final result
@server.tool(description="Streaming tool", structured_output=True)
@mcp_streaming_tool
async def streaming_tool_tuple() -> AsyncGenerator[GenerateTupleOut2, None]:
contents = [f"This is the sentence N°{i}" for i in range(5)]
for idx, chunk in enumerate(contents):
yield (idx, chunk) # streamed chunks
await anyio.sleep(0.2)
yield GenerateTupleOut2(result=(5, ". ".join(contents))) # final result
return server
def main(
host: str,
port: int,
mode: Literal["sse", "streamable-http"],
server_id: str,
ssl_keyfile: str | None,
ssl_certfile: str | None,
ssl_ca_certs: str | None,
ssl_cert_reqs: int,
):
_EXTRA_CONFIG.set(
dict(
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_ca_certs=ssl_ca_certs,
ssl_cert_reqs=ssl_cert_reqs,
)
)
server = create_server(host=host, port=port, server_id=server_id)
server.run(transport=mode)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process host, port, and mode.")
parser.add_argument(
"--host", type=str, help='The host address (e.g., "localhost" or "127.0.0.1")'
)
parser.add_argument("--port", type=int, help="The port number (e.g., 8080)")
parser.add_argument(
"--mode", type=str, choices=["sse", "streamable-http"], help="The mode for the application"
)
parser.add_argument(
"--ssl_keyfile", type=str, help="Path to the server private key file (PEM format)."
)
parser.add_argument(
"--ssl_certfile", type=str, help="Path to the server certificate chain file (PEM format)."
)
parser.add_argument(
"--ssl_ca_certs", type=str, help="Path to the trusted CA certificate file (PEM format)."
)
parser.add_argument(
"--ssl_cert_reqs", type=int, help="Server certificate verify mode (0=None or 2=Required)."
)
parser.add_argument(
"--server_id",
type=str,
default="",
help="Identifier exposed by the test server.",
)
args = parser.parse_args()
main(
host=args.host,
port=args.port,
mode=args.mode,
server_id=args.server_id,
ssl_keyfile=args.ssl_keyfile,
ssl_certfile=args.ssl_certfile,
ssl_ca_certs=args.ssl_ca_certs,
ssl_cert_reqs=args.ssl_cert_reqs,
)