test_mcp_elicitation.py
python
| 1 | """Tests for MCP 2025-11-25 Elicitation: ToolCallContext, session, and tools. |
| 2 | |
| 3 | Covers: |
| 4 | ToolCallContext: |
| 5 | - elicit_form: accept, decline, cancel, timeout, no-session fallback |
| 6 | - elicit_url: accept, decline, no-session fallback |
| 7 | - progress: session push, no-session no-op |
| 8 | |
| 9 | Session elicitation helpers: |
| 10 | - create_pending_elicitation, resolve_elicitation, cancel_elicitation |
| 11 | |
| 12 | Elicitation schemas: |
| 13 | - SCHEMAS contains all expected keys |
| 14 | - build_form_elicitation returns correct mode/requestedSchema |
| 15 | - build_url_elicitation returns correct mode/url/elicitationId |
| 16 | |
| 17 | Tool routing (unit): |
| 18 | - musehub_compose_with_preferences: no session → elicitation_unavailable |
| 19 | - musehub_review_pr_interactive: no session → elicitation_unavailable |
| 20 | - musehub_connect_streaming_platform: no session → elicitation_unavailable |
| 21 | - musehub_connect_daw_cloud: no session → elicitation_unavailable |
| 22 | - musehub_create_release_interactive: no session → elicitation_unavailable |
| 23 | |
| 24 | New prompts: |
| 25 | - musehub/onboard assembles correctly |
| 26 | - musehub/release_to_world assembles correctly with repo_id interpolation |
| 27 | |
| 28 | SSE formatting: |
| 29 | - sse_event produces correct format |
| 30 | - sse_notification produces correct JSON-RPC notification |
| 31 | - sse_request produces correct JSON-RPC request with id |
| 32 | - sse_response produces correct JSON-RPC response |
| 33 | """ |
| 34 | from __future__ import annotations |
| 35 | |
| 36 | import asyncio |
| 37 | import json |
| 38 | from unittest.mock import AsyncMock, MagicMock, patch |
| 39 | |
| 40 | import pytest |
| 41 | |
| 42 | from musehub.mcp.context import ToolCallContext |
| 43 | from musehub.mcp.elicitation import ( |
| 44 | SCHEMAS, |
| 45 | build_form_elicitation, |
| 46 | build_url_elicitation, |
| 47 | oauth_connect_url, |
| 48 | daw_cloud_connect_url, |
| 49 | ) |
| 50 | from musehub.mcp.prompts import PROMPT_CATALOGUE, get_prompt |
| 51 | from musehub.mcp.session import ( |
| 52 | MCPSession, |
| 53 | create_session, |
| 54 | create_pending_elicitation, |
| 55 | delete_session, |
| 56 | resolve_elicitation, |
| 57 | cancel_elicitation, |
| 58 | push_to_session, |
| 59 | ) |
| 60 | from musehub.mcp.sse import ( |
| 61 | sse_event, |
| 62 | sse_notification, |
| 63 | sse_request, |
| 64 | sse_response, |
| 65 | ) |
| 66 | |
| 67 | |
| 68 | # ── SSE formatting ──────────────────────────────────────────────────────────── |
| 69 | |
| 70 | |
| 71 | def test_sse_event_basic_format() -> None: |
| 72 | """sse_event should produce 'data: <json>\\n\\n'.""" |
| 73 | result = sse_event({"jsonrpc": "2.0", "method": "ping"}) |
| 74 | assert result.startswith("data:") |
| 75 | assert result.endswith("\n\n") |
| 76 | # Extract data line and parse JSON |
| 77 | data_line = [l for l in result.split("\n") if l.startswith("data:")][0] |
| 78 | payload = json.loads(data_line[len("data: "):]) |
| 79 | assert payload["method"] == "ping" |
| 80 | |
| 81 | |
| 82 | def test_sse_event_with_id_and_type() -> None: |
| 83 | """sse_event with event_id and event_type should include id: and event: lines.""" |
| 84 | result = sse_event({"a": 1}, event_id="42", event_type="notification") |
| 85 | assert "id: 42\n" in result |
| 86 | assert "event: notification\n" in result |
| 87 | |
| 88 | |
| 89 | def test_sse_notification_format() -> None: |
| 90 | """sse_notification should produce a valid JSON-RPC 2.0 notification.""" |
| 91 | result = sse_notification("notifications/progress", {"progress": 50}) |
| 92 | data_line = [l for l in result.split("\n") if l.startswith("data:")][0] |
| 93 | payload = json.loads(data_line[len("data: "):]) |
| 94 | assert payload["jsonrpc"] == "2.0" |
| 95 | assert payload["method"] == "notifications/progress" |
| 96 | assert payload["params"]["progress"] == 50 |
| 97 | assert "id" not in payload # notifications have no id |
| 98 | |
| 99 | |
| 100 | def test_sse_request_format() -> None: |
| 101 | """sse_request should produce a valid JSON-RPC 2.0 request with id.""" |
| 102 | result = sse_request("elicit-1", "elicitation/create", {"mode": "form"}) |
| 103 | data_line = [l for l in result.split("\n") if l.startswith("data:")][0] |
| 104 | payload = json.loads(data_line[len("data: "):]) |
| 105 | assert payload["jsonrpc"] == "2.0" |
| 106 | assert payload["id"] == "elicit-1" |
| 107 | assert payload["method"] == "elicitation/create" |
| 108 | assert payload["params"]["mode"] == "form" |
| 109 | |
| 110 | |
| 111 | def test_sse_response_format() -> None: |
| 112 | """sse_response should produce a valid JSON-RPC 2.0 success response.""" |
| 113 | result = sse_response(42, {"content": [{"type": "text", "text": "ok"}]}) |
| 114 | data_line = [l for l in result.split("\n") if l.startswith("data:")][0] |
| 115 | payload = json.loads(data_line[len("data: "):]) |
| 116 | assert payload["jsonrpc"] == "2.0" |
| 117 | assert payload["id"] == 42 |
| 118 | assert "result" in payload |
| 119 | assert "error" not in payload |
| 120 | |
| 121 | |
| 122 | # ── Elicitation schemas ─────────────────────────────────────────────────────── |
| 123 | |
| 124 | |
| 125 | def test_schemas_has_all_expected_keys() -> None: |
| 126 | """SCHEMAS must contain all 5 musical elicitation schemas.""" |
| 127 | expected = { |
| 128 | "compose_preferences", |
| 129 | "repo_creation", |
| 130 | "pr_review_focus", |
| 131 | "release_metadata", |
| 132 | "platform_connect_confirm", |
| 133 | } |
| 134 | assert expected == set(SCHEMAS.keys()) |
| 135 | |
| 136 | |
| 137 | def test_compose_preferences_schema_required_fields() -> None: |
| 138 | """compose_preferences schema must declare correct required fields.""" |
| 139 | schema = SCHEMAS["compose_preferences"] |
| 140 | assert schema["type"] == "object" |
| 141 | required = schema["required"] |
| 142 | assert "key" in required |
| 143 | assert "tempo_bpm" in required |
| 144 | assert "mood" in required |
| 145 | assert "genre" in required |
| 146 | |
| 147 | |
| 148 | def test_build_form_elicitation() -> None: |
| 149 | """build_form_elicitation should return correct mode and requestedSchema.""" |
| 150 | params = build_form_elicitation("compose_preferences", "Pick your vibe") |
| 151 | assert params["mode"] == "form" |
| 152 | assert params["message"] == "Pick your vibe" |
| 153 | assert "requestedSchema" in params |
| 154 | assert params["requestedSchema"] is SCHEMAS["compose_preferences"] |
| 155 | |
| 156 | |
| 157 | def test_build_form_elicitation_unknown_key_raises() -> None: |
| 158 | """build_form_elicitation with unknown key should raise KeyError.""" |
| 159 | with pytest.raises(KeyError): |
| 160 | build_form_elicitation("nonexistent_schema", "message") |
| 161 | |
| 162 | |
| 163 | def test_build_url_elicitation() -> None: |
| 164 | """build_url_elicitation should return correct mode, url, and elicitationId.""" |
| 165 | params, eid = build_url_elicitation("https://example.com/oauth", "Connect Spotify") |
| 166 | assert params["mode"] == "url" |
| 167 | assert params["url"] == "https://example.com/oauth" |
| 168 | assert params["message"] == "Connect Spotify" |
| 169 | assert params["elicitationId"] == eid |
| 170 | assert len(eid) > 10 |
| 171 | |
| 172 | |
| 173 | def test_build_url_elicitation_stable_id() -> None: |
| 174 | """build_url_elicitation should use provided elicitation_id.""" |
| 175 | params, eid = build_url_elicitation( |
| 176 | "https://example.com/oauth", "msg", elicitation_id="my-stable-id" |
| 177 | ) |
| 178 | assert eid == "my-stable-id" |
| 179 | assert params["elicitationId"] == "my-stable-id" |
| 180 | |
| 181 | |
| 182 | def test_oauth_connect_url_format() -> None: |
| 183 | """oauth_connect_url should produce correct platform-specific MuseHub URL.""" |
| 184 | url = oauth_connect_url("Spotify", "abc123", base_url="https://musehub.app") |
| 185 | assert "spotify" in url |
| 186 | assert "elicitation_id=abc123" in url |
| 187 | assert url.startswith("https://musehub.app") |
| 188 | |
| 189 | |
| 190 | def test_daw_cloud_connect_url_format() -> None: |
| 191 | """daw_cloud_connect_url should produce correct service-specific URL.""" |
| 192 | url = daw_cloud_connect_url("LANDR", "xyz789", base_url="https://musehub.app") |
| 193 | assert "landr" in url |
| 194 | assert "elicitation_id=xyz789" in url |
| 195 | |
| 196 | |
| 197 | # ── ToolCallContext — elicit_form ───────────────────────────────────────────── |
| 198 | |
| 199 | |
| 200 | @pytest.mark.anyio |
| 201 | async def test_elicit_form_no_session_returns_none() -> None: |
| 202 | """elicit_form without an active session should return None.""" |
| 203 | ctx = ToolCallContext(user_id=None, session=None) |
| 204 | result = await ctx.elicit_form(SCHEMAS["compose_preferences"], "msg") |
| 205 | assert result is None |
| 206 | |
| 207 | |
| 208 | @pytest.mark.anyio |
| 209 | async def test_elicit_form_accepted_returns_content() -> None: |
| 210 | """elicit_form should return content dict when user accepts.""" |
| 211 | session = create_session("user-1", {"elicitation": {"form": {}}}) |
| 212 | ctx = ToolCallContext(user_id="user-1", session=session) |
| 213 | |
| 214 | # Pre-resolve the Future before the elicit_form call awaits it. |
| 215 | content = {"key": "C major", "tempo_bpm": 120, "mood": "peaceful", "genre": "ambient"} |
| 216 | |
| 217 | async def _resolve_after_push() -> None: |
| 218 | await asyncio.sleep(0) # yield to let push happen |
| 219 | for req_id, fut in list(session.pending.items()): |
| 220 | resolve_elicitation(session, req_id, {"action": "accept", "content": content}) |
| 221 | |
| 222 | task = asyncio.create_task(_resolve_after_push()) |
| 223 | result = await ctx.elicit_form(SCHEMAS["compose_preferences"], "Pick your vibe") |
| 224 | await task |
| 225 | |
| 226 | assert result == content |
| 227 | delete_session(session.session_id) |
| 228 | |
| 229 | |
| 230 | @pytest.mark.anyio |
| 231 | async def test_elicit_form_declined_returns_none() -> None: |
| 232 | """elicit_form should return None when user declines.""" |
| 233 | session = create_session("user-1", {"elicitation": {"form": {}}}) |
| 234 | ctx = ToolCallContext(user_id="user-1", session=session) |
| 235 | |
| 236 | async def _decline_after_push() -> None: |
| 237 | await asyncio.sleep(0) |
| 238 | for req_id in list(session.pending.keys()): |
| 239 | resolve_elicitation(session, req_id, {"action": "decline"}) |
| 240 | |
| 241 | task = asyncio.create_task(_decline_after_push()) |
| 242 | result = await ctx.elicit_form(SCHEMAS["compose_preferences"], "Pick your vibe") |
| 243 | await task |
| 244 | |
| 245 | assert result is None |
| 246 | delete_session(session.session_id) |
| 247 | |
| 248 | |
| 249 | @pytest.mark.anyio |
| 250 | async def test_elicit_form_no_form_capability_returns_none() -> None: |
| 251 | """elicit_form should return None if client didn't declare form support.""" |
| 252 | session = create_session("user-1", {"elicitation": {"url": {}}}) # url only, no form |
| 253 | ctx = ToolCallContext(user_id="user-1", session=session) |
| 254 | result = await ctx.elicit_form(SCHEMAS["compose_preferences"], "msg") |
| 255 | assert result is None |
| 256 | delete_session(session.session_id) |
| 257 | |
| 258 | |
| 259 | # ── ToolCallContext — elicit_url ────────────────────────────────────────────── |
| 260 | |
| 261 | |
| 262 | @pytest.mark.anyio |
| 263 | async def test_elicit_url_no_session_returns_false() -> None: |
| 264 | """elicit_url without an active session should return False.""" |
| 265 | ctx = ToolCallContext(user_id=None, session=None) |
| 266 | result = await ctx.elicit_url("https://example.com/oauth", "msg") |
| 267 | assert result is False |
| 268 | |
| 269 | |
| 270 | @pytest.mark.anyio |
| 271 | async def test_elicit_url_accepted_returns_true() -> None: |
| 272 | """elicit_url should return True when user accepts the URL flow.""" |
| 273 | session = create_session("user-1", {"elicitation": {"form": {}, "url": {}}}) |
| 274 | ctx = ToolCallContext(user_id="user-1", session=session) |
| 275 | |
| 276 | async def _accept_after_push() -> None: |
| 277 | await asyncio.sleep(0) |
| 278 | for req_id in list(session.pending.keys()): |
| 279 | resolve_elicitation(session, req_id, {"action": "accept"}) |
| 280 | |
| 281 | task = asyncio.create_task(_accept_after_push()) |
| 282 | result = await ctx.elicit_url("https://example.com/oauth", "msg") |
| 283 | await task |
| 284 | |
| 285 | assert result is True |
| 286 | delete_session(session.session_id) |
| 287 | |
| 288 | |
| 289 | # ── ToolCallContext — progress ──────────────────────────────────────────────── |
| 290 | |
| 291 | |
| 292 | @pytest.mark.anyio |
| 293 | async def test_progress_no_session_is_noop() -> None: |
| 294 | """progress without an active session should not raise.""" |
| 295 | ctx = ToolCallContext(user_id=None, session=None) |
| 296 | await ctx.progress("token", 1, 10, "working…") # must not raise |
| 297 | |
| 298 | |
| 299 | @pytest.mark.anyio |
| 300 | async def test_progress_with_session_pushes_sse_event() -> None: |
| 301 | """progress with an active session should push a notifications/progress SSE event.""" |
| 302 | session = create_session("user-1", {}) |
| 303 | queue: asyncio.Queue[str | None] = asyncio.Queue() |
| 304 | session.sse_queues.append(queue) |
| 305 | |
| 306 | ctx = ToolCallContext(user_id="user-1", session=session) |
| 307 | await ctx.progress("compose-token", 2, 5, "generating…") |
| 308 | |
| 309 | assert not queue.empty() |
| 310 | event_text = queue.get_nowait() |
| 311 | assert event_text is not None |
| 312 | data_line = [l for l in event_text.split("\n") if l.startswith("data:")][0] |
| 313 | payload = json.loads(data_line[len("data: "):]) |
| 314 | assert payload["method"] == "notifications/progress" |
| 315 | assert payload["params"]["progress"] == 2 |
| 316 | assert payload["params"]["total"] == 5 |
| 317 | |
| 318 | delete_session(session.session_id) |
| 319 | |
| 320 | |
| 321 | # ── Elicitation tool executors — no session graceful degradation ────────────── |
| 322 | |
| 323 | |
| 324 | @pytest.mark.anyio |
| 325 | async def test_compose_with_preferences_no_session() -> None: |
| 326 | """musehub_compose_with_preferences without session must return error.""" |
| 327 | from musehub.mcp.write_tools.elicitation_tools import execute_compose_with_preferences |
| 328 | |
| 329 | ctx = ToolCallContext(user_id=None, session=None) |
| 330 | result = await execute_compose_with_preferences(repo_id=None, ctx=ctx) |
| 331 | assert result.ok is False |
| 332 | assert result.error_code == "elicitation_unavailable" |
| 333 | |
| 334 | |
| 335 | @pytest.mark.anyio |
| 336 | async def test_review_pr_interactive_no_session() -> None: |
| 337 | """musehub_review_pr_interactive without session must return error.""" |
| 338 | from musehub.mcp.write_tools.elicitation_tools import execute_review_pr_interactive |
| 339 | |
| 340 | ctx = ToolCallContext(user_id=None, session=None) |
| 341 | result = await execute_review_pr_interactive("repo-1", "pr-1", ctx=ctx) |
| 342 | assert result.ok is False |
| 343 | assert result.error_code == "elicitation_unavailable" |
| 344 | |
| 345 | |
| 346 | @pytest.mark.anyio |
| 347 | async def test_connect_streaming_platform_no_session() -> None: |
| 348 | """musehub_connect_streaming_platform without session must return error.""" |
| 349 | from musehub.mcp.write_tools.elicitation_tools import execute_connect_streaming_platform |
| 350 | |
| 351 | ctx = ToolCallContext(user_id=None, session=None) |
| 352 | result = await execute_connect_streaming_platform("Spotify", None, ctx=ctx) |
| 353 | assert result.ok is False |
| 354 | assert result.error_code == "elicitation_unavailable" |
| 355 | |
| 356 | |
| 357 | @pytest.mark.anyio |
| 358 | async def test_connect_daw_cloud_no_session() -> None: |
| 359 | """musehub_connect_daw_cloud without session must return error.""" |
| 360 | from musehub.mcp.write_tools.elicitation_tools import execute_connect_daw_cloud |
| 361 | |
| 362 | ctx = ToolCallContext(user_id=None, session=None) |
| 363 | result = await execute_connect_daw_cloud("LANDR", ctx=ctx) |
| 364 | assert result.ok is False |
| 365 | assert result.error_code == "elicitation_unavailable" |
| 366 | |
| 367 | |
| 368 | @pytest.mark.anyio |
| 369 | async def test_create_release_interactive_no_session() -> None: |
| 370 | """musehub_create_release_interactive without session must return error.""" |
| 371 | from musehub.mcp.write_tools.elicitation_tools import execute_create_release_interactive |
| 372 | |
| 373 | ctx = ToolCallContext(user_id=None, session=None) |
| 374 | result = await execute_create_release_interactive("repo-1", ctx=ctx) |
| 375 | assert result.ok is False |
| 376 | assert result.error_code == "elicitation_unavailable" |
| 377 | |
| 378 | |
| 379 | # ── New prompts ─────────────────────────────────────────────────────────────── |
| 380 | |
| 381 | |
| 382 | def test_onboard_prompt_assembles() -> None: |
| 383 | """musehub/onboard should assemble with 2 messages.""" |
| 384 | result = get_prompt("musehub/onboard", {"username": "alice"}) |
| 385 | assert result is not None |
| 386 | assert "messages" in result |
| 387 | assert len(result["messages"]) == 2 |
| 388 | assert result["messages"][0]["role"] == "user" |
| 389 | text = result["messages"][1]["content"]["text"] |
| 390 | assert "alice" in text |
| 391 | assert "elicitation" in text.lower() or "elicit" in text.lower() or "compose" in text.lower() |
| 392 | |
| 393 | |
| 394 | def test_release_to_world_prompt_assembles() -> None: |
| 395 | """musehub/release_to_world should assemble with 2 messages and interpolate repo_id.""" |
| 396 | result = get_prompt("musehub/release_to_world", {"repo_id": "abc-123"}) |
| 397 | assert result is not None |
| 398 | assert "messages" in result |
| 399 | assert len(result["messages"]) == 2 |
| 400 | text = result["messages"][1]["content"]["text"] |
| 401 | assert "abc-123" in text |
| 402 | |
| 403 | |
| 404 | def test_onboard_prompt_in_catalogue() -> None: |
| 405 | """musehub/onboard must be in the prompt catalogue.""" |
| 406 | names = {p["name"] for p in PROMPT_CATALOGUE} |
| 407 | assert "musehub/onboard" in names |
| 408 | |
| 409 | |
| 410 | def test_release_to_world_in_catalogue() -> None: |
| 411 | """musehub/release_to_world must be in the prompt catalogue.""" |
| 412 | names = {p["name"] for p in PROMPT_CATALOGUE} |
| 413 | assert "musehub/release_to_world" in names |
| 414 | |
| 415 | |
| 416 | # ── Dispatcher routing — new notifications (2025-11-25) ────────────────────── |
| 417 | |
| 418 | |
| 419 | @pytest.mark.anyio |
| 420 | async def test_notifications_cancelled_handled() -> None: |
| 421 | """notifications/cancelled should be handled as a notification (return None).""" |
| 422 | from musehub.mcp.dispatcher import handle_request |
| 423 | |
| 424 | resp = await handle_request({ |
| 425 | "jsonrpc": "2.0", |
| 426 | "method": "notifications/cancelled", |
| 427 | "params": {"requestId": "elicit-1", "reason": "user navigated away"}, |
| 428 | }) |
| 429 | assert resp is None # notifications return None |
| 430 | |
| 431 | |
| 432 | @pytest.mark.anyio |
| 433 | async def test_notifications_elicitation_complete_handled() -> None: |
| 434 | """notifications/elicitation/complete should be handled as a notification.""" |
| 435 | from musehub.mcp.dispatcher import handle_request |
| 436 | |
| 437 | resp = await handle_request({ |
| 438 | "jsonrpc": "2.0", |
| 439 | "method": "notifications/elicitation/complete", |
| 440 | "params": {"elicitationId": "abc-xyz"}, |
| 441 | }) |
| 442 | assert resp is None |
| 443 | |
| 444 | |
| 445 | @pytest.mark.anyio |
| 446 | async def test_notifications_cancelled_resolves_future() -> None: |
| 447 | """notifications/cancelled with a session should cancel the pending Future.""" |
| 448 | from musehub.mcp.dispatcher import handle_request |
| 449 | |
| 450 | session = create_session(None, {"elicitation": {"form": {}}}) |
| 451 | fut = create_pending_elicitation(session, "elicit-99") |
| 452 | |
| 453 | await handle_request( |
| 454 | { |
| 455 | "jsonrpc": "2.0", |
| 456 | "method": "notifications/cancelled", |
| 457 | "params": {"requestId": "elicit-99"}, |
| 458 | }, |
| 459 | session=session, |
| 460 | ) |
| 461 | |
| 462 | assert fut.cancelled() |
| 463 | delete_session(session.session_id) |