gabriel / muse public
test_porcelain_security.py python
300 lines 12.8 KB
86000da9 fix: replace typer CliRunner with argparse-compatible test helper Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Security-focused regression tests for all porcelain hardening fixes.
2
3 These tests verify the specific security improvements made during the
4 porcelain hardening pass:
5
6 - ReDoS guard in content-grep (pattern length limit)
7 - Zip-slip prevention in archive and snapshot export
8 - validate_branch_name added to checkout and rebase
9 - sanitize_display applied to all user-sourced echoed strings
10 - Atomic stash writes (no temp file corruption)
11 - Snapshot ID glob prefix sanitisation
12 """
13
14 from __future__ import annotations
15
16 import datetime
17 import hashlib
18 import json
19 import pathlib
20 import uuid
21
22 import pytest
23 from tests.cli_test_helper import CliRunner
24
25 cli = None # argparse migration — CliRunner ignores this arg
26
27 runner = CliRunner()
28
29
30 # ---------------------------------------------------------------------------
31 # Shared repo setup helper
32 # ---------------------------------------------------------------------------
33
34 def _env(root: pathlib.Path) -> dict[str, str]:
35 return {"MUSE_REPO_ROOT": str(root)}
36
37
38 def _init_repo(tmp_path: pathlib.Path, domain: str = "midi") -> tuple[pathlib.Path, str]:
39 muse_dir = tmp_path / ".muse"
40 muse_dir.mkdir()
41 repo_id = str(uuid.uuid4())
42 (muse_dir / "repo.json").write_text(json.dumps({
43 "repo_id": repo_id,
44 "domain": domain,
45 "default_branch": "main",
46 "created_at": "2025-01-01T00:00:00+00:00",
47 }), encoding="utf-8")
48 (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
49 (muse_dir / "refs" / "heads").mkdir(parents=True)
50 (muse_dir / "snapshots").mkdir()
51 (muse_dir / "commits").mkdir()
52 (muse_dir / "objects").mkdir()
53 return tmp_path, repo_id
54
55
56 def _make_commit(root: pathlib.Path, repo_id: str, message: str = "test") -> str:
57 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
58 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
59
60 ref_file = root / ".muse" / "refs" / "heads" / "main"
61 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
62 manifest: dict[str, str] = {}
63 snap_id = compute_snapshot_id(manifest)
64 committed_at = datetime.datetime.now(datetime.timezone.utc)
65 commit_id = compute_commit_id(
66 parent_ids=[parent_id] if parent_id else [],
67 snapshot_id=snap_id, message=message,
68 committed_at_iso=committed_at.isoformat(),
69 )
70 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
71 write_commit(root, CommitRecord(
72 commit_id=commit_id, repo_id=repo_id, branch="main",
73 snapshot_id=snap_id, message=message, committed_at=committed_at,
74 parent_commit_id=parent_id,
75 ))
76 ref_file.parent.mkdir(parents=True, exist_ok=True)
77 ref_file.write_text(commit_id, encoding="utf-8")
78 return commit_id
79
80
81 # ---------------------------------------------------------------------------
82 # content-grep: ReDoS guard
83 # ---------------------------------------------------------------------------
84
85 class TestContentGrepSecurity:
86 def test_pattern_too_long_rejected(self, tmp_path: pathlib.Path) -> None:
87 root, repo_id = _init_repo(tmp_path)
88 _make_commit(root, repo_id)
89 long_pattern = "a" * 501 # > 500 char limit
90 result = runner.invoke(cli, ["content-grep", "--pattern", long_pattern], env=_env(root))
91 assert result.exit_code != 0
92 assert "too long" in result.output or "Pattern" in result.output
93
94 def test_pattern_exactly_500_chars_accepted(self, tmp_path: pathlib.Path) -> None:
95 root, repo_id = _init_repo(tmp_path)
96 _make_commit(root, repo_id)
97 pattern_500 = "a" * 500
98 result = runner.invoke(cli, ["content-grep", "--pattern", pattern_500], env=_env(root))
99 # No match → exit 1, but not a ReDoS validation failure
100 assert result.exit_code in (0, 1)
101
102 def test_invalid_regex_rejected(self, tmp_path: pathlib.Path) -> None:
103 root, repo_id = _init_repo(tmp_path)
104 _make_commit(root, repo_id)
105 result = runner.invoke(cli, ["content-grep", "--pattern", "[invalid regex"], env=_env(root))
106 assert result.exit_code != 0
107 assert "regex" in result.output.lower() or "invalid" in result.output.lower()
108
109 def test_output_sanitized_no_ansi_injection(self, tmp_path: pathlib.Path) -> None:
110 root, repo_id = _init_repo(tmp_path)
111 content = b"normal line\n\x1b[31mRED\x1b[0m line\nanother\n"
112 obj_id = hashlib.sha256(content).hexdigest()
113 obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:]
114 obj_path.parent.mkdir(parents=True, exist_ok=True)
115 obj_path.write_bytes(content)
116
117 from muse.core.store import SnapshotRecord, CommitRecord, write_snapshot, write_commit
118 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
119
120 manifest = {"file.txt": obj_id}
121 snap_id = compute_snapshot_id(manifest)
122 committed_at = datetime.datetime.now(datetime.timezone.utc)
123 commit_id = compute_commit_id([], snap_id, "test", committed_at.isoformat())
124 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
125 write_commit(root, CommitRecord(
126 commit_id=commit_id, repo_id=repo_id, branch="main",
127 snapshot_id=snap_id, message="test", committed_at=committed_at,
128 parent_commit_id=None,
129 ))
130 (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id)
131
132 result = runner.invoke(cli, ["content-grep", "--pattern", "RED"], env=_env(root))
133 if result.exit_code == 0:
134 assert "\x1b" not in result.output
135
136
137 # ---------------------------------------------------------------------------
138 # archive: zip-slip guard
139 # ---------------------------------------------------------------------------
140
141 class TestArchiveSecurity:
142 def test_archive_prefix_with_dotdot_rejected(self, tmp_path: pathlib.Path) -> None:
143 root, repo_id = _init_repo(tmp_path)
144 _make_commit(root, repo_id)
145 result = runner.invoke(cli, ["archive", "--prefix", "../../evil"], env=_env(root))
146 assert result.exit_code != 0
147
148 def test_zip_slip_guard_in_safe_arcname(self) -> None:
149 from muse.cli.commands.archive import _safe_arcname
150 assert _safe_arcname("safe", "../../../etc/passwd") is None
151 assert _safe_arcname("safe", "/etc/passwd") is None
152 assert _safe_arcname("safe", "normal/path.txt") == "safe/normal/path.txt"
153
154
155 # ---------------------------------------------------------------------------
156 # snapshot: glob prefix sanitisation
157 # ---------------------------------------------------------------------------
158
159 class TestSnapshotSecurity:
160 def test_validate_snapshot_id_prefix_strips_metacharacters(self) -> None:
161 from muse.cli.commands.snapshot_cmd import _validate_snapshot_id_prefix
162 prefix = _validate_snapshot_id_prefix("*bad[0-9]?glob*")
163 assert "*" not in prefix
164 assert "[" not in prefix
165 assert "?" not in prefix
166 assert all(c in "0123456789abcdef" for c in prefix)
167
168 def test_snapshot_show_with_glob_meta_no_injection(
169 self, tmp_path: pathlib.Path
170 ) -> None:
171 """Glob metacharacters in the snapshot ID prefix must be sanitised."""
172 root, repo_id = _init_repo(tmp_path)
173 _make_commit(root, repo_id)
174 # The '*' prefix is sanitised to empty string (no hex chars), so the
175 # command finds nothing but must not raise an exception or expose paths.
176 result = runner.invoke(cli, ["snapshot", "show", "*"], env=_env(root))
177 # Should not crash; may exit 0 (empty match) or non-zero (not found)
178 assert "\x1b" not in result.output
179 assert result.exception is None
180
181 def test_safe_arcname_in_snapshot(self) -> None:
182 from muse.cli.commands.snapshot_cmd import _safe_arcname
183 assert _safe_arcname("", "../../../etc/passwd") is None
184 assert _safe_arcname("prefix", "safe.txt") == "prefix/safe.txt"
185
186
187 # ---------------------------------------------------------------------------
188 # checkout: validate_branch_name on switch
189 # ---------------------------------------------------------------------------
190
191 class TestCheckoutSecurity:
192 def test_checkout_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None:
193 root, repo_id = _init_repo(tmp_path)
194 _make_commit(root, repo_id)
195 result = runner.invoke(cli, ["checkout", "../evil"], env=_env(root))
196 assert result.exit_code != 0
197
198 def test_checkout_double_dot_rejected(self, tmp_path: pathlib.Path) -> None:
199 root, repo_id = _init_repo(tmp_path)
200 _make_commit(root, repo_id)
201 result = runner.invoke(cli, ["checkout", ".."], env=_env(root))
202 assert result.exit_code != 0
203
204 def test_checkout_valid_existing_branch_works(self, tmp_path: pathlib.Path) -> None:
205 root, repo_id = _init_repo(tmp_path)
206 _make_commit(root, repo_id)
207 # Create a second branch and switch to it
208 (root / ".muse" / "refs" / "heads" / "dev").write_text(
209 (root / ".muse" / "refs" / "heads" / "main").read_text()
210 )
211 result = runner.invoke(cli, ["checkout", "dev"], env=_env(root), catch_exceptions=False)
212 assert result.exit_code == 0
213
214
215 # ---------------------------------------------------------------------------
216 # rebase: validate_branch_name on upstream/onto
217 # ---------------------------------------------------------------------------
218
219 class TestRebaseSecurity:
220 def test_rebase_invalid_upstream_fails(self, tmp_path: pathlib.Path) -> None:
221 root, repo_id = _init_repo(tmp_path)
222 _make_commit(root, repo_id)
223 result = runner.invoke(cli, ["rebase", "../../../etc/passwd"], env=_env(root))
224 assert result.exit_code != 0
225
226
227 # ---------------------------------------------------------------------------
228 # stash: atomic write regression
229 # ---------------------------------------------------------------------------
230
231 class TestStashAtomicWrite:
232 def test_no_temp_files_after_save(self, tmp_path: pathlib.Path) -> None:
233 root, _ = _init_repo(tmp_path)
234 from muse.cli.commands.stash import _save_stash, StashEntry
235 entries = [
236 StashEntry(
237 snapshot_id="a" * 64, manifest={},
238 branch="main", stashed_at="2025-01-01T00:00:00+00:00"
239 )
240 ]
241 _save_stash(root, entries)
242 assert list((root / ".muse").glob(".stash_tmp_*")) == []
243 assert (root / ".muse" / "stash.json").exists()
244
245 def test_stash_file_contents_after_atomic_write(self, tmp_path: pathlib.Path) -> None:
246 root, _ = _init_repo(tmp_path)
247 from muse.cli.commands.stash import _save_stash, _load_stash, StashEntry
248 _save_stash(root, [
249 StashEntry(snapshot_id="b" * 64, manifest={},
250 branch="main", stashed_at="2025-06-01T12:00:00+00:00")
251 ])
252 loaded = _load_stash(root)
253 assert len(loaded) == 1
254 assert loaded[0]["snapshot_id"] == "b" * 64
255
256
257 # ---------------------------------------------------------------------------
258 # show: sanitize_display regression
259 # ---------------------------------------------------------------------------
260
261 class TestShowDisplaySanitize:
262 def test_commit_message_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None:
263 root, repo_id = _init_repo(tmp_path)
264 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
265 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
266
267 snap_id = compute_snapshot_id({})
268 committed_at = datetime.datetime.now(datetime.timezone.utc)
269 commit_id = compute_commit_id([], snap_id, "clean", committed_at.isoformat())
270 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}))
271 write_commit(root, CommitRecord(
272 commit_id=commit_id, repo_id=repo_id, branch="main",
273 snapshot_id=snap_id,
274 message="evil\x1b[31mRED\x1b[0m message",
275 committed_at=committed_at, parent_commit_id=None,
276 author="Alice\x1b[0m",
277 ))
278 (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id)
279
280 result = runner.invoke(cli, ["show"], env=_env(root), catch_exceptions=False)
281 assert result.exit_code == 0
282 assert "\x1b" not in result.output
283
284
285 # ---------------------------------------------------------------------------
286 # reflog: operation sanitization regression
287 # ---------------------------------------------------------------------------
288
289 class TestReflogSanitize:
290 def test_operation_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None:
291 root, repo_id = _init_repo(tmp_path)
292 from muse.core.reflog import append_reflog
293 _make_commit(root, repo_id)
294 append_reflog(
295 root, "main",
296 old_id="0" * 64, new_id="a" * 64,
297 author="user", operation="evil\x1b[31mRED\x1b[0m",
298 )
299 result = runner.invoke(cli, ["reflog"], env=_env(root), catch_exceptions=False)
300 assert "\x1b" not in result.output