gabriel / muse public
test_porcelain_security.py python
300 lines 12.8 KB
95b86799 feat: add --format json to all porcelain commands for agent-first output Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Security-focused regression tests for all porcelain hardening fixes.
2
3 These tests verify the specific security improvements made during the
4 porcelain hardening pass:
5
6 - ReDoS guard in content-grep (pattern length limit)
7 - Zip-slip prevention in archive and snapshot export
8 - validate_branch_name added to checkout and rebase
9 - sanitize_display applied to all user-sourced echoed strings
10 - Atomic stash writes (no temp file corruption)
11 - Snapshot ID glob prefix sanitisation
12 """
13
14 from __future__ import annotations
15
16 import datetime
17 import hashlib
18 import json
19 import pathlib
20 import uuid
21
22 import pytest
23 from typer.testing import CliRunner
24
25 from muse.cli.app import cli
26
27 runner = CliRunner()
28
29
30 # ---------------------------------------------------------------------------
31 # Shared repo setup helper
32 # ---------------------------------------------------------------------------
33
34 def _env(root: pathlib.Path) -> dict[str, str]:
35 return {"MUSE_REPO_ROOT": str(root)}
36
37
38 def _init_repo(tmp_path: pathlib.Path, domain: str = "midi") -> tuple[pathlib.Path, str]:
39 muse_dir = tmp_path / ".muse"
40 muse_dir.mkdir()
41 repo_id = str(uuid.uuid4())
42 (muse_dir / "repo.json").write_text(json.dumps({
43 "repo_id": repo_id,
44 "domain": domain,
45 "default_branch": "main",
46 "created_at": "2025-01-01T00:00:00+00:00",
47 }), encoding="utf-8")
48 (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
49 (muse_dir / "refs" / "heads").mkdir(parents=True)
50 (muse_dir / "snapshots").mkdir()
51 (muse_dir / "commits").mkdir()
52 (muse_dir / "objects").mkdir()
53 return tmp_path, repo_id
54
55
56 def _make_commit(root: pathlib.Path, repo_id: str, message: str = "test") -> str:
57 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
58 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
59
60 ref_file = root / ".muse" / "refs" / "heads" / "main"
61 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
62 manifest: dict[str, str] = {}
63 snap_id = compute_snapshot_id(manifest)
64 committed_at = datetime.datetime.now(datetime.timezone.utc)
65 commit_id = compute_commit_id(
66 parent_ids=[parent_id] if parent_id else [],
67 snapshot_id=snap_id, message=message,
68 committed_at_iso=committed_at.isoformat(),
69 )
70 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
71 write_commit(root, CommitRecord(
72 commit_id=commit_id, repo_id=repo_id, branch="main",
73 snapshot_id=snap_id, message=message, committed_at=committed_at,
74 parent_commit_id=parent_id,
75 ))
76 ref_file.parent.mkdir(parents=True, exist_ok=True)
77 ref_file.write_text(commit_id, encoding="utf-8")
78 return commit_id
79
80
81 # ---------------------------------------------------------------------------
82 # content-grep: ReDoS guard
83 # ---------------------------------------------------------------------------
84
85 class TestContentGrepSecurity:
86 def test_pattern_too_long_rejected(self, tmp_path: pathlib.Path) -> None:
87 root, repo_id = _init_repo(tmp_path)
88 _make_commit(root, repo_id)
89 long_pattern = "a" * 501 # > 500 char limit
90 result = runner.invoke(cli, ["content-grep", "--pattern", long_pattern], env=_env(root))
91 assert result.exit_code != 0
92 assert "too long" in result.output or "Pattern" in result.output
93
94 def test_pattern_exactly_500_chars_accepted(self, tmp_path: pathlib.Path) -> None:
95 root, repo_id = _init_repo(tmp_path)
96 _make_commit(root, repo_id)
97 pattern_500 = "a" * 500
98 result = runner.invoke(cli, ["content-grep", "--pattern", pattern_500], env=_env(root))
99 # No match → exit 1, but not a ReDoS validation failure
100 assert result.exit_code in (0, 1)
101
102 def test_invalid_regex_rejected(self, tmp_path: pathlib.Path) -> None:
103 root, repo_id = _init_repo(tmp_path)
104 _make_commit(root, repo_id)
105 result = runner.invoke(cli, ["content-grep", "--pattern", "[invalid regex"], env=_env(root))
106 assert result.exit_code != 0
107 assert "regex" in result.output.lower() or "invalid" in result.output.lower()
108
109 def test_output_sanitized_no_ansi_injection(self, tmp_path: pathlib.Path) -> None:
110 root, repo_id = _init_repo(tmp_path)
111 content = b"normal line\n\x1b[31mRED\x1b[0m line\nanother\n"
112 obj_id = hashlib.sha256(content).hexdigest()
113 obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:]
114 obj_path.parent.mkdir(parents=True, exist_ok=True)
115 obj_path.write_bytes(content)
116
117 from muse.core.store import SnapshotRecord, CommitRecord, write_snapshot, write_commit
118 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
119
120 manifest = {"file.txt": obj_id}
121 snap_id = compute_snapshot_id(manifest)
122 committed_at = datetime.datetime.now(datetime.timezone.utc)
123 commit_id = compute_commit_id([], snap_id, "test", committed_at.isoformat())
124 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
125 write_commit(root, CommitRecord(
126 commit_id=commit_id, repo_id=repo_id, branch="main",
127 snapshot_id=snap_id, message="test", committed_at=committed_at,
128 parent_commit_id=None,
129 ))
130 (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id)
131
132 result = runner.invoke(cli, ["content-grep", "--pattern", "RED"], env=_env(root))
133 if result.exit_code == 0:
134 assert "\x1b" not in result.output
135
136
137 # ---------------------------------------------------------------------------
138 # archive: zip-slip guard
139 # ---------------------------------------------------------------------------
140
141 class TestArchiveSecurity:
142 def test_archive_prefix_with_dotdot_rejected(self, tmp_path: pathlib.Path) -> None:
143 root, repo_id = _init_repo(tmp_path)
144 _make_commit(root, repo_id)
145 result = runner.invoke(cli, ["archive", "--prefix", "../../evil"], env=_env(root))
146 assert result.exit_code != 0
147
148 def test_zip_slip_guard_in_safe_arcname(self) -> None:
149 from muse.cli.commands.archive import _safe_arcname
150 assert _safe_arcname("safe", "../../../etc/passwd") is None
151 assert _safe_arcname("safe", "/etc/passwd") is None
152 assert _safe_arcname("safe", "normal/path.txt") == "safe/normal/path.txt"
153
154
155 # ---------------------------------------------------------------------------
156 # snapshot: glob prefix sanitisation
157 # ---------------------------------------------------------------------------
158
159 class TestSnapshotSecurity:
160 def test_validate_snapshot_id_prefix_strips_metacharacters(self) -> None:
161 from muse.cli.commands.snapshot_cmd import _validate_snapshot_id_prefix
162 prefix = _validate_snapshot_id_prefix("*bad[0-9]?glob*")
163 assert "*" not in prefix
164 assert "[" not in prefix
165 assert "?" not in prefix
166 assert all(c in "0123456789abcdef" for c in prefix)
167
168 def test_snapshot_show_with_glob_meta_no_injection(
169 self, tmp_path: pathlib.Path
170 ) -> None:
171 """Glob metacharacters in the snapshot ID prefix must be sanitised."""
172 root, repo_id = _init_repo(tmp_path)
173 _make_commit(root, repo_id)
174 # The '*' prefix is sanitised to empty string (no hex chars), so the
175 # command finds nothing but must not raise an exception or expose paths.
176 result = runner.invoke(cli, ["snapshot", "show", "*"], env=_env(root))
177 # Should not crash; may exit 0 (empty match) or non-zero (not found)
178 assert "\x1b" not in result.output
179 assert result.exception is None
180
181 def test_safe_arcname_in_snapshot(self) -> None:
182 from muse.cli.commands.snapshot_cmd import _safe_arcname
183 assert _safe_arcname("", "../../../etc/passwd") is None
184 assert _safe_arcname("prefix", "safe.txt") == "prefix/safe.txt"
185
186
187 # ---------------------------------------------------------------------------
188 # checkout: validate_branch_name on switch
189 # ---------------------------------------------------------------------------
190
191 class TestCheckoutSecurity:
192 def test_checkout_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None:
193 root, repo_id = _init_repo(tmp_path)
194 _make_commit(root, repo_id)
195 result = runner.invoke(cli, ["checkout", "../evil"], env=_env(root))
196 assert result.exit_code != 0
197
198 def test_checkout_double_dot_rejected(self, tmp_path: pathlib.Path) -> None:
199 root, repo_id = _init_repo(tmp_path)
200 _make_commit(root, repo_id)
201 result = runner.invoke(cli, ["checkout", ".."], env=_env(root))
202 assert result.exit_code != 0
203
204 def test_checkout_valid_existing_branch_works(self, tmp_path: pathlib.Path) -> None:
205 root, repo_id = _init_repo(tmp_path)
206 _make_commit(root, repo_id)
207 # Create a second branch and switch to it
208 (root / ".muse" / "refs" / "heads" / "dev").write_text(
209 (root / ".muse" / "refs" / "heads" / "main").read_text()
210 )
211 result = runner.invoke(cli, ["checkout", "dev"], env=_env(root), catch_exceptions=False)
212 assert result.exit_code == 0
213
214
215 # ---------------------------------------------------------------------------
216 # rebase: validate_branch_name on upstream/onto
217 # ---------------------------------------------------------------------------
218
219 class TestRebaseSecurity:
220 def test_rebase_invalid_upstream_fails(self, tmp_path: pathlib.Path) -> None:
221 root, repo_id = _init_repo(tmp_path)
222 _make_commit(root, repo_id)
223 result = runner.invoke(cli, ["rebase", "../../../etc/passwd"], env=_env(root))
224 assert result.exit_code != 0
225
226
227 # ---------------------------------------------------------------------------
228 # stash: atomic write regression
229 # ---------------------------------------------------------------------------
230
231 class TestStashAtomicWrite:
232 def test_no_temp_files_after_save(self, tmp_path: pathlib.Path) -> None:
233 root, _ = _init_repo(tmp_path)
234 from muse.cli.commands.stash import _save_stash, StashEntry
235 entries = [
236 StashEntry(
237 snapshot_id="a" * 64, manifest={},
238 branch="main", stashed_at="2025-01-01T00:00:00+00:00"
239 )
240 ]
241 _save_stash(root, entries)
242 assert list((root / ".muse").glob(".stash_tmp_*")) == []
243 assert (root / ".muse" / "stash.json").exists()
244
245 def test_stash_file_contents_after_atomic_write(self, tmp_path: pathlib.Path) -> None:
246 root, _ = _init_repo(tmp_path)
247 from muse.cli.commands.stash import _save_stash, _load_stash, StashEntry
248 _save_stash(root, [
249 StashEntry(snapshot_id="b" * 64, manifest={},
250 branch="main", stashed_at="2025-06-01T12:00:00+00:00")
251 ])
252 loaded = _load_stash(root)
253 assert len(loaded) == 1
254 assert loaded[0]["snapshot_id"] == "b" * 64
255
256
257 # ---------------------------------------------------------------------------
258 # show: sanitize_display regression
259 # ---------------------------------------------------------------------------
260
261 class TestShowDisplaySanitize:
262 def test_commit_message_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None:
263 root, repo_id = _init_repo(tmp_path)
264 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
265 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
266
267 snap_id = compute_snapshot_id({})
268 committed_at = datetime.datetime.now(datetime.timezone.utc)
269 commit_id = compute_commit_id([], snap_id, "clean", committed_at.isoformat())
270 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}))
271 write_commit(root, CommitRecord(
272 commit_id=commit_id, repo_id=repo_id, branch="main",
273 snapshot_id=snap_id,
274 message="evil\x1b[31mRED\x1b[0m message",
275 committed_at=committed_at, parent_commit_id=None,
276 author="Alice\x1b[0m",
277 ))
278 (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id)
279
280 result = runner.invoke(cli, ["show"], env=_env(root), catch_exceptions=False)
281 assert result.exit_code == 0
282 assert "\x1b" not in result.output
283
284
285 # ---------------------------------------------------------------------------
286 # reflog: operation sanitization regression
287 # ---------------------------------------------------------------------------
288
289 class TestReflogSanitize:
290 def test_operation_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None:
291 root, repo_id = _init_repo(tmp_path)
292 from muse.core.reflog import append_reflog
293 _make_commit(root, repo_id)
294 append_reflog(
295 root, "main",
296 old_id="0" * 64, new_id="a" * 64,
297 author="user", operation="evil\x1b[31mRED\x1b[0m",
298 )
299 result = runner.invoke(cli, ["reflog"], env=_env(root), catch_exceptions=False)
300 assert "\x1b" not in result.output