gabriel / muse public
test_plumbing_pack_unpack.py python
327 lines 11.7 KB
dec4604a feat(mwp): replace JSON+base64 wire protocol with MWP binary msgpack Gabriel Cardona <gabriel@tellurstori.com> 13h ago
1 """Tests for ``muse plumbing pack-objects`` and ``muse plumbing unpack-objects``.
2
3 Covers: single-commit pack, HEAD expansion, ``--have`` pruning, pack-unpack
4 round-trip (idempotent), invalid-msgpack stdin rejection, empty stdin, msgpack
5 output schema, counts reported by unpack-objects, and a stress round-trip with
6 50 commits and 50 objects.
7 """
8
9 from __future__ import annotations
10
11 import datetime
12 import hashlib
13 import json
14 import pathlib
15 import sys
16
17 import msgpack
18 import pytest
19 from tests.cli_test_helper import CliRunner
20
21 cli = None # argparse migration — CliRunner ignores this arg
22 from muse.core.errors import ExitCode
23 from muse.core.object_store import write_object
24 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
25
26 runner = CliRunner()
27
28
29 # ---------------------------------------------------------------------------
30 # Helpers
31 # ---------------------------------------------------------------------------
32
33
34 def _sha(tag: str) -> str:
35 return hashlib.sha256(tag.encode()).hexdigest()
36
37
38 def _sha_bytes(data: bytes) -> str:
39 return hashlib.sha256(data).hexdigest()
40
41
42 def _init_repo(path: pathlib.Path) -> pathlib.Path:
43 muse = path / ".muse"
44 (muse / "commits").mkdir(parents=True)
45 (muse / "snapshots").mkdir(parents=True)
46 (muse / "objects").mkdir(parents=True)
47 (muse / "refs" / "heads").mkdir(parents=True)
48 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
49 (muse / "repo.json").write_text(
50 json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8"
51 )
52 return path
53
54
55 def _env(repo: pathlib.Path) -> dict[str, str]:
56 return {"MUSE_REPO_ROOT": str(repo)}
57
58
59 def _snap(repo: pathlib.Path, manifest: dict[str, str] | None = None, tag: str = "s") -> str:
60 m = manifest or {}
61 sid = _sha(f"snap-{tag}-{sorted(m.items())}")
62 write_snapshot(
63 repo,
64 SnapshotRecord(
65 snapshot_id=sid,
66 manifest=m,
67 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
68 ),
69 )
70 return sid
71
72
73 def _commit(
74 repo: pathlib.Path, tag: str, sid: str, branch: str = "main", parent: str | None = None
75 ) -> str:
76 cid = _sha(tag)
77 write_commit(
78 repo,
79 CommitRecord(
80 commit_id=cid,
81 repo_id="test-repo",
82 branch=branch,
83 snapshot_id=sid,
84 message=tag,
85 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
86 author="tester",
87 parent_commit_id=parent,
88 ),
89 )
90 ref = repo / ".muse" / "refs" / "heads" / branch
91 ref.parent.mkdir(parents=True, exist_ok=True)
92 ref.write_text(cid, encoding="utf-8")
93 return cid
94
95
96 def _obj(repo: pathlib.Path, content: bytes) -> str:
97 oid = _sha_bytes(content)
98 write_object(repo, oid, content)
99 return oid
100
101
102 def _pack(repo: pathlib.Path, cid: str) -> bytes:
103 """Run pack-objects for a single commit and return the raw msgpack bundle."""
104 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
105 assert result.exit_code == 0, result.output
106 return result.stdout_bytes
107
108
109 def _unpack(repo: pathlib.Path, bundle_bytes: bytes) -> dict[str, int]:
110 result = runner.invoke(
111 cli, ["plumbing", "unpack-objects"], input=bundle_bytes, env=_env(repo)
112 )
113 assert result.exit_code == 0, result.output
114 parsed: dict[str, int] = json.loads(result.stdout)
115 return parsed
116
117
118 # ---------------------------------------------------------------------------
119 # Unit: pack-objects validation
120 # ---------------------------------------------------------------------------
121
122
123 class TestPackObjectsUnit:
124 def test_head_resolves_correctly(self, tmp_path: pathlib.Path) -> None:
125 repo = _init_repo(tmp_path)
126 sid = _snap(repo)
127 cid = _commit(repo, "head-test", sid)
128 result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo))
129 assert result.exit_code == 0, result.output
130 bundle = msgpack.unpackb(result.stdout_bytes, raw=False)
131 assert any(c["commit_id"] == cid for c in bundle.get("commits", []))
132
133 def test_head_on_empty_branch_exits_user_error(self, tmp_path: pathlib.Path) -> None:
134 repo = _init_repo(tmp_path)
135 result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo))
136 assert result.exit_code == ExitCode.USER_ERROR
137
138
139 # ---------------------------------------------------------------------------
140 # Integration: pack schema
141 # ---------------------------------------------------------------------------
142
143
144 class TestPackObjectsSchema:
145 def test_bundle_has_commits_snapshots_objects_keys(self, tmp_path: pathlib.Path) -> None:
146 repo = _init_repo(tmp_path)
147 sid = _snap(repo)
148 cid = _commit(repo, "schema", sid)
149 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
150 assert result.exit_code == 0
151 bundle = msgpack.unpackb(result.stdout_bytes, raw=False)
152 assert "commits" in bundle
153 assert "snapshots" in bundle
154 assert "objects" in bundle
155
156 def test_objects_are_raw_bytes(self, tmp_path: pathlib.Path) -> None:
157 content = b"hello object"
158 repo = _init_repo(tmp_path)
159 oid = _obj(repo, content)
160 sid = _snap(repo, {"f.mid": oid})
161 cid = _commit(repo, "obj-bytes", sid)
162 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
163 assert result.exit_code == 0
164 bundle = msgpack.unpackb(result.stdout_bytes, raw=False)
165 obj_entry = next(o for o in bundle["objects"] if o["object_id"] == oid)
166 assert obj_entry["content"] == content
167
168 def test_bundle_commit_record_present(self, tmp_path: pathlib.Path) -> None:
169 repo = _init_repo(tmp_path)
170 sid = _snap(repo)
171 cid = _commit(repo, "bundled", sid)
172 result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo))
173 assert result.exit_code == 0
174 bundle = msgpack.unpackb(result.stdout_bytes, raw=False)
175 commit_ids = [c["commit_id"] for c in bundle["commits"]]
176 assert cid in commit_ids
177
178
179 # ---------------------------------------------------------------------------
180 # Integration: --have pruning
181 # ---------------------------------------------------------------------------
182
183
184 class TestPackObjectsHave:
185 def test_have_prunes_ancestor_commits(self, tmp_path: pathlib.Path) -> None:
186 repo = _init_repo(tmp_path)
187 sid = _snap(repo)
188 c0 = _commit(repo, "c0", sid)
189 c1 = _commit(repo, "c1", sid, parent=c0)
190 # Pack c1 but tell the remote it already has c0.
191 result = runner.invoke(
192 cli, ["plumbing", "pack-objects", "--have", c0, c1], env=_env(repo)
193 )
194 assert result.exit_code == 0
195 bundle = msgpack.unpackb(result.stdout_bytes, raw=False)
196 commit_ids = {c["commit_id"] for c in bundle.get("commits", [])}
197 assert c1 in commit_ids
198 assert c0 not in commit_ids
199
200
201 # ---------------------------------------------------------------------------
202 # Integration: unpack-objects
203 # ---------------------------------------------------------------------------
204
205
206 class TestUnpackObjects:
207 def test_unpack_returns_count_dict(self, tmp_path: pathlib.Path) -> None:
208 src = _init_repo(tmp_path / "src")
209 dst = _init_repo(tmp_path / "dst")
210 sid = _snap(src)
211 cid = _commit(src, "to-unpack", sid)
212 bundle = _pack(src, cid)
213 counts = _unpack(dst, bundle)
214 assert "commits_written" in counts
215 assert "snapshots_written" in counts
216 assert "objects_written" in counts
217 assert "objects_skipped" in counts
218
219 def test_round_trip_commit_appears_in_dst_store(self, tmp_path: pathlib.Path) -> None:
220 from muse.core.store import read_commit
221
222 src = _init_repo(tmp_path / "src")
223 dst = _init_repo(tmp_path / "dst")
224 sid = _snap(src)
225 cid = _commit(src, "round-trip", sid)
226 bundle = _pack(src, cid)
227 _unpack(dst, bundle)
228 assert read_commit(dst, cid) is not None
229
230 def test_round_trip_snapshot_appears_in_dst(self, tmp_path: pathlib.Path) -> None:
231 from muse.core.store import read_snapshot
232
233 src = _init_repo(tmp_path / "src")
234 dst = _init_repo(tmp_path / "dst")
235 sid = _snap(src)
236 cid = _commit(src, "snap-rt", sid)
237 bundle = _pack(src, cid)
238 _unpack(dst, bundle)
239 assert read_snapshot(dst, sid) is not None
240
241 def test_round_trip_objects_present_in_dst(self, tmp_path: pathlib.Path) -> None:
242 from muse.core.object_store import has_object
243
244 src = _init_repo(tmp_path / "src")
245 dst = _init_repo(tmp_path / "dst")
246 oid = _obj(src, b"transferable blob")
247 sid = _snap(src, {"f.mid": oid})
248 cid = _commit(src, "obj-rt", sid)
249 bundle = _pack(src, cid)
250 _unpack(dst, bundle)
251 assert has_object(dst, oid)
252
253 def test_unpack_idempotent_second_application(self, tmp_path: pathlib.Path) -> None:
254 src = _init_repo(tmp_path / "src")
255 dst = _init_repo(tmp_path / "dst")
256 sid = _snap(src)
257 cid = _commit(src, "idempotent", sid)
258 bundle = _pack(src, cid)
259 counts1 = _unpack(dst, bundle)
260 counts2 = _unpack(dst, bundle)
261 # Second unpack: commits/snapshots already exist, nothing extra written.
262 assert counts1["commits_written"] == 1
263 assert counts2["commits_written"] == 0
264
265 def test_invalid_msgpack_stdin_exits_user_error(self, tmp_path: pathlib.Path) -> None:
266 repo = _init_repo(tmp_path)
267 result = runner.invoke(
268 cli, ["plumbing", "unpack-objects"], input=b"\xff\xff NOT VALID", env=_env(repo)
269 )
270 assert result.exit_code == ExitCode.USER_ERROR
271
272 def test_empty_bundle_unpacks_cleanly(self, tmp_path: pathlib.Path) -> None:
273 repo = _init_repo(tmp_path)
274 empty = msgpack.packb(
275 {"commits": [], "snapshots": [], "objects": [], "branch_heads": {}},
276 use_bin_type=True,
277 )
278 counts = _unpack(repo, empty)
279 assert counts["commits_written"] == 0
280 assert counts["objects_written"] == 0
281
282
283 # ---------------------------------------------------------------------------
284 # Stress: 50-commit round-trip
285 # ---------------------------------------------------------------------------
286
287
288 class TestPackUnpackStress:
289 def test_50_commit_chain_round_trip(self, tmp_path: pathlib.Path) -> None:
290 from muse.core.store import read_commit
291
292 src = _init_repo(tmp_path / "src")
293 dst = _init_repo(tmp_path / "dst")
294 sid = _snap(src)
295 parent: str | None = None
296 cids: list[str] = []
297 for i in range(50):
298 cid = _commit(src, f"c{i}", sid, parent=parent)
299 cids.append(cid)
300 parent = cid
301
302 bundle_bytes = runner.invoke(
303 cli, ["plumbing", "pack-objects", cids[-1]], env=_env(src)
304 ).stdout_bytes
305 counts = _unpack(dst, bundle_bytes)
306 assert counts["commits_written"] == 50
307
308 # All 50 commits readable in destination.
309 for cid in cids:
310 assert read_commit(dst, cid) is not None
311
312 def test_50_object_round_trip(self, tmp_path: pathlib.Path) -> None:
313 from muse.core.object_store import has_object
314
315 src = _init_repo(tmp_path / "src")
316 dst = _init_repo(tmp_path / "dst")
317 oids = [_obj(src, f"blob-{i}".encode()) for i in range(50)]
318 manifest = {f"f{i}.mid": oids[i] for i in range(50)}
319 sid = _snap(src, manifest)
320 cid = _commit(src, "50-objs", sid)
321 bundle_bytes = runner.invoke(
322 cli, ["plumbing", "pack-objects", cid], env=_env(src)
323 ).stdout_bytes
324 counts = _unpack(dst, bundle_bytes)
325 assert counts["objects_written"] == 50
326 for oid in oids:
327 assert has_object(dst, oid)