test_core_merge_engine.py
python
| 1 | """Tests for muse.core.merge_engine — three-way merge logic. |
| 2 | |
| 3 | Extended in Phase 3 to cover the structured (operation-level) merge path via |
| 4 | :func:`~muse.core.op_transform.merge_structured` and the |
| 5 | :class:`~muse.domain.StructuredMergePlugin` integration. |
| 6 | """ |
| 7 | from __future__ import annotations |
| 8 | |
| 9 | import datetime |
| 10 | import json |
| 11 | import pathlib |
| 12 | |
| 13 | import pytest |
| 14 | |
| 15 | from muse.core.merge_engine import ( |
| 16 | MergeState, |
| 17 | apply_merge, |
| 18 | clear_merge_state, |
| 19 | detect_conflicts, |
| 20 | diff_snapshots, |
| 21 | find_merge_base, |
| 22 | read_merge_state, |
| 23 | write_merge_state, |
| 24 | ) |
| 25 | from muse.core.op_transform import MergeOpsResult, merge_op_lists, merge_structured |
| 26 | from muse.core.store import CommitRecord, write_commit |
| 27 | from muse.domain import ( |
| 28 | DeleteOp, |
| 29 | DomainOp, |
| 30 | InsertOp, |
| 31 | ReplaceOp, |
| 32 | SnapshotManifest, |
| 33 | StructuredDelta, |
| 34 | StructuredMergePlugin, |
| 35 | ) |
| 36 | from muse.plugins.music.plugin import MusicPlugin |
| 37 | |
| 38 | |
| 39 | @pytest.fixture |
| 40 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 41 | muse_dir = tmp_path / ".muse" |
| 42 | (muse_dir / "commits").mkdir(parents=True) |
| 43 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 44 | return tmp_path |
| 45 | |
| 46 | |
| 47 | def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> None: |
| 48 | write_commit(root, CommitRecord( |
| 49 | commit_id=cid, |
| 50 | repo_id="r", |
| 51 | branch="main", |
| 52 | snapshot_id=f"snap-{cid}", |
| 53 | message=cid, |
| 54 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 55 | parent_commit_id=parent, |
| 56 | parent2_commit_id=parent2, |
| 57 | )) |
| 58 | |
| 59 | |
| 60 | class TestDiffSnapshots: |
| 61 | def test_no_change(self) -> None: |
| 62 | m = {"a.mid": "h1", "b.mid": "h2"} |
| 63 | assert diff_snapshots(m, m) == set() |
| 64 | |
| 65 | def test_added(self) -> None: |
| 66 | assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"} |
| 67 | |
| 68 | def test_removed(self) -> None: |
| 69 | assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"} |
| 70 | |
| 71 | def test_modified(self) -> None: |
| 72 | assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"} |
| 73 | |
| 74 | |
| 75 | class TestDetectConflicts: |
| 76 | def test_no_conflict(self) -> None: |
| 77 | assert detect_conflicts({"a.mid"}, {"b.mid"}) == set() |
| 78 | |
| 79 | def test_conflict(self) -> None: |
| 80 | assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}) == {"b.mid"} |
| 81 | |
| 82 | def test_both_empty(self) -> None: |
| 83 | assert detect_conflicts(set(), set()) == set() |
| 84 | |
| 85 | |
| 86 | class TestApplyMerge: |
| 87 | def test_clean_merge(self) -> None: |
| 88 | base = {"a.mid": "h0", "b.mid": "h0"} |
| 89 | ours = {"a.mid": "h_ours", "b.mid": "h0"} |
| 90 | theirs = {"a.mid": "h0", "b.mid": "h_theirs"} |
| 91 | ours_changed = {"a.mid"} |
| 92 | theirs_changed = {"b.mid"} |
| 93 | result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set()) |
| 94 | assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"} |
| 95 | |
| 96 | def test_conflict_paths_excluded(self) -> None: |
| 97 | base = {"a.mid": "h0"} |
| 98 | ours = {"a.mid": "h_ours"} |
| 99 | theirs = {"a.mid": "h_theirs"} |
| 100 | ours_changed = theirs_changed = {"a.mid"} |
| 101 | result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"}) |
| 102 | assert result == {"a.mid": "h0"} # Falls back to base |
| 103 | |
| 104 | def test_ours_deletion_applied(self) -> None: |
| 105 | base = {"a.mid": "h0", "b.mid": "h0"} |
| 106 | ours = {"b.mid": "h0"} # a.mid deleted on ours |
| 107 | theirs = {"a.mid": "h0", "b.mid": "h0"} |
| 108 | result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set()) |
| 109 | assert "a.mid" not in result |
| 110 | |
| 111 | |
| 112 | class TestMergeStateIO: |
| 113 | def test_write_and_read(self, repo: pathlib.Path) -> None: |
| 114 | write_merge_state( |
| 115 | repo, |
| 116 | base_commit="base", |
| 117 | ours_commit="ours", |
| 118 | theirs_commit="theirs", |
| 119 | conflict_paths=["a.mid", "b.mid"], |
| 120 | other_branch="feature/x", |
| 121 | ) |
| 122 | state = read_merge_state(repo) |
| 123 | assert state is not None |
| 124 | assert state.base_commit == "base" |
| 125 | assert state.conflict_paths == ["a.mid", "b.mid"] |
| 126 | assert state.other_branch == "feature/x" |
| 127 | |
| 128 | def test_read_no_state(self, repo: pathlib.Path) -> None: |
| 129 | assert read_merge_state(repo) is None |
| 130 | |
| 131 | def test_clear(self, repo: pathlib.Path) -> None: |
| 132 | write_merge_state(repo, base_commit="b", ours_commit="o", theirs_commit="t", conflict_paths=[]) |
| 133 | clear_merge_state(repo) |
| 134 | assert read_merge_state(repo) is None |
| 135 | |
| 136 | |
| 137 | class TestFindMergeBase: |
| 138 | def test_direct_parent(self, repo: pathlib.Path) -> None: |
| 139 | _commit(repo, "root") |
| 140 | _commit(repo, "a", parent="root") |
| 141 | _commit(repo, "b", parent="root") |
| 142 | base = find_merge_base(repo, "a", "b") |
| 143 | assert base == "root" |
| 144 | |
| 145 | def test_same_commit(self, repo: pathlib.Path) -> None: |
| 146 | _commit(repo, "root") |
| 147 | base = find_merge_base(repo, "root", "root") |
| 148 | assert base == "root" |
| 149 | |
| 150 | def test_linear_history(self, repo: pathlib.Path) -> None: |
| 151 | _commit(repo, "a") |
| 152 | _commit(repo, "b", parent="a") |
| 153 | _commit(repo, "c", parent="b") |
| 154 | base = find_merge_base(repo, "c", "b") |
| 155 | assert base == "b" |
| 156 | |
| 157 | def test_no_common_ancestor(self, repo: pathlib.Path) -> None: |
| 158 | _commit(repo, "x") |
| 159 | _commit(repo, "y") |
| 160 | assert find_merge_base(repo, "x", "y") is None |
| 161 | |
| 162 | |
| 163 | # =========================================================================== |
| 164 | # Phase 3 — structured merge engine integration tests |
| 165 | # =========================================================================== |
| 166 | |
| 167 | |
| 168 | def _ins(addr: str, pos: int | None, cid: str) -> InsertOp: |
| 169 | return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid) |
| 170 | |
| 171 | |
| 172 | def _del(addr: str, pos: int | None, cid: str) -> DeleteOp: |
| 173 | return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid) |
| 174 | |
| 175 | |
| 176 | def _rep(addr: str, old: str, new: str) -> ReplaceOp: |
| 177 | return ReplaceOp( |
| 178 | op="replace", |
| 179 | address=addr, |
| 180 | position=None, |
| 181 | old_content_id=old, |
| 182 | new_content_id=new, |
| 183 | old_summary="old", |
| 184 | new_summary="new", |
| 185 | ) |
| 186 | |
| 187 | |
| 188 | def _delta(ops: list[DomainOp]) -> StructuredDelta: |
| 189 | return StructuredDelta(domain="music", ops=ops, summary="test") |
| 190 | |
| 191 | |
| 192 | class TestMergeStructuredIntegration: |
| 193 | """Verify merge_structured delegates correctly to merge_op_lists.""" |
| 194 | |
| 195 | def test_clean_non_overlapping_file_ops(self) -> None: |
| 196 | ours = _delta([_ins("a.mid", pos=0, cid="a-hash")]) |
| 197 | theirs = _delta([_ins("b.mid", pos=0, cid="b-hash")]) |
| 198 | result = merge_structured(_delta([]), ours, theirs) |
| 199 | assert result.is_clean is True |
| 200 | assert len(result.merged_ops) == 2 |
| 201 | |
| 202 | def test_conflicting_same_address_replaces_detected(self) -> None: |
| 203 | ours = _delta([_rep("shared.mid", "old", "v-ours")]) |
| 204 | theirs = _delta([_rep("shared.mid", "old", "v-theirs")]) |
| 205 | result = merge_structured(_delta([]), ours, theirs) |
| 206 | assert result.is_clean is False |
| 207 | assert len(result.conflict_ops) == 1 |
| 208 | |
| 209 | def test_base_ops_kept_by_both_sides_preserved(self) -> None: |
| 210 | shared = _ins("base.mid", pos=0, cid="base-cid") |
| 211 | result = merge_structured( |
| 212 | _delta([shared]), |
| 213 | _delta([shared]), |
| 214 | _delta([shared]), |
| 215 | ) |
| 216 | assert result.is_clean is True |
| 217 | assert any(_op_key_tuple(op) == _op_key_tuple(shared) for op in result.merged_ops) |
| 218 | |
| 219 | def test_position_adjustment_in_structured_merge(self) -> None: |
| 220 | """Non-conflicting note inserts get position-adjusted in structured merge.""" |
| 221 | ours = _delta([_ins("lead.mid", pos=3, cid="note-A")]) |
| 222 | theirs = _delta([_ins("lead.mid", pos=7, cid="note-B")]) |
| 223 | result = merge_structured(_delta([]), ours, theirs) |
| 224 | assert result.is_clean is True |
| 225 | pos_by_cid = { |
| 226 | op["content_id"]: op["position"] |
| 227 | for op in result.merged_ops |
| 228 | if op["op"] == "insert" |
| 229 | } |
| 230 | # note-A(3): no theirs ≤ 3 → stays 3 |
| 231 | assert pos_by_cid["note-A"] == 3 |
| 232 | # note-B(7): ours A(3) ≤ 7 → 7+1 = 8 |
| 233 | assert pos_by_cid["note-B"] == 8 |
| 234 | |
| 235 | |
| 236 | def _op_key_tuple(op: DomainOp) -> tuple[str, ...]: |
| 237 | """Re-implementation of _op_key for test assertions.""" |
| 238 | if op["op"] == "insert": |
| 239 | return ("insert", op["address"], str(op["position"]), op["content_id"]) |
| 240 | if op["op"] == "delete": |
| 241 | return ("delete", op["address"], str(op["position"]), op["content_id"]) |
| 242 | if op["op"] == "replace": |
| 243 | return ("replace", op["address"], str(op["position"]), op["old_content_id"], op["new_content_id"]) |
| 244 | return (op["op"], op["address"]) |
| 245 | |
| 246 | |
| 247 | class TestStructuredMergePluginProtocol: |
| 248 | """Verify MusicPlugin satisfies the StructuredMergePlugin protocol.""" |
| 249 | |
| 250 | def test_music_plugin_isinstance_structured_merge_plugin(self) -> None: |
| 251 | plugin = MusicPlugin() |
| 252 | assert isinstance(plugin, StructuredMergePlugin) |
| 253 | |
| 254 | def test_merge_ops_non_conflicting_files_is_clean(self) -> None: |
| 255 | plugin = MusicPlugin() |
| 256 | base = SnapshotManifest(files={}, domain="music") |
| 257 | ours_snap = SnapshotManifest(files={"a.mid": "hash-a"}, domain="music") |
| 258 | theirs_snap = SnapshotManifest(files={"b.mid": "hash-b"}, domain="music") |
| 259 | ours_ops: list[DomainOp] = [_ins("a.mid", pos=None, cid="hash-a")] |
| 260 | theirs_ops: list[DomainOp] = [_ins("b.mid", pos=None, cid="hash-b")] |
| 261 | |
| 262 | result = plugin.merge_ops( |
| 263 | base, ours_snap, theirs_snap, ours_ops, theirs_ops |
| 264 | ) |
| 265 | assert result.is_clean is True |
| 266 | assert "a.mid" in result.merged["files"] |
| 267 | assert "b.mid" in result.merged["files"] |
| 268 | |
| 269 | def test_merge_ops_conflicting_same_file_replace_not_clean(self) -> None: |
| 270 | plugin = MusicPlugin() |
| 271 | base = SnapshotManifest(files={"f.mid": "base-hash"}, domain="music") |
| 272 | ours_snap = SnapshotManifest(files={"f.mid": "ours-hash"}, domain="music") |
| 273 | theirs_snap = SnapshotManifest(files={"f.mid": "theirs-hash"}, domain="music") |
| 274 | ours_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "ours-hash")] |
| 275 | theirs_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "theirs-hash")] |
| 276 | |
| 277 | result = plugin.merge_ops( |
| 278 | base, ours_snap, theirs_snap, ours_ops, theirs_ops |
| 279 | ) |
| 280 | assert not result.is_clean |
| 281 | assert "f.mid" in result.conflicts |
| 282 | |
| 283 | def test_merge_ops_ours_strategy_resolves_conflict(self) -> None: |
| 284 | plugin = MusicPlugin() |
| 285 | base = SnapshotManifest(files={"f.mid": "base"}, domain="music") |
| 286 | ours_snap = SnapshotManifest(files={"f.mid": "ours-v"}, domain="music") |
| 287 | theirs_snap = SnapshotManifest(files={"f.mid": "theirs-v"}, domain="music") |
| 288 | ours_ops: list[DomainOp] = [_rep("f.mid", "base", "ours-v")] |
| 289 | theirs_ops: list[DomainOp] = [_rep("f.mid", "base", "theirs-v")] |
| 290 | |
| 291 | result = plugin.merge_ops( |
| 292 | base, |
| 293 | ours_snap, |
| 294 | theirs_snap, |
| 295 | ours_ops, |
| 296 | theirs_ops, |
| 297 | ) |
| 298 | # Without .museattributes the conflict stands — verify conflict is reported. |
| 299 | assert not result.is_clean |
| 300 | |
| 301 | def test_merge_ops_delete_on_only_one_side_is_clean(self) -> None: |
| 302 | plugin = MusicPlugin() |
| 303 | base = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="music") |
| 304 | ours_snap = SnapshotManifest(files={"keep.mid": "k"}, domain="music") |
| 305 | theirs_snap = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="music") |
| 306 | ours_ops: list[DomainOp] = [_del("remove.mid", pos=None, cid="r")] |
| 307 | theirs_ops: list[DomainOp] = [] |
| 308 | |
| 309 | result = plugin.merge_ops( |
| 310 | base, ours_snap, theirs_snap, ours_ops, theirs_ops |
| 311 | ) |
| 312 | assert result.is_clean is True |
| 313 | assert "keep.mid" in result.merged["files"] |
| 314 | assert "remove.mid" not in result.merged["files"] |
| 315 | |
| 316 | def test_merge_ops_empty_changes_returns_base(self) -> None: |
| 317 | plugin = MusicPlugin() |
| 318 | base = SnapshotManifest(files={"f.mid": "h"}, domain="music") |
| 319 | result = plugin.merge_ops(base, base, base, [], []) |
| 320 | assert result.is_clean is True |
| 321 | assert result.merged["files"] == {"f.mid": "h"} |