test_plumbing_pack_unpack.py
python
| 1 | """Tests for ``muse plumbing pack-objects`` and ``muse plumbing unpack-objects``. |
| 2 | |
| 3 | Covers: single-commit pack, HEAD expansion, ``--have`` pruning, pack-unpack |
| 4 | round-trip (idempotent), invalid-msgpack stdin rejection, empty stdin, msgpack |
| 5 | output schema, counts reported by unpack-objects, and a stress round-trip with |
| 6 | 50 commits and 50 objects. |
| 7 | """ |
| 8 | |
| 9 | from __future__ import annotations |
| 10 | |
| 11 | import datetime |
| 12 | import hashlib |
| 13 | import json |
| 14 | import pathlib |
| 15 | import sys |
| 16 | |
| 17 | import msgpack |
| 18 | import pytest |
| 19 | from tests.cli_test_helper import CliRunner |
| 20 | |
| 21 | cli = None # argparse migration — CliRunner ignores this arg |
| 22 | from muse.core.errors import ExitCode |
| 23 | from muse.core.object_store import write_object |
| 24 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 25 | |
| 26 | runner = CliRunner() |
| 27 | |
| 28 | |
| 29 | # --------------------------------------------------------------------------- |
| 30 | # Helpers |
| 31 | # --------------------------------------------------------------------------- |
| 32 | |
| 33 | |
| 34 | def _sha(tag: str) -> str: |
| 35 | return hashlib.sha256(tag.encode()).hexdigest() |
| 36 | |
| 37 | |
| 38 | def _sha_bytes(data: bytes) -> str: |
| 39 | return hashlib.sha256(data).hexdigest() |
| 40 | |
| 41 | |
| 42 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 43 | muse = path / ".muse" |
| 44 | (muse / "commits").mkdir(parents=True) |
| 45 | (muse / "snapshots").mkdir(parents=True) |
| 46 | (muse / "objects").mkdir(parents=True) |
| 47 | (muse / "refs" / "heads").mkdir(parents=True) |
| 48 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 49 | (muse / "repo.json").write_text( |
| 50 | json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8" |
| 51 | ) |
| 52 | return path |
| 53 | |
| 54 | |
| 55 | def _env(repo: pathlib.Path) -> dict[str, str]: |
| 56 | return {"MUSE_REPO_ROOT": str(repo)} |
| 57 | |
| 58 | |
| 59 | def _snap(repo: pathlib.Path, manifest: dict[str, str] | None = None, tag: str = "s") -> str: |
| 60 | m = manifest or {} |
| 61 | sid = _sha(f"snap-{tag}-{sorted(m.items())}") |
| 62 | write_snapshot( |
| 63 | repo, |
| 64 | SnapshotRecord( |
| 65 | snapshot_id=sid, |
| 66 | manifest=m, |
| 67 | created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), |
| 68 | ), |
| 69 | ) |
| 70 | return sid |
| 71 | |
| 72 | |
| 73 | def _commit( |
| 74 | repo: pathlib.Path, tag: str, sid: str, branch: str = "main", parent: str | None = None |
| 75 | ) -> str: |
| 76 | cid = _sha(tag) |
| 77 | write_commit( |
| 78 | repo, |
| 79 | CommitRecord( |
| 80 | commit_id=cid, |
| 81 | repo_id="test-repo", |
| 82 | branch=branch, |
| 83 | snapshot_id=sid, |
| 84 | message=tag, |
| 85 | committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), |
| 86 | author="tester", |
| 87 | parent_commit_id=parent, |
| 88 | ), |
| 89 | ) |
| 90 | ref = repo / ".muse" / "refs" / "heads" / branch |
| 91 | ref.parent.mkdir(parents=True, exist_ok=True) |
| 92 | ref.write_text(cid, encoding="utf-8") |
| 93 | return cid |
| 94 | |
| 95 | |
| 96 | def _obj(repo: pathlib.Path, content: bytes) -> str: |
| 97 | oid = _sha_bytes(content) |
| 98 | write_object(repo, oid, content) |
| 99 | return oid |
| 100 | |
| 101 | |
| 102 | def _pack(repo: pathlib.Path, cid: str) -> bytes: |
| 103 | """Run pack-objects for a single commit and return the raw msgpack bundle.""" |
| 104 | result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo)) |
| 105 | assert result.exit_code == 0, result.output |
| 106 | return result.stdout_bytes |
| 107 | |
| 108 | |
| 109 | def _unpack(repo: pathlib.Path, bundle_bytes: bytes) -> dict[str, int]: |
| 110 | result = runner.invoke( |
| 111 | cli, ["plumbing", "unpack-objects"], input=bundle_bytes, env=_env(repo) |
| 112 | ) |
| 113 | assert result.exit_code == 0, result.output |
| 114 | parsed: dict[str, int] = json.loads(result.stdout) |
| 115 | return parsed |
| 116 | |
| 117 | |
| 118 | # --------------------------------------------------------------------------- |
| 119 | # Unit: pack-objects validation |
| 120 | # --------------------------------------------------------------------------- |
| 121 | |
| 122 | |
| 123 | class TestPackObjectsUnit: |
| 124 | def test_head_resolves_correctly(self, tmp_path: pathlib.Path) -> None: |
| 125 | repo = _init_repo(tmp_path) |
| 126 | sid = _snap(repo) |
| 127 | cid = _commit(repo, "head-test", sid) |
| 128 | result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo)) |
| 129 | assert result.exit_code == 0, result.output |
| 130 | bundle = msgpack.unpackb(result.stdout_bytes, raw=False) |
| 131 | assert any(c["commit_id"] == cid for c in bundle.get("commits", [])) |
| 132 | |
| 133 | def test_head_on_empty_branch_exits_user_error(self, tmp_path: pathlib.Path) -> None: |
| 134 | repo = _init_repo(tmp_path) |
| 135 | result = runner.invoke(cli, ["plumbing", "pack-objects", "HEAD"], env=_env(repo)) |
| 136 | assert result.exit_code == ExitCode.USER_ERROR |
| 137 | |
| 138 | |
| 139 | # --------------------------------------------------------------------------- |
| 140 | # Integration: pack schema |
| 141 | # --------------------------------------------------------------------------- |
| 142 | |
| 143 | |
| 144 | class TestPackObjectsSchema: |
| 145 | def test_bundle_has_commits_snapshots_objects_keys(self, tmp_path: pathlib.Path) -> None: |
| 146 | repo = _init_repo(tmp_path) |
| 147 | sid = _snap(repo) |
| 148 | cid = _commit(repo, "schema", sid) |
| 149 | result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo)) |
| 150 | assert result.exit_code == 0 |
| 151 | bundle = msgpack.unpackb(result.stdout_bytes, raw=False) |
| 152 | assert "commits" in bundle |
| 153 | assert "snapshots" in bundle |
| 154 | assert "objects" in bundle |
| 155 | |
| 156 | def test_objects_are_raw_bytes(self, tmp_path: pathlib.Path) -> None: |
| 157 | content = b"hello object" |
| 158 | repo = _init_repo(tmp_path) |
| 159 | oid = _obj(repo, content) |
| 160 | sid = _snap(repo, {"f.mid": oid}) |
| 161 | cid = _commit(repo, "obj-bytes", sid) |
| 162 | result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo)) |
| 163 | assert result.exit_code == 0 |
| 164 | bundle = msgpack.unpackb(result.stdout_bytes, raw=False) |
| 165 | obj_entry = next(o for o in bundle["objects"] if o["object_id"] == oid) |
| 166 | assert obj_entry["content"] == content |
| 167 | |
| 168 | def test_bundle_commit_record_present(self, tmp_path: pathlib.Path) -> None: |
| 169 | repo = _init_repo(tmp_path) |
| 170 | sid = _snap(repo) |
| 171 | cid = _commit(repo, "bundled", sid) |
| 172 | result = runner.invoke(cli, ["plumbing", "pack-objects", cid], env=_env(repo)) |
| 173 | assert result.exit_code == 0 |
| 174 | bundle = msgpack.unpackb(result.stdout_bytes, raw=False) |
| 175 | commit_ids = [c["commit_id"] for c in bundle["commits"]] |
| 176 | assert cid in commit_ids |
| 177 | |
| 178 | |
| 179 | # --------------------------------------------------------------------------- |
| 180 | # Integration: --have pruning |
| 181 | # --------------------------------------------------------------------------- |
| 182 | |
| 183 | |
| 184 | class TestPackObjectsHave: |
| 185 | def test_have_prunes_ancestor_commits(self, tmp_path: pathlib.Path) -> None: |
| 186 | repo = _init_repo(tmp_path) |
| 187 | sid = _snap(repo) |
| 188 | c0 = _commit(repo, "c0", sid) |
| 189 | c1 = _commit(repo, "c1", sid, parent=c0) |
| 190 | # Pack c1 but tell the remote it already has c0. |
| 191 | result = runner.invoke( |
| 192 | cli, ["plumbing", "pack-objects", "--have", c0, c1], env=_env(repo) |
| 193 | ) |
| 194 | assert result.exit_code == 0 |
| 195 | bundle = msgpack.unpackb(result.stdout_bytes, raw=False) |
| 196 | commit_ids = {c["commit_id"] for c in bundle.get("commits", [])} |
| 197 | assert c1 in commit_ids |
| 198 | assert c0 not in commit_ids |
| 199 | |
| 200 | |
| 201 | # --------------------------------------------------------------------------- |
| 202 | # Integration: unpack-objects |
| 203 | # --------------------------------------------------------------------------- |
| 204 | |
| 205 | |
| 206 | class TestUnpackObjects: |
| 207 | def test_unpack_returns_count_dict(self, tmp_path: pathlib.Path) -> None: |
| 208 | src = _init_repo(tmp_path / "src") |
| 209 | dst = _init_repo(tmp_path / "dst") |
| 210 | sid = _snap(src) |
| 211 | cid = _commit(src, "to-unpack", sid) |
| 212 | bundle = _pack(src, cid) |
| 213 | counts = _unpack(dst, bundle) |
| 214 | assert "commits_written" in counts |
| 215 | assert "snapshots_written" in counts |
| 216 | assert "objects_written" in counts |
| 217 | assert "objects_skipped" in counts |
| 218 | |
| 219 | def test_round_trip_commit_appears_in_dst_store(self, tmp_path: pathlib.Path) -> None: |
| 220 | from muse.core.store import read_commit |
| 221 | |
| 222 | src = _init_repo(tmp_path / "src") |
| 223 | dst = _init_repo(tmp_path / "dst") |
| 224 | sid = _snap(src) |
| 225 | cid = _commit(src, "round-trip", sid) |
| 226 | bundle = _pack(src, cid) |
| 227 | _unpack(dst, bundle) |
| 228 | assert read_commit(dst, cid) is not None |
| 229 | |
| 230 | def test_round_trip_snapshot_appears_in_dst(self, tmp_path: pathlib.Path) -> None: |
| 231 | from muse.core.store import read_snapshot |
| 232 | |
| 233 | src = _init_repo(tmp_path / "src") |
| 234 | dst = _init_repo(tmp_path / "dst") |
| 235 | sid = _snap(src) |
| 236 | cid = _commit(src, "snap-rt", sid) |
| 237 | bundle = _pack(src, cid) |
| 238 | _unpack(dst, bundle) |
| 239 | assert read_snapshot(dst, sid) is not None |
| 240 | |
| 241 | def test_round_trip_objects_present_in_dst(self, tmp_path: pathlib.Path) -> None: |
| 242 | from muse.core.object_store import has_object |
| 243 | |
| 244 | src = _init_repo(tmp_path / "src") |
| 245 | dst = _init_repo(tmp_path / "dst") |
| 246 | oid = _obj(src, b"transferable blob") |
| 247 | sid = _snap(src, {"f.mid": oid}) |
| 248 | cid = _commit(src, "obj-rt", sid) |
| 249 | bundle = _pack(src, cid) |
| 250 | _unpack(dst, bundle) |
| 251 | assert has_object(dst, oid) |
| 252 | |
| 253 | def test_unpack_idempotent_second_application(self, tmp_path: pathlib.Path) -> None: |
| 254 | src = _init_repo(tmp_path / "src") |
| 255 | dst = _init_repo(tmp_path / "dst") |
| 256 | sid = _snap(src) |
| 257 | cid = _commit(src, "idempotent", sid) |
| 258 | bundle = _pack(src, cid) |
| 259 | counts1 = _unpack(dst, bundle) |
| 260 | counts2 = _unpack(dst, bundle) |
| 261 | # Second unpack: commits/snapshots already exist, nothing extra written. |
| 262 | assert counts1["commits_written"] == 1 |
| 263 | assert counts2["commits_written"] == 0 |
| 264 | |
| 265 | def test_invalid_msgpack_stdin_exits_user_error(self, tmp_path: pathlib.Path) -> None: |
| 266 | repo = _init_repo(tmp_path) |
| 267 | result = runner.invoke( |
| 268 | cli, ["plumbing", "unpack-objects"], input=b"\xff\xff NOT VALID", env=_env(repo) |
| 269 | ) |
| 270 | assert result.exit_code == ExitCode.USER_ERROR |
| 271 | |
| 272 | def test_empty_bundle_unpacks_cleanly(self, tmp_path: pathlib.Path) -> None: |
| 273 | repo = _init_repo(tmp_path) |
| 274 | empty = msgpack.packb( |
| 275 | {"commits": [], "snapshots": [], "objects": [], "branch_heads": {}}, |
| 276 | use_bin_type=True, |
| 277 | ) |
| 278 | counts = _unpack(repo, empty) |
| 279 | assert counts["commits_written"] == 0 |
| 280 | assert counts["objects_written"] == 0 |
| 281 | |
| 282 | |
| 283 | # --------------------------------------------------------------------------- |
| 284 | # Stress: 50-commit round-trip |
| 285 | # --------------------------------------------------------------------------- |
| 286 | |
| 287 | |
| 288 | class TestPackUnpackStress: |
| 289 | def test_50_commit_chain_round_trip(self, tmp_path: pathlib.Path) -> None: |
| 290 | from muse.core.store import read_commit |
| 291 | |
| 292 | src = _init_repo(tmp_path / "src") |
| 293 | dst = _init_repo(tmp_path / "dst") |
| 294 | sid = _snap(src) |
| 295 | parent: str | None = None |
| 296 | cids: list[str] = [] |
| 297 | for i in range(50): |
| 298 | cid = _commit(src, f"c{i}", sid, parent=parent) |
| 299 | cids.append(cid) |
| 300 | parent = cid |
| 301 | |
| 302 | bundle_bytes = runner.invoke( |
| 303 | cli, ["plumbing", "pack-objects", cids[-1]], env=_env(src) |
| 304 | ).stdout_bytes |
| 305 | counts = _unpack(dst, bundle_bytes) |
| 306 | assert counts["commits_written"] == 50 |
| 307 | |
| 308 | # All 50 commits readable in destination. |
| 309 | for cid in cids: |
| 310 | assert read_commit(dst, cid) is not None |
| 311 | |
| 312 | def test_50_object_round_trip(self, tmp_path: pathlib.Path) -> None: |
| 313 | from muse.core.object_store import has_object |
| 314 | |
| 315 | src = _init_repo(tmp_path / "src") |
| 316 | dst = _init_repo(tmp_path / "dst") |
| 317 | oids = [_obj(src, f"blob-{i}".encode()) for i in range(50)] |
| 318 | manifest = {f"f{i}.mid": oids[i] for i in range(50)} |
| 319 | sid = _snap(src, manifest) |
| 320 | cid = _commit(src, "50-objs", sid) |
| 321 | bundle_bytes = runner.invoke( |
| 322 | cli, ["plumbing", "pack-objects", cid], env=_env(src) |
| 323 | ).stdout_bytes |
| 324 | counts = _unpack(dst, bundle_bytes) |
| 325 | assert counts["objects_written"] == 50 |
| 326 | for oid in oids: |
| 327 | assert has_object(dst, oid) |