test_plumbing_snapshot_diff.py
python
| 1 | """Tests for ``muse plumbing snapshot-diff``. |
| 2 | |
| 3 | Verifies categorisation of added/modified/deleted paths, resolution of |
| 4 | snapshot IDs, commit IDs, and branch names, text-format output, and error |
| 5 | handling for unresolvable refs. |
| 6 | """ |
| 7 | |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import datetime |
| 11 | import hashlib |
| 12 | import json |
| 13 | import pathlib |
| 14 | |
| 15 | from tests.cli_test_helper import CliRunner |
| 16 | |
| 17 | cli = None # argparse migration — CliRunner ignores this arg |
| 18 | from muse.core.errors import ExitCode |
| 19 | from muse.core.object_store import write_object |
| 20 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 21 | |
| 22 | runner = CliRunner() |
| 23 | |
| 24 | |
| 25 | # --------------------------------------------------------------------------- |
| 26 | # Helpers |
| 27 | # --------------------------------------------------------------------------- |
| 28 | |
| 29 | |
| 30 | def _sha(data: bytes | str) -> str: |
| 31 | raw = data if isinstance(data, bytes) else data.encode() |
| 32 | return hashlib.sha256(raw).hexdigest() |
| 33 | |
| 34 | |
| 35 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 36 | muse = path / ".muse" |
| 37 | (muse / "commits").mkdir(parents=True) |
| 38 | (muse / "snapshots").mkdir(parents=True) |
| 39 | (muse / "objects").mkdir(parents=True) |
| 40 | (muse / "refs" / "heads").mkdir(parents=True) |
| 41 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 42 | (muse / "repo.json").write_text( |
| 43 | json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8" |
| 44 | ) |
| 45 | return path |
| 46 | |
| 47 | |
| 48 | def _env(repo: pathlib.Path) -> dict[str, str]: |
| 49 | return {"MUSE_REPO_ROOT": str(repo)} |
| 50 | |
| 51 | |
| 52 | def _obj(repo: pathlib.Path, content: bytes) -> str: |
| 53 | oid = _sha(content) |
| 54 | write_object(repo, oid, content) |
| 55 | return oid |
| 56 | |
| 57 | |
| 58 | def _snap(repo: pathlib.Path, manifest: dict[str, str]) -> str: |
| 59 | sid = _sha(json.dumps(sorted(manifest.items()))) |
| 60 | write_snapshot( |
| 61 | repo, |
| 62 | SnapshotRecord( |
| 63 | snapshot_id=sid, |
| 64 | manifest=manifest, |
| 65 | created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), |
| 66 | ), |
| 67 | ) |
| 68 | return sid |
| 69 | |
| 70 | |
| 71 | def _commit(repo: pathlib.Path, tag: str, sid: str, branch: str = "main") -> str: |
| 72 | cid = _sha(tag) |
| 73 | write_commit( |
| 74 | repo, |
| 75 | CommitRecord( |
| 76 | commit_id=cid, |
| 77 | repo_id="test-repo", |
| 78 | branch=branch, |
| 79 | snapshot_id=sid, |
| 80 | message=tag, |
| 81 | committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), |
| 82 | author="tester", |
| 83 | parent_commit_id=None, |
| 84 | ), |
| 85 | ) |
| 86 | ref = repo / ".muse" / "refs" / "heads" / branch |
| 87 | ref.write_text(cid, encoding="utf-8") |
| 88 | return cid |
| 89 | |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # Tests |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | |
| 96 | class TestSnapshotDiff: |
| 97 | def test_added_deleted_categorised_correctly(self, tmp_path: pathlib.Path) -> None: |
| 98 | repo = _init_repo(tmp_path) |
| 99 | shared = _obj(repo, b"shared") |
| 100 | new_obj = _obj(repo, b"new") |
| 101 | sid_a = _snap(repo, {"shared.mid": shared, "old.mid": shared}) |
| 102 | sid_b = _snap(repo, {"shared.mid": shared, "new.mid": new_obj}) |
| 103 | result = runner.invoke(cli, ["plumbing", "snapshot-diff", sid_a, sid_b], env=_env(repo)) |
| 104 | assert result.exit_code == 0, result.output |
| 105 | data = json.loads(result.stdout) |
| 106 | assert [e["path"] for e in data["added"]] == ["new.mid"] |
| 107 | assert [e["path"] for e in data["deleted"]] == ["old.mid"] |
| 108 | assert data["modified"] == [] |
| 109 | assert data["total_changes"] == 2 |
| 110 | |
| 111 | def test_modified_entry_contains_both_object_ids(self, tmp_path: pathlib.Path) -> None: |
| 112 | repo = _init_repo(tmp_path) |
| 113 | v1 = _obj(repo, b"v1") |
| 114 | v2 = _obj(repo, b"v2") |
| 115 | sid_a = _snap(repo, {"track.mid": v1}) |
| 116 | sid_b = _snap(repo, {"track.mid": v2}) |
| 117 | result = runner.invoke(cli, ["plumbing", "snapshot-diff", sid_a, sid_b], env=_env(repo)) |
| 118 | assert result.exit_code == 0, result.output |
| 119 | data = json.loads(result.stdout) |
| 120 | assert len(data["modified"]) == 1 |
| 121 | mod = data["modified"][0] |
| 122 | assert mod["path"] == "track.mid" |
| 123 | assert mod["object_id_a"] == v1 |
| 124 | assert mod["object_id_b"] == v2 |
| 125 | |
| 126 | def test_zero_changes_when_snapshots_identical(self, tmp_path: pathlib.Path) -> None: |
| 127 | repo = _init_repo(tmp_path) |
| 128 | obj = _obj(repo, b"same") |
| 129 | sid = _snap(repo, {"f.mid": obj}) |
| 130 | result = runner.invoke(cli, ["plumbing", "snapshot-diff", sid, sid], env=_env(repo)) |
| 131 | assert result.exit_code == 0, result.output |
| 132 | data = json.loads(result.stdout) |
| 133 | assert data["total_changes"] == 0 |
| 134 | |
| 135 | def test_resolves_by_branch_name(self, tmp_path: pathlib.Path) -> None: |
| 136 | repo = _init_repo(tmp_path) |
| 137 | obj_a = _obj(repo, b"a") |
| 138 | obj_b = _obj(repo, b"b") |
| 139 | _commit(repo, "cmt-main", _snap(repo, {"a.mid": obj_a}), branch="main") |
| 140 | _commit(repo, "cmt-dev", _snap(repo, {"b.mid": obj_b}), branch="dev") |
| 141 | (repo / ".muse" / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 142 | result = runner.invoke(cli, ["plumbing", "snapshot-diff", "main", "dev"], env=_env(repo)) |
| 143 | assert result.exit_code == 0, result.output |
| 144 | data = json.loads(result.stdout) |
| 145 | assert data["total_changes"] == 2 |
| 146 | |
| 147 | def test_text_format_shows_status_letters(self, tmp_path: pathlib.Path) -> None: |
| 148 | repo = _init_repo(tmp_path) |
| 149 | shared = _obj(repo, b"s") |
| 150 | new_obj = _obj(repo, b"n") |
| 151 | sid_a = _snap(repo, {"gone.mid": shared}) |
| 152 | sid_b = _snap(repo, {"new.mid": new_obj}) |
| 153 | result = runner.invoke( |
| 154 | cli, ["plumbing", "snapshot-diff", "--format", "text", sid_a, sid_b], env=_env(repo) |
| 155 | ) |
| 156 | assert result.exit_code == 0, result.output |
| 157 | assert "A new.mid" in result.stdout |
| 158 | assert "D gone.mid" in result.stdout |
| 159 | |
| 160 | def test_stat_flag_appends_summary(self, tmp_path: pathlib.Path) -> None: |
| 161 | repo = _init_repo(tmp_path) |
| 162 | sid_a = _snap(repo, {"gone.mid": _obj(repo, b"g")}) |
| 163 | sid_b = _snap(repo, {"new.mid": _obj(repo, b"n")}) |
| 164 | result = runner.invoke( |
| 165 | cli, |
| 166 | ["plumbing", "snapshot-diff", "--format", "text", "--stat", sid_a, sid_b], |
| 167 | env=_env(repo), |
| 168 | ) |
| 169 | assert result.exit_code == 0, result.output |
| 170 | assert "added" in result.stdout |
| 171 | assert "deleted" in result.stdout |
| 172 | |
| 173 | def test_unresolvable_ref_exits_user_error(self, tmp_path: pathlib.Path) -> None: |
| 174 | repo = _init_repo(tmp_path) |
| 175 | result = runner.invoke( |
| 176 | cli, ["plumbing", "snapshot-diff", "no-such-thing", "also-missing"], env=_env(repo) |
| 177 | ) |
| 178 | assert result.exit_code == ExitCode.USER_ERROR |
| 179 | assert "error" in json.loads(result.stdout) |
| 180 | |
| 181 | def test_results_sorted_lexicographically(self, tmp_path: pathlib.Path) -> None: |
| 182 | repo = _init_repo(tmp_path) |
| 183 | sid_a = _snap(repo, {}) |
| 184 | sid_b = _snap( |
| 185 | repo, {"z.mid": _obj(repo, b"z"), "a.mid": _obj(repo, b"a"), "m.mid": _obj(repo, b"m")} |
| 186 | ) |
| 187 | result = runner.invoke(cli, ["plumbing", "snapshot-diff", sid_a, sid_b], env=_env(repo)) |
| 188 | assert result.exit_code == 0, result.output |
| 189 | data = json.loads(result.stdout) |
| 190 | added_paths = [e["path"] for e in data["added"]] |
| 191 | assert added_paths == sorted(added_paths) |