Files
Lore Engine Dev d1285eea60 mcp_http: 1 MiB body-size cap + polish (review-1/5)
- MAX_BODY_BYTES = 1 MiB; reject with HTTP 413 + -32600 envelope
  before the JSON parser allocates a Python object. Closes the
  OOM-by-giant-body DoS vector.
- Drop dead try/except ImportError fallback for ERR_* constants —
  always import from mcp_server (same package).
- stream() typing: AsyncIterator[bytes] (was Iterable[bytes]).
- build_app(graph: Graph, tool_registry: list) parameter types.
- Drop unused CONTENT_JSON constant.
- New test: test_post_oversized_body_rejected (HTTP 413).
- New test: test_post_unknown_method_returns_32601 — symmetric
  with the stdio server.py coverage of the same path.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-18 19:43:35 -04:00

128 lines
4.6 KiB
Python

"""Streamable HTTP transport for the Lore Engine POC MCP server (slice 11).
This module is a thin transport adapter over the dispatcher in
``lore_engine_poc.mcp_server.MCPServer``. It exposes a Starlette ASGI
app that:
1. Accepts ``POST /mcp`` with a JSON-RPC 2.0 body.
2. Dispatches via ``MCPServer.handle_message(msg)``.
3. Returns the response either as ``application/json`` (one-shot) or
as ``text/event-stream`` (one SSE frame) depending on the
client's ``Accept`` header.
Errors at the transport layer (parse failures, non-object bodies)
return HTTP 400 with a JSON-RPC parse-error envelope. Notifications
(methods named ``notifications/*`` with no ``id``) return HTTP 202
with an empty body, since the dispatcher itself produces no reply.
This module deliberately does NOT use the ``mcp`` PyPI package — the
goal is to mirror the stdlib-only discipline of
``scripts/05_mcp_server.py`` while gaining HTTP reachability.
"""
from __future__ import annotations
import json
from typing import Any, AsyncIterator, Optional
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
from starlette.routing import Route
from .mcp_server import ERR_INVALID_REQUEST, ERR_PARSE, MCPServer, _json_default
from .tools import Graph
# Reuse the dispatcher's JSON-RPC error codes so the wire format
# matches the stdio server exactly. These are package-internal
# constants — the dispatcher is the public surface, this module is
# the internal seam.
CONTENT_SSE = "text/event-stream"
# Maximum request body size. Every legitimate tool call is well under
# 1 MiB; the largest is add_lore_source with raw prose. Anything bigger
# is almost certainly a DoS attempt and is rejected with HTTP 413 before
# the JSON parser allocates a Python object for it.
MAX_BODY_BYTES = 1 * 1024 * 1024 # 1 MiB
def _wants_sse(accept: Optional[str]) -> bool:
"""Streamable HTTP: SSE upgrade is opted into by ``Accept``."""
if not accept:
return False
return "text/event-stream" in accept.lower()
def _jsonrpc_error(msg_id: Any, code: int, message: str) -> dict:
return {"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}}
def _sse_frame(payload: dict) -> bytes:
"""Encode one JSON-RPC response as a single SSE event frame."""
body = json.dumps(payload, default=_json_default)
return f"event: message\ndata: {body}\n\n".encode("utf-8")
async def mcp_endpoint(request: Request) -> Response:
"""ASGI handler for ``POST /mcp``."""
raw = await request.body()
# Body-size cap: reject before the JSON parser allocates anything.
# Closes the OOM-by-giant-body DoS vector at the cost of one check.
if len(raw) > MAX_BODY_BYTES:
err = _jsonrpc_error(
None, ERR_INVALID_REQUEST,
f"Invalid Request: body exceeds {MAX_BODY_BYTES} bytes",
)
return JSONResponse(err, status_code=413)
if not raw:
err = _jsonrpc_error(
None, ERR_INVALID_REQUEST,
"Invalid Request: expected non-empty JSON body",
)
return JSONResponse(err, status_code=400)
try:
msg = json.loads(raw)
except json.JSONDecodeError as exc:
err = _jsonrpc_error(None, ERR_PARSE, f"Parse error: {exc}")
return JSONResponse(err, status_code=400)
if not isinstance(msg, dict):
err = _jsonrpc_error(
None, ERR_INVALID_REQUEST,
"Invalid Request: expected JSON object",
)
return JSONResponse(err, status_code=400)
server: MCPServer = request.app.state.mcp_server
response = server.handle_message(msg)
# Notification (no id) or non-replying method: 202 with empty body.
if response is None:
return Response(status_code=202)
if _wants_sse(request.headers.get("accept")):
async def stream() -> AsyncIterator[bytes]:
yield _sse_frame(response)
return StreamingResponse(
stream(),
media_type=CONTENT_SSE,
status_code=200,
)
return JSONResponse(response, status_code=200)
def build_app(graph: Graph, tool_registry: list) -> Starlette:
"""Build a fresh Starlette app wrapping a graph + tool registry.
The ``MCPServer`` instance is held on ``app.state.mcp_server`` so
the ASGI handler can reach it without globals.
Used by both tests (in-process via ``ASGITransport``) and by
``scripts/06_mcp_http_server.py`` (real socket via uvicorn).
"""
app = Starlette(routes=[Route("/mcp", mcp_endpoint, methods=["POST"])])
app.state.mcp_server = MCPServer(graph, tool_registry)
return app