- MAX_BODY_BYTES = 1 MiB; reject with HTTP 413 + -32600 envelope before the JSON parser allocates a Python object. Closes the OOM-by-giant-body DoS vector. - Drop dead try/except ImportError fallback for ERR_* constants — always import from mcp_server (same package). - stream() typing: AsyncIterator[bytes] (was Iterable[bytes]). - build_app(graph: Graph, tool_registry: list) parameter types. - Drop unused CONTENT_JSON constant. - New test: test_post_oversized_body_rejected (HTTP 413). - New test: test_post_unknown_method_returns_32601 — symmetric with the stdio server.py coverage of the same path. Co-Authored-By: Claude <noreply@anthropic.com>
128 lines
4.6 KiB
Python
128 lines
4.6 KiB
Python
"""Streamable HTTP transport for the Lore Engine POC MCP server (slice 11).
|
|
|
|
This module is a thin transport adapter over the dispatcher in
|
|
``lore_engine_poc.mcp_server.MCPServer``. It exposes a Starlette ASGI
|
|
app that:
|
|
|
|
1. Accepts ``POST /mcp`` with a JSON-RPC 2.0 body.
|
|
2. Dispatches via ``MCPServer.handle_message(msg)``.
|
|
3. Returns the response either as ``application/json`` (one-shot) or
|
|
as ``text/event-stream`` (one SSE frame) depending on the
|
|
client's ``Accept`` header.
|
|
|
|
Errors at the transport layer (parse failures, non-object bodies)
|
|
return HTTP 400 with a JSON-RPC parse-error envelope. Notifications
|
|
(methods named ``notifications/*`` with no ``id``) return HTTP 202
|
|
with an empty body, since the dispatcher itself produces no reply.
|
|
|
|
This module deliberately does NOT use the ``mcp`` PyPI package — the
|
|
goal is to mirror the stdlib-only discipline of
|
|
``scripts/05_mcp_server.py`` while gaining HTTP reachability.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, AsyncIterator, Optional
|
|
|
|
from starlette.applications import Starlette
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse, Response, StreamingResponse
|
|
from starlette.routing import Route
|
|
|
|
from .mcp_server import ERR_INVALID_REQUEST, ERR_PARSE, MCPServer, _json_default
|
|
from .tools import Graph
|
|
|
|
# Reuse the dispatcher's JSON-RPC error codes so the wire format
|
|
# matches the stdio server exactly. These are package-internal
|
|
# constants — the dispatcher is the public surface, this module is
|
|
# the internal seam.
|
|
|
|
|
|
CONTENT_SSE = "text/event-stream"
|
|
|
|
# Maximum request body size. Every legitimate tool call is well under
|
|
# 1 MiB; the largest is add_lore_source with raw prose. Anything bigger
|
|
# is almost certainly a DoS attempt and is rejected with HTTP 413 before
|
|
# the JSON parser allocates a Python object for it.
|
|
MAX_BODY_BYTES = 1 * 1024 * 1024 # 1 MiB
|
|
|
|
|
|
def _wants_sse(accept: Optional[str]) -> bool:
|
|
"""Streamable HTTP: SSE upgrade is opted into by ``Accept``."""
|
|
if not accept:
|
|
return False
|
|
return "text/event-stream" in accept.lower()
|
|
|
|
|
|
def _jsonrpc_error(msg_id: Any, code: int, message: str) -> dict:
|
|
return {"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}}
|
|
|
|
|
|
def _sse_frame(payload: dict) -> bytes:
|
|
"""Encode one JSON-RPC response as a single SSE event frame."""
|
|
body = json.dumps(payload, default=_json_default)
|
|
return f"event: message\ndata: {body}\n\n".encode("utf-8")
|
|
|
|
|
|
async def mcp_endpoint(request: Request) -> Response:
|
|
"""ASGI handler for ``POST /mcp``."""
|
|
raw = await request.body()
|
|
# Body-size cap: reject before the JSON parser allocates anything.
|
|
# Closes the OOM-by-giant-body DoS vector at the cost of one check.
|
|
if len(raw) > MAX_BODY_BYTES:
|
|
err = _jsonrpc_error(
|
|
None, ERR_INVALID_REQUEST,
|
|
f"Invalid Request: body exceeds {MAX_BODY_BYTES} bytes",
|
|
)
|
|
return JSONResponse(err, status_code=413)
|
|
if not raw:
|
|
err = _jsonrpc_error(
|
|
None, ERR_INVALID_REQUEST,
|
|
"Invalid Request: expected non-empty JSON body",
|
|
)
|
|
return JSONResponse(err, status_code=400)
|
|
try:
|
|
msg = json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
err = _jsonrpc_error(None, ERR_PARSE, f"Parse error: {exc}")
|
|
return JSONResponse(err, status_code=400)
|
|
|
|
if not isinstance(msg, dict):
|
|
err = _jsonrpc_error(
|
|
None, ERR_INVALID_REQUEST,
|
|
"Invalid Request: expected JSON object",
|
|
)
|
|
return JSONResponse(err, status_code=400)
|
|
|
|
server: MCPServer = request.app.state.mcp_server
|
|
response = server.handle_message(msg)
|
|
|
|
# Notification (no id) or non-replying method: 202 with empty body.
|
|
if response is None:
|
|
return Response(status_code=202)
|
|
|
|
if _wants_sse(request.headers.get("accept")):
|
|
async def stream() -> AsyncIterator[bytes]:
|
|
yield _sse_frame(response)
|
|
return StreamingResponse(
|
|
stream(),
|
|
media_type=CONTENT_SSE,
|
|
status_code=200,
|
|
)
|
|
return JSONResponse(response, status_code=200)
|
|
|
|
|
|
def build_app(graph: Graph, tool_registry: list) -> Starlette:
|
|
"""Build a fresh Starlette app wrapping a graph + tool registry.
|
|
|
|
The ``MCPServer`` instance is held on ``app.state.mcp_server`` so
|
|
the ASGI handler can reach it without globals.
|
|
|
|
Used by both tests (in-process via ``ASGITransport``) and by
|
|
``scripts/06_mcp_http_server.py`` (real socket via uvicorn).
|
|
"""
|
|
app = Starlette(routes=[Route("/mcp", mcp_endpoint, methods=["POST"])])
|
|
app.state.mcp_server = MCPServer(graph, tool_registry)
|
|
return app
|