test_core_pack.py
python
| 1 | """Tests for muse.core.pack — PackBundle build and apply operations.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import datetime |
| 6 | import json |
| 7 | import pathlib |
| 8 | |
| 9 | import pytest |
| 10 | |
| 11 | from muse.core.object_store import has_object, read_object, write_object |
| 12 | from muse.core.pack import ( |
| 13 | ObjectPayload, |
| 14 | PackBundle, |
| 15 | apply_pack, |
| 16 | build_pack, |
| 17 | ) |
| 18 | from muse.core.store import ( |
| 19 | CommitRecord, |
| 20 | SnapshotRecord, |
| 21 | read_commit, |
| 22 | read_snapshot, |
| 23 | write_commit, |
| 24 | write_snapshot, |
| 25 | ) |
| 26 | |
| 27 | |
| 28 | # --------------------------------------------------------------------------- |
| 29 | # Fixtures |
| 30 | # --------------------------------------------------------------------------- |
| 31 | |
| 32 | |
| 33 | @pytest.fixture |
| 34 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 35 | """Minimal .muse/ repo structure.""" |
| 36 | muse_dir = tmp_path / ".muse" |
| 37 | (muse_dir / "commits").mkdir(parents=True) |
| 38 | (muse_dir / "snapshots").mkdir(parents=True) |
| 39 | (muse_dir / "objects").mkdir(parents=True) |
| 40 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 41 | (muse_dir / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 42 | (muse_dir / "HEAD").write_text("ref: refs/heads/main\n") |
| 43 | (muse_dir / "refs" / "heads" / "main").write_text("") |
| 44 | return tmp_path |
| 45 | |
| 46 | |
| 47 | def _make_object(root: pathlib.Path, content: bytes) -> str: |
| 48 | """Write raw bytes into the object store; return the object_id.""" |
| 49 | import hashlib |
| 50 | oid = hashlib.sha256(content).hexdigest() |
| 51 | write_object(root, oid, content) |
| 52 | return oid |
| 53 | |
| 54 | |
| 55 | def _make_snapshot( |
| 56 | root: pathlib.Path, snapshot_id: str, manifest: dict[str, str] |
| 57 | ) -> SnapshotRecord: |
| 58 | s = SnapshotRecord(snapshot_id=snapshot_id, manifest=manifest) |
| 59 | write_snapshot(root, s) |
| 60 | return s |
| 61 | |
| 62 | |
| 63 | def _make_commit( |
| 64 | root: pathlib.Path, |
| 65 | commit_id: str, |
| 66 | snapshot_id: str, |
| 67 | message: str = "test", |
| 68 | parent: str | None = None, |
| 69 | ) -> CommitRecord: |
| 70 | c = CommitRecord( |
| 71 | commit_id=commit_id, |
| 72 | repo_id="test-repo", |
| 73 | branch="main", |
| 74 | snapshot_id=snapshot_id, |
| 75 | message=message, |
| 76 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 77 | parent_commit_id=parent, |
| 78 | ) |
| 79 | write_commit(root, c) |
| 80 | return c |
| 81 | |
| 82 | |
| 83 | # --------------------------------------------------------------------------- |
| 84 | # build_pack tests |
| 85 | # --------------------------------------------------------------------------- |
| 86 | |
| 87 | |
| 88 | class TestBuildPack: |
| 89 | def test_single_commit_no_history(self, repo: pathlib.Path) -> None: |
| 90 | content = b"hello world" |
| 91 | oid = _make_object(repo, content) |
| 92 | _make_snapshot(repo, "s" * 64, {"file.txt": oid}) |
| 93 | _make_commit(repo, "commit1", "s" * 64) |
| 94 | |
| 95 | bundle = build_pack(repo, ["commit1"]) |
| 96 | |
| 97 | assert len(bundle.get("commits") or []) == 1 |
| 98 | assert len(bundle.get("snapshots") or []) == 1 |
| 99 | assert len(bundle.get("objects") or []) == 1 |
| 100 | assert (bundle.get("objects") or [{}])[0]["object_id"] == oid |
| 101 | |
| 102 | def test_object_content_is_raw_bytes(self, repo: pathlib.Path) -> None: |
| 103 | content = b"\x00\x01\x02\x03" |
| 104 | oid = _make_object(repo, content) |
| 105 | _make_snapshot(repo, "s" * 64, {"bin.dat": oid}) |
| 106 | _make_commit(repo, "c1", "s" * 64) |
| 107 | |
| 108 | bundle = build_pack(repo, ["c1"]) |
| 109 | |
| 110 | objs = bundle.get("objects") or [] |
| 111 | assert len(objs) == 1 |
| 112 | assert objs[0]["content"] == content |
| 113 | |
| 114 | def test_multi_commit_chain(self, repo: pathlib.Path) -> None: |
| 115 | oid1 = _make_object(repo, b"v1") |
| 116 | oid2 = _make_object(repo, b"v2") |
| 117 | _make_snapshot(repo, "s" * 64, {"f.txt": oid1}) |
| 118 | _make_snapshot(repo, "snap2", {"f.txt": oid2}) |
| 119 | _make_commit(repo, "c1", "s" * 64) |
| 120 | _make_commit(repo, "c2", "snap2", parent="c1") |
| 121 | |
| 122 | bundle = build_pack(repo, ["c2"]) |
| 123 | |
| 124 | assert len(bundle.get("commits") or []) == 2 |
| 125 | assert len(bundle.get("snapshots") or []) == 2 |
| 126 | assert len(bundle.get("objects") or []) == 2 |
| 127 | |
| 128 | def test_have_excludes_ancestor_commits(self, repo: pathlib.Path) -> None: |
| 129 | oid1 = _make_object(repo, b"v1") |
| 130 | oid2 = _make_object(repo, b"v2") |
| 131 | _make_snapshot(repo, "s" * 64, {"f.txt": oid1}) |
| 132 | _make_snapshot(repo, "snap2", {"f.txt": oid2}) |
| 133 | _make_commit(repo, "c1", "s" * 64) |
| 134 | _make_commit(repo, "c2", "snap2", parent="c1") |
| 135 | |
| 136 | bundle = build_pack(repo, ["c2"], have=["c1"]) |
| 137 | |
| 138 | # Only c2 should be in the bundle; c1 is in have. |
| 139 | commit_ids = [c["commit_id"] for c in (bundle.get("commits") or [])] |
| 140 | assert "c2" in commit_ids |
| 141 | assert "c1" not in commit_ids |
| 142 | |
| 143 | def test_deduplicates_shared_objects(self, repo: pathlib.Path) -> None: |
| 144 | shared_oid = _make_object(repo, b"shared") |
| 145 | _make_snapshot(repo, "s" * 64, {"a.txt": shared_oid}) |
| 146 | _make_snapshot(repo, "snap2", {"b.txt": shared_oid}) |
| 147 | _make_commit(repo, "c1", "s" * 64) |
| 148 | _make_commit(repo, "c2", "snap2", parent="c1") |
| 149 | |
| 150 | bundle = build_pack(repo, ["c2"]) |
| 151 | |
| 152 | # Shared object should appear only once. |
| 153 | object_ids = [o["object_id"] for o in (bundle.get("objects") or [])] |
| 154 | assert object_ids.count(shared_oid) == 1 |
| 155 | |
| 156 | def test_empty_commit_ids_returns_empty_bundle(self, repo: pathlib.Path) -> None: |
| 157 | bundle = build_pack(repo, []) |
| 158 | assert (bundle.get("commits") or []) == [] |
| 159 | assert (bundle.get("objects") or []) == [] |
| 160 | |
| 161 | def test_missing_commit_skipped_gracefully(self, repo: pathlib.Path) -> None: |
| 162 | # Should not raise even if a commit_id does not exist. |
| 163 | bundle = build_pack(repo, ["nonexistent"]) |
| 164 | assert (bundle.get("commits") or []) == [] |
| 165 | |
| 166 | def test_merge_commit_includes_both_parents(self, repo: pathlib.Path) -> None: |
| 167 | oid_a = _make_object(repo, b"branch-a") |
| 168 | oid_b = _make_object(repo, b"branch-b") |
| 169 | _make_snapshot(repo, "snap_a", {"a.txt": oid_a}) |
| 170 | _make_snapshot(repo, "snap_b", {"b.txt": oid_b}) |
| 171 | _make_snapshot(repo, "snap_m", {"a.txt": oid_a, "b.txt": oid_b}) |
| 172 | _make_commit(repo, "c_a", "snap_a") |
| 173 | _make_commit(repo, "c_b", "snap_b") |
| 174 | # Merge commit with two parents |
| 175 | c_merge = CommitRecord( |
| 176 | commit_id="c_merge", |
| 177 | repo_id="test-repo", |
| 178 | branch="main", |
| 179 | snapshot_id="snap_m", |
| 180 | message="merge", |
| 181 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 182 | parent_commit_id="c_a", |
| 183 | parent2_commit_id="c_b", |
| 184 | ) |
| 185 | write_commit(repo, c_merge) |
| 186 | |
| 187 | bundle = build_pack(repo, ["c_merge"]) |
| 188 | commit_ids = {c["commit_id"] for c in (bundle.get("commits") or [])} |
| 189 | assert {"c_merge", "c_a", "c_b"}.issubset(commit_ids) |
| 190 | |
| 191 | |
| 192 | # --------------------------------------------------------------------------- |
| 193 | # apply_pack tests |
| 194 | # --------------------------------------------------------------------------- |
| 195 | |
| 196 | |
| 197 | class TestApplyPack: |
| 198 | def test_round_trip(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: |
| 199 | """build_pack → apply_pack in a fresh repo produces identical data.""" |
| 200 | content = b"round trip" |
| 201 | oid = _make_object(repo, content) |
| 202 | _make_snapshot(repo, "s" * 64, {"f.txt": oid}) |
| 203 | _make_commit(repo, "c1", "s" * 64, message="initial") |
| 204 | |
| 205 | bundle = build_pack(repo, ["c1"]) |
| 206 | |
| 207 | # Apply into a fresh repo. |
| 208 | dest = tmp_path / "dest" |
| 209 | muse_dir = dest / ".muse" |
| 210 | (muse_dir / "commits").mkdir(parents=True) |
| 211 | (muse_dir / "snapshots").mkdir(parents=True) |
| 212 | (muse_dir / "objects").mkdir(parents=True) |
| 213 | |
| 214 | result = apply_pack(dest, bundle) |
| 215 | |
| 216 | assert result["objects_written"] == 1 |
| 217 | assert has_object(dest, oid) |
| 218 | assert read_object(dest, oid) == content |
| 219 | assert read_snapshot(dest, "s" * 64) is not None |
| 220 | assert read_commit(dest, "c1") is not None |
| 221 | |
| 222 | def test_idempotent_apply(self, repo: pathlib.Path) -> None: |
| 223 | """Applying the same bundle twice does not raise and new_count = 0.""" |
| 224 | content = b"idempotent" |
| 225 | oid = _make_object(repo, content) |
| 226 | _make_snapshot(repo, "s" * 64, {"f.txt": oid}) |
| 227 | _make_commit(repo, "c1", "s" * 64) |
| 228 | |
| 229 | bundle = build_pack(repo, ["c1"]) |
| 230 | apply_pack(repo, bundle) |
| 231 | result = apply_pack(repo, bundle) |
| 232 | |
| 233 | assert result["objects_written"] == 0 # All already present. |
| 234 | |
| 235 | def test_malformed_object_skipped(self, repo: pathlib.Path) -> None: |
| 236 | # content must be bytes; passing wrong type is caught gracefully |
| 237 | bundle: PackBundle = { |
| 238 | "commits": [], |
| 239 | "snapshots": [], |
| 240 | "objects": [ObjectPayload(object_id="abc123", content=b"")], |
| 241 | } |
| 242 | result = apply_pack(repo, bundle) |
| 243 | assert result["objects_written"] == 0 |
| 244 | |
| 245 | def test_empty_bundle_is_noop(self, repo: pathlib.Path) -> None: |
| 246 | bundle: PackBundle = {} |
| 247 | result = apply_pack(repo, bundle) |
| 248 | assert result["objects_written"] == 0 |
| 249 | |
| 250 | def test_apply_preserves_commit_metadata( |
| 251 | self, repo: pathlib.Path, tmp_path: pathlib.Path |
| 252 | ) -> None: |
| 253 | oid = _make_object(repo, b"data") |
| 254 | _make_snapshot(repo, "1" * 64, {"data.bin": oid}) |
| 255 | _make_commit(repo, "c1", "1" * 64, message="preserve me") |
| 256 | |
| 257 | bundle = build_pack(repo, ["c1"]) |
| 258 | |
| 259 | dest = tmp_path / "d" |
| 260 | (dest / ".muse" / "commits").mkdir(parents=True) |
| 261 | (dest / ".muse" / "snapshots").mkdir(parents=True) |
| 262 | (dest / ".muse" / "objects").mkdir(parents=True) |
| 263 | apply_pack(dest, bundle) |
| 264 | |
| 265 | commit = read_commit(dest, "c1") |
| 266 | assert commit is not None |
| 267 | assert commit.message == "preserve me" |
| 268 | assert commit.snapshot_id == "1" * 64 |
| 269 | |
| 270 | def test_apply_returns_new_object_count( |
| 271 | self, repo: pathlib.Path, tmp_path: pathlib.Path |
| 272 | ) -> None: |
| 273 | oid1 = _make_object(repo, b"obj1") |
| 274 | oid2 = _make_object(repo, b"obj2") |
| 275 | _make_snapshot(repo, "1" * 64, {"a": oid1, "b": oid2}) |
| 276 | _make_commit(repo, "c1", "1" * 64) |
| 277 | |
| 278 | bundle = build_pack(repo, ["c1"]) |
| 279 | dest = tmp_path / "d" |
| 280 | (dest / ".muse" / "commits").mkdir(parents=True) |
| 281 | (dest / ".muse" / "snapshots").mkdir(parents=True) |
| 282 | (dest / ".muse" / "objects").mkdir(parents=True) |
| 283 | |
| 284 | result = apply_pack(dest, bundle) |
| 285 | assert result["objects_written"] == 2 |