cgcardona / muse public
test_core_merge_engine.py python
320 lines 11.7 KB
04004b82 Rename MusicRGA → MidiRGA and purge all 'music plugin' terminology Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Tests for muse.core.merge_engine — three-way merge logic.
2
3 Extended to cover the structured (operation-level) merge path via
4 :func:`~muse.core.op_transform.merge_structured` and the
5 :class:`~muse.domain.StructuredMergePlugin` integration.
6 """
7
8 import datetime
9 import json
10 import pathlib
11
12 import pytest
13
14 from muse.core.merge_engine import (
15 MergeState,
16 apply_merge,
17 clear_merge_state,
18 detect_conflicts,
19 diff_snapshots,
20 find_merge_base,
21 read_merge_state,
22 write_merge_state,
23 )
24 from muse.core.op_transform import MergeOpsResult, merge_op_lists, merge_structured
25 from muse.core.store import CommitRecord, write_commit
26 from muse.domain import (
27 DeleteOp,
28 DomainOp,
29 InsertOp,
30 ReplaceOp,
31 SnapshotManifest,
32 StructuredDelta,
33 StructuredMergePlugin,
34 )
35 from muse.plugins.midi.plugin import MidiPlugin
36
37
38 @pytest.fixture
39 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
40 muse_dir = tmp_path / ".muse"
41 (muse_dir / "commits").mkdir(parents=True)
42 (muse_dir / "refs" / "heads").mkdir(parents=True)
43 return tmp_path
44
45
46 def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> None:
47 write_commit(root, CommitRecord(
48 commit_id=cid,
49 repo_id="r",
50 branch="main",
51 snapshot_id=f"snap-{cid}",
52 message=cid,
53 committed_at=datetime.datetime.now(datetime.timezone.utc),
54 parent_commit_id=parent,
55 parent2_commit_id=parent2,
56 ))
57
58
59 class TestDiffSnapshots:
60 def test_no_change(self) -> None:
61 m = {"a.mid": "h1", "b.mid": "h2"}
62 assert diff_snapshots(m, m) == set()
63
64 def test_added(self) -> None:
65 assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"}
66
67 def test_removed(self) -> None:
68 assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"}
69
70 def test_modified(self) -> None:
71 assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"}
72
73
74 class TestDetectConflicts:
75 def test_no_conflict(self) -> None:
76 assert detect_conflicts({"a.mid"}, {"b.mid"}) == set()
77
78 def test_conflict(self) -> None:
79 assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}) == {"b.mid"}
80
81 def test_both_empty(self) -> None:
82 assert detect_conflicts(set(), set()) == set()
83
84
85 class TestApplyMerge:
86 def test_clean_merge(self) -> None:
87 base = {"a.mid": "h0", "b.mid": "h0"}
88 ours = {"a.mid": "h_ours", "b.mid": "h0"}
89 theirs = {"a.mid": "h0", "b.mid": "h_theirs"}
90 ours_changed = {"a.mid"}
91 theirs_changed = {"b.mid"}
92 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
93 assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"}
94
95 def test_conflict_paths_excluded(self) -> None:
96 base = {"a.mid": "h0"}
97 ours = {"a.mid": "h_ours"}
98 theirs = {"a.mid": "h_theirs"}
99 ours_changed = theirs_changed = {"a.mid"}
100 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"})
101 assert result == {"a.mid": "h0"} # Falls back to base
102
103 def test_ours_deletion_applied(self) -> None:
104 base = {"a.mid": "h0", "b.mid": "h0"}
105 ours = {"b.mid": "h0"} # a.mid deleted on ours
106 theirs = {"a.mid": "h0", "b.mid": "h0"}
107 result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set())
108 assert "a.mid" not in result
109
110
111 class TestMergeStateIO:
112 def test_write_and_read(self, repo: pathlib.Path) -> None:
113 write_merge_state(
114 repo,
115 base_commit="base",
116 ours_commit="ours",
117 theirs_commit="theirs",
118 conflict_paths=["a.mid", "b.mid"],
119 other_branch="feature/x",
120 )
121 state = read_merge_state(repo)
122 assert state is not None
123 assert state.base_commit == "base"
124 assert state.conflict_paths == ["a.mid", "b.mid"]
125 assert state.other_branch == "feature/x"
126
127 def test_read_no_state(self, repo: pathlib.Path) -> None:
128 assert read_merge_state(repo) is None
129
130 def test_clear(self, repo: pathlib.Path) -> None:
131 write_merge_state(repo, base_commit="b", ours_commit="o", theirs_commit="t", conflict_paths=[])
132 clear_merge_state(repo)
133 assert read_merge_state(repo) is None
134
135
136 class TestFindMergeBase:
137 def test_direct_parent(self, repo: pathlib.Path) -> None:
138 _commit(repo, "root")
139 _commit(repo, "a", parent="root")
140 _commit(repo, "b", parent="root")
141 base = find_merge_base(repo, "a", "b")
142 assert base == "root"
143
144 def test_same_commit(self, repo: pathlib.Path) -> None:
145 _commit(repo, "root")
146 base = find_merge_base(repo, "root", "root")
147 assert base == "root"
148
149 def test_linear_history(self, repo: pathlib.Path) -> None:
150 _commit(repo, "a")
151 _commit(repo, "b", parent="a")
152 _commit(repo, "c", parent="b")
153 base = find_merge_base(repo, "c", "b")
154 assert base == "b"
155
156 def test_no_common_ancestor(self, repo: pathlib.Path) -> None:
157 _commit(repo, "x")
158 _commit(repo, "y")
159 assert find_merge_base(repo, "x", "y") is None
160
161
162 # ===========================================================================
163 # Structured merge engine integration tests
164 # ===========================================================================
165
166
167 def _ins(addr: str, pos: int | None, cid: str) -> InsertOp:
168 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
169
170
171 def _del(addr: str, pos: int | None, cid: str) -> DeleteOp:
172 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
173
174
175 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
176 return ReplaceOp(
177 op="replace",
178 address=addr,
179 position=None,
180 old_content_id=old,
181 new_content_id=new,
182 old_summary="old",
183 new_summary="new",
184 )
185
186
187 def _delta(ops: list[DomainOp]) -> StructuredDelta:
188 return StructuredDelta(domain="midi", ops=ops, summary="test")
189
190
191 class TestMergeStructuredIntegration:
192 """Verify merge_structured delegates correctly to merge_op_lists."""
193
194 def test_clean_non_overlapping_file_ops(self) -> None:
195 ours = _delta([_ins("a.mid", pos=0, cid="a-hash")])
196 theirs = _delta([_ins("b.mid", pos=0, cid="b-hash")])
197 result = merge_structured(_delta([]), ours, theirs)
198 assert result.is_clean is True
199 assert len(result.merged_ops) == 2
200
201 def test_conflicting_same_address_replaces_detected(self) -> None:
202 ours = _delta([_rep("shared.mid", "old", "v-ours")])
203 theirs = _delta([_rep("shared.mid", "old", "v-theirs")])
204 result = merge_structured(_delta([]), ours, theirs)
205 assert result.is_clean is False
206 assert len(result.conflict_ops) == 1
207
208 def test_base_ops_kept_by_both_sides_preserved(self) -> None:
209 shared = _ins("base.mid", pos=0, cid="base-cid")
210 result = merge_structured(
211 _delta([shared]),
212 _delta([shared]),
213 _delta([shared]),
214 )
215 assert result.is_clean is True
216 assert any(_op_key_tuple(op) == _op_key_tuple(shared) for op in result.merged_ops)
217
218 def test_position_adjustment_in_structured_merge(self) -> None:
219 """Non-conflicting note inserts get position-adjusted in structured merge."""
220 ours = _delta([_ins("lead.mid", pos=3, cid="note-A")])
221 theirs = _delta([_ins("lead.mid", pos=7, cid="note-B")])
222 result = merge_structured(_delta([]), ours, theirs)
223 assert result.is_clean is True
224 pos_by_cid = {
225 op["content_id"]: op["position"]
226 for op in result.merged_ops
227 if op["op"] == "insert"
228 }
229 # note-A(3): no theirs ≤ 3 → stays 3
230 assert pos_by_cid["note-A"] == 3
231 # note-B(7): ours A(3) ≤ 7 → 7+1 = 8
232 assert pos_by_cid["note-B"] == 8
233
234
235 def _op_key_tuple(op: DomainOp) -> tuple[str, ...]:
236 """Re-implementation of _op_key for test assertions."""
237 if op["op"] == "insert":
238 return ("insert", op["address"], str(op["position"]), op["content_id"])
239 if op["op"] == "delete":
240 return ("delete", op["address"], str(op["position"]), op["content_id"])
241 if op["op"] == "replace":
242 return ("replace", op["address"], str(op["position"]), op["old_content_id"], op["new_content_id"])
243 return (op["op"], op["address"])
244
245
246 class TestStructuredMergePluginProtocol:
247 """Verify MidiPlugin satisfies the StructuredMergePlugin protocol."""
248
249 def test_midi_plugin_isinstance_structured_merge_plugin(self) -> None:
250 plugin = MidiPlugin()
251 assert isinstance(plugin, StructuredMergePlugin)
252
253 def test_merge_ops_non_conflicting_files_is_clean(self) -> None:
254 plugin = MidiPlugin()
255 base = SnapshotManifest(files={}, domain="midi")
256 ours_snap = SnapshotManifest(files={"a.mid": "hash-a"}, domain="midi")
257 theirs_snap = SnapshotManifest(files={"b.mid": "hash-b"}, domain="midi")
258 ours_ops: list[DomainOp] = [_ins("a.mid", pos=None, cid="hash-a")]
259 theirs_ops: list[DomainOp] = [_ins("b.mid", pos=None, cid="hash-b")]
260
261 result = plugin.merge_ops(
262 base, ours_snap, theirs_snap, ours_ops, theirs_ops
263 )
264 assert result.is_clean is True
265 assert "a.mid" in result.merged["files"]
266 assert "b.mid" in result.merged["files"]
267
268 def test_merge_ops_conflicting_same_file_replace_not_clean(self) -> None:
269 plugin = MidiPlugin()
270 base = SnapshotManifest(files={"f.mid": "base-hash"}, domain="midi")
271 ours_snap = SnapshotManifest(files={"f.mid": "ours-hash"}, domain="midi")
272 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-hash"}, domain="midi")
273 ours_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "ours-hash")]
274 theirs_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "theirs-hash")]
275
276 result = plugin.merge_ops(
277 base, ours_snap, theirs_snap, ours_ops, theirs_ops
278 )
279 assert not result.is_clean
280 assert "f.mid" in result.conflicts
281
282 def test_merge_ops_ours_strategy_resolves_conflict(self) -> None:
283 plugin = MidiPlugin()
284 base = SnapshotManifest(files={"f.mid": "base"}, domain="midi")
285 ours_snap = SnapshotManifest(files={"f.mid": "ours-v"}, domain="midi")
286 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-v"}, domain="midi")
287 ours_ops: list[DomainOp] = [_rep("f.mid", "base", "ours-v")]
288 theirs_ops: list[DomainOp] = [_rep("f.mid", "base", "theirs-v")]
289
290 result = plugin.merge_ops(
291 base,
292 ours_snap,
293 theirs_snap,
294 ours_ops,
295 theirs_ops,
296 )
297 # Without .museattributes the conflict stands — verify conflict is reported.
298 assert not result.is_clean
299
300 def test_merge_ops_delete_on_only_one_side_is_clean(self) -> None:
301 plugin = MidiPlugin()
302 base = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi")
303 ours_snap = SnapshotManifest(files={"keep.mid": "k"}, domain="midi")
304 theirs_snap = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi")
305 ours_ops: list[DomainOp] = [_del("remove.mid", pos=None, cid="r")]
306 theirs_ops: list[DomainOp] = []
307
308 result = plugin.merge_ops(
309 base, ours_snap, theirs_snap, ours_ops, theirs_ops
310 )
311 assert result.is_clean is True
312 assert "keep.mid" in result.merged["files"]
313 assert "remove.mid" not in result.merged["files"]
314
315 def test_merge_ops_empty_changes_returns_base(self) -> None:
316 plugin = MidiPlugin()
317 base = SnapshotManifest(files={"f.mid": "h"}, domain="midi")
318 result = plugin.merge_ops(base, base, base, [], [])
319 assert result.is_clean is True
320 assert result.merged["files"] == {"f.mid": "h"}