cgcardona / muse public
test_core_merge_engine.py python
323 lines 11.8 KB
8d5137ed fix(security): full surface hardening — validation, path containment, p… Gabriel Cardona <cgcardona@gmail.com> 10h ago
1 """Tests for muse.core.merge_engine — three-way merge logic.
2
3 Extended to cover the structured (operation-level) merge path via
4 :func:`~muse.core.op_transform.merge_structured` and the
5 :class:`~muse.domain.StructuredMergePlugin` integration.
6 """
7
8 import datetime
9 import json
10 import pathlib
11
12 import pytest
13
14 from muse.core.merge_engine import (
15 MergeState,
16 apply_merge,
17 clear_merge_state,
18 detect_conflicts,
19 diff_snapshots,
20 find_merge_base,
21 read_merge_state,
22 write_merge_state,
23 )
24 from muse.core.op_transform import MergeOpsResult, merge_op_lists, merge_structured
25 from muse.core.store import CommitRecord, write_commit
26 from muse.domain import (
27 DeleteOp,
28 DomainOp,
29 InsertOp,
30 ReplaceOp,
31 SnapshotManifest,
32 StructuredDelta,
33 StructuredMergePlugin,
34 )
35 from muse.plugins.midi.plugin import MidiPlugin
36
37
38 @pytest.fixture
39 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
40 muse_dir = tmp_path / ".muse"
41 (muse_dir / "commits").mkdir(parents=True)
42 (muse_dir / "refs" / "heads").mkdir(parents=True)
43 return tmp_path
44
45
46 def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> None:
47 write_commit(root, CommitRecord(
48 commit_id=cid,
49 repo_id="r",
50 branch="main",
51 snapshot_id=f"snap-{cid}",
52 message=cid,
53 committed_at=datetime.datetime.now(datetime.timezone.utc),
54 parent_commit_id=parent,
55 parent2_commit_id=parent2,
56 ))
57
58
59 class TestDiffSnapshots:
60 def test_no_change(self) -> None:
61 m = {"a.mid": "h1", "b.mid": "h2"}
62 assert diff_snapshots(m, m) == set()
63
64 def test_added(self) -> None:
65 assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"}
66
67 def test_removed(self) -> None:
68 assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"}
69
70 def test_modified(self) -> None:
71 assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"}
72
73
74 class TestDetectConflicts:
75 def test_no_conflict(self) -> None:
76 assert detect_conflicts({"a.mid"}, {"b.mid"}) == set()
77
78 def test_conflict(self) -> None:
79 assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}) == {"b.mid"}
80
81 def test_both_empty(self) -> None:
82 assert detect_conflicts(set(), set()) == set()
83
84
85 class TestApplyMerge:
86 def test_clean_merge(self) -> None:
87 base = {"a.mid": "h0", "b.mid": "h0"}
88 ours = {"a.mid": "h_ours", "b.mid": "h0"}
89 theirs = {"a.mid": "h0", "b.mid": "h_theirs"}
90 ours_changed = {"a.mid"}
91 theirs_changed = {"b.mid"}
92 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
93 assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"}
94
95 def test_conflict_paths_excluded(self) -> None:
96 base = {"a.mid": "h0"}
97 ours = {"a.mid": "h_ours"}
98 theirs = {"a.mid": "h_theirs"}
99 ours_changed = theirs_changed = {"a.mid"}
100 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"})
101 assert result == {"a.mid": "h0"} # Falls back to base
102
103 def test_ours_deletion_applied(self) -> None:
104 base = {"a.mid": "h0", "b.mid": "h0"}
105 ours = {"b.mid": "h0"} # a.mid deleted on ours
106 theirs = {"a.mid": "h0", "b.mid": "h0"}
107 result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set())
108 assert "a.mid" not in result
109
110
111 class TestMergeStateIO:
112 def test_write_and_read(self, repo: pathlib.Path) -> None:
113 base_id = "b" * 64
114 ours_id = "1" * 64
115 theirs_id = "2" * 64
116 write_merge_state(
117 repo,
118 base_commit=base_id,
119 ours_commit=ours_id,
120 theirs_commit=theirs_id,
121 conflict_paths=["a.mid", "b.mid"],
122 other_branch="feature/x",
123 )
124 state = read_merge_state(repo)
125 assert state is not None
126 assert state.base_commit == base_id
127 assert state.conflict_paths == ["a.mid", "b.mid"]
128 assert state.other_branch == "feature/x"
129
130 def test_read_no_state(self, repo: pathlib.Path) -> None:
131 assert read_merge_state(repo) is None
132
133 def test_clear(self, repo: pathlib.Path) -> None:
134 write_merge_state(repo, base_commit="b" * 64, ours_commit="c" * 64, theirs_commit="d" * 64, conflict_paths=[])
135 clear_merge_state(repo)
136 assert read_merge_state(repo) is None
137
138
139 class TestFindMergeBase:
140 def test_direct_parent(self, repo: pathlib.Path) -> None:
141 _commit(repo, "root")
142 _commit(repo, "a", parent="root")
143 _commit(repo, "b", parent="root")
144 base = find_merge_base(repo, "a", "b")
145 assert base == "root"
146
147 def test_same_commit(self, repo: pathlib.Path) -> None:
148 _commit(repo, "root")
149 base = find_merge_base(repo, "root", "root")
150 assert base == "root"
151
152 def test_linear_history(self, repo: pathlib.Path) -> None:
153 _commit(repo, "a")
154 _commit(repo, "b", parent="a")
155 _commit(repo, "c", parent="b")
156 base = find_merge_base(repo, "c", "b")
157 assert base == "b"
158
159 def test_no_common_ancestor(self, repo: pathlib.Path) -> None:
160 _commit(repo, "x")
161 _commit(repo, "y")
162 assert find_merge_base(repo, "x", "y") is None
163
164
165 # ===========================================================================
166 # Structured merge engine integration tests
167 # ===========================================================================
168
169
170 def _ins(addr: str, pos: int | None, cid: str) -> InsertOp:
171 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
172
173
174 def _del(addr: str, pos: int | None, cid: str) -> DeleteOp:
175 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
176
177
178 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
179 return ReplaceOp(
180 op="replace",
181 address=addr,
182 position=None,
183 old_content_id=old,
184 new_content_id=new,
185 old_summary="old",
186 new_summary="new",
187 )
188
189
190 def _delta(ops: list[DomainOp]) -> StructuredDelta:
191 return StructuredDelta(domain="midi", ops=ops, summary="test")
192
193
194 class TestMergeStructuredIntegration:
195 """Verify merge_structured delegates correctly to merge_op_lists."""
196
197 def test_clean_non_overlapping_file_ops(self) -> None:
198 ours = _delta([_ins("a.mid", pos=0, cid="a-hash")])
199 theirs = _delta([_ins("b.mid", pos=0, cid="b-hash")])
200 result = merge_structured(_delta([]), ours, theirs)
201 assert result.is_clean is True
202 assert len(result.merged_ops) == 2
203
204 def test_conflicting_same_address_replaces_detected(self) -> None:
205 ours = _delta([_rep("shared.mid", "old", "v-ours")])
206 theirs = _delta([_rep("shared.mid", "old", "v-theirs")])
207 result = merge_structured(_delta([]), ours, theirs)
208 assert result.is_clean is False
209 assert len(result.conflict_ops) == 1
210
211 def test_base_ops_kept_by_both_sides_preserved(self) -> None:
212 shared = _ins("base.mid", pos=0, cid="base-cid")
213 result = merge_structured(
214 _delta([shared]),
215 _delta([shared]),
216 _delta([shared]),
217 )
218 assert result.is_clean is True
219 assert any(_op_key_tuple(op) == _op_key_tuple(shared) for op in result.merged_ops)
220
221 def test_position_adjustment_in_structured_merge(self) -> None:
222 """Non-conflicting note inserts get position-adjusted in structured merge."""
223 ours = _delta([_ins("lead.mid", pos=3, cid="note-A")])
224 theirs = _delta([_ins("lead.mid", pos=7, cid="note-B")])
225 result = merge_structured(_delta([]), ours, theirs)
226 assert result.is_clean is True
227 pos_by_cid = {
228 op["content_id"]: op["position"]
229 for op in result.merged_ops
230 if op["op"] == "insert"
231 }
232 # note-A(3): no theirs ≤ 3 → stays 3
233 assert pos_by_cid["note-A"] == 3
234 # note-B(7): ours A(3) ≤ 7 → 7+1 = 8
235 assert pos_by_cid["note-B"] == 8
236
237
238 def _op_key_tuple(op: DomainOp) -> tuple[str, ...]:
239 """Re-implementation of _op_key for test assertions."""
240 if op["op"] == "insert":
241 return ("insert", op["address"], str(op["position"]), op["content_id"])
242 if op["op"] == "delete":
243 return ("delete", op["address"], str(op["position"]), op["content_id"])
244 if op["op"] == "replace":
245 return ("replace", op["address"], str(op["position"]), op["old_content_id"], op["new_content_id"])
246 return (op["op"], op["address"])
247
248
249 class TestStructuredMergePluginProtocol:
250 """Verify MidiPlugin satisfies the StructuredMergePlugin protocol."""
251
252 def test_midi_plugin_isinstance_structured_merge_plugin(self) -> None:
253 plugin = MidiPlugin()
254 assert isinstance(plugin, StructuredMergePlugin)
255
256 def test_merge_ops_non_conflicting_files_is_clean(self) -> None:
257 plugin = MidiPlugin()
258 base = SnapshotManifest(files={}, domain="midi")
259 ours_snap = SnapshotManifest(files={"a.mid": "hash-a"}, domain="midi")
260 theirs_snap = SnapshotManifest(files={"b.mid": "hash-b"}, domain="midi")
261 ours_ops: list[DomainOp] = [_ins("a.mid", pos=None, cid="hash-a")]
262 theirs_ops: list[DomainOp] = [_ins("b.mid", pos=None, cid="hash-b")]
263
264 result = plugin.merge_ops(
265 base, ours_snap, theirs_snap, ours_ops, theirs_ops
266 )
267 assert result.is_clean is True
268 assert "a.mid" in result.merged["files"]
269 assert "b.mid" in result.merged["files"]
270
271 def test_merge_ops_conflicting_same_file_replace_not_clean(self) -> None:
272 plugin = MidiPlugin()
273 base = SnapshotManifest(files={"f.mid": "base-hash"}, domain="midi")
274 ours_snap = SnapshotManifest(files={"f.mid": "ours-hash"}, domain="midi")
275 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-hash"}, domain="midi")
276 ours_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "ours-hash")]
277 theirs_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "theirs-hash")]
278
279 result = plugin.merge_ops(
280 base, ours_snap, theirs_snap, ours_ops, theirs_ops
281 )
282 assert not result.is_clean
283 assert "f.mid" in result.conflicts
284
285 def test_merge_ops_ours_strategy_resolves_conflict(self) -> None:
286 plugin = MidiPlugin()
287 base = SnapshotManifest(files={"f.mid": "base"}, domain="midi")
288 ours_snap = SnapshotManifest(files={"f.mid": "ours-v"}, domain="midi")
289 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-v"}, domain="midi")
290 ours_ops: list[DomainOp] = [_rep("f.mid", "base", "ours-v")]
291 theirs_ops: list[DomainOp] = [_rep("f.mid", "base", "theirs-v")]
292
293 result = plugin.merge_ops(
294 base,
295 ours_snap,
296 theirs_snap,
297 ours_ops,
298 theirs_ops,
299 )
300 # Without .museattributes the conflict stands — verify conflict is reported.
301 assert not result.is_clean
302
303 def test_merge_ops_delete_on_only_one_side_is_clean(self) -> None:
304 plugin = MidiPlugin()
305 base = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi")
306 ours_snap = SnapshotManifest(files={"keep.mid": "k"}, domain="midi")
307 theirs_snap = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi")
308 ours_ops: list[DomainOp] = [_del("remove.mid", pos=None, cid="r")]
309 theirs_ops: list[DomainOp] = []
310
311 result = plugin.merge_ops(
312 base, ours_snap, theirs_snap, ours_ops, theirs_ops
313 )
314 assert result.is_clean is True
315 assert "keep.mid" in result.merged["files"]
316 assert "remove.mid" not in result.merged["files"]
317
318 def test_merge_ops_empty_changes_returns_base(self) -> None:
319 plugin = MidiPlugin()
320 base = SnapshotManifest(files={"f.mid": "h"}, domain="midi")
321 result = plugin.merge_ops(base, base, base, [], [])
322 assert result.is_clean is True
323 assert result.merged["files"] == {"f.mid": "h"}