diff --git a/src/planet_mcp/clients.py b/src/planet_mcp/clients.py new file mode 100644 index 0000000..99a1ea2 --- /dev/null +++ b/src/planet_mcp/clients.py @@ -0,0 +1,20 @@ +from planet import Session, __version__ + +_session: Session | None = None + + +def _init_session(cls: type[Session]) -> Session: + _session = cls() + # todo add our version? + _session._client.headers["User-Agent"] = f"planet-mcp {__version__}" + return _session + + +def session() -> Session: + """session returns a global Session. dependencies should only call this + in their tool/resource calls or within a callable `mcp` initializer + """ + global _session + if _session is None: + _session = _init_session(Session) + return _session diff --git a/src/planet_mcp/main.py b/src/planet_mcp/main.py index 819dbd5..892c4db 100644 --- a/src/planet_mcp/main.py +++ b/src/planet_mcp/main.py @@ -3,18 +3,43 @@ is installed, this is installed as an executable named planet-mcp. """ -from fastmcp import FastMCP -import planet -from planet_mcp.server import MCPService - # note - the mcp dev tooling (e.g. uv run fastmcp dev src/main.py) # wants to find a server object named `mcp` (or it won't work) -mcp = FastMCP("Planet local/stdio MCP") -sess = planet.Session() -svc = MCPService(mcp=mcp, session=sess) +import argparse +from planet_mcp.server import init + + +def parse_args() -> argparse.Namespace: + def csv(value): + return set(t.strip() for t in (value or "").split(",")) + + parser = argparse.ArgumentParser( + description="Planet MCP Server", + ) + # when using fastmcp inspector and other tools, we handle + # extra args here (or else the parser barfs) + parser.add_argument("args", nargs="*") + parser.add_argument("--include-tags", type=csv, default=None) + parser.add_argument("--exclude-tags", type=csv, default=None) + parser.add_argument("--servers", type=csv, default=None) + # similar to extra args, inspector adds this + parser.add_argument("--no-banner", action="store_true") + return parser.parse_args() + + +args = parse_args() +mcp = init( + enabled_servers=args.servers, + include_tags=args.include_tags, + exclude_tags=args.exclude_tags, +) # this is the entry point for the executable script installed via package # and also supports execution via `uv run fastmcp run src/main.py` def main(): - svc.mcp.run(transport="stdio") + mcp.run(transport="stdio", show_banner=not args.no_banner) + + +if __name__ == "__main__": + main() diff --git a/src/planet_mcp/server.py b/src/planet_mcp/server.py index c06f1a5..0b3a18d 100644 --- a/src/planet_mcp/server.py +++ b/src/planet_mcp/server.py @@ -1,150 +1,56 @@ -import functools -import inspect -from planet_mcp import descriptions, tiles +from contextlib import asynccontextmanager +from typing import AsyncIterator from fastmcp import FastMCP -import planet -from typing import Union, Optional +from planet_mcp import servers -from pydantic import PydanticSchemaGenerationError +_instructions = """ +Instructions for using Planet's official MCP server. +""" -DEFAULT_IGNORE = { - "data_wait_asset", - "orders_wait", - "data_get_stats", - "data_update_search", - "orders_aggregated_order_stats", - "subscriptions_get_results_csv", - "subscriptions_patch_subscription", - "subscriptions_update_subscription", - "mosaics_get_quad_contributions", - "destinations_patch_destination", -} +class PlanetContext: + def __init__(self): + pass -class MCPService: - """MCP service for the Planet API +def _lifespan(context: PlanetContext): - member `mcp` contains the FastMCP app instance. - """ + @asynccontextmanager + async def lifespan(server: FastMCP) -> AsyncIterator[PlanetContext]: + yield context - def __init__( - self, - mcp: FastMCP, - session: planet.Session, - ignore: set[str] | None = None, - include: set[str] | None = None, - ): - self.mcp = mcp - self.session = session + return lifespan - if ignore and include: - raise ValueError("Cannot specify both ignore and include sets.") - self.include = include - self.ignore = ignore +def init( + enabled_servers: set[str] | None = None, + include_tags: set[str] | None = None, + exclude_tags: set[str] | None = None, +) -> FastMCP: - if self.ignore is None and self.include is None: - self.ignore = DEFAULT_IGNORE - self.ignore = ignore if ignore is not None else DEFAULT_IGNORE + context = PlanetContext() - self.make_tools(planet.FeaturesClient, "features") - self.make_tools(planet.DataClient, "data") - self.make_tools(planet.OrdersClient, "orders") - self.make_tools(planet.SubscriptionsClient, "subscriptions") - self.make_tools(planet.MosaicsClient, "mosaics") - self.make_tools(planet.DestinationsClient, "destinations") + mcp = FastMCP( + "Planet MCP Server", + lifespan=_lifespan(context), + instructions=_instructions, + include_tags=include_tags, + exclude_tags=exclude_tags, + ) - # add tiles tools - tiles.init(self.mcp, self.session) + for server in servers.all: + try: + # server protocol is either a variable or callable named mcp + entry = getattr(server, "mcp") + except AttributeError: + raise Exception(f"programmer error, mcp attribute not in {server}") + if callable(entry): + entry = entry() + if not isinstance(entry, FastMCP): + raise Exception( + f"programmer error, expected FastMCP type, got {type(entry)}" + ) + if enabled_servers is None or entry.name in enabled_servers: + mcp.mount(entry, entry.name) # type: ignore - def make_tools(self, client_class, prefix: str): - for name, func in inspect.getmembers(client_class(self.session)): - if inspect.ismethod(func) and name[0] != "_": - full_name = f"{prefix}_{name}" - - if self.ignore is not None and full_name in self.ignore: - continue - if self.include is not None and name not in self.include: - continue - - # extended tool options - opts = {} - - # check if there is a description override for this tool - if full_name in descriptions.overrides: - opts["description"] = descriptions.overrides[full_name] - - # async generator functions have an incompatible return type. - # ensure they are converted to a list[dict] return type. - if inspect.isasyncgenfunction(func): - func = _async_get_wrapper(func, prefix) - - # no return functions end up with a "self" parameter so this - # works around by adding a simple response - # @todo - upstream bug? - sig = inspect.signature(func) - if sig.return_annotation is None: - func = _return_wrapper(func) - - try: - self.mcp.tool(func, name=full_name, **opts) - except PydanticSchemaGenerationError: - # there's a few functions we need to modify again because of custom types. - modified_func = _create_param_modified_wrapper(func) - try: - self.mcp.tool(modified_func, name=full_name, **opts) - except Exception as e: - print("Unable to add tool", full_name, e) - - -def _async_get_wrapper(f, prefix): - """wrap an async generator to return a list[dict]""" - - @functools.wraps(f) - async def generate_async(*args, **kwargs) -> list[dict]: - return [i async for i in (f(*args, **kwargs))] - - # functool.wraps annotates using the original function return - generate_async.__annotations__["return"] = list[dict] - return generate_async - - -def _return_wrapper(func): - @functools.wraps(func) - async def wrapper(*args, **kwargs): - await func(*args, **kwargs) - return {"status": "ok"} - - wrapper.__annotations__["return"] = dict - return wrapper - - -def _create_param_modified_wrapper(original_func): - """ - Some functions that accept special types (typing.Protocol) fail during - FastMCP tool registration. This wrapper modifies the function's signature, - replacing the type hints with simple types that FastMCP can handle. - """ - - @functools.wraps(original_func) - async def wrapper(*args, **kwargs): - return await original_func(*args, **kwargs) - - try: - sig = inspect.signature(original_func) - - for param_name, param in sig.parameters.items(): - # Convert problematic types into useable ones. - # we want to override the GeojsonLike field - # and remove the union of the Feature which accepts a dict or a GeoInterface - if param_name in ("feature", "quad", "mosaic", "series"): - wrapper.__annotations__[param_name] = dict - elif param_name == "geometry" and "planet.models" in str(param.annotation): - wrapper.__annotations__[param_name] = Optional[Union[dict, str]] | None - - except Exception as e: - print(f"Error modifying signature: {e}") - wrapper.__annotations__ = {} - - return wrapper + return mcp diff --git a/src/planet_mcp/servers/__init__.py b/src/planet_mcp/servers/__init__.py new file mode 100644 index 0000000..784c54e --- /dev/null +++ b/src/planet_mcp/servers/__init__.py @@ -0,0 +1,7 @@ +from . import sdk +from . import tiles + +all = [ + sdk, + tiles, +] diff --git a/src/planet_mcp/descriptions.py b/src/planet_mcp/servers/descriptions.py similarity index 100% rename from src/planet_mcp/descriptions.py rename to src/planet_mcp/servers/descriptions.py diff --git a/src/planet_mcp/servers/sdk.py b/src/planet_mcp/servers/sdk.py new file mode 100644 index 0000000..b86d0b0 --- /dev/null +++ b/src/planet_mcp/servers/sdk.py @@ -0,0 +1,127 @@ +import functools +import inspect + +from planet_mcp.clients import session +from . import descriptions +from fastmcp import FastMCP +import planet +from typing import Union, Optional + +from pydantic import PydanticSchemaGenerationError + +_DEFAULT_IGNORE = { + "data_wait_asset", + "orders_wait", + "data_get_stats", + "data_update_search", + "orders_aggregated_order_stats", + "subscriptions_get_results_csv", + "subscriptions_patch_subscription", + "subscriptions_update_subscription", + "mosaics_get_quad_contributions", + "destinations_patch_destination", +} + + +def mcp() -> FastMCP: + mcp = FastMCP("sdk") + make_tools(mcp, planet.FeaturesClient, "features") + make_tools(mcp, planet.DataClient, "data") + make_tools(mcp, planet.OrdersClient, "orders") + make_tools(mcp, planet.SubscriptionsClient, "subscriptions") + make_tools(mcp, planet.MosaicsClient, "mosaics") + make_tools(mcp, planet.DestinationsClient, "destinations") + return mcp + + +def make_tools(mcp: FastMCP, client_class: type, prefix: str): + for name, func in inspect.getmembers(client_class(session())): + if inspect.ismethod(func) and name[0] != "_": + full_name = f"{prefix}_{name}" + + if full_name in _DEFAULT_IGNORE: + continue + + # extended tool options + opts = {} + + # check if there is a description override for this tool + if full_name in descriptions.overrides: + opts["description"] = descriptions.overrides[full_name] + + # async generator functions have an incompatible return type. + # ensure they are converted to a list[dict] return type. + if inspect.isasyncgenfunction(func): + func = _async_get_wrapper(func, prefix) + + # no return functions end up with a "self" parameter so this + # works around by adding a simple response + # @todo - upstream bug? + sig = inspect.signature(func) + if sig.return_annotation is None: + func = _return_wrapper(func) + + if "download" in name: + opts["tags"] = set("download") + + try: + mcp.tool(func, name=full_name, **opts) + except PydanticSchemaGenerationError: + # there's a few functions we need to modify again because of custom types. + modified_func = _create_param_modified_wrapper(func) + try: + mcp.tool(modified_func, name=full_name, **opts) + except Exception as e: + print("Unable to add tool", full_name, e) + + +def _async_get_wrapper(f, prefix): + """wrap an async generator to return a list[dict]""" + + @functools.wraps(f) + async def generate_async(*args, **kwargs) -> list[dict]: + return [i async for i in (f(*args, **kwargs))] + + # functool.wraps annotates using the original function return + generate_async.__annotations__["return"] = list[dict] + return generate_async + + +def _return_wrapper(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + await func(*args, **kwargs) + return {"status": "ok"} + + wrapper.__annotations__["return"] = dict + return wrapper + + +def _create_param_modified_wrapper(original_func): + """ + Some functions that accept special types (typing.Protocol) fail during + FastMCP tool registration. This wrapper modifies the function's signature, + replacing the type hints with simple types that FastMCP can handle. + """ + + @functools.wraps(original_func) + async def wrapper(*args, **kwargs): + return await original_func(*args, **kwargs) + + try: + sig = inspect.signature(original_func) + + for param_name, param in sig.parameters.items(): + # Convert problematic types into useable ones. + # we want to override the GeojsonLike field + # and remove the union of the Feature which accepts a dict or a GeoInterface + if param_name in ("feature", "quad", "mosaic", "series"): + wrapper.__annotations__[param_name] = dict + elif param_name == "geometry" and "planet.models" in str(param.annotation): + wrapper.__annotations__[param_name] = Optional[Union[dict, str]] | None + + except Exception as e: + print(f"Error modifying signature: {e}") + wrapper.__annotations__ = {} + + return wrapper diff --git a/src/planet_mcp/servers/tiles.py b/src/planet_mcp/servers/tiles.py new file mode 100644 index 0000000..b593957 --- /dev/null +++ b/src/planet_mcp/servers/tiles.py @@ -0,0 +1,74 @@ +from typing import Annotated +from fastmcp import FastMCP +from fastmcp.utilities.types import Image +import httpx +import mercantile +from pydantic import Field + +from planet_mcp.clients import session + +mcp = FastMCP("tiles") + + +@mcp.tool +async def get_scene_tile( + item_type: Annotated[str, Field(pattern=r"^\w+$")], + item_id: Annotated[str, Field(pattern=r"^\w+$")], + lat: Annotated[float, Field(ge=-90.0, le=90.0)], + long: Annotated[float, Field(ge=-180.0, le=180.0)], + zoom: Annotated[int, Field(ge=10, le=15)] = 14, +) -> Image: + """ + Get a tile image for a specific item at a given latitude and longitude. + + Latitude and longitude must be in decimal degrees. + + item_type and item_id are required. Suitable items can be found with the data_search tool. + """ + + # Convert latitude and longitude to tile xyz + tile = mercantile.tile(long, lat, zoom) + x, y = tile.x, tile.y + + req = httpx.Request( + "GET", + f"https://tiles3.planet.com/data/v1/{item_type}/{item_id}/{zoom}/{x}/{y}.png", + ) + data = await session()._send(req) + + if data.status_code != 200: + # we are using an external facing API, so we can relay the error message back + # to the user. It may indicate an invalid input. + if data.text: + raise ValueError(f"Failed to fetch tile: {data.text}") + raise ValueError("Failed to fetch tile. Please try again.") + return Image( + data=data.content, + format="png", + ) + + +@mcp.tool +async def get_scene_thumbnail( + item_type: Annotated[str, Field(pattern=r"^\w+$")], + item_id: Annotated[str, Field(pattern=r"^\w+$")], +) -> Image: + """ + Get a thumbnail image for a specific item. + + item_type and item_id are required. Suitable items can be found with the data_search tool. + item_id accepts alphanumeric characters and underscores. + """ + thumbnail = ( + f"https://tiles.planet.com/data/v1/item-types/{item_type}/items/{item_id}/thumb" + ) + data = await session()._send(httpx.Request("GET", thumbnail)) + + if data.status_code != 200: + if data.text: + raise ValueError(f"Failed to fetch tile: {data.text}") + raise ValueError("Failed to fetch thumbnail. Please try again.") + return Image( + data=data.content, + format="png", + ) diff --git a/src/planet_mcp/tiles.py b/src/planet_mcp/tiles.py deleted file mode 100644 index 3d67b85..0000000 --- a/src/planet_mcp/tiles.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import Annotated -from fastmcp import FastMCP -from fastmcp.utilities.types import Image -import httpx -import mercantile -from planet import Session -from pydantic import Field - - -def init(app: FastMCP, session: Session) -> None: - """Initialize Tiles tools with the FastMCP application.""" - - @app.tool - async def get_scene_tile( - item_type: Annotated[str, Field(pattern=r"^\w+$")], - item_id: Annotated[str, Field(pattern=r"^\w+$")], - lat: Annotated[float, Field(ge=-90.0, le=90.0)], - long: Annotated[float, Field(ge=-180.0, le=180.0)], - zoom: Annotated[int, Field(ge=10, le=15)] = 14, - ) -> Image: - """ - Get a tile image for a specific item at a given latitude and longitude. - - Latitude and longitude must be in decimal degrees. - - item_type and item_id are required. Suitable items can be found with the data_search tool. - """ - - # Convert latitude and longitude to tile xyz - tile = mercantile.tile(long, lat, zoom) - x, y = tile.x, tile.y - - req = httpx.Request( - "GET", - f"https://tiles3.planet.com/data/v1/{item_type}/{item_id}/{zoom}/{x}/{y}.png", - ) - data = await session._send(req) - - if data.status_code != 200: - # we are using an external facing API, so we can relay the error message back - # to the user. It may indicate an invalid input. - if data.text: - raise ValueError(f"Failed to fetch tile: {data.text}") - raise ValueError("Failed to fetch tile. Please try again.") - return Image( - data=data.content, - format="png", - ) - - @app.tool - async def get_scene_thumbnail( - item_type: Annotated[str, Field(pattern=r"^\w+$")], - item_id: Annotated[str, Field(pattern=r"^\w+$")], - ) -> Image: - """ - Get a thumbnail image for a specific item. - - item_type and item_id are required. Suitable items can be found with the data_search tool. - item_id accepts alphanumeric characters and underscores. - """ - thumbnail = f"https://tiles.planet.com/data/v1/item-types/{item_type}/items/{item_id}/thumb" - data = await session._send(httpx.Request("GET", thumbnail)) - - if data.status_code != 200: - if data.text: - raise ValueError(f"Failed to fetch tile: {data.text}") - raise ValueError("Failed to fetch thumbnail. Please try again.") - return Image( - data=data.content, - format="png", - ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..cffd9f1 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,28 @@ +from http import HTTPStatus +import httpx +import pytest +import respx +from planet import specs + + +@pytest.fixture(autouse=True, scope="session") +@respx.mock +def mock_bundles(): + # The following bundles are not real, only used for tests... + # This force loads the bundles ahead of time so we avoid hitting the API + # or not mocking the route on first use. + resp = { + "bundles": { + "analytic_udm2": {"assets": {"PSScene": ["ortho_analytic_4b"]}}, + "analytic_3b_udm2": {"assets": {"PSScene": []}}, + "analytic_8b_udm2": {"assets": {"PSScene": []}}, + "analytic_sr": { + "assets": {"SkySatScene": [], "PSScene": [], "SkySatCollect": []} + }, + "analytic": {"assets": {"SkySatScene": []}}, + "visual": {"assets": {"PSScene": ["basic_udm2"]}}, + } + } + spec_url = "https://api.planet.com/compute/ops/bundles/spec" + respx.get(spec_url).return_value = httpx.Response(HTTPStatus.OK, json=resp) + specs.get_bundle_names() diff --git a/tests/test_server.py b/tests/test_server.py index f50a867..efd4b60 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1,22 +1,23 @@ +from http import HTTPStatus +import httpx import pytest -from fastmcp import FastMCP, Client +from fastmcp import Client +import respx +from planet_mcp.server import init - -@pytest.fixture -def test_server(): - mcp = FastMCP("Planet MCP Test Server") - - ## a very simple "search" test - @mcp.tool - def search(item_type: str) -> dict: - items = {"PSScene": 123, "SkySat": 456, "Tanager": 789} - return {"item_type": item_type, "temp": items.get(item_type)} - - return mcp +client = Client(init()) @pytest.mark.asyncio -async def test_search_tool(test_server): - async with Client(test_server) as client: - result = await client.call_tool("search", {"item_type": "SkySat"}) - assert result.data == {"item_type": "SkySat", "temp": 456} +@respx.mock +async def test_search_tool(): + respx.request( + "POST", "https://api.planet.com/data/v1/quick-search" + ).return_value = httpx.Response( + HTTPStatus.OK, json={"features": [{"type": "Feature"}]} + ) + async with client: + result = await client.call_tool( + "sdk_data_search", {"item_types": ["SkySatScene"]} + ) + assert result.structured_content == {"result": [{"type": "Feature"}]}