From ed89a6ae59348330ae0ffd492b3dfca392a3db8d Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 20 Apr 2026 14:06:21 +0530 Subject: [PATCH] fix: auto-send notifications/cancelled on anyio task cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #1410. Users of high-level APIs like client.call_tool() have no access to the internal equest_id, making it impossible to manually send a cancel notification when a task is aborted. The server would keep processing a request that nobody is waiting for. This fix intercepts nyio task/scope cancellation inside send_request and automatically dispatches otifications/cancelled to the peer before re-raising. The send is shielded from the cancellation scope and guarded by a 2-second timeout to prevent deadlock if the transport buffer is full. A equest_sent flag ensures we only notify the server if the request actually hit the wire — avoiding spurious cancels for requests that were never received. This mirrors the pattern used by the TypeScript SDK with AbortSignal and applies to both client→server and server→client request directions since the change lives in BaseSession. --- src/mcp/shared/session.py | 23 +++++++++++ tests/server/test_cancel_handling.py | 61 ++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 243eef5ae..cbd1265c1 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -23,6 +23,7 @@ INVALID_PARAMS, REQUEST_TIMEOUT, CancelledNotification, + CancelledNotificationParams, ClientNotification, ClientRequest, ClientResult, @@ -269,6 +270,7 @@ async def send_request( # Store the callback for this request self._progress_callbacks[request_id] = progress_callback + request_sent = False try: target = request_data.get("params", {}).get("name") span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}" @@ -284,6 +286,7 @@ async def send_request( jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data) await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata)) + request_sent = True # request read timeout takes precedence over session read timeout timeout = request_read_timeout_seconds or self._session_read_timeout_seconds @@ -301,6 +304,26 @@ async def send_request( else: return result_type.model_validate(response_or_error.result, by_name=False) + except anyio.get_cancelled_exc_class(): + # Automatically notify the other side when a task/scope is cancelled, + # so the peer can abort work for a request nobody is waiting for. + if request_sent: + with anyio.CancelScope(shield=True): + try: + # Add a short timeout to prevent deadlock if the transport buffer is full + with anyio.move_on_after(2.0): + await self.send_notification( + CancelledNotification( # type: ignore[arg-type] + params=CancelledNotificationParams( + request_id=request_id, + reason="Task cancelled", + ) + ) + ) + except Exception: + pass # Transport may already be closed + raise + finally: self._response_streams.pop(request_id, None) self._progress_callbacks.pop(request_id, None) diff --git a/tests/server/test_cancel_handling.py b/tests/server/test_cancel_handling.py index cff5a37c1..4b4584f0d 100644 --- a/tests/server/test_cancel_handling.py +++ b/tests/server/test_cancel_handling.py @@ -248,3 +248,64 @@ async def run_server(): # Without the fixes: RuntimeError (dict mutation) or ClosedResourceError # (respond after write-stream close) escapes run_server and this hangs. await server_run_returned.wait() + + +@pytest.mark.anyio +async def test_anyio_cancel_scope_sends_cancelled_notification() -> None: + """Cancelling a call_tool via anyio cancel scope should automatically + send notifications/cancelled to the server, causing it to abort the handler.""" + + tool_started = anyio.Event() + handler_cancelled = anyio.Event() + + async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult: + return ListToolsResult( + tools=[ + Tool( + name="slow_tool", + description="A slow tool for testing cancellation", + input_schema={}, + ) + ] + ) + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + if params.name == "slow_tool": + tool_started.set() + try: + await anyio.sleep_forever() + except anyio.get_cancelled_exc_class(): + handler_cancelled.set() + raise + raise ValueError(f"Unknown tool: {params.name}") # pragma: no cover + + server = Server( + "test-server", + on_list_tools=handle_list_tools, + on_call_tool=handle_call_tool, + ) + + async with Client(server) as client: + # Cancel the call_tool via anyio scope cancellation. + # send_request should automatically send notifications/cancelled. + async with anyio.create_task_group() as tg: + + async def do_call() -> None: + with anyio.CancelScope() as scope: + # Store scope so the outer task can cancel it + do_call.scope = scope # type: ignore[attr-defined] + await client.call_tool("slow_tool", {}) + + tg.start_soon(do_call) + + # Wait for the server handler to start + await tool_started.wait() + + # Cancel the client-side scope — this should trigger auto-notification + do_call.scope.cancel() # type: ignore[attr-defined] + + # Give the server a moment to process the cancellation + await anyio.sleep(0.1) + + # The server handler should have been cancelled via the notification + assert handler_cancelled.is_set(), "Server handler was not cancelled — notifications/cancelled was not sent"