test_cmd_archive.py
python
| 1 | """Comprehensive tests for ``muse archive``. |
| 2 | |
| 3 | Covers: |
| 4 | - Unit: _safe_arcname zip-slip guard |
| 5 | - Integration: archive a commit to tar.gz and zip |
| 6 | - E2E: full CLI via CliRunner with output path |
| 7 | - Security: --prefix validation, zip-slip prevention in manifest paths |
| 8 | - Stress: archive with many tracked files |
| 9 | """ |
| 10 | |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import datetime |
| 14 | import hashlib |
| 15 | import json |
| 16 | import pathlib |
| 17 | import tarfile |
| 18 | import uuid |
| 19 | import zipfile |
| 20 | |
| 21 | import pytest |
| 22 | from typer.testing import CliRunner |
| 23 | |
| 24 | from muse.cli.app import cli |
| 25 | |
| 26 | runner = CliRunner() |
| 27 | |
| 28 | |
| 29 | # --------------------------------------------------------------------------- |
| 30 | # Helpers |
| 31 | # --------------------------------------------------------------------------- |
| 32 | |
| 33 | def _env(root: pathlib.Path) -> dict[str, str]: |
| 34 | return {"MUSE_REPO_ROOT": str(root)} |
| 35 | |
| 36 | |
| 37 | def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 38 | muse_dir = tmp_path / ".muse" |
| 39 | muse_dir.mkdir() |
| 40 | repo_id = str(uuid.uuid4()) |
| 41 | (muse_dir / "repo.json").write_text(json.dumps({ |
| 42 | "repo_id": repo_id, |
| 43 | "domain": "midi", |
| 44 | "default_branch": "main", |
| 45 | "created_at": "2025-01-01T00:00:00+00:00", |
| 46 | }), encoding="utf-8") |
| 47 | (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 48 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 49 | (muse_dir / "snapshots").mkdir() |
| 50 | (muse_dir / "commits").mkdir() |
| 51 | (muse_dir / "objects").mkdir() |
| 52 | return tmp_path, repo_id |
| 53 | |
| 54 | |
| 55 | def _make_commit_with_files( |
| 56 | root: pathlib.Path, repo_id: str, files: dict[str, bytes] | None = None |
| 57 | ) -> str: |
| 58 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 59 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 60 | |
| 61 | ref_file = root / ".muse" / "refs" / "heads" / "main" |
| 62 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 63 | |
| 64 | manifest: dict[str, str] = {} |
| 65 | if files: |
| 66 | for rel_path, content in files.items(): |
| 67 | obj_id = hashlib.sha256(content).hexdigest() |
| 68 | obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 69 | obj_path.parent.mkdir(parents=True, exist_ok=True) |
| 70 | obj_path.write_bytes(content) |
| 71 | manifest[rel_path] = obj_id |
| 72 | |
| 73 | snap_id = compute_snapshot_id(manifest) |
| 74 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 75 | commit_id = compute_commit_id( |
| 76 | parent_ids=[parent_id] if parent_id else [], |
| 77 | snapshot_id=snap_id, message="archive test", |
| 78 | committed_at_iso=committed_at.isoformat(), |
| 79 | ) |
| 80 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 81 | write_commit(root, CommitRecord( |
| 82 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 83 | snapshot_id=snap_id, message="archive test", |
| 84 | committed_at=committed_at, parent_commit_id=parent_id, |
| 85 | )) |
| 86 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 87 | ref_file.write_text(commit_id, encoding="utf-8") |
| 88 | return commit_id |
| 89 | |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # Unit tests |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | class TestArchiveUnit: |
| 96 | def test_safe_arcname_normal_path(self) -> None: |
| 97 | from muse.cli.commands.archive import _safe_arcname |
| 98 | assert _safe_arcname("myproject", "state/song.mid") == "myproject/state/song.mid" |
| 99 | |
| 100 | def test_safe_arcname_no_prefix(self) -> None: |
| 101 | from muse.cli.commands.archive import _safe_arcname |
| 102 | assert _safe_arcname("", "state/song.mid") == "state/song.mid" |
| 103 | |
| 104 | def test_safe_arcname_traversal_in_rel_path_rejected(self) -> None: |
| 105 | from muse.cli.commands.archive import _safe_arcname |
| 106 | assert _safe_arcname("prefix", "../../../etc/passwd") is None |
| 107 | |
| 108 | def test_safe_arcname_absolute_rel_path_rejected(self) -> None: |
| 109 | from muse.cli.commands.archive import _safe_arcname |
| 110 | assert _safe_arcname("prefix", "/etc/passwd") is None |
| 111 | |
| 112 | def test_safe_arcname_traversal_in_prefix_rejected(self) -> None: |
| 113 | from muse.cli.commands.archive import _safe_arcname |
| 114 | assert _safe_arcname("../evil", "file.txt") is None |
| 115 | |
| 116 | def test_safe_arcname_trailing_slash_normalised(self) -> None: |
| 117 | from muse.cli.commands.archive import _safe_arcname |
| 118 | assert _safe_arcname("myproject/", "file.txt") == "myproject/file.txt" |
| 119 | |
| 120 | |
| 121 | # --------------------------------------------------------------------------- |
| 122 | # Integration tests |
| 123 | # --------------------------------------------------------------------------- |
| 124 | |
| 125 | class TestArchiveIntegration: |
| 126 | def test_archive_empty_commit(self, tmp_path: pathlib.Path) -> None: |
| 127 | root, repo_id = _init_repo(tmp_path) |
| 128 | _make_commit_with_files(root, repo_id, files={}) |
| 129 | out = tmp_path / "out.tar.gz" |
| 130 | result = runner.invoke(cli, ["archive", "--output", str(out)], env=_env(root), catch_exceptions=False) |
| 131 | assert result.exit_code == 0 |
| 132 | assert out.exists() |
| 133 | |
| 134 | def test_archive_tar_gz_contains_files(self, tmp_path: pathlib.Path) -> None: |
| 135 | root, repo_id = _init_repo(tmp_path) |
| 136 | _make_commit_with_files(root, repo_id, files={"state/song.mid": b"\x00\x00MIDI"}) |
| 137 | out = tmp_path / "archive.tar.gz" |
| 138 | result = runner.invoke(cli, ["archive", "--output", str(out)], env=_env(root), catch_exceptions=False) |
| 139 | assert result.exit_code == 0 |
| 140 | with tarfile.open(out, "r:gz") as tf: |
| 141 | names = tf.getnames() |
| 142 | assert any("song.mid" in n for n in names) |
| 143 | |
| 144 | def test_archive_zip_contains_files(self, tmp_path: pathlib.Path) -> None: |
| 145 | root, repo_id = _init_repo(tmp_path) |
| 146 | _make_commit_with_files(root, repo_id, files={"track.mid": b"MIDIdata"}) |
| 147 | out = tmp_path / "archive.zip" |
| 148 | result = runner.invoke( |
| 149 | cli, ["archive", "--format", "zip", "--output", str(out)], |
| 150 | env=_env(root), catch_exceptions=False, |
| 151 | ) |
| 152 | assert result.exit_code == 0 |
| 153 | with zipfile.ZipFile(out, "r") as zf: |
| 154 | names = zf.namelist() |
| 155 | assert any("track.mid" in n for n in names) |
| 156 | |
| 157 | def test_archive_with_prefix(self, tmp_path: pathlib.Path) -> None: |
| 158 | root, repo_id = _init_repo(tmp_path) |
| 159 | _make_commit_with_files(root, repo_id, files={"song.mid": b"data"}) |
| 160 | out = tmp_path / "prefixed.tar.gz" |
| 161 | result = runner.invoke( |
| 162 | cli, ["archive", "--output", str(out), "--prefix", "myband-v1.0/"], |
| 163 | env=_env(root), catch_exceptions=False, |
| 164 | ) |
| 165 | assert result.exit_code == 0 |
| 166 | with tarfile.open(out, "r:gz") as tf: |
| 167 | names = tf.getnames() |
| 168 | assert any("myband-v1.0" in n for n in names) |
| 169 | |
| 170 | def test_archive_unknown_format_fails(self, tmp_path: pathlib.Path) -> None: |
| 171 | root, repo_id = _init_repo(tmp_path) |
| 172 | _make_commit_with_files(root, repo_id) |
| 173 | result = runner.invoke(cli, ["archive", "--format", "rar"], env=_env(root)) |
| 174 | assert result.exit_code != 0 |
| 175 | |
| 176 | def test_archive_no_commits_fails(self, tmp_path: pathlib.Path) -> None: |
| 177 | root, repo_id = _init_repo(tmp_path) |
| 178 | result = runner.invoke(cli, ["archive"], env=_env(root)) |
| 179 | assert result.exit_code != 0 |
| 180 | |
| 181 | def test_archive_short_flags(self, tmp_path: pathlib.Path) -> None: |
| 182 | root, repo_id = _init_repo(tmp_path) |
| 183 | _make_commit_with_files(root, repo_id, files={"test.mid": b"data"}) |
| 184 | out = tmp_path / "short.tar.gz" |
| 185 | result = runner.invoke( |
| 186 | cli, ["archive", "-f", "tar.gz", "-o", str(out)], |
| 187 | env=_env(root), catch_exceptions=False, |
| 188 | ) |
| 189 | assert result.exit_code == 0 |
| 190 | |
| 191 | |
| 192 | # --------------------------------------------------------------------------- |
| 193 | # Security tests |
| 194 | # --------------------------------------------------------------------------- |
| 195 | |
| 196 | class TestArchiveSecurity: |
| 197 | def test_prefix_traversal_rejected(self, tmp_path: pathlib.Path) -> None: |
| 198 | root, repo_id = _init_repo(tmp_path) |
| 199 | _make_commit_with_files(root, repo_id, files={"song.mid": b"data"}) |
| 200 | out = tmp_path / "evil.tar.gz" |
| 201 | result = runner.invoke( |
| 202 | cli, ["archive", "--output", str(out), "--prefix", "../evil/"], |
| 203 | env=_env(root), |
| 204 | ) |
| 205 | assert result.exit_code != 0 |
| 206 | |
| 207 | def test_zip_slip_manifest_path_skipped(self, tmp_path: pathlib.Path) -> None: |
| 208 | """A manifest entry with '../' is skipped, not written to archive.""" |
| 209 | root, repo_id = _init_repo(tmp_path) |
| 210 | from muse.cli.commands.archive import _build_tar |
| 211 | content = b"evil content" |
| 212 | obj_id = hashlib.sha256(content).hexdigest() |
| 213 | obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 214 | obj_path.parent.mkdir(parents=True, exist_ok=True) |
| 215 | obj_path.write_bytes(content) |
| 216 | |
| 217 | out = tmp_path / "safe.tar.gz" |
| 218 | manifest = {"../../../etc/passwd": obj_id, "safe.txt": obj_id} |
| 219 | count = _build_tar(root, manifest, out, prefix="") |
| 220 | assert count == 1 # only safe.txt |
| 221 | with tarfile.open(out, "r:gz") as tf: |
| 222 | names = tf.getnames() |
| 223 | assert all("etc" not in n for n in names) |
| 224 | |
| 225 | |
| 226 | # --------------------------------------------------------------------------- |
| 227 | # Stress tests |
| 228 | # --------------------------------------------------------------------------- |
| 229 | |
| 230 | class TestArchiveStress: |
| 231 | def test_archive_many_files(self, tmp_path: pathlib.Path) -> None: |
| 232 | root, repo_id = _init_repo(tmp_path) |
| 233 | files = {f"track_{i:03d}.mid": f"MIDI{i}".encode() for i in range(50)} |
| 234 | _make_commit_with_files(root, repo_id, files=files) |
| 235 | out = tmp_path / "many.tar.gz" |
| 236 | result = runner.invoke(cli, ["archive", "--output", str(out)], env=_env(root), catch_exceptions=False) |
| 237 | assert result.exit_code == 0 |
| 238 | with tarfile.open(out, "r:gz") as tf: |
| 239 | names = tf.getnames() |
| 240 | assert len(names) == 50 |