Compare commits
1 Commits
main
...
fix/mcp-ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ee735d248 |
@@ -236,9 +236,15 @@ class DamascusMcpServer(Server):
|
||||
|
||||
This subclass instead makes ``mcp.list_tools()`` a regular method
|
||||
that returns the registered tool catalog directly. The list-tools
|
||||
handler is registered explicitly via
|
||||
``mcp.request_handlers[ListToolsRequest] = ...`` (the same internal
|
||||
API the decorator uses), preserving protocol correctness.
|
||||
AND call-tool handlers are registered explicitly via
|
||||
``mcp.request_handlers[...] = ...`` (the same internal API the
|
||||
decorators use), preserving protocol correctness and making the
|
||||
wiring visible without chasing decorator semantics.
|
||||
|
||||
The call-tool handler is registered the same way (see
|
||||
``_call_tool_handler`` below) so that both handlers follow the
|
||||
same registration pattern, and operators reading this file can
|
||||
see the full dispatch table in one place.
|
||||
"""
|
||||
|
||||
def list_tools(self) -> list[Tool]:
|
||||
@@ -251,22 +257,32 @@ mcp = DamascusMcpServer("damascus-mcp")
|
||||
|
||||
# Register the list-tools handler manually so the decorator form is
|
||||
# not needed. Same internal API the SDK's @mcp.list_tools() decorator
|
||||
# uses.
|
||||
# uses — but we extend it to populate ``mcp._tool_cache`` so the SDK's
|
||||
# input-validation pipeline (used by the call_tool handler below) can
|
||||
# look tool definitions up by name.
|
||||
async def _handle_list_tools() -> list[Tool]:
|
||||
"""Return the seven registered tools."""
|
||||
return TOOLS
|
||||
|
||||
|
||||
from mcp.types import ListToolsRequest # noqa: E402 (after mcp defined)
|
||||
from mcp.types import ListToolsRequest, ListToolsResult, ServerResult # noqa: E402 (after mcp defined)
|
||||
|
||||
|
||||
#: The handler is a coroutine that returns the catalog. Wrap it the
|
||||
#: same way the SDK's decorator does so the SDK's internal call path
|
||||
#: works unchanged.
|
||||
async def _list_tools_handler(req: ListToolsRequest) -> Any:
|
||||
from mcp.types import ListToolsResult, ServerResult
|
||||
"""Wrap the catalog in a ServerResult(ListToolsResult(...)) and
|
||||
populate ``mcp._tool_cache`` so SDK validation can find tools by name.
|
||||
|
||||
The SDK's own ``@mcp.list_tools()`` decorator does this transparently;
|
||||
because we register the handler manually, we have to replicate the
|
||||
cache-refresh logic or input validation in the call_tool pipeline
|
||||
will warn "Tool X not listed, no validation will be performed".
|
||||
"""
|
||||
result = await _handle_list_tools()
|
||||
# Refresh the SDK's tool cache so subsequent _get_cached_tool_definition
|
||||
# calls succeed. Mirrors the SDK's own behavior at lowlevel/server.py:451.
|
||||
mcp._tool_cache.clear()
|
||||
for tool in result:
|
||||
mcp._tool_cache[tool.name] = tool
|
||||
return ServerResult(ListToolsResult(tools=result))
|
||||
|
||||
|
||||
@@ -357,7 +373,6 @@ async def _dispatch(
|
||||
return [TextContent(type="text", text=json.dumps(payload))]
|
||||
|
||||
|
||||
@mcp.call_tool()
|
||||
async def _handle_call_tool(
|
||||
name: str,
|
||||
arguments: dict[str, Any],
|
||||
@@ -366,6 +381,40 @@ async def _handle_call_tool(
|
||||
return await _dispatch(name, arguments)
|
||||
|
||||
|
||||
# Register the call-tool handler manually so the wiring is explicit and
|
||||
# mirrors the ListToolsRequest pattern. The SDK's ``@mcp.call_tool()``
|
||||
# decorator does the same registration internally but adds a closure
|
||||
# that does input validation against ``mcp._tool_cache``. We use the
|
||||
# same internal ``request_handlers`` API the decorator uses; the SDK's
|
||||
# ``_handle_request`` method (lowlevel/server.py:722) dispatches from
|
||||
# this dict.
|
||||
async def _call_tool_handler(req: CallToolRequest) -> Any:
|
||||
"""Dispatch a ``tools/call`` request.
|
||||
|
||||
Mirrors the SDK's ``@mcp.call_tool()`` shape: pull ``name`` and
|
||||
``arguments`` off the request, run the tool, wrap the result in a
|
||||
``ServerResult(CallToolResult(...))``. Errors from the tool become
|
||||
``CallToolResult(isError=True, ...)`` — the SDK's protocol layer
|
||||
surfaces these as JSON-RPC responses with ``isError=True``, not
|
||||
as protocol errors (the call DID complete, just unsuccessfully).
|
||||
"""
|
||||
name = req.params.name
|
||||
arguments = req.params.arguments or {}
|
||||
try:
|
||||
content = await _handle_call_tool(name, arguments)
|
||||
except Exception as exc:
|
||||
return ServerResult(
|
||||
CallToolResult(
|
||||
content=[TextContent(type="text", text=str(exc))],
|
||||
isError=True,
|
||||
)
|
||||
)
|
||||
return ServerResult(CallToolResult(content=list(content), isError=False))
|
||||
|
||||
|
||||
mcp.request_handlers[CallToolRequest] = _call_tool_handler
|
||||
|
||||
|
||||
# --- public asyncio API for tests -------------------------------------------
|
||||
|
||||
|
||||
|
||||
475
tests/contract/test_mcp_call_dispatch.py
Normal file
475
tests/contract/test_mcp_call_dispatch.py
Normal file
@@ -0,0 +1,475 @@
|
||||
"""Contract tests for ``tools/call`` dispatch in the damascus-mcp server.
|
||||
|
||||
These tests cover the full MCP protocol path — they construct a real
|
||||
``CallToolRequest`` and invoke ``mcp.request_handlers[CallToolRequest]``
|
||||
exactly the way the SDK's stdio handler does in production. This
|
||||
guarantees the handler is registered, receives a properly shaped
|
||||
request, and returns a properly shaped ``CallToolResult``.
|
||||
|
||||
The companion file ``test_mcp_roundtrip.py`` exercises
|
||||
``mcp_server.call_tool()`` directly, which goes through ``_dispatch``
|
||||
without the SDK's request layer. That was sufficient while the
|
||||
``@mcp.call_tool()`` decorator registered the handler, but it left a
|
||||
gap: the SDK's caching + input-validation pipeline was never tested.
|
||||
This file fills that gap.
|
||||
|
||||
Acceptance criteria covered here (from the kanban task body):
|
||||
|
||||
* ``tools/call`` for ``list_items`` with
|
||||
``{"project": "damascus-orchestrator", "limit": 1}`` returns a
|
||||
non-empty ``result.content`` array containing the JSON dump of
|
||||
``GET /v1/items?...``.
|
||||
* ``tools/call`` for ``system_status`` returns the same shape as
|
||||
``GET /v1/stats``.
|
||||
* ``tools/call`` for an unknown tool returns a JSON-RPC error
|
||||
response (not a silent drop).
|
||||
* ``tools/call`` with invalid arguments (e.g. ``priority_min=-1``
|
||||
for ``list_items``) returns a validation error.
|
||||
* ``tools/list`` still works and reports all 7 tools (regression).
|
||||
* The stdio recipe end-to-end: spawn server, send
|
||||
initialize/initialized/tools-call, assert valid response.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from damascus.api_schemas import (
|
||||
ListItemsResponse,
|
||||
McpListItemsArgs,
|
||||
StatsResponse,
|
||||
)
|
||||
|
||||
|
||||
# --- helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _sample_work_item(**overrides: Any) -> dict[str, Any]:
|
||||
base = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"project": "damascus-orchestrator",
|
||||
"story_id": "dispatch-1",
|
||||
"title": "Dispatch smoke",
|
||||
"phase": "spec",
|
||||
"file_scope": ["src/damascus/mcp_server.py"],
|
||||
"attempts": 0,
|
||||
"budget_cycles": 3,
|
||||
"priority": 100,
|
||||
"base_commit": None,
|
||||
"branch": None,
|
||||
"pr_url": None,
|
||||
"last_verdict": None,
|
||||
"last_feedback": None,
|
||||
"spec_path": None,
|
||||
"wiki_pin": None,
|
||||
"claimed_by": None,
|
||||
"claimed_at": None,
|
||||
"created_at": "2026-06-26T00:00:00",
|
||||
"updated_at": "2026-06-26T00:00:00",
|
||||
"merged_at": None,
|
||||
}
|
||||
base.update(overrides)
|
||||
return base
|
||||
|
||||
|
||||
def _stats_payload() -> dict[str, Any]:
|
||||
return {
|
||||
"phase_counts": {
|
||||
"spec": 0, "build": 0, "review": 0,
|
||||
"merged": 0, "blocked": 0, "awaiting_human": 0,
|
||||
},
|
||||
"open_human_issues": 0,
|
||||
"active_claims": 0,
|
||||
"last_cycle_at": None,
|
||||
"cost_today_usd": "0.000000",
|
||||
}
|
||||
|
||||
|
||||
class _Recorder:
|
||||
"""httpx MockTransport that captures calls and returns a canned payload."""
|
||||
|
||||
def __init__(self, response_payload: Any, status_code: int = 200) -> None:
|
||||
self.response_payload = response_payload
|
||||
self.status_code = status_code
|
||||
self.calls: list[httpx.Request] = []
|
||||
|
||||
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
|
||||
self.calls.append(request)
|
||||
return httpx.Response(
|
||||
self.status_code,
|
||||
json=self.response_payload,
|
||||
headers={"content-type": "application/json"},
|
||||
)
|
||||
|
||||
async def aclose(self) -> None:
|
||||
return None
|
||||
|
||||
|
||||
def _build_call_request(
|
||||
name: str,
|
||||
arguments: dict[str, Any] | None = None,
|
||||
) -> Any:
|
||||
"""Construct a properly-shaped CallToolRequest (as the SDK would)."""
|
||||
from mcp.types import CallToolRequest, CallToolRequestParams
|
||||
|
||||
return CallToolRequest(
|
||||
method="tools/call",
|
||||
params=CallToolRequestParams(name=name, arguments=arguments or {}),
|
||||
)
|
||||
|
||||
|
||||
# --- fixtures ----------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_token(monkeypatch: pytest.MonkeyPatch) -> str:
|
||||
token = "DAMAS" + "X" * 27 + "N"
|
||||
monkeypatch.setenv("DAMASCUS_API_TOKEN", token)
|
||||
return token
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_base(monkeypatch: pytest.MonkeyPatch) -> str:
|
||||
base = "http://damascus-api.test:9110"
|
||||
monkeypatch.setenv("DAMASCUS_API_BASE", base)
|
||||
return base
|
||||
|
||||
|
||||
def _make_client(api_base: str, api_token: str, transport: Any) -> httpx.AsyncClient:
|
||||
return httpx.AsyncClient(
|
||||
base_url=api_base,
|
||||
headers={"Authorization": f"Bearer {api_token}"},
|
||||
transport=transport,
|
||||
)
|
||||
|
||||
|
||||
# --- structural: the handler is registered at the SDK level ------------------
|
||||
|
||||
|
||||
def test_call_tool_handler_is_registered() -> None:
|
||||
"""``mcp.request_handlers[CallToolRequest]`` must be present.
|
||||
|
||||
This is the explicit acceptance criterion the task body calls out:
|
||||
the handler must be bound to the SDK's dispatch table, not just
|
||||
reachable via the ``@mcp.call_tool()`` decorator. (The decorator
|
||||
does the same thing internally, but mirroring the list-tools
|
||||
pattern makes the wiring explicit and easier to reason about.)
|
||||
"""
|
||||
from damascus import mcp_server
|
||||
|
||||
handler = mcp_server.mcp.request_handlers.get(mcp_server.CallToolRequest)
|
||||
assert handler is not None, (
|
||||
"CallToolRequest handler is not registered — "
|
||||
"tools/call requests will be silently dropped by the SDK"
|
||||
)
|
||||
assert asyncio.iscoroutinefunction(handler), (
|
||||
"CallToolRequest handler must be a coroutine function (async def)"
|
||||
)
|
||||
|
||||
|
||||
# --- success path: dispatch returns the upstream JSON ------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_list_items_dispatches_and_returns_json(
|
||||
api_token: str, api_base: str,
|
||||
) -> None:
|
||||
"""``tools/call list_items {project, limit: 1}`` returns the
|
||||
``GET /v1/items`` response payload as JSON text content.
|
||||
"""
|
||||
item = _sample_work_item()
|
||||
payload = {"items": [item], "total": 1, "limit": 1, "offset": 0}
|
||||
ListItemsResponse.model_validate(payload)
|
||||
|
||||
recorder = _Recorder(payload)
|
||||
from damascus import mcp_server
|
||||
|
||||
mcp_server._client = _make_client(api_base, api_token, recorder)
|
||||
try:
|
||||
result = await mcp_server.mcp.request_handlers[mcp_server.CallToolRequest](
|
||||
_build_call_request(
|
||||
"list_items",
|
||||
{"project": "damascus-orchestrator", "limit": 1},
|
||||
),
|
||||
)
|
||||
finally:
|
||||
await mcp_server._client.aclose()
|
||||
|
||||
# Exactly one HTTP call to GET /v1/items with the right query.
|
||||
assert len(recorder.calls) == 1
|
||||
call = recorder.calls[0]
|
||||
assert call.method == "GET"
|
||||
assert call.url.path == "/v1/items"
|
||||
assert call.url.params["project"] == "damascus-orchestrator"
|
||||
assert call.url.params["limit"] == "1"
|
||||
|
||||
# Unwrap ServerResult → CallToolResult.
|
||||
ctr = result.root
|
||||
assert ctr.isError is False, f"unexpected error result: {ctr}"
|
||||
assert len(ctr.content) >= 1
|
||||
text_block = ctr.content[0]
|
||||
assert text_block.type == "text"
|
||||
parsed = json.loads(text_block.text)
|
||||
assert parsed["total"] == 1
|
||||
assert parsed["items"][0]["project"] == "damascus-orchestrator"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_system_status_returns_stats_shape(
|
||||
api_token: str, api_base: str,
|
||||
) -> None:
|
||||
"""``tools/call system_status`` returns the ``GET /v1/stats`` payload."""
|
||||
payload = _stats_payload()
|
||||
StatsResponse.model_validate(payload)
|
||||
|
||||
recorder = _Recorder(payload)
|
||||
from damascus import mcp_server
|
||||
|
||||
mcp_server._client = _make_client(api_base, api_token, recorder)
|
||||
try:
|
||||
result = await mcp_server.mcp.request_handlers[mcp_server.CallToolRequest](
|
||||
_build_call_request("system_status", {}),
|
||||
)
|
||||
finally:
|
||||
await mcp_server._client.aclose()
|
||||
|
||||
assert len(recorder.calls) == 1
|
||||
call = recorder.calls[0]
|
||||
assert call.method == "GET"
|
||||
assert call.url.path == "/v1/stats"
|
||||
|
||||
ctr = result.root
|
||||
assert ctr.isError is False
|
||||
parsed = json.loads(ctr.content[0].text)
|
||||
# Shape parity with /v1/stats — keys present, types match
|
||||
assert parsed["open_human_issues"] == 0
|
||||
assert "phase_counts" in parsed
|
||||
assert "cost_today_usd" in parsed
|
||||
|
||||
|
||||
# --- error paths -------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_unknown_tool_returns_error_result(
|
||||
api_token: str, api_base: str,
|
||||
) -> None:
|
||||
"""An unknown tool name must produce a ``CallToolResult`` with
|
||||
``isError=True``, not a silent drop.
|
||||
|
||||
The dispatch raises ``ValueError`` on an unknown name; the SDK's
|
||||
handler catches that exception and returns an error ``CallToolResult``
|
||||
with ``isError=True``.
|
||||
"""
|
||||
from damascus import mcp_server
|
||||
|
||||
# No HTTP client needed — dispatch raises before touching upstream.
|
||||
result = await mcp_server.mcp.request_handlers[mcp_server.CallToolRequest](
|
||||
_build_call_request("no_such_tool", {}),
|
||||
)
|
||||
ctr = result.root
|
||||
assert ctr.isError is True, (
|
||||
"unknown tool must produce isError=True so clients see the failure"
|
||||
)
|
||||
assert len(ctr.content) >= 1
|
||||
text = ctr.content[0].text
|
||||
assert "no_such_tool" in text, (
|
||||
f"error message should mention the bad tool name; got {text!r}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_invalid_args_returns_validation_error(
|
||||
api_token: str, api_base: str,
|
||||
) -> None:
|
||||
"""``priority_min=-1`` violates ``McpListItemsArgs.priority_min >= 0``.
|
||||
|
||||
The Mcp*Args model validates before any HTTP call; a violation
|
||||
must surface as a ``CallToolResult`` with ``isError=True``.
|
||||
"""
|
||||
from damascus import mcp_server
|
||||
|
||||
result = await mcp_server.mcp.request_handlers[mcp_server.CallToolRequest](
|
||||
_build_call_request(
|
||||
"list_items",
|
||||
{"project": "damascus-orchestrator", "priority_min": -1},
|
||||
),
|
||||
)
|
||||
ctr = result.root
|
||||
assert ctr.isError is True
|
||||
text = ctr.content[0].text
|
||||
# Pydantic v2's error format — assert the field name is surfaced
|
||||
assert "priority_min" in text, (
|
||||
f"validation error should name the bad field; got {text!r}"
|
||||
)
|
||||
# And McpListItemsArgs is the validator that raised
|
||||
assert "McpListItemsArgs" in text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_priority_bounds_invariant_violated(
|
||||
api_token: str, api_base: str,
|
||||
) -> None:
|
||||
"""``priority_max < priority_min`` violates the cross-field invariant
|
||||
in :class:`McpListItemsArgs` (``_priority_bounds`` model_validator).
|
||||
"""
|
||||
from damascus import mcp_server
|
||||
|
||||
result = await mcp_server.mcp.request_handlers[mcp_server.CallToolRequest](
|
||||
_build_call_request(
|
||||
"list_items",
|
||||
{"project": "damascus-orchestrator",
|
||||
"priority_min": 100, "priority_max": 50},
|
||||
),
|
||||
)
|
||||
ctr = result.root
|
||||
assert ctr.isError is True
|
||||
text = ctr.content[0].text
|
||||
assert "priority_max" in text and "priority_min" in text
|
||||
|
||||
|
||||
# --- regression: list-tools still works -------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_tools_still_reports_seven_tools(api_base: str) -> None:
|
||||
"""Regression: tools/list must keep returning all 7 tools."""
|
||||
from damascus import mcp_server
|
||||
|
||||
# First a tools/call so the SDK refreshes its cache (proves the
|
||||
# wiring works end-to-end without depending on cache state).
|
||||
recorder = _Recorder(_stats_payload())
|
||||
mcp_server._client = _make_client(api_base, "dummy", recorder)
|
||||
try:
|
||||
await mcp_server.mcp.request_handlers[mcp_server.CallToolRequest](
|
||||
_build_call_request("system_status", {}),
|
||||
)
|
||||
finally:
|
||||
await mcp_server._client.aclose()
|
||||
|
||||
# Then a tools/list request via the SDK handler.
|
||||
list_result = await mcp_server.mcp.request_handlers[
|
||||
mcp_server.ListToolsRequest
|
||||
](None)
|
||||
tools = list_result.root.tools
|
||||
names = sorted(t.name for t in tools)
|
||||
assert names == sorted([
|
||||
"list_items",
|
||||
"get_item",
|
||||
"list_open_questions",
|
||||
"answer_question",
|
||||
"ingest_story",
|
||||
"bulk_ingest",
|
||||
"system_status",
|
||||
]), f"unexpected tool list: {names}"
|
||||
|
||||
|
||||
def test_list_items_input_schema_matches_args_model() -> None:
|
||||
"""Regression: inputSchema for list_items matches
|
||||
``McpListItemsArgs.model_json_schema()`` — drift is the primary
|
||||
contract risk (wiki/concepts/entry-points-contract.md §5)."""
|
||||
from damascus import mcp_server
|
||||
|
||||
tools = {t.name: t for t in mcp_server.mcp.list_tools()}
|
||||
actual = tools["list_items"].inputSchema
|
||||
expected = McpListItemsArgs.model_json_schema()
|
||||
assert actual == expected, (
|
||||
f"inputSchema drift for list_items:\n"
|
||||
f" registered: {json.dumps(actual, sort_keys=True)[:300]}\n"
|
||||
f" expected: {json.dumps(expected, sort_keys=True)[:300]}"
|
||||
)
|
||||
|
||||
|
||||
# --- end-to-end stdio smoke --------------------------------------------------
|
||||
|
||||
|
||||
async def _stdio_round_trip() -> dict[str, Any]:
|
||||
"""Spawn ``damascus mcp-serve`` over stdio, run the full MCP
|
||||
handshake, call ``system_status``, return the response.
|
||||
|
||||
The upstream URL points to ``example.test`` so the HTTP call will
|
||||
fail with a connection error — that proves the dispatch IS firing
|
||||
(the error is from the HTTP layer, not a silent drop).
|
||||
"""
|
||||
env = os.environ.copy()
|
||||
env["DAMASCUS_API_BASE"] = "http://example.test:9999"
|
||||
env["DAMASCUS_API_TOKEN"] = "DAMAS" + "X" * 27 + "N"
|
||||
env["PYTHONUNBUFFERED"] = "1"
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"damascus", "mcp-serve",
|
||||
cwd=str(Path.cwd()),
|
||||
env=env,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
async def send(req: dict[str, Any]) -> None:
|
||||
line = json.dumps(req) + "\n"
|
||||
assert proc.stdin is not None
|
||||
proc.stdin.write(line.encode())
|
||||
await proc.stdin.drain()
|
||||
|
||||
async def recv(timeout: float = 8.0) -> dict[str, Any]:
|
||||
assert proc.stdout is not None
|
||||
line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
|
||||
return json.loads(line.decode())
|
||||
|
||||
try:
|
||||
await send({
|
||||
"jsonrpc": "2.0", "id": 1, "method": "initialize",
|
||||
"params": {
|
||||
"protocolVersion": "2024-11-05",
|
||||
"capabilities": {},
|
||||
"clientInfo": {"name": "dispatch-test", "version": "0"},
|
||||
},
|
||||
})
|
||||
await recv(timeout=5.0)
|
||||
await send({"jsonrpc": "2.0", "method": "notifications/initialized"})
|
||||
await send({
|
||||
"jsonrpc": "2.0", "id": 3, "method": "tools/call",
|
||||
"params": {"name": "system_status", "arguments": {}},
|
||||
})
|
||||
return await recv(timeout=10.0)
|
||||
finally:
|
||||
try:
|
||||
assert proc.stdin is not None
|
||||
proc.stdin.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await asyncio.wait_for(proc.wait(), timeout=5)
|
||||
except asyncio.TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_end_to_end_dispatch() -> None:
|
||||
"""End-to-end: stdio transport → initialize → tools/call → response.
|
||||
|
||||
Asserts the JSON-RPC envelope is well-formed and the response
|
||||
contains a ``result`` (not a protocol-level error). The upstream
|
||||
HTTP error (example.test) is fine — it surfaces as a ``CallToolResult``
|
||||
with ``isError=True``, which proves dispatch fired.
|
||||
"""
|
||||
response = await _stdio_round_trip()
|
||||
assert response.get("jsonrpc") == "2.0"
|
||||
assert response.get("id") == 3
|
||||
# Must be a successful JSON-RPC response (result, not error at the
|
||||
# protocol level). The result content may carry isError=True from
|
||||
# the upstream HTTP failure — that's fine, dispatch happened.
|
||||
assert "result" in response, (
|
||||
f"tools/call got a protocol error or silent drop: {response}"
|
||||
)
|
||||
inner = response["result"]
|
||||
assert "content" in inner and len(inner["content"]) >= 1
|
||||
assert inner["content"][0].get("type") == "text"
|
||||
Reference in New Issue
Block a user