test_porcelain_security.py
python
| 1 | """Security-focused regression tests for all porcelain hardening fixes. |
| 2 | |
| 3 | These tests verify the specific security improvements made during the |
| 4 | porcelain hardening pass: |
| 5 | |
| 6 | - ReDoS guard in content-grep (pattern length limit) |
| 7 | - Zip-slip prevention in archive and snapshot export |
| 8 | - validate_branch_name added to checkout and rebase |
| 9 | - sanitize_display applied to all user-sourced echoed strings |
| 10 | - Atomic stash writes (no temp file corruption) |
| 11 | - Snapshot ID glob prefix sanitisation |
| 12 | """ |
| 13 | |
| 14 | from __future__ import annotations |
| 15 | |
| 16 | import datetime |
| 17 | import hashlib |
| 18 | import json |
| 19 | import pathlib |
| 20 | import uuid |
| 21 | |
| 22 | import pytest |
| 23 | from tests.cli_test_helper import CliRunner |
| 24 | |
| 25 | cli = None # argparse migration — CliRunner ignores this arg |
| 26 | |
| 27 | runner = CliRunner() |
| 28 | |
| 29 | |
| 30 | # --------------------------------------------------------------------------- |
| 31 | # Shared repo setup helper |
| 32 | # --------------------------------------------------------------------------- |
| 33 | |
| 34 | def _env(root: pathlib.Path) -> dict[str, str]: |
| 35 | return {"MUSE_REPO_ROOT": str(root)} |
| 36 | |
| 37 | |
| 38 | def _init_repo(tmp_path: pathlib.Path, domain: str = "midi") -> tuple[pathlib.Path, str]: |
| 39 | muse_dir = tmp_path / ".muse" |
| 40 | muse_dir.mkdir() |
| 41 | repo_id = str(uuid.uuid4()) |
| 42 | (muse_dir / "repo.json").write_text(json.dumps({ |
| 43 | "repo_id": repo_id, |
| 44 | "domain": domain, |
| 45 | "default_branch": "main", |
| 46 | "created_at": "2025-01-01T00:00:00+00:00", |
| 47 | }), encoding="utf-8") |
| 48 | (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 49 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 50 | (muse_dir / "snapshots").mkdir() |
| 51 | (muse_dir / "commits").mkdir() |
| 52 | (muse_dir / "objects").mkdir() |
| 53 | return tmp_path, repo_id |
| 54 | |
| 55 | |
| 56 | def _make_commit(root: pathlib.Path, repo_id: str, message: str = "test") -> str: |
| 57 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 58 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 59 | |
| 60 | ref_file = root / ".muse" / "refs" / "heads" / "main" |
| 61 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 62 | manifest: dict[str, str] = {} |
| 63 | snap_id = compute_snapshot_id(manifest) |
| 64 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 65 | commit_id = compute_commit_id( |
| 66 | parent_ids=[parent_id] if parent_id else [], |
| 67 | snapshot_id=snap_id, message=message, |
| 68 | committed_at_iso=committed_at.isoformat(), |
| 69 | ) |
| 70 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 71 | write_commit(root, CommitRecord( |
| 72 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 73 | snapshot_id=snap_id, message=message, committed_at=committed_at, |
| 74 | parent_commit_id=parent_id, |
| 75 | )) |
| 76 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 77 | ref_file.write_text(commit_id, encoding="utf-8") |
| 78 | return commit_id |
| 79 | |
| 80 | |
| 81 | # --------------------------------------------------------------------------- |
| 82 | # content-grep: ReDoS guard |
| 83 | # --------------------------------------------------------------------------- |
| 84 | |
| 85 | class TestContentGrepSecurity: |
| 86 | def test_pattern_too_long_rejected(self, tmp_path: pathlib.Path) -> None: |
| 87 | root, repo_id = _init_repo(tmp_path) |
| 88 | _make_commit(root, repo_id) |
| 89 | long_pattern = "a" * 501 # > 500 char limit |
| 90 | result = runner.invoke(cli, ["content-grep", "--pattern", long_pattern], env=_env(root)) |
| 91 | assert result.exit_code != 0 |
| 92 | assert "too long" in result.output or "Pattern" in result.output |
| 93 | |
| 94 | def test_pattern_exactly_500_chars_accepted(self, tmp_path: pathlib.Path) -> None: |
| 95 | root, repo_id = _init_repo(tmp_path) |
| 96 | _make_commit(root, repo_id) |
| 97 | pattern_500 = "a" * 500 |
| 98 | result = runner.invoke(cli, ["content-grep", "--pattern", pattern_500], env=_env(root)) |
| 99 | # No match → exit 1, but not a ReDoS validation failure |
| 100 | assert result.exit_code in (0, 1) |
| 101 | |
| 102 | def test_invalid_regex_rejected(self, tmp_path: pathlib.Path) -> None: |
| 103 | root, repo_id = _init_repo(tmp_path) |
| 104 | _make_commit(root, repo_id) |
| 105 | result = runner.invoke(cli, ["content-grep", "--pattern", "[invalid regex"], env=_env(root)) |
| 106 | assert result.exit_code != 0 |
| 107 | assert "regex" in result.output.lower() or "invalid" in result.output.lower() |
| 108 | |
| 109 | def test_output_sanitized_no_ansi_injection(self, tmp_path: pathlib.Path) -> None: |
| 110 | root, repo_id = _init_repo(tmp_path) |
| 111 | content = b"normal line\n\x1b[31mRED\x1b[0m line\nanother\n" |
| 112 | obj_id = hashlib.sha256(content).hexdigest() |
| 113 | obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 114 | obj_path.parent.mkdir(parents=True, exist_ok=True) |
| 115 | obj_path.write_bytes(content) |
| 116 | |
| 117 | from muse.core.store import SnapshotRecord, CommitRecord, write_snapshot, write_commit |
| 118 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 119 | |
| 120 | manifest = {"file.txt": obj_id} |
| 121 | snap_id = compute_snapshot_id(manifest) |
| 122 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 123 | commit_id = compute_commit_id([], snap_id, "test", committed_at.isoformat()) |
| 124 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 125 | write_commit(root, CommitRecord( |
| 126 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 127 | snapshot_id=snap_id, message="test", committed_at=committed_at, |
| 128 | parent_commit_id=None, |
| 129 | )) |
| 130 | (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id) |
| 131 | |
| 132 | result = runner.invoke(cli, ["content-grep", "--pattern", "RED"], env=_env(root)) |
| 133 | if result.exit_code == 0: |
| 134 | assert "\x1b" not in result.output |
| 135 | |
| 136 | |
| 137 | # --------------------------------------------------------------------------- |
| 138 | # archive: zip-slip guard |
| 139 | # --------------------------------------------------------------------------- |
| 140 | |
| 141 | class TestArchiveSecurity: |
| 142 | def test_archive_prefix_with_dotdot_rejected(self, tmp_path: pathlib.Path) -> None: |
| 143 | root, repo_id = _init_repo(tmp_path) |
| 144 | _make_commit(root, repo_id) |
| 145 | result = runner.invoke(cli, ["archive", "--prefix", "../../evil"], env=_env(root)) |
| 146 | assert result.exit_code != 0 |
| 147 | |
| 148 | def test_zip_slip_guard_in_safe_arcname(self) -> None: |
| 149 | from muse.cli.commands.archive import _safe_arcname |
| 150 | assert _safe_arcname("safe", "../../../etc/passwd") is None |
| 151 | assert _safe_arcname("safe", "/etc/passwd") is None |
| 152 | assert _safe_arcname("safe", "normal/path.txt") == "safe/normal/path.txt" |
| 153 | |
| 154 | |
| 155 | # --------------------------------------------------------------------------- |
| 156 | # snapshot: glob prefix sanitisation |
| 157 | # --------------------------------------------------------------------------- |
| 158 | |
| 159 | class TestSnapshotSecurity: |
| 160 | def test_validate_snapshot_id_prefix_strips_metacharacters(self) -> None: |
| 161 | from muse.cli.commands.snapshot_cmd import _validate_snapshot_id_prefix |
| 162 | prefix = _validate_snapshot_id_prefix("*bad[0-9]?glob*") |
| 163 | assert "*" not in prefix |
| 164 | assert "[" not in prefix |
| 165 | assert "?" not in prefix |
| 166 | assert all(c in "0123456789abcdef" for c in prefix) |
| 167 | |
| 168 | def test_snapshot_show_with_glob_meta_no_injection( |
| 169 | self, tmp_path: pathlib.Path |
| 170 | ) -> None: |
| 171 | """Glob metacharacters in the snapshot ID prefix must be sanitised.""" |
| 172 | root, repo_id = _init_repo(tmp_path) |
| 173 | _make_commit(root, repo_id) |
| 174 | # The '*' prefix is sanitised to empty string (no hex chars), so the |
| 175 | # command finds nothing but must not raise an exception or expose paths. |
| 176 | result = runner.invoke(cli, ["snapshot", "show", "*"], env=_env(root)) |
| 177 | # Should not crash; may exit 0 (empty match) or non-zero (not found) |
| 178 | assert "\x1b" not in result.output |
| 179 | assert result.exception is None |
| 180 | |
| 181 | def test_safe_arcname_in_snapshot(self) -> None: |
| 182 | from muse.cli.commands.snapshot_cmd import _safe_arcname |
| 183 | assert _safe_arcname("", "../../../etc/passwd") is None |
| 184 | assert _safe_arcname("prefix", "safe.txt") == "prefix/safe.txt" |
| 185 | |
| 186 | |
| 187 | # --------------------------------------------------------------------------- |
| 188 | # checkout: validate_branch_name on switch |
| 189 | # --------------------------------------------------------------------------- |
| 190 | |
| 191 | class TestCheckoutSecurity: |
| 192 | def test_checkout_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 193 | root, repo_id = _init_repo(tmp_path) |
| 194 | _make_commit(root, repo_id) |
| 195 | result = runner.invoke(cli, ["checkout", "../evil"], env=_env(root)) |
| 196 | assert result.exit_code != 0 |
| 197 | |
| 198 | def test_checkout_double_dot_rejected(self, tmp_path: pathlib.Path) -> None: |
| 199 | root, repo_id = _init_repo(tmp_path) |
| 200 | _make_commit(root, repo_id) |
| 201 | result = runner.invoke(cli, ["checkout", ".."], env=_env(root)) |
| 202 | assert result.exit_code != 0 |
| 203 | |
| 204 | def test_checkout_valid_existing_branch_works(self, tmp_path: pathlib.Path) -> None: |
| 205 | root, repo_id = _init_repo(tmp_path) |
| 206 | _make_commit(root, repo_id) |
| 207 | # Create a second branch and switch to it |
| 208 | (root / ".muse" / "refs" / "heads" / "dev").write_text( |
| 209 | (root / ".muse" / "refs" / "heads" / "main").read_text() |
| 210 | ) |
| 211 | result = runner.invoke(cli, ["checkout", "dev"], env=_env(root), catch_exceptions=False) |
| 212 | assert result.exit_code == 0 |
| 213 | |
| 214 | |
| 215 | # --------------------------------------------------------------------------- |
| 216 | # rebase: validate_branch_name on upstream/onto |
| 217 | # --------------------------------------------------------------------------- |
| 218 | |
| 219 | class TestRebaseSecurity: |
| 220 | def test_rebase_invalid_upstream_fails(self, tmp_path: pathlib.Path) -> None: |
| 221 | root, repo_id = _init_repo(tmp_path) |
| 222 | _make_commit(root, repo_id) |
| 223 | result = runner.invoke(cli, ["rebase", "../../../etc/passwd"], env=_env(root)) |
| 224 | assert result.exit_code != 0 |
| 225 | |
| 226 | |
| 227 | # --------------------------------------------------------------------------- |
| 228 | # stash: atomic write regression |
| 229 | # --------------------------------------------------------------------------- |
| 230 | |
| 231 | class TestStashAtomicWrite: |
| 232 | def test_no_temp_files_after_save(self, tmp_path: pathlib.Path) -> None: |
| 233 | root, _ = _init_repo(tmp_path) |
| 234 | from muse.cli.commands.stash import _save_stash, StashEntry |
| 235 | entries = [ |
| 236 | StashEntry( |
| 237 | snapshot_id="a" * 64, manifest={}, |
| 238 | branch="main", stashed_at="2025-01-01T00:00:00+00:00" |
| 239 | ) |
| 240 | ] |
| 241 | _save_stash(root, entries) |
| 242 | assert list((root / ".muse").glob(".stash_tmp_*")) == [] |
| 243 | assert (root / ".muse" / "stash.json").exists() |
| 244 | |
| 245 | def test_stash_file_contents_after_atomic_write(self, tmp_path: pathlib.Path) -> None: |
| 246 | root, _ = _init_repo(tmp_path) |
| 247 | from muse.cli.commands.stash import _save_stash, _load_stash, StashEntry |
| 248 | _save_stash(root, [ |
| 249 | StashEntry(snapshot_id="b" * 64, manifest={}, |
| 250 | branch="main", stashed_at="2025-06-01T12:00:00+00:00") |
| 251 | ]) |
| 252 | loaded = _load_stash(root) |
| 253 | assert len(loaded) == 1 |
| 254 | assert loaded[0]["snapshot_id"] == "b" * 64 |
| 255 | |
| 256 | |
| 257 | # --------------------------------------------------------------------------- |
| 258 | # show: sanitize_display regression |
| 259 | # --------------------------------------------------------------------------- |
| 260 | |
| 261 | class TestShowDisplaySanitize: |
| 262 | def test_commit_message_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None: |
| 263 | root, repo_id = _init_repo(tmp_path) |
| 264 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 265 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 266 | |
| 267 | snap_id = compute_snapshot_id({}) |
| 268 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 269 | commit_id = compute_commit_id([], snap_id, "clean", committed_at.isoformat()) |
| 270 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={})) |
| 271 | write_commit(root, CommitRecord( |
| 272 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 273 | snapshot_id=snap_id, |
| 274 | message="evil\x1b[31mRED\x1b[0m message", |
| 275 | committed_at=committed_at, parent_commit_id=None, |
| 276 | author="Alice\x1b[0m", |
| 277 | )) |
| 278 | (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id) |
| 279 | |
| 280 | result = runner.invoke(cli, ["show"], env=_env(root), catch_exceptions=False) |
| 281 | assert result.exit_code == 0 |
| 282 | assert "\x1b" not in result.output |
| 283 | |
| 284 | |
| 285 | # --------------------------------------------------------------------------- |
| 286 | # reflog: operation sanitization regression |
| 287 | # --------------------------------------------------------------------------- |
| 288 | |
| 289 | class TestReflogSanitize: |
| 290 | def test_operation_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None: |
| 291 | root, repo_id = _init_repo(tmp_path) |
| 292 | from muse.core.reflog import append_reflog |
| 293 | _make_commit(root, repo_id) |
| 294 | append_reflog( |
| 295 | root, "main", |
| 296 | old_id="0" * 64, new_id="a" * 64, |
| 297 | author="user", operation="evil\x1b[31mRED\x1b[0m", |
| 298 | ) |
| 299 | result = runner.invoke(cli, ["reflog"], env=_env(root), catch_exceptions=False) |
| 300 | assert "\x1b" not in result.output |