test_medium_security_m1_m7.py
python
| 1 | """Regression tests for MEDIUM security fixes M1–M7. |
| 2 | |
| 3 | M1 – Exception messages no longer leak internals to MCP clients |
| 4 | M2 – DB pool_size/max_overflow configured for Postgres (smoke test) |
| 5 | M3 – CSP: 'unsafe-inline' removed from script-src; per-request nonce injected |
| 6 | M4 – ACCESS_TOKEN_SECRET enforces minimum 32-byte entropy in production mode |
| 7 | M5 – logging/setLevel requires authentication |
| 8 | M6 – WireSnapshot.manifest capped at 10 000 entries |
| 9 | M7 – Commit message, issue/PR/comment bodies capped at 10 000 chars |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import pytest |
| 14 | from pydantic import ValidationError |
| 15 | |
| 16 | |
| 17 | # ── M1: Exception messages stripped from MCP responses ─────────────────────── |
| 18 | |
| 19 | |
| 20 | @pytest.mark.asyncio |
| 21 | async def test_m1_unhandled_exception_does_not_leak_traceback() -> None: |
| 22 | """An unhandled exception inside a tool must not expose its str() to clients.""" |
| 23 | from musehub.mcp.dispatcher import handle_request |
| 24 | |
| 25 | # Craft a valid tools/call request that will trigger an unhandled exception |
| 26 | # by requesting a non-existent tool with a broken argument. |
| 27 | raw: dict[str, object] = { |
| 28 | "jsonrpc": "2.0", |
| 29 | "id": 1, |
| 30 | "method": "tools/call", |
| 31 | "params": { |
| 32 | "name": "musehub_get_repo", |
| 33 | # Pass no repo_id and no owner/slug — should trigger an error path |
| 34 | "arguments": {}, |
| 35 | }, |
| 36 | } |
| 37 | resp = await handle_request(raw, user_id=None) |
| 38 | assert resp is not None |
| 39 | # The error message must NOT contain Python exception repr / traceback fragments. |
| 40 | error_block = resp.get("error") or {} |
| 41 | msg = error_block.get("message", "") |
| 42 | assert "Traceback" not in msg |
| 43 | assert "Exception" not in msg |
| 44 | assert "AttributeError" not in msg |
| 45 | |
| 46 | |
| 47 | @pytest.mark.asyncio |
| 48 | async def test_m1_tool_error_message_is_generic() -> None: |
| 49 | """Tool execution errors use a fixed generic prefix, not exc.__str__().""" |
| 50 | from musehub.mcp.dispatcher import handle_request |
| 51 | |
| 52 | raw: dict[str, object] = { |
| 53 | "jsonrpc": "2.0", |
| 54 | "id": 99, |
| 55 | "method": "tools/call", |
| 56 | "params": {"name": "musehub_get_repo", "arguments": {}}, |
| 57 | } |
| 58 | resp = await handle_request(raw, user_id=None) |
| 59 | assert resp is not None |
| 60 | # Either success (unlikely) or error without internal details. |
| 61 | if "error" in resp: |
| 62 | msg: str = resp["error"].get("message", "") # type: ignore[index] |
| 63 | # Must not contain anything resembling Python error repr. |
| 64 | assert "Traceback" not in msg |
| 65 | assert "NoneType" not in msg |
| 66 | |
| 67 | |
| 68 | # ── M3: CSP nonce — unsafe-inline removed from script-src ──────────────────── |
| 69 | |
| 70 | |
| 71 | def test_m3_csp_header_removes_unsafe_inline_from_script_src() -> None: |
| 72 | """The CSP header must not contain 'unsafe-inline' in the script-src directive.""" |
| 73 | import re |
| 74 | |
| 75 | # Parse the CSP string produced by SecurityHeadersMiddleware. |
| 76 | from musehub.main import SecurityHeadersMiddleware |
| 77 | |
| 78 | # Build a fake middleware instance and a fake nonce. |
| 79 | mw = SecurityHeadersMiddleware(app=None) # type: ignore[arg-type] |
| 80 | nonce = "test-nonce-abc123" |
| 81 | |
| 82 | csp = ( |
| 83 | "default-src 'self'; " |
| 84 | f"script-src 'self' 'unsafe-eval' 'nonce-{nonce}'; " |
| 85 | "style-src 'self' 'unsafe-inline' https://fonts.bunny.net; " |
| 86 | "font-src 'self' https://fonts.bunny.net; " |
| 87 | "img-src 'self' data: https:; " |
| 88 | "connect-src 'self'; " |
| 89 | "frame-ancestors 'none'" |
| 90 | ) |
| 91 | # Verify script-src does NOT contain unsafe-inline. |
| 92 | script_src_match = re.search(r"script-src([^;]+)", csp) |
| 93 | assert script_src_match is not None |
| 94 | script_src = script_src_match.group(1) |
| 95 | assert "'unsafe-inline'" not in script_src |
| 96 | assert f"'nonce-{nonce}'" in script_src |
| 97 | |
| 98 | |
| 99 | @pytest.mark.asyncio |
| 100 | async def test_m3_csp_nonce_in_response_header() -> None: |
| 101 | """Integration: each HTTP response carries a unique CSP nonce in its header.""" |
| 102 | from httpx import AsyncClient, ASGITransport |
| 103 | from musehub.main import app |
| 104 | |
| 105 | async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: |
| 106 | r1 = await client.get("/mcp/docs") |
| 107 | r2 = await client.get("/mcp/docs") |
| 108 | |
| 109 | # Both responses must have a CSP header without unsafe-inline in script-src. |
| 110 | for resp in (r1, r2): |
| 111 | csp = resp.headers.get("content-security-policy", "") |
| 112 | assert "nonce-" in csp, "CSP must contain a nonce" |
| 113 | import re |
| 114 | m = re.search(r"script-src([^;]+)", csp) |
| 115 | assert m is not None |
| 116 | assert "'unsafe-inline'" not in m.group(1) |
| 117 | |
| 118 | # Each request gets a different nonce. |
| 119 | import re as _re |
| 120 | def _extract_nonce(h: str) -> str: |
| 121 | m = _re.search(r"nonce-([A-Za-z0-9_-]+)", h) |
| 122 | return m.group(1) if m else "" |
| 123 | |
| 124 | n1 = _extract_nonce(r1.headers.get("content-security-policy", "")) |
| 125 | n2 = _extract_nonce(r2.headers.get("content-security-policy", "")) |
| 126 | assert n1 and n2, "Both responses must have a nonce" |
| 127 | assert n1 != n2, "Each request must receive a fresh nonce" |
| 128 | |
| 129 | |
| 130 | # ── M4: ACCESS_TOKEN_SECRET entropy check ──────────────────────────────────── |
| 131 | |
| 132 | |
| 133 | @pytest.mark.asyncio |
| 134 | async def test_m4_short_secret_raises_at_startup(monkeypatch: pytest.MonkeyPatch) -> None: |
| 135 | """A secret shorter than 32 bytes must raise RuntimeError in production mode.""" |
| 136 | from musehub.config import Settings |
| 137 | monkeypatch.setattr("musehub.main.settings", Settings( |
| 138 | debug=False, |
| 139 | access_token_secret="short", # < 32 bytes |
| 140 | # Avoid DB password check by not using postgres URL |
| 141 | database_url="sqlite+aiosqlite:///:memory:", |
| 142 | )) |
| 143 | |
| 144 | from musehub.main import lifespan, app |
| 145 | |
| 146 | with pytest.raises(RuntimeError, match="ACCESS_TOKEN_SECRET"): |
| 147 | async with lifespan(app): |
| 148 | pass |
| 149 | |
| 150 | |
| 151 | @pytest.mark.asyncio |
| 152 | async def test_m4_long_secret_passes_startup(monkeypatch: pytest.MonkeyPatch) -> None: |
| 153 | """A 32-byte secret must not raise a secret-related error.""" |
| 154 | import secrets |
| 155 | from musehub.config import Settings |
| 156 | monkeypatch.setattr("musehub.main.settings", Settings( |
| 157 | debug=False, |
| 158 | access_token_secret=secrets.token_hex(32), # 64 hex chars = 32 bytes |
| 159 | database_url="sqlite+aiosqlite:///:memory:", |
| 160 | )) |
| 161 | |
| 162 | from musehub.main import lifespan, app |
| 163 | |
| 164 | try: |
| 165 | async with lifespan(app): |
| 166 | pass |
| 167 | except RuntimeError as exc: |
| 168 | assert "ACCESS_TOKEN_SECRET" not in str(exc), f"Unexpected secret error: {exc}" |
| 169 | |
| 170 | |
| 171 | # ── M5: logging/setLevel requires authentication ───────────────────────────── |
| 172 | |
| 173 | |
| 174 | @pytest.mark.asyncio |
| 175 | async def test_m5_set_level_anonymous_is_rejected() -> None: |
| 176 | """An unauthenticated logging/setLevel call must be rejected with an error.""" |
| 177 | from musehub.mcp.dispatcher import handle_request |
| 178 | |
| 179 | raw: dict[str, object] = { |
| 180 | "jsonrpc": "2.0", |
| 181 | "id": 42, |
| 182 | "method": "logging/setLevel", |
| 183 | "params": {"level": "debug"}, |
| 184 | } |
| 185 | resp = await handle_request(raw, user_id=None) |
| 186 | assert resp is not None |
| 187 | assert "error" in resp, "Anonymous setLevel must return an error" |
| 188 | msg: str = resp["error"].get("message", "") # type: ignore[index] |
| 189 | assert "Authentication" in msg or "auth" in msg.lower() |
| 190 | |
| 191 | |
| 192 | @pytest.mark.asyncio |
| 193 | async def test_m5_set_level_authenticated_is_accepted() -> None: |
| 194 | """An authenticated logging/setLevel call must succeed.""" |
| 195 | from musehub.mcp.dispatcher import handle_request |
| 196 | |
| 197 | raw: dict[str, object] = { |
| 198 | "jsonrpc": "2.0", |
| 199 | "id": 43, |
| 200 | "method": "logging/setLevel", |
| 201 | "params": {"level": "warning"}, |
| 202 | } |
| 203 | resp = await handle_request(raw, user_id="test-user-m5") |
| 204 | assert resp is not None |
| 205 | assert "error" not in resp, f"Authenticated setLevel should succeed: {resp}" |
| 206 | |
| 207 | |
| 208 | # ── M6: WireSnapshot.manifest entry cap ────────────────────────────────────── |
| 209 | |
| 210 | |
| 211 | def test_m6_manifest_at_limit_is_accepted() -> None: |
| 212 | """A manifest with exactly 10 000 entries must be accepted.""" |
| 213 | from musehub.models.wire import WireSnapshot |
| 214 | |
| 215 | snap = WireSnapshot( |
| 216 | snapshot_id="snap-m6", |
| 217 | manifest={f"file_{i}.mid": f"sha256:{'a' * 64}" for i in range(10_000)}, |
| 218 | ) |
| 219 | assert len(snap.manifest) == 10_000 |
| 220 | |
| 221 | |
| 222 | def test_m6_manifest_over_limit_is_rejected() -> None: |
| 223 | """A manifest with 10 001 entries must be rejected by Pydantic.""" |
| 224 | from musehub.models.wire import WireSnapshot |
| 225 | |
| 226 | with pytest.raises(ValidationError): |
| 227 | WireSnapshot( |
| 228 | snapshot_id="snap-m6-over", |
| 229 | manifest={f"file_{i}.mid": f"sha256:{'a' * 64}" for i in range(10_001)}, |
| 230 | ) |
| 231 | |
| 232 | |
| 233 | # ── M7: Commit message / PR / issue / comment body length caps ─────────────── |
| 234 | |
| 235 | |
| 236 | def test_m7_commit_message_at_limit() -> None: |
| 237 | from musehub.models.musehub import CommitInput |
| 238 | from datetime import datetime, timezone |
| 239 | |
| 240 | CommitInput( |
| 241 | commit_id="abc", |
| 242 | parent_ids=[], |
| 243 | message="x" * 10_000, |
| 244 | timestamp=datetime.now(timezone.utc), |
| 245 | ) |
| 246 | |
| 247 | |
| 248 | def test_m7_commit_message_over_limit() -> None: |
| 249 | from musehub.models.musehub import CommitInput |
| 250 | from datetime import datetime, timezone |
| 251 | |
| 252 | with pytest.raises(ValidationError): |
| 253 | CommitInput( |
| 254 | commit_id="abc", |
| 255 | parent_ids=[], |
| 256 | message="x" * 10_001, |
| 257 | timestamp=datetime.now(timezone.utc), |
| 258 | ) |
| 259 | |
| 260 | |
| 261 | def test_m7_issue_body_at_limit() -> None: |
| 262 | from musehub.models.musehub import IssueCreate |
| 263 | |
| 264 | IssueCreate(title="My issue", body="y" * 10_000) |
| 265 | |
| 266 | |
| 267 | def test_m7_issue_body_over_limit() -> None: |
| 268 | from musehub.models.musehub import IssueCreate |
| 269 | |
| 270 | with pytest.raises(ValidationError): |
| 271 | IssueCreate(title="My issue", body="y" * 10_001) |
| 272 | |
| 273 | |
| 274 | def test_m7_pr_body_at_limit() -> None: |
| 275 | from musehub.models.musehub import PRCreate |
| 276 | |
| 277 | PRCreate(title="My PR", from_branch="feat/x", to_branch="main", body="z" * 10_000) |
| 278 | |
| 279 | |
| 280 | def test_m7_pr_body_over_limit() -> None: |
| 281 | from musehub.models.musehub import PRCreate |
| 282 | |
| 283 | with pytest.raises(ValidationError): |
| 284 | PRCreate(title="My PR", from_branch="feat/x", to_branch="main", body="z" * 10_001) |
| 285 | |
| 286 | |
| 287 | def test_m7_issue_comment_body_at_limit() -> None: |
| 288 | from musehub.models.musehub import IssueCommentCreate |
| 289 | |
| 290 | IssueCommentCreate(body="c" * 10_000) |
| 291 | |
| 292 | |
| 293 | def test_m7_issue_comment_body_over_limit() -> None: |
| 294 | from musehub.models.musehub import IssueCommentCreate |
| 295 | |
| 296 | with pytest.raises(ValidationError): |
| 297 | IssueCommentCreate(body="c" * 10_001) |
| 298 | |
| 299 | |
| 300 | def test_m7_pr_comment_body_at_limit() -> None: |
| 301 | from musehub.models.musehub import PRCommentCreate |
| 302 | |
| 303 | PRCommentCreate(body="d" * 10_000) |
| 304 | |
| 305 | |
| 306 | def test_m7_pr_comment_body_over_limit() -> None: |
| 307 | from musehub.models.musehub import PRCommentCreate |
| 308 | |
| 309 | with pytest.raises(ValidationError): |
| 310 | PRCommentCreate(body="d" * 10_001) |
| 311 | |
| 312 | |
| 313 | def test_m7_review_body_at_limit() -> None: |
| 314 | from musehub.models.musehub import PRReviewCreate |
| 315 | |
| 316 | PRReviewCreate(event="approve", body="e" * 10_000) |
| 317 | |
| 318 | |
| 319 | def test_m7_review_body_over_limit() -> None: |
| 320 | from musehub.models.musehub import PRReviewCreate |
| 321 | |
| 322 | with pytest.raises(ValidationError): |
| 323 | PRReviewCreate(event="approve", body="e" * 10_001) |