cgcardona / muse public
test_core_merge_engine.py python
144 lines 4.6 KB
1d9234e8 Replace Maestro-coupled tests with new architecture test suite Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Tests for muse.core.merge_engine — three-way merge logic."""
2 from __future__ import annotations
3
4 import json
5 import pathlib
6
7 import pytest
8
9 from muse.core.merge_engine import (
10 MergeState,
11 apply_merge,
12 clear_merge_state,
13 detect_conflicts,
14 diff_snapshots,
15 find_merge_base,
16 read_merge_state,
17 write_merge_state,
18 )
19 from muse.core.store import CommitRecord, write_commit
20 import datetime
21
22
23 @pytest.fixture
24 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
25 muse_dir = tmp_path / ".muse"
26 (muse_dir / "commits").mkdir(parents=True)
27 (muse_dir / "refs" / "heads").mkdir(parents=True)
28 return tmp_path
29
30
31 def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> None:
32 write_commit(root, CommitRecord(
33 commit_id=cid,
34 repo_id="r",
35 branch="main",
36 snapshot_id=f"snap-{cid}",
37 message=cid,
38 committed_at=datetime.datetime.now(datetime.timezone.utc),
39 parent_commit_id=parent,
40 parent2_commit_id=parent2,
41 ))
42
43
44 class TestDiffSnapshots:
45 def test_no_change(self) -> None:
46 m = {"a.mid": "h1", "b.mid": "h2"}
47 assert diff_snapshots(m, m) == set()
48
49 def test_added(self) -> None:
50 assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"}
51
52 def test_removed(self) -> None:
53 assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"}
54
55 def test_modified(self) -> None:
56 assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"}
57
58
59 class TestDetectConflicts:
60 def test_no_conflict(self) -> None:
61 assert detect_conflicts({"a.mid"}, {"b.mid"}) == set()
62
63 def test_conflict(self) -> None:
64 assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}) == {"b.mid"}
65
66 def test_both_empty(self) -> None:
67 assert detect_conflicts(set(), set()) == set()
68
69
70 class TestApplyMerge:
71 def test_clean_merge(self) -> None:
72 base = {"a.mid": "h0", "b.mid": "h0"}
73 ours = {"a.mid": "h_ours", "b.mid": "h0"}
74 theirs = {"a.mid": "h0", "b.mid": "h_theirs"}
75 ours_changed = {"a.mid"}
76 theirs_changed = {"b.mid"}
77 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
78 assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"}
79
80 def test_conflict_paths_excluded(self) -> None:
81 base = {"a.mid": "h0"}
82 ours = {"a.mid": "h_ours"}
83 theirs = {"a.mid": "h_theirs"}
84 ours_changed = theirs_changed = {"a.mid"}
85 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"})
86 assert result == {"a.mid": "h0"} # Falls back to base
87
88 def test_ours_deletion_applied(self) -> None:
89 base = {"a.mid": "h0", "b.mid": "h0"}
90 ours = {"b.mid": "h0"} # a.mid deleted on ours
91 theirs = {"a.mid": "h0", "b.mid": "h0"}
92 result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set())
93 assert "a.mid" not in result
94
95
96 class TestMergeStateIO:
97 def test_write_and_read(self, repo: pathlib.Path) -> None:
98 write_merge_state(
99 repo,
100 base_commit="base",
101 ours_commit="ours",
102 theirs_commit="theirs",
103 conflict_paths=["a.mid", "b.mid"],
104 other_branch="feature/x",
105 )
106 state = read_merge_state(repo)
107 assert state is not None
108 assert state.base_commit == "base"
109 assert state.conflict_paths == ["a.mid", "b.mid"]
110 assert state.other_branch == "feature/x"
111
112 def test_read_no_state(self, repo: pathlib.Path) -> None:
113 assert read_merge_state(repo) is None
114
115 def test_clear(self, repo: pathlib.Path) -> None:
116 write_merge_state(repo, base_commit="b", ours_commit="o", theirs_commit="t", conflict_paths=[])
117 clear_merge_state(repo)
118 assert read_merge_state(repo) is None
119
120
121 class TestFindMergeBase:
122 def test_direct_parent(self, repo: pathlib.Path) -> None:
123 _commit(repo, "root")
124 _commit(repo, "a", parent="root")
125 _commit(repo, "b", parent="root")
126 base = find_merge_base(repo, "a", "b")
127 assert base == "root"
128
129 def test_same_commit(self, repo: pathlib.Path) -> None:
130 _commit(repo, "root")
131 base = find_merge_base(repo, "root", "root")
132 assert base == "root"
133
134 def test_linear_history(self, repo: pathlib.Path) -> None:
135 _commit(repo, "a")
136 _commit(repo, "b", parent="a")
137 _commit(repo, "c", parent="b")
138 base = find_merge_base(repo, "c", "b")
139 assert base == "b"
140
141 def test_no_common_ancestor(self, repo: pathlib.Path) -> None:
142 _commit(repo, "x")
143 _commit(repo, "y")
144 assert find_merge_base(repo, "x", "y") is None