cgcardona / muse public
test_stress_merge_correctness.py python
379 lines 15.0 KB
8d5137ed fix(security): full surface hardening — validation, path containment, p… Gabriel Cardona <cgcardona@gmail.com> 10h ago
1 """Adversarial stress tests for the three-way merge engine.
2
3 Covers:
4 - apply_merge edge cases: both sides delete the same file, theirs-only delete,
5 ours-only add, both add the same file with identical hash (clean).
6 - detect_conflicts: full combinatorial (empty sets, symmetric, one-sided).
7 - diff_snapshots: many files added / removed / modified.
8 - diff_snapshots then detect_conflicts → apply_merge pipeline correctness.
9 - Large manifest diffs (500 paths).
10 - MergeState round-trip with and without optional fields.
11 - Corrupt MERGE_STATE.json is silently ignored (returns None).
12 - apply_resolution raises FileNotFoundError for absent object.
13 """
14
15 import json
16 import pathlib
17 import secrets
18 import hashlib
19 import datetime
20
21 import pytest
22
23 from muse.core.merge_engine import (
24 MergeState,
25 apply_merge,
26 apply_resolution,
27 clear_merge_state,
28 detect_conflicts,
29 diff_snapshots,
30 read_merge_state,
31 write_merge_state,
32 )
33 from muse.core.object_store import write_object
34
35
36 # ---------------------------------------------------------------------------
37 # Helpers
38 # ---------------------------------------------------------------------------
39
40
41 def _h(label: str) -> str:
42 return hashlib.sha256(label.encode()).hexdigest()
43
44
45 @pytest.fixture
46 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
47 muse = tmp_path / ".muse"
48 muse.mkdir()
49 (muse / "objects").mkdir()
50 return tmp_path
51
52
53 # ===========================================================================
54 # diff_snapshots — exhaustive
55 # ===========================================================================
56
57
58 class TestDiffSnapshotsExhaustive:
59 def test_identical_manifests_no_diff(self) -> None:
60 m = {f"file-{i}.mid": _h(f"content-{i}") for i in range(100)}
61 assert diff_snapshots(m, m) == set()
62
63 def test_all_files_added(self) -> None:
64 added = {f"new-{i}.mid": _h(f"new-{i}") for i in range(50)}
65 result = diff_snapshots({}, added)
66 assert result == set(added.keys())
67
68 def test_all_files_removed(self) -> None:
69 original = {f"old-{i}.mid": _h(f"old-{i}") for i in range(50)}
70 result = diff_snapshots(original, {})
71 assert result == set(original.keys())
72
73 def test_all_files_modified(self) -> None:
74 base = {f"f{i}.mid": _h(f"v1-{i}") for i in range(50)}
75 target = {f"f{i}.mid": _h(f"v2-{i}") for i in range(50)}
76 result = diff_snapshots(base, target)
77 assert result == set(base.keys())
78
79 def test_mixed_add_remove_modify(self) -> None:
80 base = {"keep.mid": _h("keep"), "remove.mid": _h("remove"), "modify.mid": _h("old")}
81 target = {"keep.mid": _h("keep"), "add.mid": _h("new"), "modify.mid": _h("new")}
82 result = diff_snapshots(base, target)
83 assert result == {"remove.mid", "add.mid", "modify.mid"}
84 assert "keep.mid" not in result
85
86 def test_500_file_manifest_correct_diff(self) -> None:
87 base = {f"path/to/file-{i:04d}.mid": _h(f"v1-{i}") for i in range(500)}
88 target = dict(base)
89 # Modify 100, add 50, remove 50.
90 modified = set()
91 for i in range(0, 100):
92 key = f"path/to/file-{i:04d}.mid"
93 target[key] = _h(f"v2-{i}")
94 modified.add(key)
95 added = set()
96 for i in range(500, 550):
97 key = f"path/to/new-{i}.mid"
98 target[key] = _h(f"new-{i}")
99 added.add(key)
100 removed = set()
101 for i in range(450, 500):
102 key = f"path/to/file-{i:04d}.mid"
103 del target[key]
104 removed.add(key)
105 result = diff_snapshots(base, target)
106 assert result == modified | added | removed
107
108 def test_symmetric_diff_not_required(self) -> None:
109 """diff_snapshots is not symmetric: order matters."""
110 a = {"f.mid": _h("hash-a")}
111 b = {"f.mid": _h("hash-b")}
112 assert diff_snapshots(a, b) == {"f.mid"}
113 assert diff_snapshots(b, a) == {"f.mid"}
114
115
116 # ===========================================================================
117 # detect_conflicts — exhaustive
118 # ===========================================================================
119
120
121 class TestDetectConflictsExhaustive:
122 def test_empty_both_sides(self) -> None:
123 assert detect_conflicts(set(), set()) == set()
124
125 def test_empty_ours(self) -> None:
126 assert detect_conflicts(set(), {"a.mid", "b.mid"}) == set()
127
128 def test_empty_theirs(self) -> None:
129 assert detect_conflicts({"a.mid", "b.mid"}, set()) == set()
130
131 def test_full_overlap(self) -> None:
132 s = {f"f{i}.mid" for i in range(50)}
133 assert detect_conflicts(s, s) == s
134
135 def test_no_overlap(self) -> None:
136 ours = {f"ours-{i}.mid" for i in range(25)}
137 theirs = {f"theirs-{i}.mid" for i in range(25)}
138 assert detect_conflicts(ours, theirs) == set()
139
140 def test_partial_overlap(self) -> None:
141 ours = {"shared.mid", "only-ours.mid"}
142 theirs = {"shared.mid", "only-theirs.mid"}
143 assert detect_conflicts(ours, theirs) == {"shared.mid"}
144
145 def test_commutativity(self) -> None:
146 a = {f"f{i}" for i in range(30)}
147 b = {f"f{i}" for i in range(20, 50)}
148 assert detect_conflicts(a, b) == detect_conflicts(b, a)
149
150
151 # ===========================================================================
152 # apply_merge — exhaustive
153 # ===========================================================================
154
155
156 class TestApplyMergeExhaustive:
157 def test_both_sides_delete_same_file_not_conflicting(self) -> None:
158 """Both sides delete the same file — no conflict, file absent in merged."""
159 base = {"shared.mid": _h("shared")}
160 ours = {}
161 theirs = {}
162 ours_changed = {"shared.mid"}
163 theirs_changed = {"shared.mid"}
164 # No conflict paths specified (caller decided it's not a conflict).
165 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
166 assert "shared.mid" not in result
167
168 def test_only_theirs_adds_file(self) -> None:
169 base: dict[str, str] = {}
170 ours: dict[str, str] = {}
171 theirs = {"new.mid": _h("new")}
172 result = apply_merge(base, ours, theirs, set(), {"new.mid"}, set())
173 assert result["new.mid"] == _h("new")
174
175 def test_only_ours_adds_file(self) -> None:
176 base: dict[str, str] = {}
177 theirs: dict[str, str] = {}
178 ours = {"new.mid": _h("ours-new")}
179 result = apply_merge(base, ours, theirs, {"new.mid"}, set(), set())
180 assert result["new.mid"] == _h("ours-new")
181
182 def test_both_add_same_file_same_hash_no_conflict(self) -> None:
183 """Both sides independently add the same file with the same content hash — no conflict."""
184 base: dict[str, str] = {}
185 h = _h("identical-content")
186 ours = {"new.mid": h}
187 theirs = {"new.mid": h}
188 # Caller detects: same hash = no conflict.
189 result = apply_merge(base, ours, theirs, {"new.mid"}, {"new.mid"}, set())
190 assert result["new.mid"] == h
191
192 def test_conflict_path_falls_back_to_base(self) -> None:
193 base = {"conflict.mid": _h("base")}
194 ours = {"conflict.mid": _h("ours")}
195 theirs = {"conflict.mid": _h("theirs")}
196 result = apply_merge(
197 base, ours, theirs,
198 {"conflict.mid"}, {"conflict.mid"}, {"conflict.mid"}
199 )
200 # Conflict paths are excluded → base value is kept.
201 assert result["conflict.mid"] == _h("base")
202
203 def test_theirs_deletion_removes_from_merged(self) -> None:
204 base = {"f.mid": _h("f"), "g.mid": _h("g")}
205 ours = {"f.mid": _h("f"), "g.mid": _h("g")}
206 theirs = {"f.mid": _h("f")} # g.mid deleted on theirs
207 result = apply_merge(base, ours, theirs, set(), {"g.mid"}, set())
208 assert "g.mid" not in result
209
210 def test_unrelated_changes_both_preserved(self) -> None:
211 base = {"a.mid": _h("a0"), "b.mid": _h("b0"), "c.mid": _h("c0")}
212 ours = {"a.mid": _h("a1"), "b.mid": _h("b0"), "c.mid": _h("c0")}
213 theirs = {"a.mid": _h("a0"), "b.mid": _h("b1"), "c.mid": _h("c0")}
214 result = apply_merge(
215 base, ours, theirs, {"a.mid"}, {"b.mid"}, set()
216 )
217 assert result["a.mid"] == _h("a1")
218 assert result["b.mid"] == _h("b1")
219 assert result["c.mid"] == _h("c0")
220
221 def test_large_manifest_clean_merge(self) -> None:
222 """200 files: 100 changed by ours, 100 changed by theirs, no overlap."""
223 base = {f"f{i:03d}.mid": _h(f"v0-{i}") for i in range(200)}
224 ours = dict(base)
225 theirs = dict(base)
226 ours_changed = set()
227 theirs_changed = set()
228 for i in range(100):
229 ours[f"f{i:03d}.mid"] = _h(f"v-ours-{i}")
230 ours_changed.add(f"f{i:03d}.mid")
231 for i in range(100, 200):
232 theirs[f"f{i:03d}.mid"] = _h(f"v-theirs-{i}")
233 theirs_changed.add(f"f{i:03d}.mid")
234 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
235 for i in range(100):
236 assert result[f"f{i:03d}.mid"] == _h(f"v-ours-{i}")
237 for i in range(100, 200):
238 assert result[f"f{i:03d}.mid"] == _h(f"v-theirs-{i}")
239
240 def test_pipeline_diff_detect_merge(self) -> None:
241 """End-to-end: run diff → detect → apply and verify correctness.
242
243 Scenario:
244 base = {conflict.mid, ours-only.mid, theirs-only.mid, untouched.mid}
245 ours: modifies conflict.mid, deletes ours-only.mid (only ours touches it)
246 theirs: modifies conflict.mid, deletes theirs-only.mid (only theirs touches it)
247
248 Expected results:
249 conflict.mid: bilateral conflict → stays at base value
250 ours-only.mid: deleted only by ours → deleted in merged
251 theirs-only.mid: deleted only by theirs → deleted in merged
252 untouched.mid: neither side changed → stays at base
253 """
254 base = {
255 "conflict.mid": _h("c0"),
256 "ours-only.mid": _h("o0"),
257 "theirs-only.mid": _h("t0"),
258 "untouched.mid": _h("u0"),
259 }
260 # ours: modifies conflict.mid, deletes ours-only.mid, leaves theirs-only and untouched
261 ours = {
262 "conflict.mid": _h("c-ours"),
263 "theirs-only.mid": _h("t0"),
264 "untouched.mid": _h("u0"),
265 }
266 # theirs: modifies conflict.mid, deletes theirs-only.mid, leaves ours-only and untouched
267 theirs = {
268 "conflict.mid": _h("c-theirs"),
269 "ours-only.mid": _h("o0"),
270 "untouched.mid": _h("u0"),
271 }
272
273 ours_changed = diff_snapshots(base, ours)
274 theirs_changed = diff_snapshots(base, theirs)
275 conflicts = detect_conflicts(ours_changed, theirs_changed)
276
277 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts)
278
279 # conflict.mid: both sides changed → stays at base (excluded from result but key present from base).
280 assert result["conflict.mid"] == _h("c0")
281 # ours-only.mid: deleted by ours only → absent in merged.
282 assert "ours-only.mid" not in result
283 # theirs-only.mid: deleted by theirs only → absent in merged.
284 assert "theirs-only.mid" not in result
285 # untouched.mid: neither side touched → stays at base.
286 assert result["untouched.mid"] == _h("u0")
287
288
289 # ===========================================================================
290 # MergeState I/O — adversarial
291 # ===========================================================================
292
293
294 class TestMergeStateIOAdversarial:
295 def test_conflict_paths_sorted_on_write(self, repo: pathlib.Path) -> None:
296 write_merge_state(
297 repo, base_commit="b", ours_commit="o", theirs_commit="t",
298 conflict_paths=["z.mid", "a.mid", "m.mid"],
299 )
300 state = read_merge_state(repo)
301 assert state is not None
302 assert state.conflict_paths == ["a.mid", "m.mid", "z.mid"]
303
304 def test_optional_other_branch_absent(self, repo: pathlib.Path) -> None:
305 write_merge_state(
306 repo, base_commit="b", ours_commit="o", theirs_commit="t",
307 conflict_paths=[],
308 )
309 state = read_merge_state(repo)
310 assert state is not None
311 assert state.other_branch is None
312
313 def test_corrupt_json_returns_none(self, repo: pathlib.Path) -> None:
314 path = repo / ".muse" / "MERGE_STATE.json"
315 path.write_text("{not valid json")
316 assert read_merge_state(repo) is None
317
318 def test_empty_json_returns_none_gracefully(self, repo: pathlib.Path) -> None:
319 path = repo / ".muse" / "MERGE_STATE.json"
320 path.write_text("")
321 assert read_merge_state(repo) is None
322
323 def test_missing_file_returns_none(self, repo: pathlib.Path) -> None:
324 assert read_merge_state(repo) is None
325
326 def test_clear_idempotent(self, repo: pathlib.Path) -> None:
327 # Clearing when no state file exists should not raise.
328 clear_merge_state(repo)
329 clear_merge_state(repo)
330
331 def test_write_overwrite_previous(self, repo: pathlib.Path) -> None:
332 b2 = "b" * 64
333 o2 = "c" * 64
334 t2 = "d" * 64
335 write_merge_state(repo, base_commit="1" * 64, ours_commit="2" * 64, theirs_commit="3" * 64, conflict_paths=["a.mid"])
336 write_merge_state(repo, base_commit=b2, ours_commit=o2, theirs_commit=t2, conflict_paths=["b.mid"])
337 state = read_merge_state(repo)
338 assert state is not None
339 assert state.base_commit == b2
340 assert state.conflict_paths == ["b.mid"]
341
342 def test_100_conflict_paths_round_trip(self, repo: pathlib.Path) -> None:
343 paths = [f"track-{i:03d}.mid" for i in range(100)]
344 write_merge_state(repo, base_commit="b" * 64, ours_commit="c" * 64, theirs_commit="d" * 64, conflict_paths=paths)
345 state = read_merge_state(repo)
346 assert state is not None
347 assert state.conflict_paths == sorted(paths)
348
349 def test_merge_state_is_frozen_dataclass(self) -> None:
350 ms = MergeState(conflict_paths=["a.mid"], base_commit="b")
351 with pytest.raises((AttributeError, TypeError)):
352 ms.__setattr__("base_commit", "new")
353
354
355 # ===========================================================================
356 # apply_resolution
357 # ===========================================================================
358
359
360 class TestApplyResolution:
361 def test_resolution_restores_correct_content(self, repo: pathlib.Path) -> None:
362 data = b"resolved content"
363 oid = hashlib.sha256(data).hexdigest()
364 write_object(repo, oid, data)
365 (repo / "muse-work").mkdir()
366 apply_resolution(repo, "beat.mid", oid)
367 restored = (repo / "muse-work" / "beat.mid").read_bytes()
368 assert restored == data
369
370 def test_resolution_creates_nested_dirs(self, repo: pathlib.Path) -> None:
371 data = b"nested file"
372 oid = hashlib.sha256(data).hexdigest()
373 write_object(repo, oid, data)
374 apply_resolution(repo, "sub/dir/beat.mid", oid)
375 assert (repo / "muse-work" / "sub" / "dir" / "beat.mid").read_bytes() == data
376
377 def test_resolution_missing_object_raises(self, repo: pathlib.Path) -> None:
378 with pytest.raises(FileNotFoundError):
379 apply_resolution(repo, "beat.mid", "a" * 64)