cgcardona / muse public
test_plugin_apply_and_checkout.py python
326 lines 11.9 KB
59a915a4 refactor: repo root is the working tree — remove state/ subdirectory Gabriel Cardona <gabriel@tellurstori.com> 6h ago
1 """Tests for plugin.apply() and the incremental checkout that wires it in.
2
3 Covers:
4 - MidiPlugin.apply() with a workdir path (files already updated on disk)
5 - MidiPlugin.apply() with a snapshot dict (in-memory removals)
6 - checkout incremental delta: only changed files are touched
7 - revert reuses parent snapshot_id (no re-scan)
8 - cherry-pick uses merged_manifest directly (no re-scan)
9 """
10
11 import pathlib
12
13 import pytest
14 from typer.testing import CliRunner
15
16 from muse.cli.app import cli
17 from muse.core.store import get_head_commit_id, read_commit, read_snapshot
18 from muse.domain import DeleteOp, SnapshotManifest, StructuredDelta
19 from muse.plugins.midi.plugin import MidiPlugin
20
21 runner = CliRunner()
22 plugin = MidiPlugin()
23
24
25 # ---------------------------------------------------------------------------
26 # Fixtures
27 # ---------------------------------------------------------------------------
28
29
30 @pytest.fixture
31 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
32 monkeypatch.chdir(tmp_path)
33 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
34 result = runner.invoke(cli, ["init"])
35 assert result.exit_code == 0, result.output
36 return tmp_path
37
38
39 def _write(repo: pathlib.Path, filename: str, content: str = "data") -> None:
40 (repo / filename).write_text(content)
41
42
43 def _commit(msg: str = "commit") -> None:
44 result = runner.invoke(cli, ["commit", "-m", msg])
45 assert result.exit_code == 0, result.output
46
47
48 def _head_id(repo: pathlib.Path) -> str:
49 cid = get_head_commit_id(repo, "main")
50 assert cid is not None
51 return cid
52
53
54 # ---------------------------------------------------------------------------
55 # MidiPlugin.apply() — unit tests
56 # ---------------------------------------------------------------------------
57
58
59 def _empty_delta() -> StructuredDelta:
60 return StructuredDelta(domain="midi", ops=[], summary="no changes")
61
62
63 class TestMidiPluginApplyPath:
64 """apply() with a workdir path rescans the directory for ground truth.
65
66 When live_state is a pathlib.Path, apply() ignores the delta and simply
67 rescans the directory — the physical filesystem is the source of truth.
68 """
69
70 def test_apply_returns_snapshot_of_workdir(self, tmp_path: pathlib.Path) -> None:
71 workdir = tmp_path / "work"
72 workdir.mkdir()
73 (workdir / "a.mid").write_bytes(b"midi-a")
74 (workdir / "b.mid").write_bytes(b"midi-b")
75
76 # Simulate remove b.mid physically before calling apply.
77 (workdir / "b.mid").unlink()
78
79 result = plugin.apply(_empty_delta(), workdir)
80
81 assert "b.mid" not in result["files"]
82 assert "a.mid" in result["files"]
83
84 def test_apply_picks_up_added_file(self, tmp_path: pathlib.Path) -> None:
85 workdir = tmp_path / "work"
86 workdir.mkdir()
87 (workdir / "a.mid").write_bytes(b"original")
88
89 # Add file physically before calling apply.
90 (workdir / "new.mid").write_bytes(b"new content")
91 result = plugin.apply(_empty_delta(), workdir)
92
93 assert "new.mid" in result["files"]
94
95 def test_apply_picks_up_modified_content(self, tmp_path: pathlib.Path) -> None:
96 workdir = tmp_path / "work"
97 workdir.mkdir()
98 (workdir / "a.mid").write_bytes(b"v1")
99
100 snap_v1 = plugin.snapshot(workdir)
101
102 # Modify physically, then apply rescans.
103 (workdir / "a.mid").write_bytes(b"v2")
104 result = plugin.apply(_empty_delta(), workdir)
105
106 assert result["files"]["a.mid"] != snap_v1["files"]["a.mid"]
107
108 def test_apply_empty_delta_returns_current_state(self, tmp_path: pathlib.Path) -> None:
109 workdir = tmp_path / "work"
110 workdir.mkdir()
111 (workdir / "a.mid").write_bytes(b"data")
112
113 result = plugin.apply(_empty_delta(), workdir)
114 expected = plugin.snapshot(workdir)
115 assert result["files"] == expected["files"]
116
117
118 class TestMidiPluginApplyDict:
119 """apply() with a snapshot dict applies ops in-memory."""
120
121 def _delete(self, address: str, content_id: str = "x") -> DeleteOp:
122 return DeleteOp(
123 op="delete", address=address, position=None,
124 content_id=content_id, content_summary=f"deleted: {address}",
125 )
126
127 def test_apply_removes_deleted_paths(self) -> None:
128 snap = SnapshotManifest(files={"a.mid": "aaa", "b.mid": "bbb"}, domain="midi")
129 delta = StructuredDelta(
130 domain="midi",
131 ops=[self._delete("b.mid", "bbb")],
132 summary="1 file removed",
133 )
134 result = plugin.apply(delta, snap)
135 assert "b.mid" not in result["files"]
136 assert "a.mid" in result["files"]
137
138 def test_apply_removes_multiple_paths(self) -> None:
139 snap = SnapshotManifest(files={"a.mid": "aaa", "b.mid": "bbb", "c.mid": "ccc"}, domain="midi")
140 delta = StructuredDelta(
141 domain="midi",
142 ops=[self._delete("a.mid", "aaa"), self._delete("c.mid", "ccc")],
143 summary="2 files removed",
144 )
145 result = plugin.apply(delta, snap)
146 assert result["files"] == {"b.mid": "bbb"}
147
148 def test_apply_nonexistent_remove_is_noop(self) -> None:
149 snap = SnapshotManifest(files={"a.mid": "aaa"}, domain="midi")
150 delta = StructuredDelta(
151 domain="midi",
152 ops=[self._delete("ghost.mid")],
153 summary="1 file removed",
154 )
155 result = plugin.apply(delta, snap)
156 assert result["files"] == {"a.mid": "aaa"}
157
158 def test_apply_preserves_domain(self) -> None:
159 snap = SnapshotManifest(files={}, domain="midi")
160 delta = _empty_delta()
161 result = plugin.apply(delta, snap)
162 assert result["domain"] == "midi"
163
164
165 # ---------------------------------------------------------------------------
166 # Incremental checkout via plugin.apply()
167 # ---------------------------------------------------------------------------
168
169
170 class TestIncrementalCheckout:
171 def test_checkout_branch_only_changes_delta(self, repo: pathlib.Path) -> None:
172 """Files unchanged between branches are not touched on disk."""
173 _write(repo, "shared.mid", "shared")
174 _write(repo, "main-only.mid", "main")
175 _commit("main initial")
176
177 runner.invoke(cli, ["branch", "feature"])
178 runner.invoke(cli, ["checkout", "feature"])
179 _write(repo, "feature-only.mid", "feature")
180 _commit("feature commit")
181
182 runner.invoke(cli, ["checkout", "main"])
183 assert (repo / "shared.mid").exists()
184 assert (repo / "main-only.mid").exists()
185 assert not (repo / "feature-only.mid").exists()
186
187 def test_checkout_restores_correct_content(self, repo: pathlib.Path) -> None:
188 """Modified files get the correct content after checkout."""
189 _write(repo, "beat.mid", "v1")
190 _commit("v1")
191
192 runner.invoke(cli, ["branch", "feature"])
193 runner.invoke(cli, ["checkout", "feature"])
194 _write(repo, "beat.mid", "v2")
195 _commit("v2 on feature")
196
197 runner.invoke(cli, ["checkout", "main"])
198 assert (repo / "beat.mid").read_text() == "v1"
199
200 def test_checkout_commit_id_incremental(self, repo: pathlib.Path) -> None:
201 """Detached HEAD checkout also uses incremental apply."""
202 _write(repo, "a.mid", "original")
203 _commit("first")
204 first_id = _head_id(repo)
205
206 _write(repo, "b.mid", "new")
207 _commit("second")
208
209 runner.invoke(cli, ["checkout", first_id])
210 assert (repo / "a.mid").exists()
211 assert not (repo / "b.mid").exists()
212
213 def test_checkout_to_new_branch_keeps_workdir(self, repo: pathlib.Path) -> None:
214 """Switching to a brand-new (no-commit) branch keeps the current workdir intact."""
215 _write(repo, "beat.mid")
216 _commit("some work")
217
218 runner.invoke(cli, ["branch", "empty-branch"])
219 runner.invoke(cli, ["checkout", "empty-branch"])
220 # workdir is preserved — new branch inherits from where we branched
221 assert (repo / "beat.mid").exists()
222
223
224 # ---------------------------------------------------------------------------
225 # Revert reuses parent snapshot (no re-scan)
226 # ---------------------------------------------------------------------------
227
228
229 class TestRevertSnapshotReuse:
230 def test_revert_commit_points_to_parent_snapshot(self, repo: pathlib.Path) -> None:
231 """The revert commit's snapshot_id is the same as the reverted commit's parent."""
232 _write(repo, "beat.mid", "base")
233 _commit("base")
234 base_id = _head_id(repo)
235 base_commit = read_commit(repo, base_id)
236 assert base_commit is not None
237 parent_snapshot_id = base_commit.snapshot_id
238
239 _write(repo, "lead.mid", "new")
240 _commit("add lead")
241 lead_id = _head_id(repo)
242
243 runner.invoke(cli, ["revert", lead_id])
244 revert_id = _head_id(repo)
245 revert_commit = read_commit(repo, revert_id)
246 assert revert_commit is not None
247 # The revert commit points to the same snapshot as "base" — no re-scan.
248 assert revert_commit.snapshot_id == parent_snapshot_id
249
250 def test_revert_snapshot_already_in_store(self, repo: pathlib.Path) -> None:
251 """After revert, the referenced snapshot is already in the store (not re-created)."""
252 _write(repo, "beat.mid", "base")
253 _commit("base")
254 base_id = _head_id(repo)
255 base_commit = read_commit(repo, base_id)
256 assert base_commit is not None
257
258 _write(repo, "lead.mid", "new")
259 _commit("add lead")
260 lead_id = _head_id(repo)
261
262 runner.invoke(cli, ["revert", lead_id])
263 revert_id = _head_id(repo)
264 revert_commit = read_commit(repo, revert_id)
265 assert revert_commit is not None
266
267 snap = read_snapshot(repo, revert_commit.snapshot_id)
268 assert snap is not None
269 assert "beat.mid" in snap.manifest
270 assert "lead.mid" not in snap.manifest
271
272
273 # ---------------------------------------------------------------------------
274 # Cherry-pick uses merged_manifest (no re-scan)
275 # ---------------------------------------------------------------------------
276
277
278 class TestCherryPickManifestReuse:
279 def test_cherry_pick_commit_has_correct_snapshot(self, repo: pathlib.Path) -> None:
280 """Cherry-picked commit's snapshot contains only the right files."""
281 _write(repo, "base.mid", "base")
282 _commit("base")
283
284 runner.invoke(cli, ["branch", "feature"])
285 runner.invoke(cli, ["checkout", "feature"])
286 _write(repo, "feature.mid", "feature content")
287 _commit("feature addition")
288 feature_id = get_head_commit_id(repo, "feature")
289 assert feature_id is not None
290
291 runner.invoke(cli, ["checkout", "main"])
292 result = runner.invoke(cli, ["cherry-pick", feature_id])
293 assert result.exit_code == 0, result.output
294
295 picked_id = _head_id(repo)
296 picked_commit = read_commit(repo, picked_id)
297 assert picked_commit is not None
298 snap = read_snapshot(repo, picked_commit.snapshot_id)
299 assert snap is not None
300 assert "feature.mid" in snap.manifest
301 assert "base.mid" in snap.manifest
302
303 def test_cherry_pick_snapshot_objects_in_store(self, repo: pathlib.Path) -> None:
304 """Objects in the cherry-pick snapshot are already in the store."""
305 _write(repo, "base.mid", "base")
306 _commit("base")
307
308 runner.invoke(cli, ["branch", "feature"])
309 runner.invoke(cli, ["checkout", "feature"])
310 _write(repo, "extra.mid", "extra")
311 _commit("feature extra")
312 feature_id = get_head_commit_id(repo, "feature")
313 assert feature_id is not None
314
315 runner.invoke(cli, ["checkout", "main"])
316 runner.invoke(cli, ["cherry-pick", feature_id])
317
318 picked_id = _head_id(repo)
319 picked_commit = read_commit(repo, picked_id)
320 assert picked_commit is not None
321 snap = read_snapshot(repo, picked_commit.snapshot_id)
322 assert snap is not None
323
324 from muse.core.object_store import has_object
325 for oid in snap.manifest.values():
326 assert has_object(repo, oid), f"Object {oid[:8]} missing from store"