gabriel / muse public
test_cmd_archive.py python
240 lines 9.6 KB
86000da9 fix: replace typer CliRunner with argparse-compatible test helper Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Comprehensive tests for ``muse archive``.
2
3 Covers:
4 - Unit: _safe_arcname zip-slip guard
5 - Integration: archive a commit to tar.gz and zip
6 - E2E: full CLI via CliRunner with output path
7 - Security: --prefix validation, zip-slip prevention in manifest paths
8 - Stress: archive with many tracked files
9 """
10
11 from __future__ import annotations
12
13 import datetime
14 import hashlib
15 import json
16 import pathlib
17 import tarfile
18 import uuid
19 import zipfile
20
21 import pytest
22 from tests.cli_test_helper import CliRunner
23
24 cli = None # argparse migration — CliRunner ignores this arg
25
26 runner = CliRunner()
27
28
29 # ---------------------------------------------------------------------------
30 # Helpers
31 # ---------------------------------------------------------------------------
32
33 def _env(root: pathlib.Path) -> dict[str, str]:
34 return {"MUSE_REPO_ROOT": str(root)}
35
36
37 def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
38 muse_dir = tmp_path / ".muse"
39 muse_dir.mkdir()
40 repo_id = str(uuid.uuid4())
41 (muse_dir / "repo.json").write_text(json.dumps({
42 "repo_id": repo_id,
43 "domain": "midi",
44 "default_branch": "main",
45 "created_at": "2025-01-01T00:00:00+00:00",
46 }), encoding="utf-8")
47 (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
48 (muse_dir / "refs" / "heads").mkdir(parents=True)
49 (muse_dir / "snapshots").mkdir()
50 (muse_dir / "commits").mkdir()
51 (muse_dir / "objects").mkdir()
52 return tmp_path, repo_id
53
54
55 def _make_commit_with_files(
56 root: pathlib.Path, repo_id: str, files: dict[str, bytes] | None = None
57 ) -> str:
58 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
59 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
60
61 ref_file = root / ".muse" / "refs" / "heads" / "main"
62 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
63
64 manifest: dict[str, str] = {}
65 if files:
66 for rel_path, content in files.items():
67 obj_id = hashlib.sha256(content).hexdigest()
68 obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:]
69 obj_path.parent.mkdir(parents=True, exist_ok=True)
70 obj_path.write_bytes(content)
71 manifest[rel_path] = obj_id
72
73 snap_id = compute_snapshot_id(manifest)
74 committed_at = datetime.datetime.now(datetime.timezone.utc)
75 commit_id = compute_commit_id(
76 parent_ids=[parent_id] if parent_id else [],
77 snapshot_id=snap_id, message="archive test",
78 committed_at_iso=committed_at.isoformat(),
79 )
80 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
81 write_commit(root, CommitRecord(
82 commit_id=commit_id, repo_id=repo_id, branch="main",
83 snapshot_id=snap_id, message="archive test",
84 committed_at=committed_at, parent_commit_id=parent_id,
85 ))
86 ref_file.parent.mkdir(parents=True, exist_ok=True)
87 ref_file.write_text(commit_id, encoding="utf-8")
88 return commit_id
89
90
91 # ---------------------------------------------------------------------------
92 # Unit tests
93 # ---------------------------------------------------------------------------
94
95 class TestArchiveUnit:
96 def test_safe_arcname_normal_path(self) -> None:
97 from muse.cli.commands.archive import _safe_arcname
98 assert _safe_arcname("myproject", "state/song.mid") == "myproject/state/song.mid"
99
100 def test_safe_arcname_no_prefix(self) -> None:
101 from muse.cli.commands.archive import _safe_arcname
102 assert _safe_arcname("", "state/song.mid") == "state/song.mid"
103
104 def test_safe_arcname_traversal_in_rel_path_rejected(self) -> None:
105 from muse.cli.commands.archive import _safe_arcname
106 assert _safe_arcname("prefix", "../../../etc/passwd") is None
107
108 def test_safe_arcname_absolute_rel_path_rejected(self) -> None:
109 from muse.cli.commands.archive import _safe_arcname
110 assert _safe_arcname("prefix", "/etc/passwd") is None
111
112 def test_safe_arcname_traversal_in_prefix_rejected(self) -> None:
113 from muse.cli.commands.archive import _safe_arcname
114 assert _safe_arcname("../evil", "file.txt") is None
115
116 def test_safe_arcname_trailing_slash_normalised(self) -> None:
117 from muse.cli.commands.archive import _safe_arcname
118 assert _safe_arcname("myproject/", "file.txt") == "myproject/file.txt"
119
120
121 # ---------------------------------------------------------------------------
122 # Integration tests
123 # ---------------------------------------------------------------------------
124
125 class TestArchiveIntegration:
126 def test_archive_empty_commit(self, tmp_path: pathlib.Path) -> None:
127 root, repo_id = _init_repo(tmp_path)
128 _make_commit_with_files(root, repo_id, files={})
129 out = tmp_path / "out.tar.gz"
130 result = runner.invoke(cli, ["archive", "--output", str(out)], env=_env(root), catch_exceptions=False)
131 assert result.exit_code == 0
132 assert out.exists()
133
134 def test_archive_tar_gz_contains_files(self, tmp_path: pathlib.Path) -> None:
135 root, repo_id = _init_repo(tmp_path)
136 _make_commit_with_files(root, repo_id, files={"state/song.mid": b"\x00\x00MIDI"})
137 out = tmp_path / "archive.tar.gz"
138 result = runner.invoke(cli, ["archive", "--output", str(out)], env=_env(root), catch_exceptions=False)
139 assert result.exit_code == 0
140 with tarfile.open(out, "r:gz") as tf:
141 names = tf.getnames()
142 assert any("song.mid" in n for n in names)
143
144 def test_archive_zip_contains_files(self, tmp_path: pathlib.Path) -> None:
145 root, repo_id = _init_repo(tmp_path)
146 _make_commit_with_files(root, repo_id, files={"track.mid": b"MIDIdata"})
147 out = tmp_path / "archive.zip"
148 result = runner.invoke(
149 cli, ["archive", "--format", "zip", "--output", str(out)],
150 env=_env(root), catch_exceptions=False,
151 )
152 assert result.exit_code == 0
153 with zipfile.ZipFile(out, "r") as zf:
154 names = zf.namelist()
155 assert any("track.mid" in n for n in names)
156
157 def test_archive_with_prefix(self, tmp_path: pathlib.Path) -> None:
158 root, repo_id = _init_repo(tmp_path)
159 _make_commit_with_files(root, repo_id, files={"song.mid": b"data"})
160 out = tmp_path / "prefixed.tar.gz"
161 result = runner.invoke(
162 cli, ["archive", "--output", str(out), "--prefix", "myband-v1.0/"],
163 env=_env(root), catch_exceptions=False,
164 )
165 assert result.exit_code == 0
166 with tarfile.open(out, "r:gz") as tf:
167 names = tf.getnames()
168 assert any("myband-v1.0" in n for n in names)
169
170 def test_archive_unknown_format_fails(self, tmp_path: pathlib.Path) -> None:
171 root, repo_id = _init_repo(tmp_path)
172 _make_commit_with_files(root, repo_id)
173 result = runner.invoke(cli, ["archive", "--format", "rar"], env=_env(root))
174 assert result.exit_code != 0
175
176 def test_archive_no_commits_fails(self, tmp_path: pathlib.Path) -> None:
177 root, repo_id = _init_repo(tmp_path)
178 result = runner.invoke(cli, ["archive"], env=_env(root))
179 assert result.exit_code != 0
180
181 def test_archive_short_flags(self, tmp_path: pathlib.Path) -> None:
182 root, repo_id = _init_repo(tmp_path)
183 _make_commit_with_files(root, repo_id, files={"test.mid": b"data"})
184 out = tmp_path / "short.tar.gz"
185 result = runner.invoke(
186 cli, ["archive", "-f", "tar.gz", "-o", str(out)],
187 env=_env(root), catch_exceptions=False,
188 )
189 assert result.exit_code == 0
190
191
192 # ---------------------------------------------------------------------------
193 # Security tests
194 # ---------------------------------------------------------------------------
195
196 class TestArchiveSecurity:
197 def test_prefix_traversal_rejected(self, tmp_path: pathlib.Path) -> None:
198 root, repo_id = _init_repo(tmp_path)
199 _make_commit_with_files(root, repo_id, files={"song.mid": b"data"})
200 out = tmp_path / "evil.tar.gz"
201 result = runner.invoke(
202 cli, ["archive", "--output", str(out), "--prefix", "../evil/"],
203 env=_env(root),
204 )
205 assert result.exit_code != 0
206
207 def test_zip_slip_manifest_path_skipped(self, tmp_path: pathlib.Path) -> None:
208 """A manifest entry with '../' is skipped, not written to archive."""
209 root, repo_id = _init_repo(tmp_path)
210 from muse.cli.commands.archive import _build_tar
211 content = b"evil content"
212 obj_id = hashlib.sha256(content).hexdigest()
213 obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:]
214 obj_path.parent.mkdir(parents=True, exist_ok=True)
215 obj_path.write_bytes(content)
216
217 out = tmp_path / "safe.tar.gz"
218 manifest = {"../../../etc/passwd": obj_id, "safe.txt": obj_id}
219 count = _build_tar(root, manifest, out, prefix="")
220 assert count == 1 # only safe.txt
221 with tarfile.open(out, "r:gz") as tf:
222 names = tf.getnames()
223 assert all("etc" not in n for n in names)
224
225
226 # ---------------------------------------------------------------------------
227 # Stress tests
228 # ---------------------------------------------------------------------------
229
230 class TestArchiveStress:
231 def test_archive_many_files(self, tmp_path: pathlib.Path) -> None:
232 root, repo_id = _init_repo(tmp_path)
233 files = {f"track_{i:03d}.mid": f"MIDI{i}".encode() for i in range(50)}
234 _make_commit_with_files(root, repo_id, files=files)
235 out = tmp_path / "many.tar.gz"
236 result = runner.invoke(cli, ["archive", "--output", str(out)], env=_env(root), catch_exceptions=False)
237 assert result.exit_code == 0
238 with tarfile.open(out, "r:gz") as tf:
239 names = tf.getnames()
240 assert len(names) == 50