Files

216 lines
7.0 KiB
Python

"""Lore Engine POC — MCP server (slice 2.6+).
This is the in-process JSON-RPC 2.0 dispatcher used by the stdio
entry point in ``scripts/05_mcp_server.py``. The class is split
out of the stdio script so the dispatcher logic can be unit-tested
without spawning a subprocess — see ``tests/test_mcp/test_server.py``.
The MCP protocol (https://modelcontextprotocol.io) we implement:
* ``initialize`` — returns ``protocolVersion`` (we hard-code
``2024-11-05``), ``serverInfo``, and an empty ``capabilities``
object.
* ``tools/list`` — returns the schema for every registered tool
(name + description + inputSchema).
* ``tools/call`` — dispatches to the registered tool function
by name, with ``arguments`` unpacked as kwargs. Tool failures
come back as ``isError: True`` in the ``result``; transport
errors (unknown tool, missing ``id``) come back as JSON-RPC
``error`` envelopes with negative codes from the spec.
Notifications (no ``id``) get no reply, per JSON-RPC 2.0.
"""
from __future__ import annotations
import dataclasses
import json
from dataclasses import dataclass
from typing import Callable, Optional
# JSON-RPC 2.0 standard error codes we emit.
ERR_PARSE = -32700
ERR_INVALID_REQUEST = -32600
ERR_METHOD_NOT_FOUND = -32601
ERR_INVALID_PARAMS = -32602
ERR_INTERNAL = -32603
# MCP protocolVersion we announce. Pinned at the slice-2.6+
# version; bump if/when the spec adds a major capability.
MCP_PROTOCOL_VERSION = "2024-11-05"
def _json_default(obj):
"""``json.dumps(..., default=...)`` fallback.
Lore Engine tools return:
* dataclasses (ConsistencyRun, Violation subclasses) — serialise
via ``dataclasses.asdict``.
* sets — serialise as sorted lists.
* LoreSource namedtuples / Edge dataclasses — covered by asdict.
"""
if dataclasses.is_dataclass(obj) and not isinstance(obj, type):
return dataclasses.asdict(obj)
if isinstance(obj, set):
return sorted(obj)
if isinstance(obj, frozenset):
return sorted(obj)
return str(obj)
@dataclass
class ToolEntry:
"""One MCP tool: name + description + JSON Schema + Python fn.
The ``fn`` is expected to take ``(graph, **kwargs)``; the
dispatcher calls it with ``tool.fn(graph, **arguments)``.
Type coercion (e.g. ``at_time`` string → time atom) is the
tool's job, not the dispatcher's.
``underlying_fn`` is the *original* (un-adapted) function
that ``fn`` wraps. It's optional and only set when ``fn`` is
an adapter (e.g. the registry's ``_make_adapter``-built
closures). Tests use it to assert that the registry binds
the expected source function; the dispatcher does not read
it.
"""
name: str
description: str
input_schema: dict
fn: Callable
underlying_fn: Optional[Callable] = None
class MCPServer:
"""JSON-RPC 2.0 dispatcher over an in-memory graph + tool registry.
The class is **stateless across requests** (every call to
``handle_message`` is independent) except for the tool
registry itself, which is fixed at construction time.
"""
def __init__(self, graph, tool_registry):
self.graph = graph
# Build a name → entry lookup once.
self.tools = {t.name: t for t in tool_registry}
# ------------------------------------------------------------------
# Public dispatch
# ------------------------------------------------------------------
def handle_message(self, msg: dict) -> Optional[dict]:
"""Dispatch one JSON-RPC 2.0 message.
Returns the response dict, or ``None`` for notifications
(methods named ``notifications/*`` with no ``id``).
"""
msg_id = msg.get("id")
method = msg.get("method")
# Notifications — must not produce a reply.
if method and method.startswith("notifications/") and msg_id is None:
return None
if method == "initialize":
return self._initialize(msg)
if method == "tools/list":
return self._tools_list(msg)
if method == "tools/call":
return self._tools_call(msg)
return self._error(msg_id, ERR_METHOD_NOT_FOUND, f"Method not found: {method!r}")
# ------------------------------------------------------------------
# initialize
# ------------------------------------------------------------------
def _initialize(self, msg: dict) -> dict:
return {
"jsonrpc": "2.0",
"id": msg.get("id"),
"result": {
"protocolVersion": MCP_PROTOCOL_VERSION,
"serverInfo": {
"name": "lore-engine-poc",
"version": "0.1.0",
},
"capabilities": {},
},
}
# ------------------------------------------------------------------
# tools/list
# ------------------------------------------------------------------
def _tools_list(self, msg: dict) -> dict:
tools = [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema,
}
for t in self.tools.values()
]
return {
"jsonrpc": "2.0",
"id": msg.get("id"),
"result": {"tools": tools},
}
# ------------------------------------------------------------------
# tools/call
# ------------------------------------------------------------------
def _tools_call(self, msg: dict) -> dict:
msg_id = msg.get("id")
params = msg.get("params", {}) or {}
name = params.get("name")
arguments = params.get("arguments", {}) or {}
tool = self.tools.get(name)
if tool is None:
return self._error(msg_id, ERR_INVALID_PARAMS, f"Unknown tool: {name!r}")
try:
result = tool.fn(self.graph, **arguments)
except Exception as exc:
# Tool-body errors come back as isError=True so callers
# can distinguish "the tool ran and failed" from
# "the dispatcher couldn't reach it".
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"content": [{"type": "text", "text": str(exc)}],
"isError": True,
},
}
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"content": [
{"type": "text", "text": json.dumps(result, default=_json_default)}
],
"isError": False,
},
}
# ------------------------------------------------------------------
# Error envelope
# ------------------------------------------------------------------
@staticmethod
def _error(msg_id, code: int, message: str) -> dict:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {"code": code, "message": message},
}
__all__ = ["MCPServer", "ToolEntry", "MCP_PROTOCOL_VERSION"]