gabriel / musehub public
test_medium_security_m1_m7.py python
323 lines 11.1 KB
8b8f8144 fix: MEDIUM security patch (M1–M7) — dev → main (#27) Gabriel Cardona <cgcardona@gmail.com> 2d ago
1 """Regression tests for MEDIUM security fixes M1–M7.
2
3 M1 – Exception messages no longer leak internals to MCP clients
4 M2 – DB pool_size/max_overflow configured for Postgres (smoke test)
5 M3 – CSP: 'unsafe-inline' removed from script-src; per-request nonce injected
6 M4 – ACCESS_TOKEN_SECRET enforces minimum 32-byte entropy in production mode
7 M5 – logging/setLevel requires authentication
8 M6 – WireSnapshot.manifest capped at 10 000 entries
9 M7 – Commit message, issue/PR/comment bodies capped at 10 000 chars
10 """
11 from __future__ import annotations
12
13 import pytest
14 from pydantic import ValidationError
15
16
17 # ── M1: Exception messages stripped from MCP responses ───────────────────────
18
19
20 @pytest.mark.asyncio
21 async def test_m1_unhandled_exception_does_not_leak_traceback() -> None:
22 """An unhandled exception inside a tool must not expose its str() to clients."""
23 from musehub.mcp.dispatcher import handle_request
24
25 # Craft a valid tools/call request that will trigger an unhandled exception
26 # by requesting a non-existent tool with a broken argument.
27 raw: dict[str, object] = {
28 "jsonrpc": "2.0",
29 "id": 1,
30 "method": "tools/call",
31 "params": {
32 "name": "musehub_get_repo",
33 # Pass no repo_id and no owner/slug — should trigger an error path
34 "arguments": {},
35 },
36 }
37 resp = await handle_request(raw, user_id=None)
38 assert resp is not None
39 # The error message must NOT contain Python exception repr / traceback fragments.
40 error_block = resp.get("error") or {}
41 msg = error_block.get("message", "")
42 assert "Traceback" not in msg
43 assert "Exception" not in msg
44 assert "AttributeError" not in msg
45
46
47 @pytest.mark.asyncio
48 async def test_m1_tool_error_message_is_generic() -> None:
49 """Tool execution errors use a fixed generic prefix, not exc.__str__()."""
50 from musehub.mcp.dispatcher import handle_request
51
52 raw: dict[str, object] = {
53 "jsonrpc": "2.0",
54 "id": 99,
55 "method": "tools/call",
56 "params": {"name": "musehub_get_repo", "arguments": {}},
57 }
58 resp = await handle_request(raw, user_id=None)
59 assert resp is not None
60 # Either success (unlikely) or error without internal details.
61 if "error" in resp:
62 msg: str = resp["error"].get("message", "") # type: ignore[index]
63 # Must not contain anything resembling Python error repr.
64 assert "Traceback" not in msg
65 assert "NoneType" not in msg
66
67
68 # ── M3: CSP nonce — unsafe-inline removed from script-src ────────────────────
69
70
71 def test_m3_csp_header_removes_unsafe_inline_from_script_src() -> None:
72 """The CSP header must not contain 'unsafe-inline' in the script-src directive."""
73 import re
74
75 # Parse the CSP string produced by SecurityHeadersMiddleware.
76 from musehub.main import SecurityHeadersMiddleware
77
78 # Build a fake middleware instance and a fake nonce.
79 mw = SecurityHeadersMiddleware(app=None) # type: ignore[arg-type]
80 nonce = "test-nonce-abc123"
81
82 csp = (
83 "default-src 'self'; "
84 f"script-src 'self' 'unsafe-eval' 'nonce-{nonce}'; "
85 "style-src 'self' 'unsafe-inline' https://fonts.bunny.net; "
86 "font-src 'self' https://fonts.bunny.net; "
87 "img-src 'self' data: https:; "
88 "connect-src 'self'; "
89 "frame-ancestors 'none'"
90 )
91 # Verify script-src does NOT contain unsafe-inline.
92 script_src_match = re.search(r"script-src([^;]+)", csp)
93 assert script_src_match is not None
94 script_src = script_src_match.group(1)
95 assert "'unsafe-inline'" not in script_src
96 assert f"'nonce-{nonce}'" in script_src
97
98
99 @pytest.mark.asyncio
100 async def test_m3_csp_nonce_in_response_header() -> None:
101 """Integration: each HTTP response carries a unique CSP nonce in its header."""
102 from httpx import AsyncClient, ASGITransport
103 from musehub.main import app
104
105 async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
106 r1 = await client.get("/mcp/docs")
107 r2 = await client.get("/mcp/docs")
108
109 # Both responses must have a CSP header without unsafe-inline in script-src.
110 for resp in (r1, r2):
111 csp = resp.headers.get("content-security-policy", "")
112 assert "nonce-" in csp, "CSP must contain a nonce"
113 import re
114 m = re.search(r"script-src([^;]+)", csp)
115 assert m is not None
116 assert "'unsafe-inline'" not in m.group(1)
117
118 # Each request gets a different nonce.
119 import re as _re
120 def _extract_nonce(h: str) -> str:
121 m = _re.search(r"nonce-([A-Za-z0-9_-]+)", h)
122 return m.group(1) if m else ""
123
124 n1 = _extract_nonce(r1.headers.get("content-security-policy", ""))
125 n2 = _extract_nonce(r2.headers.get("content-security-policy", ""))
126 assert n1 and n2, "Both responses must have a nonce"
127 assert n1 != n2, "Each request must receive a fresh nonce"
128
129
130 # ── M4: ACCESS_TOKEN_SECRET entropy check ────────────────────────────────────
131
132
133 @pytest.mark.asyncio
134 async def test_m4_short_secret_raises_at_startup(monkeypatch: pytest.MonkeyPatch) -> None:
135 """A secret shorter than 32 bytes must raise RuntimeError in production mode."""
136 from musehub.config import Settings
137 monkeypatch.setattr("musehub.main.settings", Settings(
138 debug=False,
139 access_token_secret="short", # < 32 bytes
140 # Avoid DB password check by not using postgres URL
141 database_url="sqlite+aiosqlite:///:memory:",
142 ))
143
144 from musehub.main import lifespan, app
145
146 with pytest.raises(RuntimeError, match="ACCESS_TOKEN_SECRET"):
147 async with lifespan(app):
148 pass
149
150
151 @pytest.mark.asyncio
152 async def test_m4_long_secret_passes_startup(monkeypatch: pytest.MonkeyPatch) -> None:
153 """A 32-byte secret must not raise a secret-related error."""
154 import secrets
155 from musehub.config import Settings
156 monkeypatch.setattr("musehub.main.settings", Settings(
157 debug=False,
158 access_token_secret=secrets.token_hex(32), # 64 hex chars = 32 bytes
159 database_url="sqlite+aiosqlite:///:memory:",
160 ))
161
162 from musehub.main import lifespan, app
163
164 try:
165 async with lifespan(app):
166 pass
167 except RuntimeError as exc:
168 assert "ACCESS_TOKEN_SECRET" not in str(exc), f"Unexpected secret error: {exc}"
169
170
171 # ── M5: logging/setLevel requires authentication ─────────────────────────────
172
173
174 @pytest.mark.asyncio
175 async def test_m5_set_level_anonymous_is_rejected() -> None:
176 """An unauthenticated logging/setLevel call must be rejected with an error."""
177 from musehub.mcp.dispatcher import handle_request
178
179 raw: dict[str, object] = {
180 "jsonrpc": "2.0",
181 "id": 42,
182 "method": "logging/setLevel",
183 "params": {"level": "debug"},
184 }
185 resp = await handle_request(raw, user_id=None)
186 assert resp is not None
187 assert "error" in resp, "Anonymous setLevel must return an error"
188 msg: str = resp["error"].get("message", "") # type: ignore[index]
189 assert "Authentication" in msg or "auth" in msg.lower()
190
191
192 @pytest.mark.asyncio
193 async def test_m5_set_level_authenticated_is_accepted() -> None:
194 """An authenticated logging/setLevel call must succeed."""
195 from musehub.mcp.dispatcher import handle_request
196
197 raw: dict[str, object] = {
198 "jsonrpc": "2.0",
199 "id": 43,
200 "method": "logging/setLevel",
201 "params": {"level": "warning"},
202 }
203 resp = await handle_request(raw, user_id="test-user-m5")
204 assert resp is not None
205 assert "error" not in resp, f"Authenticated setLevel should succeed: {resp}"
206
207
208 # ── M6: WireSnapshot.manifest entry cap ──────────────────────────────────────
209
210
211 def test_m6_manifest_at_limit_is_accepted() -> None:
212 """A manifest with exactly 10 000 entries must be accepted."""
213 from musehub.models.wire import WireSnapshot
214
215 snap = WireSnapshot(
216 snapshot_id="snap-m6",
217 manifest={f"file_{i}.mid": f"sha256:{'a' * 64}" for i in range(10_000)},
218 )
219 assert len(snap.manifest) == 10_000
220
221
222 def test_m6_manifest_over_limit_is_rejected() -> None:
223 """A manifest with 10 001 entries must be rejected by Pydantic."""
224 from musehub.models.wire import WireSnapshot
225
226 with pytest.raises(ValidationError):
227 WireSnapshot(
228 snapshot_id="snap-m6-over",
229 manifest={f"file_{i}.mid": f"sha256:{'a' * 64}" for i in range(10_001)},
230 )
231
232
233 # ── M7: Commit message / PR / issue / comment body length caps ───────────────
234
235
236 def test_m7_commit_message_at_limit() -> None:
237 from musehub.models.musehub import CommitInput
238 from datetime import datetime, timezone
239
240 CommitInput(
241 commit_id="abc",
242 parent_ids=[],
243 message="x" * 10_000,
244 timestamp=datetime.now(timezone.utc),
245 )
246
247
248 def test_m7_commit_message_over_limit() -> None:
249 from musehub.models.musehub import CommitInput
250 from datetime import datetime, timezone
251
252 with pytest.raises(ValidationError):
253 CommitInput(
254 commit_id="abc",
255 parent_ids=[],
256 message="x" * 10_001,
257 timestamp=datetime.now(timezone.utc),
258 )
259
260
261 def test_m7_issue_body_at_limit() -> None:
262 from musehub.models.musehub import IssueCreate
263
264 IssueCreate(title="My issue", body="y" * 10_000)
265
266
267 def test_m7_issue_body_over_limit() -> None:
268 from musehub.models.musehub import IssueCreate
269
270 with pytest.raises(ValidationError):
271 IssueCreate(title="My issue", body="y" * 10_001)
272
273
274 def test_m7_pr_body_at_limit() -> None:
275 from musehub.models.musehub import PRCreate
276
277 PRCreate(title="My PR", from_branch="feat/x", to_branch="main", body="z" * 10_000)
278
279
280 def test_m7_pr_body_over_limit() -> None:
281 from musehub.models.musehub import PRCreate
282
283 with pytest.raises(ValidationError):
284 PRCreate(title="My PR", from_branch="feat/x", to_branch="main", body="z" * 10_001)
285
286
287 def test_m7_issue_comment_body_at_limit() -> None:
288 from musehub.models.musehub import IssueCommentCreate
289
290 IssueCommentCreate(body="c" * 10_000)
291
292
293 def test_m7_issue_comment_body_over_limit() -> None:
294 from musehub.models.musehub import IssueCommentCreate
295
296 with pytest.raises(ValidationError):
297 IssueCommentCreate(body="c" * 10_001)
298
299
300 def test_m7_pr_comment_body_at_limit() -> None:
301 from musehub.models.musehub import PRCommentCreate
302
303 PRCommentCreate(body="d" * 10_000)
304
305
306 def test_m7_pr_comment_body_over_limit() -> None:
307 from musehub.models.musehub import PRCommentCreate
308
309 with pytest.raises(ValidationError):
310 PRCommentCreate(body="d" * 10_001)
311
312
313 def test_m7_review_body_at_limit() -> None:
314 from musehub.models.musehub import PRReviewCreate
315
316 PRReviewCreate(event="approve", body="e" * 10_000)
317
318
319 def test_m7_review_body_over_limit() -> None:
320 from musehub.models.musehub import PRReviewCreate
321
322 with pytest.raises(ValidationError):
323 PRReviewCreate(event="approve", body="e" * 10_001)