gabriel / muse public
test_plumbing_pack_unpack.py python
325 lines 11.6 KB
86000da9 fix: replace typer CliRunner with argparse-compatible test helper Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Tests for ``muse plumbing pack-objects`` and ``muse plumbing unpack-objects``.
2
3 Covers: single-commit pack, HEAD expansion, ``--have`` pruning, pack-unpack
4 round-trip (idempotent), invalid-JSON stdin rejection, empty stdin, JSON output
5 schema, counts reported by unpack-objects, and a stress round-trip with 50
6 commits and 50 objects.
7 """
8
9 from __future__ import annotations
10
11 import base64
12 import datetime
13 import hashlib
14 import json
15 import pathlib
16 import sys
17
18 import pytest
19 from tests.cli_test_helper import CliRunner
20
21 cli = None # argparse migration — CliRunner ignores this arg
22 from muse.core.errors import ExitCode
23 from muse.core.object_store import write_object
24 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
25
26 runner = CliRunner()
27
28
29 # ---------------------------------------------------------------------------
30 # Helpers
31 # ---------------------------------------------------------------------------
32
33
34 def _sha(tag: str) -> str:
35 return hashlib.sha256(tag.encode()).hexdigest()
36
37
38 def _sha_bytes(data: bytes) -> str:
39 return hashlib.sha256(data).hexdigest()
40
41
42 def _init_repo(path: pathlib.Path) -> pathlib.Path:
43 muse = path / ".muse"
44 (muse / "commits").mkdir(parents=True)
45 (muse / "snapshots").mkdir(parents=True)
46 (muse / "objects").mkdir(parents=True)
47 (muse / "refs" / "heads").mkdir(parents=True)
48 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
49 (muse / "repo.json").write_text(
50 json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8"
51 )
52 return path
53
54
55 def _env(repo: pathlib.Path) -> dict[str, str]:
56 return {"MUSE_REPO_ROOT": str(repo)}
57
58
59 def _snap(repo: pathlib.Path, manifest: dict[str, str] | None = None, tag: str = "s") -> str:
60 m = manifest or {}
61 sid = _sha(f"snap-{tag}-{sorted(m.items())}")
62 write_snapshot(
63 repo,
64 SnapshotRecord(
65 snapshot_id=sid,
66 manifest=m,
67 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
68 ),
69 )
70 return sid
71
72
73 def _commit(
74 repo: pathlib.Path, tag: str, sid: str, branch: str = "main", parent: str | None = None
75 ) -> str:
76 cid = _sha(tag)
77 write_commit(
78 repo,
79 CommitRecord(
80 commit_id=cid,
81 repo_id="test-repo",
82 branch=branch,
83 snapshot_id=sid,
84 message=tag,
85 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
86 author="tester",
87 parent_commit_id=parent,
88 ),
89 )
90 ref = repo / ".muse" / "refs" / "heads" / branch
91 ref.parent.mkdir(parents=True, exist_ok=True)
92 ref.write_text(cid, encoding="utf-8")
93 return cid
94
95
96 def _obj(repo: pathlib.Path, content: bytes) -> str:
97 oid = _sha_bytes(content)
98 write_object(repo, oid, content)
99 return oid
100
101
102 def _pack(repo: pathlib.Path, cid: str) -> str:
103 """Run pack-objects for a single commit and return the raw JSON bundle."""
104 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
105 assert result.exit_code == 0, result.output
106 return result.stdout
107
108
109 def _unpack(repo: pathlib.Path, bundle_json: str) -> dict[str, int]:
110 result = runner.invoke(
111 cli, ["plumbing", "unpack-objects"], input=bundle_json, env=_env(repo)
112 )
113 assert result.exit_code == 0, result.output
114 parsed: dict[str, int] = json.loads(result.stdout)
115 return parsed
116
117
118 # ---------------------------------------------------------------------------
119 # Unit: pack-objects validation
120 # ---------------------------------------------------------------------------
121
122
123 class TestPackObjectsUnit:
124 def test_head_resolves_correctly(self, tmp_path: pathlib.Path) -> None:
125 repo = _init_repo(tmp_path)
126 sid = _snap(repo)
127 cid = _commit(repo, "head-test", sid)
128 result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo))
129 assert result.exit_code == 0, result.output
130 bundle = json.loads(result.stdout)
131 assert any(c["commit_id"] == cid for c in bundle.get("commits", []))
132
133 def test_head_on_empty_branch_exits_user_error(self, tmp_path: pathlib.Path) -> None:
134 repo = _init_repo(tmp_path)
135 result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo))
136 assert result.exit_code == ExitCode.USER_ERROR
137
138
139 # ---------------------------------------------------------------------------
140 # Integration: pack schema
141 # ---------------------------------------------------------------------------
142
143
144 class TestPackObjectsSchema:
145 def test_bundle_has_commits_snapshots_objects_keys(self, tmp_path: pathlib.Path) -> None:
146 repo = _init_repo(tmp_path)
147 sid = _snap(repo)
148 cid = _commit(repo, "schema", sid)
149 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
150 assert result.exit_code == 0
151 bundle = json.loads(result.stdout)
152 assert "commits" in bundle
153 assert "snapshots" in bundle
154 assert "objects" in bundle
155
156 def test_objects_are_base64_encoded(self, tmp_path: pathlib.Path) -> None:
157 content = b"hello object"
158 repo = _init_repo(tmp_path)
159 oid = _obj(repo, content)
160 sid = _snap(repo, {"f.mid": oid})
161 cid = _commit(repo, "obj-base64", sid)
162 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
163 assert result.exit_code == 0
164 bundle = json.loads(result.stdout)
165 obj_entry = next(o for o in bundle["objects"] if o["object_id"] == oid)
166 decoded = base64.b64decode(obj_entry["content_b64"])
167 assert decoded == content
168
169 def test_bundle_commit_record_present(self, tmp_path: pathlib.Path) -> None:
170 repo = _init_repo(tmp_path)
171 sid = _snap(repo)
172 cid = _commit(repo, "bundled", sid)
173 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
174 assert result.exit_code == 0
175 bundle = json.loads(result.stdout)
176 commit_ids = [c["commit_id"] for c in bundle["commits"]]
177 assert cid in commit_ids
178
179
180 # ---------------------------------------------------------------------------
181 # Integration: --have pruning
182 # ---------------------------------------------------------------------------
183
184
185 class TestPackObjectsHave:
186 def test_have_prunes_ancestor_commits(self, tmp_path: pathlib.Path) -> None:
187 repo = _init_repo(tmp_path)
188 sid = _snap(repo)
189 c0 = _commit(repo, "c0", sid)
190 c1 = _commit(repo, "c1", sid, parent=c0)
191 # Pack c1 but tell the remote it already has c0.
192 result = runner.invoke(
193 cli, ["plumbing", "pack-objects", "--have", c0, c1], env=_env(repo)
194 )
195 assert result.exit_code == 0
196 bundle = json.loads(result.stdout)
197 commit_ids = {c["commit_id"] for c in bundle.get("commits", [])}
198 assert c1 in commit_ids
199 assert c0 not in commit_ids
200
201
202 # ---------------------------------------------------------------------------
203 # Integration: unpack-objects
204 # ---------------------------------------------------------------------------
205
206
207 class TestUnpackObjects:
208 def test_unpack_returns_count_dict(self, tmp_path: pathlib.Path) -> None:
209 src = _init_repo(tmp_path / "src")
210 dst = _init_repo(tmp_path / "dst")
211 sid = _snap(src)
212 cid = _commit(src, "to-unpack", sid)
213 bundle = _pack(src, cid)
214 counts = _unpack(dst, bundle)
215 assert "commits_written" in counts
216 assert "snapshots_written" in counts
217 assert "objects_written" in counts
218 assert "objects_skipped" in counts
219
220 def test_round_trip_commit_appears_in_dst_store(self, tmp_path: pathlib.Path) -> None:
221 from muse.core.store import read_commit
222
223 src = _init_repo(tmp_path / "src")
224 dst = _init_repo(tmp_path / "dst")
225 sid = _snap(src)
226 cid = _commit(src, "round-trip", sid)
227 bundle = _pack(src, cid)
228 _unpack(dst, bundle)
229 assert read_commit(dst, cid) is not None
230
231 def test_round_trip_snapshot_appears_in_dst(self, tmp_path: pathlib.Path) -> None:
232 from muse.core.store import read_snapshot
233
234 src = _init_repo(tmp_path / "src")
235 dst = _init_repo(tmp_path / "dst")
236 sid = _snap(src)
237 cid = _commit(src, "snap-rt", sid)
238 bundle = _pack(src, cid)
239 _unpack(dst, bundle)
240 assert read_snapshot(dst, sid) is not None
241
242 def test_round_trip_objects_present_in_dst(self, tmp_path: pathlib.Path) -> None:
243 from muse.core.object_store import has_object
244
245 src = _init_repo(tmp_path / "src")
246 dst = _init_repo(tmp_path / "dst")
247 oid = _obj(src, b"transferable blob")
248 sid = _snap(src, {"f.mid": oid})
249 cid = _commit(src, "obj-rt", sid)
250 bundle = _pack(src, cid)
251 _unpack(dst, bundle)
252 assert has_object(dst, oid)
253
254 def test_unpack_idempotent_second_application(self, tmp_path: pathlib.Path) -> None:
255 src = _init_repo(tmp_path / "src")
256 dst = _init_repo(tmp_path / "dst")
257 sid = _snap(src)
258 cid = _commit(src, "idempotent", sid)
259 bundle = _pack(src, cid)
260 counts1 = _unpack(dst, bundle)
261 counts2 = _unpack(dst, bundle)
262 # Second unpack: commits/snapshots already exist, nothing extra written.
263 assert counts1["commits_written"] == 1
264 assert counts2["commits_written"] == 0
265
266 def test_invalid_json_stdin_exits_user_error(self, tmp_path: pathlib.Path) -> None:
267 repo = _init_repo(tmp_path)
268 result = runner.invoke(
269 cli, ["plumbing", "unpack-objects"], input="NOT JSON!", env=_env(repo)
270 )
271 assert result.exit_code == ExitCode.USER_ERROR
272
273 def test_empty_bundle_unpacks_cleanly(self, tmp_path: pathlib.Path) -> None:
274 repo = _init_repo(tmp_path)
275 empty = json.dumps({"commits": [], "snapshots": [], "objects": [], "branch_heads": {}})
276 counts = _unpack(repo, empty)
277 assert counts["commits_written"] == 0
278 assert counts["objects_written"] == 0
279
280
281 # ---------------------------------------------------------------------------
282 # Stress: 50-commit round-trip
283 # ---------------------------------------------------------------------------
284
285
286 class TestPackUnpackStress:
287 def test_50_commit_chain_round_trip(self, tmp_path: pathlib.Path) -> None:
288 from muse.core.store import read_commit
289
290 src = _init_repo(tmp_path / "src")
291 dst = _init_repo(tmp_path / "dst")
292 sid = _snap(src)
293 parent: str | None = None
294 cids: list[str] = []
295 for i in range(50):
296 cid = _commit(src, f"c{i}", sid, parent=parent)
297 cids.append(cid)
298 parent = cid
299
300 bundle_json = runner.invoke(
301 cli, ["plumbing", "pack-objects", cids[-1]], env=_env(src)
302 ).stdout
303 counts = _unpack(dst, bundle_json)
304 assert counts["commits_written"] == 50
305
306 # All 50 commits readable in destination.
307 for cid in cids:
308 assert read_commit(dst, cid) is not None
309
310 def test_50_object_round_trip(self, tmp_path: pathlib.Path) -> None:
311 from muse.core.object_store import has_object
312
313 src = _init_repo(tmp_path / "src")
314 dst = _init_repo(tmp_path / "dst")
315 oids = [_obj(src, f"blob-{i}".encode()) for i in range(50)]
316 manifest = {f"f{i}.mid": oids[i] for i in range(50)}
317 sid = _snap(src, manifest)
318 cid = _commit(src, "50-objs", sid)
319 bundle_json = runner.invoke(
320 cli, ["plumbing", "pack-objects", cid], env=_env(src)
321 ).stdout
322 counts = _unpack(dst, bundle_json)
323 assert counts["objects_written"] == 50
324 for oid in oids:
325 assert has_object(dst, oid)