ui_mcp_elicitation.py
python
| 1 | """MuseHub MCP Elicitation UI — landing pages for URL-mode elicitation flows. |
| 2 | |
| 3 | URL-mode elicitation (MCP 2025-11-25) lets the MCP server direct users to a |
| 4 | URL to complete an out-of-band interaction (OAuth, payment, API key entry). |
| 5 | This router provides the landing pages that receive those redirects. |
| 6 | |
| 7 | Endpoints: |
| 8 | |
| 9 | GET /mcp/connect/{platform}?elicitation_id=... |
| 10 | Landing page for streaming platform OAuth start. Verifies the user's |
| 11 | MuseHub session, then redirects to the platform's OAuth page. |
| 12 | On return, the callback signals elicitation completion to the agent. |
| 13 | |
| 14 | GET /mcp/connect/daw/{service}?elicitation_id=... |
| 15 | Same pattern for cloud DAW / mastering service connections (LANDR, Splice, etc.). |
| 16 | |
| 17 | GET /mcp/elicitation/{elicitation_id}/callback?status=accepted|declined |
| 18 | OAuth redirect target. Resolves the pending elicitation Future in the |
| 19 | active MCP session and pushes a ``notifications/elicitation/complete`` |
| 20 | event to the agent's SSE stream. |
| 21 | |
| 22 | Security: |
| 23 | - All endpoints require a valid MuseHub session cookie (user must be logged in). |
| 24 | - The ``elicitation_id`` is validated against the active session to prevent |
| 25 | cross-session hijacking. |
| 26 | - Only allow-listed platform slugs are accepted (no open redirects). |
| 27 | """ |
| 28 | |
| 29 | import logging |
| 30 | |
| 31 | from fastapi import APIRouter, Query, Request |
| 32 | from fastapi.responses import HTMLResponse, RedirectResponse |
| 33 | from starlette.responses import Response |
| 34 | |
| 35 | from musehub.api.routes.musehub._templates import templates |
| 36 | from musehub.mcp.elicitation import AVAILABLE_PLATFORMS, AVAILABLE_DAW_CLOUDS |
| 37 | |
| 38 | logger = logging.getLogger(__name__) |
| 39 | |
| 40 | router = APIRouter(tags=["MCP Elicitation UI"]) |
| 41 | |
| 42 | # ── Slug → display name lookup ──────────────────────────────────────────────── |
| 43 | |
| 44 | _PLATFORM_BY_SLUG: dict[str, str] = { |
| 45 | p.lower().replace(" ", "-"): p for p in AVAILABLE_PLATFORMS if isinstance(p, str) |
| 46 | } |
| 47 | |
| 48 | _DAW_BY_SLUG: dict[str, str] = { |
| 49 | s.lower().replace(" ", "-"): s for s in AVAILABLE_DAW_CLOUDS if isinstance(s, str) |
| 50 | } |
| 51 | |
| 52 | # Placeholder OAuth URLs — replace with real platform OAuth endpoints in production. |
| 53 | _PLATFORM_OAUTH_URLS: dict[str, str] = { |
| 54 | "Spotify": "https://accounts.spotify.com/authorize?client_id=musehub&scope=user-read-private", |
| 55 | "SoundCloud": "https://soundcloud.com/connect?client_id=musehub&scope=non-expiring", |
| 56 | "Bandcamp": "https://bandcamp.com/api/oauth/authorize", |
| 57 | "YouTube Music": "https://accounts.google.com/o/oauth2/auth?scope=youtube", |
| 58 | "Apple Music": "https://appleid.apple.com/auth/authorize", |
| 59 | "TIDAL": "https://login.tidal.com/oauth/authorize", |
| 60 | "Amazon Music": "https://www.amazon.com/ap/oa", |
| 61 | "Deezer": "https://connect.deezer.com/oauth/auth.php", |
| 62 | } |
| 63 | |
| 64 | _DAW_OAUTH_URLS: dict[str, str] = { |
| 65 | "LANDR": "https://app.landr.com/oauth/authorize", |
| 66 | "Splice": "https://splice.com/oauth/authorize", |
| 67 | "Soundtrap": "https://www.soundtrap.com/oauth/authorize", |
| 68 | "BandLab": "https://www.bandlab.com/api/oauth/authorize", |
| 69 | "Audiotool": "https://www.audiotool.com/oauth/authorize", |
| 70 | } |
| 71 | |
| 72 | |
| 73 | # ── Auth helper ─────────────────────────────────────────────────────────────── |
| 74 | |
| 75 | |
| 76 | def _get_musehub_user_id(request: Request) -> str | None: |
| 77 | """Extract the authenticated user ID from the session cookie. |
| 78 | |
| 79 | Returns the user ID string if logged in, or None for anonymous users. |
| 80 | """ |
| 81 | session = request.session if hasattr(request, "session") else {} |
| 82 | return session.get("user_id") |
| 83 | |
| 84 | |
| 85 | # ── Platform OAuth start page ───────────────────────────────────────────────── |
| 86 | |
| 87 | |
| 88 | @router.get( |
| 89 | "/mcp/connect/{platform_slug}", |
| 90 | operation_id="mcpElicitationPlatformConnect", |
| 91 | summary="MCP URL Elicitation — streaming platform OAuth start", |
| 92 | response_class=HTMLResponse, |
| 93 | ) |
| 94 | async def platform_connect_start( |
| 95 | request: Request, |
| 96 | platform_slug: str, |
| 97 | elicitation_id: str = Query(..., description="Stable elicitation ID from the agent's request"), |
| 98 | ) -> Response: |
| 99 | """Landing page for streaming platform OAuth elicitation. |
| 100 | |
| 101 | Validates the platform slug, checks user authentication, then renders |
| 102 | a confirmation page before redirecting to the platform OAuth. |
| 103 | """ |
| 104 | platform = _PLATFORM_BY_SLUG.get(platform_slug) |
| 105 | if platform is None: |
| 106 | return HTMLResponse( |
| 107 | content=_error_page( |
| 108 | f"Unknown platform: {platform_slug!r}. " |
| 109 | "Supported: " + ", ".join(_PLATFORM_BY_SLUG.keys()) |
| 110 | ), |
| 111 | status_code=404, |
| 112 | ) |
| 113 | |
| 114 | user_id = _get_musehub_user_id(request) |
| 115 | if user_id is None: |
| 116 | callback = request.url |
| 117 | return RedirectResponse(url=f"/login?next={callback}", status_code=302) |
| 118 | |
| 119 | oauth_url = _PLATFORM_OAUTH_URLS.get(platform, "#") |
| 120 | callback_url = str(request.url_for( |
| 121 | "mcpElicitationCallback", |
| 122 | elicitation_id=elicitation_id, |
| 123 | )) + "?status=accepted" |
| 124 | |
| 125 | # In production, the OAuth flow would redirect to this callback URL. |
| 126 | # For now, show a confirmation page that completes the elicitation. |
| 127 | context = { |
| 128 | "platform": platform, |
| 129 | "platform_slug": platform_slug, |
| 130 | "elicitation_id": elicitation_id, |
| 131 | "oauth_url": oauth_url, |
| 132 | "callback_url": callback_url, |
| 133 | "user_id": user_id, |
| 134 | } |
| 135 | return templates.TemplateResponse(request, "mcp/elicitation_connect.html", context) |
| 136 | |
| 137 | |
| 138 | # ── Cloud DAW OAuth start page ──────────────────────────────────────────────── |
| 139 | |
| 140 | |
| 141 | @router.get( |
| 142 | "/mcp/connect/daw/{service_slug}", |
| 143 | operation_id="mcpElicitationDawConnect", |
| 144 | summary="MCP URL Elicitation — cloud DAW OAuth start", |
| 145 | response_class=HTMLResponse, |
| 146 | ) |
| 147 | async def daw_connect_start( |
| 148 | request: Request, |
| 149 | service_slug: str, |
| 150 | elicitation_id: str = Query(..., description="Stable elicitation ID from the agent's request"), |
| 151 | ) -> Response: |
| 152 | """Landing page for cloud DAW / mastering service OAuth elicitation.""" |
| 153 | service = _DAW_BY_SLUG.get(service_slug) |
| 154 | if service is None: |
| 155 | return HTMLResponse( |
| 156 | content=_error_page( |
| 157 | f"Unknown DAW service: {service_slug!r}. " |
| 158 | "Supported: " + ", ".join(_DAW_BY_SLUG.keys()) |
| 159 | ), |
| 160 | status_code=404, |
| 161 | ) |
| 162 | |
| 163 | user_id = _get_musehub_user_id(request) |
| 164 | if user_id is None: |
| 165 | callback = request.url |
| 166 | return RedirectResponse(url=f"/login?next={callback}", status_code=302) |
| 167 | |
| 168 | oauth_url = _DAW_OAUTH_URLS.get(service, "#") |
| 169 | callback_url = str(request.url_for( |
| 170 | "mcpElicitationCallback", |
| 171 | elicitation_id=elicitation_id, |
| 172 | )) + "?status=accepted" |
| 173 | |
| 174 | context = { |
| 175 | "service": service, |
| 176 | "service_slug": service_slug, |
| 177 | "elicitation_id": elicitation_id, |
| 178 | "oauth_url": oauth_url, |
| 179 | "callback_url": callback_url, |
| 180 | "user_id": user_id, |
| 181 | "is_daw": True, |
| 182 | } |
| 183 | return templates.TemplateResponse(request, "mcp/elicitation_connect.html", context) |
| 184 | |
| 185 | |
| 186 | # ── OAuth callback / elicitation completion ─────────────────────────────────── |
| 187 | |
| 188 | |
| 189 | @router.get( |
| 190 | "/mcp/elicitation/{elicitation_id}/callback", |
| 191 | operation_id="mcpElicitationCallback", |
| 192 | summary="MCP URL Elicitation — OAuth callback and completion signal", |
| 193 | response_class=HTMLResponse, |
| 194 | ) |
| 195 | async def elicitation_callback( |
| 196 | request: Request, |
| 197 | elicitation_id: str, |
| 198 | status: str = Query("accepted", description="Completion status: 'accepted' or 'declined'"), |
| 199 | code: str | None = Query(None, description="OAuth authorization code (when status=accepted)"), |
| 200 | ) -> Response: |
| 201 | """OAuth redirect target: resolves the pending elicitation in the MCP session. |
| 202 | |
| 203 | After the OAuth flow completes (or if the user declines), this endpoint: |
| 204 | 1. Pushes a ``notifications/elicitation/complete`` event to the agent's SSE stream. |
| 205 | 2. Renders a completion page the user can close. |
| 206 | |
| 207 | The agent's ``elicit_url()`` await will resolve, allowing the tool to continue. |
| 208 | """ |
| 209 | action = "accept" if status == "accepted" else "decline" |
| 210 | |
| 211 | # Signal elicitation completion to any active MCP sessions. |
| 212 | # In the current in-memory model we broadcast to all sessions; in production |
| 213 | # the elicitation_id would be mapped to a specific session. |
| 214 | _signal_elicitation_complete(elicitation_id, action=action) |
| 215 | |
| 216 | context = { |
| 217 | "elicitation_id": elicitation_id, |
| 218 | "status": status, |
| 219 | "action": action, |
| 220 | "code_present": bool(code), |
| 221 | } |
| 222 | return templates.TemplateResponse(request, "mcp/elicitation_callback.html", context) |
| 223 | |
| 224 | |
| 225 | # ── Signal helper ───────────────────────────────────────────────────────────── |
| 226 | |
| 227 | |
| 228 | def _signal_elicitation_complete(elicitation_id: str, *, action: str = "accept") -> int: |
| 229 | """Push ``notifications/elicitation/complete`` to all sessions with a matching pending. |
| 230 | |
| 231 | Returns the number of sessions resolved. |
| 232 | |
| 233 | In production, the elicitation_id is stored with the session at creation |
| 234 | time so we can do O(1) lookup; here we iterate all sessions (bounded by |
| 235 | the in-memory store size, typically a few thousand). |
| 236 | """ |
| 237 | from musehub.mcp import session as session_store |
| 238 | from musehub.mcp.sse import sse_notification |
| 239 | from musehub.mcp.session import resolve_elicitation |
| 240 | |
| 241 | resolved = 0 |
| 242 | for sess in list(session_store._SESSIONS.values()): |
| 243 | # Try resolving the elicitation_id as a pending key. |
| 244 | did_resolve = resolve_elicitation(sess, elicitation_id, {"action": action}) |
| 245 | if did_resolve: |
| 246 | resolved += 1 |
| 247 | # Also push a notification so any listening SSE stream sees it. |
| 248 | notification = sse_notification( |
| 249 | "notifications/elicitation/complete", |
| 250 | {"elicitationId": elicitation_id, "action": action}, |
| 251 | ) |
| 252 | from musehub.mcp.session import push_to_session |
| 253 | push_to_session(sess, notification) |
| 254 | |
| 255 | if resolved: |
| 256 | logger.info( |
| 257 | "Elicitation callback: resolved %d session(s) (id=%s, action=%s)", |
| 258 | resolved, |
| 259 | elicitation_id, |
| 260 | action, |
| 261 | ) |
| 262 | else: |
| 263 | logger.warning( |
| 264 | "Elicitation callback: no matching pending Future found (id=%s)", |
| 265 | elicitation_id, |
| 266 | ) |
| 267 | return resolved |
| 268 | |
| 269 | |
| 270 | # ── HTML helpers ────────────────────────────────────────────────────────────── |
| 271 | |
| 272 | |
| 273 | def _error_page(message: str) -> str: |
| 274 | return f"""<!DOCTYPE html> |
| 275 | <html lang="en"> |
| 276 | <head><meta charset="UTF-8"><title>MuseHub — MCP Elicitation Error</title> |
| 277 | <style>body{{font-family:sans-serif;max-width:600px;margin:4rem auto;color:#1a1a2e;}} |
| 278 | h1{{color:#e11d48;}}a{{color:#7c3aed;}}</style></head> |
| 279 | <body> |
| 280 | <h1>Elicitation Error</h1> |
| 281 | <p>{message}</p> |
| 282 | <p><a href="/">← Back to MuseHub</a></p> |
| 283 | </body> |
| 284 | </html>""" |