gabriel / muse public
test_plumbing_pack_unpack.py python
325 lines 11.5 KB
99746394 feat(tests+docs): supercharge plumbing test suite and update reference doc Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Tests for ``muse plumbing pack-objects`` and ``muse plumbing unpack-objects``.
2
3 Covers: single-commit pack, HEAD expansion, ``--have`` pruning, pack-unpack
4 round-trip (idempotent), invalid-JSON stdin rejection, empty stdin, JSON output
5 schema, counts reported by unpack-objects, and a stress round-trip with 50
6 commits and 50 objects.
7 """
8
9 from __future__ import annotations
10
11 import base64
12 import datetime
13 import hashlib
14 import json
15 import pathlib
16 import sys
17
18 import pytest
19 from typer.testing import CliRunner
20
21 from muse.cli.app import cli
22 from muse.core.errors import ExitCode
23 from muse.core.object_store import write_object
24 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
25
26 runner = CliRunner()
27
28
29 # ---------------------------------------------------------------------------
30 # Helpers
31 # ---------------------------------------------------------------------------
32
33
34 def _sha(tag: str) -> str:
35 return hashlib.sha256(tag.encode()).hexdigest()
36
37
38 def _sha_bytes(data: bytes) -> str:
39 return hashlib.sha256(data).hexdigest()
40
41
42 def _init_repo(path: pathlib.Path) -> pathlib.Path:
43 muse = path / ".muse"
44 (muse / "commits").mkdir(parents=True)
45 (muse / "snapshots").mkdir(parents=True)
46 (muse / "objects").mkdir(parents=True)
47 (muse / "refs" / "heads").mkdir(parents=True)
48 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
49 (muse / "repo.json").write_text(
50 json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8"
51 )
52 return path
53
54
55 def _env(repo: pathlib.Path) -> dict[str, str]:
56 return {"MUSE_REPO_ROOT": str(repo)}
57
58
59 def _snap(repo: pathlib.Path, manifest: dict[str, str] | None = None, tag: str = "s") -> str:
60 m = manifest or {}
61 sid = _sha(f"snap-{tag}-{sorted(m.items())}")
62 write_snapshot(
63 repo,
64 SnapshotRecord(
65 snapshot_id=sid,
66 manifest=m,
67 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
68 ),
69 )
70 return sid
71
72
73 def _commit(
74 repo: pathlib.Path, tag: str, sid: str, branch: str = "main", parent: str | None = None
75 ) -> str:
76 cid = _sha(tag)
77 write_commit(
78 repo,
79 CommitRecord(
80 commit_id=cid,
81 repo_id="test-repo",
82 branch=branch,
83 snapshot_id=sid,
84 message=tag,
85 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
86 author="tester",
87 parent_commit_id=parent,
88 ),
89 )
90 ref = repo / ".muse" / "refs" / "heads" / branch
91 ref.parent.mkdir(parents=True, exist_ok=True)
92 ref.write_text(cid, encoding="utf-8")
93 return cid
94
95
96 def _obj(repo: pathlib.Path, content: bytes) -> str:
97 oid = _sha_bytes(content)
98 write_object(repo, oid, content)
99 return oid
100
101
102 def _pack(repo: pathlib.Path, cid: str) -> str:
103 """Run pack-objects for a single commit and return the raw JSON bundle."""
104 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
105 assert result.exit_code == 0, result.output
106 return result.stdout
107
108
109 def _unpack(repo: pathlib.Path, bundle_json: str) -> dict[str, int]:
110 result = runner.invoke(
111 cli, ["plumbing", "unpack-objects"], input=bundle_json, env=_env(repo)
112 )
113 assert result.exit_code == 0, result.output
114 parsed: dict[str, int] = json.loads(result.stdout)
115 return parsed
116
117
118 # ---------------------------------------------------------------------------
119 # Unit: pack-objects validation
120 # ---------------------------------------------------------------------------
121
122
123 class TestPackObjectsUnit:
124 def test_head_resolves_correctly(self, tmp_path: pathlib.Path) -> None:
125 repo = _init_repo(tmp_path)
126 sid = _snap(repo)
127 cid = _commit(repo, "head-test", sid)
128 result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo))
129 assert result.exit_code == 0, result.output
130 bundle = json.loads(result.stdout)
131 assert any(c["commit_id"] == cid for c in bundle.get("commits", []))
132
133 def test_head_on_empty_branch_exits_user_error(self, tmp_path: pathlib.Path) -> None:
134 repo = _init_repo(tmp_path)
135 result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo))
136 assert result.exit_code == ExitCode.USER_ERROR
137
138
139 # ---------------------------------------------------------------------------
140 # Integration: pack schema
141 # ---------------------------------------------------------------------------
142
143
144 class TestPackObjectsSchema:
145 def test_bundle_has_commits_snapshots_objects_keys(self, tmp_path: pathlib.Path) -> None:
146 repo = _init_repo(tmp_path)
147 sid = _snap(repo)
148 cid = _commit(repo, "schema", sid)
149 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
150 assert result.exit_code == 0
151 bundle = json.loads(result.stdout)
152 assert "commits" in bundle
153 assert "snapshots" in bundle
154 assert "objects" in bundle
155
156 def test_objects_are_base64_encoded(self, tmp_path: pathlib.Path) -> None:
157 content = b"hello object"
158 repo = _init_repo(tmp_path)
159 oid = _obj(repo, content)
160 sid = _snap(repo, {"f.mid": oid})
161 cid = _commit(repo, "obj-base64", sid)
162 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
163 assert result.exit_code == 0
164 bundle = json.loads(result.stdout)
165 obj_entry = next(o for o in bundle["objects"] if o["object_id"] == oid)
166 decoded = base64.b64decode(obj_entry["content_b64"])
167 assert decoded == content
168
169 def test_bundle_commit_record_present(self, tmp_path: pathlib.Path) -> None:
170 repo = _init_repo(tmp_path)
171 sid = _snap(repo)
172 cid = _commit(repo, "bundled", sid)
173 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
174 assert result.exit_code == 0
175 bundle = json.loads(result.stdout)
176 commit_ids = [c["commit_id"] for c in bundle["commits"]]
177 assert cid in commit_ids
178
179
180 # ---------------------------------------------------------------------------
181 # Integration: --have pruning
182 # ---------------------------------------------------------------------------
183
184
185 class TestPackObjectsHave:
186 def test_have_prunes_ancestor_commits(self, tmp_path: pathlib.Path) -> None:
187 repo = _init_repo(tmp_path)
188 sid = _snap(repo)
189 c0 = _commit(repo, "c0", sid)
190 c1 = _commit(repo, "c1", sid, parent=c0)
191 # Pack c1 but tell the remote it already has c0.
192 result = runner.invoke(
193 cli, ["plumbing", "pack-objects", "--have", c0, c1], env=_env(repo)
194 )
195 assert result.exit_code == 0
196 bundle = json.loads(result.stdout)
197 commit_ids = {c["commit_id"] for c in bundle.get("commits", [])}
198 assert c1 in commit_ids
199 assert c0 not in commit_ids
200
201
202 # ---------------------------------------------------------------------------
203 # Integration: unpack-objects
204 # ---------------------------------------------------------------------------
205
206
207 class TestUnpackObjects:
208 def test_unpack_returns_count_dict(self, tmp_path: pathlib.Path) -> None:
209 src = _init_repo(tmp_path / "src")
210 dst = _init_repo(tmp_path / "dst")
211 sid = _snap(src)
212 cid = _commit(src, "to-unpack", sid)
213 bundle = _pack(src, cid)
214 counts = _unpack(dst, bundle)
215 assert "commits_written" in counts
216 assert "snapshots_written" in counts
217 assert "objects_written" in counts
218 assert "objects_skipped" in counts
219
220 def test_round_trip_commit_appears_in_dst_store(self, tmp_path: pathlib.Path) -> None:
221 from muse.core.store import read_commit
222
223 src = _init_repo(tmp_path / "src")
224 dst = _init_repo(tmp_path / "dst")
225 sid = _snap(src)
226 cid = _commit(src, "round-trip", sid)
227 bundle = _pack(src, cid)
228 _unpack(dst, bundle)
229 assert read_commit(dst, cid) is not None
230
231 def test_round_trip_snapshot_appears_in_dst(self, tmp_path: pathlib.Path) -> None:
232 from muse.core.store import read_snapshot
233
234 src = _init_repo(tmp_path / "src")
235 dst = _init_repo(tmp_path / "dst")
236 sid = _snap(src)
237 cid = _commit(src, "snap-rt", sid)
238 bundle = _pack(src, cid)
239 _unpack(dst, bundle)
240 assert read_snapshot(dst, sid) is not None
241
242 def test_round_trip_objects_present_in_dst(self, tmp_path: pathlib.Path) -> None:
243 from muse.core.object_store import has_object
244
245 src = _init_repo(tmp_path / "src")
246 dst = _init_repo(tmp_path / "dst")
247 oid = _obj(src, b"transferable blob")
248 sid = _snap(src, {"f.mid": oid})
249 cid = _commit(src, "obj-rt", sid)
250 bundle = _pack(src, cid)
251 _unpack(dst, bundle)
252 assert has_object(dst, oid)
253
254 def test_unpack_idempotent_second_application(self, tmp_path: pathlib.Path) -> None:
255 src = _init_repo(tmp_path / "src")
256 dst = _init_repo(tmp_path / "dst")
257 sid = _snap(src)
258 cid = _commit(src, "idempotent", sid)
259 bundle = _pack(src, cid)
260 counts1 = _unpack(dst, bundle)
261 counts2 = _unpack(dst, bundle)
262 # Second unpack: commits/snapshots already exist, nothing extra written.
263 assert counts1["commits_written"] == 1
264 assert counts2["commits_written"] == 0
265
266 def test_invalid_json_stdin_exits_user_error(self, tmp_path: pathlib.Path) -> None:
267 repo = _init_repo(tmp_path)
268 result = runner.invoke(
269 cli, ["plumbing", "unpack-objects"], input="NOT JSON!", env=_env(repo)
270 )
271 assert result.exit_code == ExitCode.USER_ERROR
272
273 def test_empty_bundle_unpacks_cleanly(self, tmp_path: pathlib.Path) -> None:
274 repo = _init_repo(tmp_path)
275 empty = json.dumps({"commits": [], "snapshots": [], "objects": [], "branch_heads": {}})
276 counts = _unpack(repo, empty)
277 assert counts["commits_written"] == 0
278 assert counts["objects_written"] == 0
279
280
281 # ---------------------------------------------------------------------------
282 # Stress: 50-commit round-trip
283 # ---------------------------------------------------------------------------
284
285
286 class TestPackUnpackStress:
287 def test_50_commit_chain_round_trip(self, tmp_path: pathlib.Path) -> None:
288 from muse.core.store import read_commit
289
290 src = _init_repo(tmp_path / "src")
291 dst = _init_repo(tmp_path / "dst")
292 sid = _snap(src)
293 parent: str | None = None
294 cids: list[str] = []
295 for i in range(50):
296 cid = _commit(src, f"c{i}", sid, parent=parent)
297 cids.append(cid)
298 parent = cid
299
300 bundle_json = runner.invoke(
301 cli, ["plumbing", "pack-objects", cids[-1]], env=_env(src)
302 ).stdout
303 counts = _unpack(dst, bundle_json)
304 assert counts["commits_written"] == 50
305
306 # All 50 commits readable in destination.
307 for cid in cids:
308 assert read_commit(dst, cid) is not None
309
310 def test_50_object_round_trip(self, tmp_path: pathlib.Path) -> None:
311 from muse.core.object_store import has_object
312
313 src = _init_repo(tmp_path / "src")
314 dst = _init_repo(tmp_path / "dst")
315 oids = [_obj(src, f"blob-{i}".encode()) for i in range(50)]
316 manifest = {f"f{i}.mid": oids[i] for i in range(50)}
317 sid = _snap(src, manifest)
318 cid = _commit(src, "50-objs", sid)
319 bundle_json = runner.invoke(
320 cli, ["plumbing", "pack-objects", cid], env=_env(src)
321 ).stdout
322 counts = _unpack(dst, bundle_json)
323 assert counts["objects_written"] == 50
324 for oid in oids:
325 assert has_object(dst, oid)