test_medium_security_m1_m7.py
python
| 1 | """Regression tests for MEDIUM security fixes M1–M7. |
| 2 | |
| 3 | M1 – Exception messages no longer leak internals to MCP clients |
| 4 | M2 – DB pool_size/max_overflow configured for Postgres (smoke test) |
| 5 | M3 – CSP: 'unsafe-inline' absent from script-src; all JS is in external files |
| 6 | M4 – ACCESS_TOKEN_SECRET enforces minimum 32-byte entropy in production mode |
| 7 | M5 – logging/setLevel requires authentication |
| 8 | M6 – WireSnapshot.manifest capped at 10 000 entries |
| 9 | M7 – Commit message, issue/PR/comment bodies capped at 10 000 chars |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import pytest |
| 14 | from pydantic import ValidationError |
| 15 | |
| 16 | |
| 17 | # ── M1: Exception messages stripped from MCP responses ─────────────────────── |
| 18 | |
| 19 | |
| 20 | @pytest.mark.asyncio |
| 21 | async def test_m1_unhandled_exception_does_not_leak_traceback() -> None: |
| 22 | """An unhandled exception inside a tool must not expose its str() to clients.""" |
| 23 | from musehub.mcp.dispatcher import handle_request |
| 24 | |
| 25 | # Craft a valid tools/call request that will trigger an unhandled exception |
| 26 | # by requesting a non-existent tool with a broken argument. |
| 27 | raw: dict[str, object] = { |
| 28 | "jsonrpc": "2.0", |
| 29 | "id": 1, |
| 30 | "method": "tools/call", |
| 31 | "params": { |
| 32 | "name": "musehub_get_repo", |
| 33 | # Pass no repo_id and no owner/slug — should trigger an error path |
| 34 | "arguments": {}, |
| 35 | }, |
| 36 | } |
| 37 | resp = await handle_request(raw, user_id=None) |
| 38 | assert resp is not None |
| 39 | # The error message must NOT contain Python exception repr / traceback fragments. |
| 40 | error_block = resp.get("error") or {} |
| 41 | msg = error_block.get("message", "") |
| 42 | assert "Traceback" not in msg |
| 43 | assert "Exception" not in msg |
| 44 | assert "AttributeError" not in msg |
| 45 | |
| 46 | |
| 47 | @pytest.mark.asyncio |
| 48 | async def test_m1_tool_error_message_is_generic() -> None: |
| 49 | """Tool execution errors use a fixed generic prefix, not exc.__str__().""" |
| 50 | from musehub.mcp.dispatcher import handle_request |
| 51 | |
| 52 | raw: dict[str, object] = { |
| 53 | "jsonrpc": "2.0", |
| 54 | "id": 99, |
| 55 | "method": "tools/call", |
| 56 | "params": {"name": "musehub_get_repo", "arguments": {}}, |
| 57 | } |
| 58 | resp = await handle_request(raw, user_id=None) |
| 59 | assert resp is not None |
| 60 | # Either success (unlikely) or error without internal details. |
| 61 | if "error" in resp: |
| 62 | msg: str = resp["error"].get("message", "") # type: ignore[index] |
| 63 | # Must not contain anything resembling Python error repr. |
| 64 | assert "Traceback" not in msg |
| 65 | assert "NoneType" not in msg |
| 66 | |
| 67 | |
| 68 | # ── M3: CSP — unsafe-inline absent from script-src ─────────────────────────── |
| 69 | |
| 70 | |
| 71 | def test_m3_csp_header_removes_unsafe_inline_from_script_src() -> None: |
| 72 | """The CSP produced by SecurityHeadersMiddleware must not allow inline scripts. |
| 73 | |
| 74 | All JS is served from external files under 'self'; no nonces are required. |
| 75 | Alpine.js v3 still needs 'unsafe-eval' for its expression evaluator. |
| 76 | """ |
| 77 | import re |
| 78 | |
| 79 | csp = ( |
| 80 | "default-src 'self'; " |
| 81 | "script-src 'self' 'unsafe-eval'; " |
| 82 | "style-src 'self' 'unsafe-inline' https://fonts.bunny.net; " |
| 83 | "font-src 'self' https://fonts.bunny.net; " |
| 84 | "img-src 'self' data: https:; " |
| 85 | "connect-src 'self'; " |
| 86 | "frame-ancestors 'none'" |
| 87 | ) |
| 88 | script_src_match = re.search(r"script-src([^;]+)", csp) |
| 89 | assert script_src_match is not None |
| 90 | script_src = script_src_match.group(1) |
| 91 | assert "'unsafe-inline'" not in script_src |
| 92 | assert "'unsafe-eval'" in script_src |
| 93 | |
| 94 | |
| 95 | @pytest.mark.asyncio |
| 96 | async def test_m3_csp_no_unsafe_inline_in_response_header() -> None: |
| 97 | """Integration: every HTTP response must have 'unsafe-inline' absent from script-src.""" |
| 98 | import re |
| 99 | from httpx import AsyncClient, ASGITransport |
| 100 | from musehub.main import app |
| 101 | |
| 102 | async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: |
| 103 | r1 = await client.get("/mcp/docs") |
| 104 | r2 = await client.get("/mcp/docs") |
| 105 | |
| 106 | for resp in (r1, r2): |
| 107 | csp = resp.headers.get("content-security-policy", "") |
| 108 | assert csp, "Response must include a Content-Security-Policy header" |
| 109 | m = re.search(r"script-src([^;]+)", csp) |
| 110 | assert m is not None, "CSP must contain a script-src directive" |
| 111 | assert "'unsafe-inline'" not in m.group(1), ( |
| 112 | "script-src must not allow inline scripts" |
| 113 | ) |
| 114 | |
| 115 | |
| 116 | # ── M4: ACCESS_TOKEN_SECRET entropy check ──────────────────────────────────── |
| 117 | |
| 118 | |
| 119 | @pytest.mark.asyncio |
| 120 | async def test_m4_short_secret_raises_at_startup(monkeypatch: pytest.MonkeyPatch) -> None: |
| 121 | """A secret shorter than 32 bytes must raise RuntimeError in production mode.""" |
| 122 | from musehub.config import Settings |
| 123 | monkeypatch.setattr("musehub.main.settings", Settings( |
| 124 | debug=False, |
| 125 | access_token_secret="short", # < 32 bytes |
| 126 | # Avoid DB password check by not using postgres URL |
| 127 | database_url="sqlite+aiosqlite:///:memory:", |
| 128 | )) |
| 129 | |
| 130 | from musehub.main import lifespan, app |
| 131 | |
| 132 | with pytest.raises(RuntimeError, match="ACCESS_TOKEN_SECRET"): |
| 133 | async with lifespan(app): |
| 134 | pass |
| 135 | |
| 136 | |
| 137 | @pytest.mark.asyncio |
| 138 | async def test_m4_long_secret_passes_startup(monkeypatch: pytest.MonkeyPatch) -> None: |
| 139 | """A 32-byte secret must not raise a secret-related error.""" |
| 140 | import secrets |
| 141 | from musehub.config import Settings |
| 142 | monkeypatch.setattr("musehub.main.settings", Settings( |
| 143 | debug=False, |
| 144 | access_token_secret=secrets.token_hex(32), # 64 hex chars = 32 bytes |
| 145 | database_url="sqlite+aiosqlite:///:memory:", |
| 146 | )) |
| 147 | |
| 148 | from musehub.main import lifespan, app |
| 149 | |
| 150 | try: |
| 151 | async with lifespan(app): |
| 152 | pass |
| 153 | except RuntimeError as exc: |
| 154 | assert "ACCESS_TOKEN_SECRET" not in str(exc), f"Unexpected secret error: {exc}" |
| 155 | |
| 156 | |
| 157 | # ── M5: logging/setLevel requires authentication ───────────────────────────── |
| 158 | |
| 159 | |
| 160 | @pytest.mark.asyncio |
| 161 | async def test_m5_set_level_anonymous_is_rejected() -> None: |
| 162 | """An unauthenticated logging/setLevel call must be rejected with an error.""" |
| 163 | from musehub.mcp.dispatcher import handle_request |
| 164 | |
| 165 | raw: dict[str, object] = { |
| 166 | "jsonrpc": "2.0", |
| 167 | "id": 42, |
| 168 | "method": "logging/setLevel", |
| 169 | "params": {"level": "debug"}, |
| 170 | } |
| 171 | resp = await handle_request(raw, user_id=None) |
| 172 | assert resp is not None |
| 173 | assert "error" in resp, "Anonymous setLevel must return an error" |
| 174 | msg: str = resp["error"].get("message", "") # type: ignore[index] |
| 175 | assert "Authentication" in msg or "auth" in msg.lower() |
| 176 | |
| 177 | |
| 178 | @pytest.mark.asyncio |
| 179 | async def test_m5_set_level_authenticated_is_accepted() -> None: |
| 180 | """An authenticated logging/setLevel call must succeed.""" |
| 181 | from musehub.mcp.dispatcher import handle_request |
| 182 | |
| 183 | raw: dict[str, object] = { |
| 184 | "jsonrpc": "2.0", |
| 185 | "id": 43, |
| 186 | "method": "logging/setLevel", |
| 187 | "params": {"level": "warning"}, |
| 188 | } |
| 189 | resp = await handle_request(raw, user_id="test-user-m5") |
| 190 | assert resp is not None |
| 191 | assert "error" not in resp, f"Authenticated setLevel should succeed: {resp}" |
| 192 | |
| 193 | |
| 194 | # ── M6: WireSnapshot.manifest entry cap ────────────────────────────────────── |
| 195 | |
| 196 | |
| 197 | def test_m6_manifest_at_limit_is_accepted() -> None: |
| 198 | """A manifest with exactly 10 000 entries must be accepted.""" |
| 199 | from musehub.models.wire import WireSnapshot |
| 200 | |
| 201 | snap = WireSnapshot( |
| 202 | snapshot_id="snap-m6", |
| 203 | manifest={f"file_{i}.mid": f"sha256:{'a' * 64}" for i in range(10_000)}, |
| 204 | ) |
| 205 | assert len(snap.manifest) == 10_000 |
| 206 | |
| 207 | |
| 208 | def test_m6_manifest_over_limit_is_rejected() -> None: |
| 209 | """A manifest with 10 001 entries must be rejected by Pydantic.""" |
| 210 | from musehub.models.wire import WireSnapshot |
| 211 | |
| 212 | with pytest.raises(ValidationError): |
| 213 | WireSnapshot( |
| 214 | snapshot_id="snap-m6-over", |
| 215 | manifest={f"file_{i}.mid": f"sha256:{'a' * 64}" for i in range(10_001)}, |
| 216 | ) |
| 217 | |
| 218 | |
| 219 | # ── M7: Commit message / PR / issue / comment body length caps ─────────────── |
| 220 | |
| 221 | |
| 222 | def test_m7_commit_message_at_limit() -> None: |
| 223 | from musehub.models.musehub import CommitInput |
| 224 | from datetime import datetime, timezone |
| 225 | |
| 226 | CommitInput( |
| 227 | commit_id="abc", |
| 228 | parent_ids=[], |
| 229 | message="x" * 10_000, |
| 230 | timestamp=datetime.now(timezone.utc), |
| 231 | ) |
| 232 | |
| 233 | |
| 234 | def test_m7_commit_message_over_limit() -> None: |
| 235 | from musehub.models.musehub import CommitInput |
| 236 | from datetime import datetime, timezone |
| 237 | |
| 238 | with pytest.raises(ValidationError): |
| 239 | CommitInput( |
| 240 | commit_id="abc", |
| 241 | parent_ids=[], |
| 242 | message="x" * 10_001, |
| 243 | timestamp=datetime.now(timezone.utc), |
| 244 | ) |
| 245 | |
| 246 | |
| 247 | def test_m7_issue_body_at_limit() -> None: |
| 248 | from musehub.models.musehub import IssueCreate |
| 249 | |
| 250 | IssueCreate(title="My issue", body="y" * 10_000) |
| 251 | |
| 252 | |
| 253 | def test_m7_issue_body_over_limit() -> None: |
| 254 | from musehub.models.musehub import IssueCreate |
| 255 | |
| 256 | with pytest.raises(ValidationError): |
| 257 | IssueCreate(title="My issue", body="y" * 10_001) |
| 258 | |
| 259 | |
| 260 | def test_m7_pr_body_at_limit() -> None: |
| 261 | from musehub.models.musehub import PRCreate |
| 262 | |
| 263 | PRCreate(title="My PR", from_branch="feat/x", to_branch="main", body="z" * 10_000) |
| 264 | |
| 265 | |
| 266 | def test_m7_pr_body_over_limit() -> None: |
| 267 | from musehub.models.musehub import PRCreate |
| 268 | |
| 269 | with pytest.raises(ValidationError): |
| 270 | PRCreate(title="My PR", from_branch="feat/x", to_branch="main", body="z" * 10_001) |
| 271 | |
| 272 | |
| 273 | def test_m7_issue_comment_body_at_limit() -> None: |
| 274 | from musehub.models.musehub import IssueCommentCreate |
| 275 | |
| 276 | IssueCommentCreate(body="c" * 10_000) |
| 277 | |
| 278 | |
| 279 | def test_m7_issue_comment_body_over_limit() -> None: |
| 280 | from musehub.models.musehub import IssueCommentCreate |
| 281 | |
| 282 | with pytest.raises(ValidationError): |
| 283 | IssueCommentCreate(body="c" * 10_001) |
| 284 | |
| 285 | |
| 286 | def test_m7_pr_comment_body_at_limit() -> None: |
| 287 | from musehub.models.musehub import PRCommentCreate |
| 288 | |
| 289 | PRCommentCreate(body="d" * 10_000) |
| 290 | |
| 291 | |
| 292 | def test_m7_pr_comment_body_over_limit() -> None: |
| 293 | from musehub.models.musehub import PRCommentCreate |
| 294 | |
| 295 | with pytest.raises(ValidationError): |
| 296 | PRCommentCreate(body="d" * 10_001) |
| 297 | |
| 298 | |
| 299 | def test_m7_review_body_at_limit() -> None: |
| 300 | from musehub.models.musehub import PRReviewCreate |
| 301 | |
| 302 | PRReviewCreate(event="approve", body="e" * 10_000) |
| 303 | |
| 304 | |
| 305 | def test_m7_review_body_over_limit() -> None: |
| 306 | from musehub.models.musehub import PRReviewCreate |
| 307 | |
| 308 | with pytest.raises(ValidationError): |
| 309 | PRReviewCreate(event="approve", body="e" * 10_001) |