cgcardona / muse public
test_plugin_apply_and_checkout.py python
327 lines 12.0 KB
d7054e63 feat(phase-1): typed delta algebra — replace DeltaManifest with Structu… Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Tests for plugin.apply() and the incremental checkout that wires it in.
2
3 Covers:
4 - MusicPlugin.apply() with a workdir path (files already updated on disk)
5 - MusicPlugin.apply() with a snapshot dict (in-memory removals)
6 - checkout incremental delta: only changed files are touched
7 - revert reuses parent snapshot_id (no re-scan)
8 - cherry-pick uses merged_manifest directly (no re-scan)
9 """
10 from __future__ import annotations
11
12 import pathlib
13
14 import pytest
15 from typer.testing import CliRunner
16
17 from muse.cli.app import cli
18 from muse.core.store import get_head_commit_id, read_commit, read_snapshot
19 from muse.domain import DeleteOp, SnapshotManifest, StructuredDelta
20 from muse.plugins.music.plugin import MusicPlugin
21
22 runner = CliRunner()
23 plugin = MusicPlugin()
24
25
26 # ---------------------------------------------------------------------------
27 # Fixtures
28 # ---------------------------------------------------------------------------
29
30
31 @pytest.fixture
32 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
33 monkeypatch.chdir(tmp_path)
34 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
35 result = runner.invoke(cli, ["init"])
36 assert result.exit_code == 0, result.output
37 return tmp_path
38
39
40 def _write(repo: pathlib.Path, filename: str, content: str = "data") -> None:
41 (repo / "muse-work" / filename).write_text(content)
42
43
44 def _commit(msg: str = "commit") -> None:
45 result = runner.invoke(cli, ["commit", "-m", msg])
46 assert result.exit_code == 0, result.output
47
48
49 def _head_id(repo: pathlib.Path) -> str:
50 cid = get_head_commit_id(repo, "main")
51 assert cid is not None
52 return cid
53
54
55 # ---------------------------------------------------------------------------
56 # MusicPlugin.apply() — unit tests
57 # ---------------------------------------------------------------------------
58
59
60 def _empty_delta() -> StructuredDelta:
61 return StructuredDelta(domain="music", ops=[], summary="no changes")
62
63
64 class TestMusicPluginApplyPath:
65 """apply() with a workdir path rescans the directory for ground truth.
66
67 When live_state is a pathlib.Path, apply() ignores the delta and simply
68 rescans the directory — the physical filesystem is the source of truth.
69 """
70
71 def test_apply_returns_snapshot_of_workdir(self, tmp_path: pathlib.Path) -> None:
72 workdir = tmp_path / "work"
73 workdir.mkdir()
74 (workdir / "a.mid").write_bytes(b"midi-a")
75 (workdir / "b.mid").write_bytes(b"midi-b")
76
77 # Simulate remove b.mid physically before calling apply.
78 (workdir / "b.mid").unlink()
79
80 result = plugin.apply(_empty_delta(), workdir)
81
82 assert "b.mid" not in result["files"]
83 assert "a.mid" in result["files"]
84
85 def test_apply_picks_up_added_file(self, tmp_path: pathlib.Path) -> None:
86 workdir = tmp_path / "work"
87 workdir.mkdir()
88 (workdir / "a.mid").write_bytes(b"original")
89
90 # Add file physically before calling apply.
91 (workdir / "new.mid").write_bytes(b"new content")
92 result = plugin.apply(_empty_delta(), workdir)
93
94 assert "new.mid" in result["files"]
95
96 def test_apply_picks_up_modified_content(self, tmp_path: pathlib.Path) -> None:
97 workdir = tmp_path / "work"
98 workdir.mkdir()
99 (workdir / "a.mid").write_bytes(b"v1")
100
101 snap_v1 = plugin.snapshot(workdir)
102
103 # Modify physically, then apply rescans.
104 (workdir / "a.mid").write_bytes(b"v2")
105 result = plugin.apply(_empty_delta(), workdir)
106
107 assert result["files"]["a.mid"] != snap_v1["files"]["a.mid"]
108
109 def test_apply_empty_delta_returns_current_state(self, tmp_path: pathlib.Path) -> None:
110 workdir = tmp_path / "work"
111 workdir.mkdir()
112 (workdir / "a.mid").write_bytes(b"data")
113
114 result = plugin.apply(_empty_delta(), workdir)
115 expected = plugin.snapshot(workdir)
116 assert result["files"] == expected["files"]
117
118
119 class TestMusicPluginApplyDict:
120 """apply() with a snapshot dict applies ops in-memory."""
121
122 def _delete(self, address: str, content_id: str = "x") -> DeleteOp:
123 return DeleteOp(
124 op="delete", address=address, position=None,
125 content_id=content_id, content_summary=f"deleted: {address}",
126 )
127
128 def test_apply_removes_deleted_paths(self) -> None:
129 snap = SnapshotManifest(files={"a.mid": "aaa", "b.mid": "bbb"}, domain="music")
130 delta = StructuredDelta(
131 domain="music",
132 ops=[self._delete("b.mid", "bbb")],
133 summary="1 file removed",
134 )
135 result = plugin.apply(delta, snap)
136 assert "b.mid" not in result["files"]
137 assert "a.mid" in result["files"]
138
139 def test_apply_removes_multiple_paths(self) -> None:
140 snap = SnapshotManifest(files={"a.mid": "aaa", "b.mid": "bbb", "c.mid": "ccc"}, domain="music")
141 delta = StructuredDelta(
142 domain="music",
143 ops=[self._delete("a.mid", "aaa"), self._delete("c.mid", "ccc")],
144 summary="2 files removed",
145 )
146 result = plugin.apply(delta, snap)
147 assert result["files"] == {"b.mid": "bbb"}
148
149 def test_apply_nonexistent_remove_is_noop(self) -> None:
150 snap = SnapshotManifest(files={"a.mid": "aaa"}, domain="music")
151 delta = StructuredDelta(
152 domain="music",
153 ops=[self._delete("ghost.mid")],
154 summary="1 file removed",
155 )
156 result = plugin.apply(delta, snap)
157 assert result["files"] == {"a.mid": "aaa"}
158
159 def test_apply_preserves_domain(self) -> None:
160 snap = SnapshotManifest(files={}, domain="music")
161 delta = _empty_delta()
162 result = plugin.apply(delta, snap)
163 assert result["domain"] == "music"
164
165
166 # ---------------------------------------------------------------------------
167 # Incremental checkout via plugin.apply()
168 # ---------------------------------------------------------------------------
169
170
171 class TestIncrementalCheckout:
172 def test_checkout_branch_only_changes_delta(self, repo: pathlib.Path) -> None:
173 """Files unchanged between branches are not touched on disk."""
174 _write(repo, "shared.mid", "shared")
175 _write(repo, "main-only.mid", "main")
176 _commit("main initial")
177
178 runner.invoke(cli, ["branch", "feature"])
179 runner.invoke(cli, ["checkout", "feature"])
180 _write(repo, "feature-only.mid", "feature")
181 _commit("feature commit")
182
183 runner.invoke(cli, ["checkout", "main"])
184 assert (repo / "muse-work" / "shared.mid").exists()
185 assert (repo / "muse-work" / "main-only.mid").exists()
186 assert not (repo / "muse-work" / "feature-only.mid").exists()
187
188 def test_checkout_restores_correct_content(self, repo: pathlib.Path) -> None:
189 """Modified files get the correct content after checkout."""
190 _write(repo, "beat.mid", "v1")
191 _commit("v1")
192
193 runner.invoke(cli, ["branch", "feature"])
194 runner.invoke(cli, ["checkout", "feature"])
195 _write(repo, "beat.mid", "v2")
196 _commit("v2 on feature")
197
198 runner.invoke(cli, ["checkout", "main"])
199 assert (repo / "muse-work" / "beat.mid").read_text() == "v1"
200
201 def test_checkout_commit_id_incremental(self, repo: pathlib.Path) -> None:
202 """Detached HEAD checkout also uses incremental apply."""
203 _write(repo, "a.mid", "original")
204 _commit("first")
205 first_id = _head_id(repo)
206
207 _write(repo, "b.mid", "new")
208 _commit("second")
209
210 runner.invoke(cli, ["checkout", first_id])
211 assert (repo / "muse-work" / "a.mid").exists()
212 assert not (repo / "muse-work" / "b.mid").exists()
213
214 def test_checkout_to_new_branch_keeps_workdir(self, repo: pathlib.Path) -> None:
215 """Switching to a brand-new (no-commit) branch keeps the current workdir intact."""
216 _write(repo, "beat.mid")
217 _commit("some work")
218
219 runner.invoke(cli, ["branch", "empty-branch"])
220 runner.invoke(cli, ["checkout", "empty-branch"])
221 # workdir is preserved — new branch inherits from where we branched
222 assert (repo / "muse-work" / "beat.mid").exists()
223
224
225 # ---------------------------------------------------------------------------
226 # Revert reuses parent snapshot (no re-scan)
227 # ---------------------------------------------------------------------------
228
229
230 class TestRevertSnapshotReuse:
231 def test_revert_commit_points_to_parent_snapshot(self, repo: pathlib.Path) -> None:
232 """The revert commit's snapshot_id is the same as the reverted commit's parent."""
233 _write(repo, "beat.mid", "base")
234 _commit("base")
235 base_id = _head_id(repo)
236 base_commit = read_commit(repo, base_id)
237 assert base_commit is not None
238 parent_snapshot_id = base_commit.snapshot_id
239
240 _write(repo, "lead.mid", "new")
241 _commit("add lead")
242 lead_id = _head_id(repo)
243
244 runner.invoke(cli, ["revert", lead_id])
245 revert_id = _head_id(repo)
246 revert_commit = read_commit(repo, revert_id)
247 assert revert_commit is not None
248 # The revert commit points to the same snapshot as "base" — no re-scan.
249 assert revert_commit.snapshot_id == parent_snapshot_id
250
251 def test_revert_snapshot_already_in_store(self, repo: pathlib.Path) -> None:
252 """After revert, the referenced snapshot is already in the store (not re-created)."""
253 _write(repo, "beat.mid", "base")
254 _commit("base")
255 base_id = _head_id(repo)
256 base_commit = read_commit(repo, base_id)
257 assert base_commit is not None
258
259 _write(repo, "lead.mid", "new")
260 _commit("add lead")
261 lead_id = _head_id(repo)
262
263 runner.invoke(cli, ["revert", lead_id])
264 revert_id = _head_id(repo)
265 revert_commit = read_commit(repo, revert_id)
266 assert revert_commit is not None
267
268 snap = read_snapshot(repo, revert_commit.snapshot_id)
269 assert snap is not None
270 assert "beat.mid" in snap.manifest
271 assert "lead.mid" not in snap.manifest
272
273
274 # ---------------------------------------------------------------------------
275 # Cherry-pick uses merged_manifest (no re-scan)
276 # ---------------------------------------------------------------------------
277
278
279 class TestCherryPickManifestReuse:
280 def test_cherry_pick_commit_has_correct_snapshot(self, repo: pathlib.Path) -> None:
281 """Cherry-picked commit's snapshot contains only the right files."""
282 _write(repo, "base.mid", "base")
283 _commit("base")
284
285 runner.invoke(cli, ["branch", "feature"])
286 runner.invoke(cli, ["checkout", "feature"])
287 _write(repo, "feature.mid", "feature content")
288 _commit("feature addition")
289 feature_id = get_head_commit_id(repo, "feature")
290 assert feature_id is not None
291
292 runner.invoke(cli, ["checkout", "main"])
293 result = runner.invoke(cli, ["cherry-pick", feature_id])
294 assert result.exit_code == 0, result.output
295
296 picked_id = _head_id(repo)
297 picked_commit = read_commit(repo, picked_id)
298 assert picked_commit is not None
299 snap = read_snapshot(repo, picked_commit.snapshot_id)
300 assert snap is not None
301 assert "feature.mid" in snap.manifest
302 assert "base.mid" in snap.manifest
303
304 def test_cherry_pick_snapshot_objects_in_store(self, repo: pathlib.Path) -> None:
305 """Objects in the cherry-pick snapshot are already in the store."""
306 _write(repo, "base.mid", "base")
307 _commit("base")
308
309 runner.invoke(cli, ["branch", "feature"])
310 runner.invoke(cli, ["checkout", "feature"])
311 _write(repo, "extra.mid", "extra")
312 _commit("feature extra")
313 feature_id = get_head_commit_id(repo, "feature")
314 assert feature_id is not None
315
316 runner.invoke(cli, ["checkout", "main"])
317 runner.invoke(cli, ["cherry-pick", feature_id])
318
319 picked_id = _head_id(repo)
320 picked_commit = read_commit(repo, picked_id)
321 assert picked_commit is not None
322 snap = read_snapshot(repo, picked_commit.snapshot_id)
323 assert snap is not None
324
325 from muse.core.object_store import has_object
326 for oid in snap.manifest.values():
327 assert has_object(repo, oid), f"Object {oid[:8]} missing from store"