test_plumbing_read_snapshot.py
python
| 1 | """Tests for ``muse plumbing read-snapshot``. |
| 2 | |
| 3 | Covers: valid-snapshot lookup, manifest contents, file-count, created_at |
| 4 | timestamp, snapshot-not-found, invalid-ID format, and a stress case with |
| 5 | a large manifest. |
| 6 | """ |
| 7 | |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import datetime |
| 11 | import hashlib |
| 12 | import json |
| 13 | import pathlib |
| 14 | |
| 15 | from typer.testing import CliRunner |
| 16 | |
| 17 | from muse.cli.app import cli |
| 18 | from muse.core.errors import ExitCode |
| 19 | from muse.core.store import SnapshotRecord, write_snapshot |
| 20 | |
| 21 | runner = CliRunner() |
| 22 | |
| 23 | |
| 24 | # --------------------------------------------------------------------------- |
| 25 | # Helpers |
| 26 | # --------------------------------------------------------------------------- |
| 27 | |
| 28 | |
| 29 | def _sha(tag: str) -> str: |
| 30 | return hashlib.sha256(tag.encode()).hexdigest() |
| 31 | |
| 32 | |
| 33 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 34 | muse = path / ".muse" |
| 35 | (muse / "commits").mkdir(parents=True) |
| 36 | (muse / "snapshots").mkdir(parents=True) |
| 37 | (muse / "objects").mkdir(parents=True) |
| 38 | (muse / "refs" / "heads").mkdir(parents=True) |
| 39 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 40 | (muse / "repo.json").write_text( |
| 41 | json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8" |
| 42 | ) |
| 43 | return path |
| 44 | |
| 45 | |
| 46 | def _env(repo: pathlib.Path) -> dict[str, str]: |
| 47 | return {"MUSE_REPO_ROOT": str(repo)} |
| 48 | |
| 49 | |
| 50 | def _snap(repo: pathlib.Path, manifest: dict[str, str], tag: str = "snap") -> str: |
| 51 | sid = _sha(f"sid-{tag}-{json.dumps(sorted(manifest.items()))}") |
| 52 | write_snapshot( |
| 53 | repo, |
| 54 | SnapshotRecord( |
| 55 | snapshot_id=sid, |
| 56 | manifest=manifest, |
| 57 | created_at=datetime.datetime(2026, 3, 15, 12, 0, 0, tzinfo=datetime.timezone.utc), |
| 58 | ), |
| 59 | ) |
| 60 | return sid |
| 61 | |
| 62 | |
| 63 | # --------------------------------------------------------------------------- |
| 64 | # Unit: ID validation |
| 65 | # --------------------------------------------------------------------------- |
| 66 | |
| 67 | |
| 68 | class TestReadSnapshotUnit: |
| 69 | def test_invalid_id_format_exits_user_error(self, tmp_path: pathlib.Path) -> None: |
| 70 | repo = _init_repo(tmp_path) |
| 71 | result = runner.invoke(cli, ["plumbing", "read-snapshot", "not-hex"], env=_env(repo)) |
| 72 | assert result.exit_code == ExitCode.USER_ERROR |
| 73 | assert "error" in json.loads(result.stdout) |
| 74 | |
| 75 | def test_snapshot_not_found_exits_user_error(self, tmp_path: pathlib.Path) -> None: |
| 76 | repo = _init_repo(tmp_path) |
| 77 | ghost = _sha("ghost") |
| 78 | result = runner.invoke(cli, ["plumbing", "read-snapshot", ghost], env=_env(repo)) |
| 79 | assert result.exit_code == ExitCode.USER_ERROR |
| 80 | assert "error" in json.loads(result.stdout) |
| 81 | |
| 82 | |
| 83 | # --------------------------------------------------------------------------- |
| 84 | # Integration: snapshot retrieval |
| 85 | # --------------------------------------------------------------------------- |
| 86 | |
| 87 | |
| 88 | class TestReadSnapshotRetrieval: |
| 89 | def test_returns_snapshot_id_in_output(self, tmp_path: pathlib.Path) -> None: |
| 90 | repo = _init_repo(tmp_path) |
| 91 | sid = _snap(repo, {}) |
| 92 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 93 | assert result.exit_code == 0, result.output |
| 94 | data = json.loads(result.stdout) |
| 95 | assert data["snapshot_id"] == sid |
| 96 | |
| 97 | def test_returns_correct_file_count(self, tmp_path: pathlib.Path) -> None: |
| 98 | repo = _init_repo(tmp_path) |
| 99 | manifest = {"a.mid": _sha("a"), "b.mid": _sha("b"), "c.mid": _sha("c")} |
| 100 | sid = _snap(repo, manifest) |
| 101 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 102 | assert result.exit_code == 0 |
| 103 | assert json.loads(result.stdout)["file_count"] == 3 |
| 104 | |
| 105 | def test_manifest_paths_and_ids_match(self, tmp_path: pathlib.Path) -> None: |
| 106 | repo = _init_repo(tmp_path) |
| 107 | oid = _sha("drums-content") |
| 108 | manifest = {"drums.mid": oid} |
| 109 | sid = _snap(repo, manifest) |
| 110 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 111 | assert result.exit_code == 0 |
| 112 | data = json.loads(result.stdout) |
| 113 | assert data["manifest"]["drums.mid"] == oid |
| 114 | |
| 115 | def test_empty_manifest_reports_zero_files(self, tmp_path: pathlib.Path) -> None: |
| 116 | repo = _init_repo(tmp_path) |
| 117 | sid = _snap(repo, {}) |
| 118 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 119 | assert result.exit_code == 0 |
| 120 | data = json.loads(result.stdout) |
| 121 | assert data["file_count"] == 0 |
| 122 | assert data["manifest"] == {} |
| 123 | |
| 124 | def test_created_at_is_iso8601_string(self, tmp_path: pathlib.Path) -> None: |
| 125 | repo = _init_repo(tmp_path) |
| 126 | sid = _snap(repo, {}) |
| 127 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 128 | assert result.exit_code == 0 |
| 129 | ts = json.loads(result.stdout)["created_at"] |
| 130 | # Must be parseable as an ISO 8601 datetime. |
| 131 | dt = datetime.datetime.fromisoformat(ts) |
| 132 | assert dt.year == 2026 |
| 133 | |
| 134 | def test_output_is_pretty_printed_json(self, tmp_path: pathlib.Path) -> None: |
| 135 | repo = _init_repo(tmp_path) |
| 136 | sid = _snap(repo, {"f.mid": _sha("f")}) |
| 137 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 138 | assert result.exit_code == 0 |
| 139 | # Pretty-printed JSON contains newlines. |
| 140 | assert "\n" in result.stdout |
| 141 | |
| 142 | def test_multiple_snapshots_independent(self, tmp_path: pathlib.Path) -> None: |
| 143 | repo = _init_repo(tmp_path) |
| 144 | sid1 = _snap(repo, {"a.mid": _sha("a")}, "s1") |
| 145 | sid2 = _snap(repo, {"b.mid": _sha("b")}, "s2") |
| 146 | r1 = runner.invoke(cli, ["plumbing", "read-snapshot", sid1], env=_env(repo)) |
| 147 | r2 = runner.invoke(cli, ["plumbing", "read-snapshot", sid2], env=_env(repo)) |
| 148 | assert r1.exit_code == 0 and r2.exit_code == 0 |
| 149 | assert "a.mid" in json.loads(r1.stdout)["manifest"] |
| 150 | assert "b.mid" in json.loads(r2.stdout)["manifest"] |
| 151 | |
| 152 | |
| 153 | # --------------------------------------------------------------------------- |
| 154 | # Stress: 1000-file manifest |
| 155 | # --------------------------------------------------------------------------- |
| 156 | |
| 157 | |
| 158 | class TestReadSnapshotStress: |
| 159 | def test_1000_file_manifest_reads_correctly(self, tmp_path: pathlib.Path) -> None: |
| 160 | repo = _init_repo(tmp_path) |
| 161 | manifest = {f"track_{i:04d}.mid": _sha(f"obj-{i}") for i in range(1000)} |
| 162 | sid = _snap(repo, manifest) |
| 163 | result = runner.invoke(cli, ["plumbing", "read-snapshot", sid], env=_env(repo)) |
| 164 | assert result.exit_code == 0 |
| 165 | data = json.loads(result.stdout) |
| 166 | assert data["file_count"] == 1000 |
| 167 | assert len(data["manifest"]) == 1000 |