cgcardona / muse public
test_op_transform.py python
628 lines 24.4 KB
d855b718 refactor: strip phase/v2 workflow labels from all source, tests, and docs Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for the operation-level merge engine.
2
3 Covers every commutativity rule from the spec table, the OT transform function,
4 and the full three-way ``merge_op_lists`` algorithm. Each test is named after
5 the specific behaviour it verifies so that a failure message is self-documenting.
6 """
7 from __future__ import annotations
8
9 import pytest
10
11 from muse.core.op_transform import (
12 MergeOpsResult,
13 _adjust_insert_positions,
14 _op_key,
15 merge_op_lists,
16 merge_structured,
17 ops_commute,
18 transform,
19 )
20 from muse.domain import (
21 DeleteOp,
22 DomainOp,
23 InsertOp,
24 MoveOp,
25 PatchOp,
26 ReplaceOp,
27 StructuredDelta,
28 )
29
30
31 # ---------------------------------------------------------------------------
32 # Helpers for building typed ops
33 # ---------------------------------------------------------------------------
34
35
36 def _ins(addr: str, pos: int | None, cid: str = "cid-a") -> InsertOp:
37 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
38
39
40 def _del(addr: str, pos: int | None, cid: str = "cid-a") -> DeleteOp:
41 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
42
43
44 def _mov(addr: str, from_pos: int, to_pos: int, cid: str = "cid-a") -> MoveOp:
45 return MoveOp(op="move", address=addr, from_position=from_pos, to_position=to_pos, content_id=cid)
46
47
48 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
49 return ReplaceOp(
50 op="replace",
51 address=addr,
52 position=None,
53 old_content_id=old,
54 new_content_id=new,
55 old_summary="old",
56 new_summary="new",
57 )
58
59
60 def _patch(addr: str, child_ops: list[DomainOp] | None = None) -> PatchOp:
61 return PatchOp(
62 op="patch",
63 address=addr,
64 child_ops=child_ops or [],
65 child_domain="test",
66 child_summary="test patch",
67 )
68
69
70 def _delta(ops: list[DomainOp], *, domain: str = "music") -> StructuredDelta:
71 return StructuredDelta(domain=domain, ops=ops, summary="test")
72
73
74 # ===========================================================================
75 # Part 1 — ops_commute: commutativity oracle
76 # ===========================================================================
77
78
79 class TestOpsCommuteInserts:
80 def test_inserts_at_different_positions_commute(self) -> None:
81 a = _ins("f.mid", pos=2)
82 b = _ins("f.mid", pos=5)
83 assert ops_commute(a, b) is True
84
85 def test_inserts_at_same_position_do_not_commute(self) -> None:
86 a = _ins("f.mid", pos=3)
87 b = _ins("f.mid", pos=3)
88 assert ops_commute(a, b) is False
89
90 def test_inserts_with_none_position_commute_unordered(self) -> None:
91 a = _ins("files/", pos=None, cid="aa")
92 b = _ins("files/", pos=None, cid="bb")
93 assert ops_commute(a, b) is True
94
95 def test_inserts_with_none_and_int_position_commute_unordered(self) -> None:
96 # If either side is unordered, treat as commuting.
97 a = _ins("files/", pos=None)
98 b = _ins("files/", pos=3)
99 assert ops_commute(a, b) is True
100
101 def test_inserts_at_different_addresses_commute(self) -> None:
102 a = _ins("a.mid", pos=0)
103 b = _ins("b.mid", pos=0)
104 assert ops_commute(a, b) is True
105
106
107 class TestOpsCommuteDeletes:
108 def test_deletes_at_different_addresses_commute(self) -> None:
109 assert ops_commute(_del("a.mid", 0), _del("b.mid", 0)) is True
110
111 def test_consensus_delete_same_address_commutes(self) -> None:
112 # Both branches deleted the same file — idempotent, not a conflict.
113 a = _del("f.mid", pos=0, cid="same")
114 b = _del("f.mid", pos=0, cid="same")
115 assert ops_commute(a, b) is True
116
117 def test_deletes_at_same_address_different_content_still_commute(self) -> None:
118 # Two deletes always commute — the result is "both deleted something".
119 a = _del("f.mid", pos=1, cid="c1")
120 b = _del("f.mid", pos=2, cid="c2")
121 assert ops_commute(a, b) is True
122
123
124 class TestOpsCommuteReplaces:
125 def test_replaces_at_different_addresses_commute(self) -> None:
126 assert ops_commute(_rep("a.mid", "o", "n"), _rep("b.mid", "o", "n")) is True
127
128 def test_replaces_at_same_address_do_not_commute(self) -> None:
129 assert ops_commute(_rep("f.mid", "old", "v1"), _rep("f.mid", "old", "v2")) is False
130
131
132 class TestOpsCommuteMoves:
133 def test_moves_from_different_positions_commute(self) -> None:
134 assert ops_commute(_mov("f.mid", 2, 5), _mov("f.mid", 7, 1)) is True
135
136 def test_moves_from_same_position_do_not_commute(self) -> None:
137 assert ops_commute(_mov("f.mid", 3, 0), _mov("f.mid", 3, 9)) is False
138
139 def test_move_and_delete_same_position_do_not_commute(self) -> None:
140 move = _mov("f.mid", 5, 9)
141 delete = _del("f.mid", pos=5)
142 assert ops_commute(move, delete) is False
143
144 def test_move_and_delete_different_positions_commute(self) -> None:
145 move = _mov("f.mid", 5, 9)
146 delete = _del("f.mid", pos=2)
147 assert ops_commute(move, delete) is True
148
149 def test_delete_and_move_same_position_is_symmetric(self) -> None:
150 move = _mov("f.mid", 5, 9)
151 delete = _del("f.mid", pos=5)
152 # commute(move, delete) == commute(delete, move)
153 assert ops_commute(delete, move) is False
154
155 def test_delete_with_none_position_and_move_commute(self) -> None:
156 move = _mov("f.mid", 5, 9)
157 delete = _del("files/", pos=None)
158 assert ops_commute(move, delete) is True
159
160
161 class TestOpsCommutePatches:
162 def test_patches_at_different_addresses_commute(self) -> None:
163 a = _patch("a.mid")
164 b = _patch("b.mid")
165 assert ops_commute(a, b) is True
166
167 def test_patch_at_same_address_with_non_conflicting_children_commutes(self) -> None:
168 child_a = _ins("note:0", pos=1)
169 child_b = _ins("note:0", pos=5)
170 a = _patch("f.mid", child_ops=[child_a])
171 b = _patch("f.mid", child_ops=[child_b])
172 assert ops_commute(a, b) is True
173
174 def test_patch_at_same_address_with_conflicting_children_does_not_commute(self) -> None:
175 child_a = _rep("note:0", "old", "v1")
176 child_b = _rep("note:0", "old", "v2")
177 a = _patch("f.mid", child_ops=[child_a])
178 b = _patch("f.mid", child_ops=[child_b])
179 assert ops_commute(a, b) is False
180
181 def test_empty_patch_children_always_commute(self) -> None:
182 a = _patch("f.mid", child_ops=[])
183 b = _patch("f.mid", child_ops=[])
184 assert ops_commute(a, b) is True
185
186
187 class TestOpsCommuteMixedTypes:
188 def test_insert_and_delete_at_different_addresses_commute(self) -> None:
189 assert ops_commute(_ins("a.mid", 0), _del("b.mid", 0)) is True
190
191 def test_insert_and_delete_at_same_address_do_not_commute(self) -> None:
192 assert ops_commute(_ins("f.mid", 2), _del("f.mid", 5)) is False
193
194 def test_delete_and_insert_symmetry(self) -> None:
195 a = _ins("f.mid", 2)
196 b = _del("f.mid", 5)
197 assert ops_commute(a, b) == ops_commute(b, a)
198
199 def test_replace_and_insert_at_different_addresses_commute(self) -> None:
200 assert ops_commute(_rep("a.mid", "o", "n"), _ins("b.mid", 0)) is True
201
202 def test_replace_and_insert_at_same_address_do_not_commute(self) -> None:
203 assert ops_commute(_rep("f.mid", "o", "n"), _ins("f.mid", 0)) is False
204
205 def test_patch_and_replace_at_different_addresses_commute(self) -> None:
206 assert ops_commute(_patch("a.mid"), _rep("b.mid", "o", "n")) is True
207
208 def test_patch_and_replace_at_same_address_do_not_commute(self) -> None:
209 assert ops_commute(_patch("f.mid"), _rep("f.mid", "o", "n")) is False
210
211
212 # ===========================================================================
213 # Part 2 — transform: position adjustment for commuting ops
214 # ===========================================================================
215
216
217 class TestTransform:
218 def test_insert_before_insert_shifts_later_op(self) -> None:
219 # a inserts at pos 2, b inserts at pos 5. a < b, so b' = 6.
220 a = _ins("f.mid", pos=2, cid="a")
221 b = _ins("f.mid", pos=5, cid="b")
222 a_prime, b_prime = transform(a, b)
223 assert a_prime["position"] == 2 # unchanged
224 assert b_prime["position"] == 6 # shifted by a
225
226 def test_insert_after_insert_shifts_earlier_op(self) -> None:
227 # a inserts at pos 7, b inserts at pos 3. a > b, so a' = 8.
228 a = _ins("f.mid", pos=7, cid="a")
229 b = _ins("f.mid", pos=3, cid="b")
230 a_prime, b_prime = transform(a, b)
231 assert a_prime["position"] == 8 # shifted by b
232 assert b_prime["position"] == 3 # unchanged
233
234 def test_transform_preserves_content_id(self) -> None:
235 a = _ins("f.mid", pos=1, cid="note-a")
236 b = _ins("f.mid", pos=10, cid="note-b")
237 a_prime, b_prime = transform(a, b)
238 assert a_prime["content_id"] == "note-a"
239 assert b_prime["content_id"] == "note-b"
240
241 def test_transform_unordered_inserts_identity(self) -> None:
242 # position=None → identity transform (unordered collection).
243 a = _ins("files/", pos=None, cid="a")
244 b = _ins("files/", pos=None, cid="b")
245 a_prime, b_prime = transform(a, b)
246 assert a_prime is a
247 assert b_prime is b
248
249 def test_transform_non_insert_ops_identity(self) -> None:
250 # For all other commuting pairs, transform returns identity.
251 a = _del("a.mid", pos=3)
252 b = _del("b.mid", pos=7)
253 a_prime, b_prime = transform(a, b)
254 assert a_prime is a
255 assert b_prime is b
256
257 def test_transform_replace_ops_identity(self) -> None:
258 a = _rep("a.mid", "o", "n")
259 b = _rep("b.mid", "o", "n")
260 a_prime, b_prime = transform(a, b)
261 assert a_prime is a
262 assert b_prime is b
263
264 def test_transform_diamond_property_two_inserts(self) -> None:
265 """Verify that a ∘ b' == b ∘ a' — the fundamental OT diamond property.
266
267 We simulate applying inserts to a sequence and check the final order
268 matches regardless of which is applied first.
269 """
270 # Start with base list indices [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
271 # a = insert 'X' at position 3; b = insert 'Y' at position 7
272 a = _ins("seq", pos=3, cid="X")
273 b = _ins("seq", pos=7, cid="Y")
274 a_prime, b_prime = transform(a, b)
275
276 # Apply a then b': X at 3, Y at 8 → [0,1,2,X,3,4,5,6,7,Y,8,9]
277 seq = list(range(10))
278 a_pos = a["position"]
279 b_prime_pos = b_prime["position"]
280 assert a_pos is not None and b_prime_pos is not None
281 seq.insert(a_pos, "X")
282 seq.insert(b_prime_pos, "Y")
283 path_ab = seq[:]
284
285 # Apply b then a': Y at 7, X at 3 → [0,1,2,X,3,4,5,6,Y,7,8,9]
286 seq2 = list(range(10))
287 b_pos = b["position"]
288 a_prime_pos = a_prime["position"]
289 assert b_pos is not None and a_prime_pos is not None
290 seq2.insert(b_pos, "Y")
291 seq2.insert(a_prime_pos, "X")
292 path_ba = seq2[:]
293
294 assert path_ab == path_ba
295
296
297 # ===========================================================================
298 # Part 3 — _adjust_insert_positions (counting formula)
299 # ===========================================================================
300
301
302 class TestAdjustInsertPositions:
303 def test_no_other_ops_identity(self) -> None:
304 ops = [_ins("f.mid", pos=5, cid="a")]
305 result = _adjust_insert_positions(ops, [])
306 assert result[0]["position"] == 5
307
308 def test_single_other_before_shifts_position(self) -> None:
309 ops = [_ins("f.mid", pos=5, cid="a")]
310 others = [_ins("f.mid", pos=3, cid="x")]
311 result = _adjust_insert_positions(ops, others)
312 assert result[0]["position"] == 6 # shifted by 1
313
314 def test_other_after_does_not_shift(self) -> None:
315 ops = [_ins("f.mid", pos=3, cid="a")]
316 others = [_ins("f.mid", pos=5, cid="x")]
317 result = _adjust_insert_positions(ops, others)
318 assert result[0]["position"] == 3 # unchanged
319
320 def test_multiple_others_all_before_shifts_by_count(self) -> None:
321 ops = [_ins("f.mid", pos=10, cid="a")]
322 others = [_ins("f.mid", pos=2, cid="x"), _ins("f.mid", pos=7, cid="y")]
323 result = _adjust_insert_positions(ops, others)
324 assert result[0]["position"] == 12 # shifted by 2
325
326 def test_mixed_addresses_does_not_cross_contaminate(self) -> None:
327 ops = [_ins("a.mid", pos=3, cid="a")]
328 others = [_ins("b.mid", pos=1, cid="x")] # different address
329 result = _adjust_insert_positions(ops, others)
330 assert result[0]["position"] == 3 # not shifted
331
332 def test_non_insert_ops_pass_through_unchanged(self) -> None:
333 ops: list[DomainOp] = [_del("f.mid", pos=3, cid="x")]
334 result = _adjust_insert_positions(ops, [_ins("f.mid", pos=1, cid="y")])
335 assert result[0] is ops[0]
336
337 def test_unordered_insert_passes_through(self) -> None:
338 ops = [_ins("files/", pos=None, cid="a")]
339 others = [_ins("files/", pos=None, cid="x")]
340 result = _adjust_insert_positions(ops, others)
341 assert result[0]["position"] is None
342
343 def test_concrete_example_four_note_insertions(self) -> None:
344 """Verify counting formula on the four-note example from the spec."""
345 ours = [_ins("f.mid", pos=5, cid="V"), _ins("f.mid", pos=10, cid="W")]
346 theirs = [_ins("f.mid", pos=3, cid="X"), _ins("f.mid", pos=8, cid="Y")]
347
348 ours_adj = _adjust_insert_positions(ours, theirs)
349 theirs_adj = _adjust_insert_positions(theirs, ours)
350
351 # V(5) shifted by X(3) which is ≤ 5: V → 6
352 assert ours_adj[0]["position"] == 6
353 # W(10) shifted by X(3) and Y(8) both ≤ 10: W → 12
354 assert ours_adj[1]["position"] == 12
355 # X(3) no ours inserts ≤ 3: stays 3
356 assert theirs_adj[0]["position"] == 3
357 # Y(8) shifted by V(5) ≤ 8: Y → 9
358 assert theirs_adj[1]["position"] == 9
359
360
361 # ===========================================================================
362 # Part 4 — merge_op_lists: three-way merge
363 # ===========================================================================
364
365
366 class TestMergeOpLists:
367 def test_empty_inputs_return_empty_result(self) -> None:
368 result = merge_op_lists([], [], [])
369 assert result.merged_ops == []
370 assert result.conflict_ops == []
371 assert result.is_clean is True
372
373 def test_ours_only_additions_pass_through(self) -> None:
374 op = _ins("f.mid", pos=2, cid="x")
375 result = merge_op_lists([], [op], [])
376 assert len(result.merged_ops) == 1
377 assert result.conflict_ops == []
378
379 def test_theirs_only_additions_pass_through(self) -> None:
380 op = _del("f.mid", pos=0)
381 result = merge_op_lists([], [], [op])
382 assert len(result.merged_ops) == 1
383 assert result.conflict_ops == []
384
385 def test_non_conflicting_inserts_both_included(self) -> None:
386 ours_op = _ins("f.mid", pos=2, cid="V")
387 theirs_op = _ins("f.mid", pos=5, cid="W")
388 result = merge_op_lists([], [ours_op], [theirs_op])
389 assert result.is_clean is True
390 positions = {op["position"] for op in result.merged_ops if op["op"] == "insert"}
391 # Ours at 2 stays 2 (no theirs ≤ 2); theirs at 5 → 6 (ours at 2 ≤ 5).
392 assert 2 in positions
393 assert 6 in positions
394
395 def test_same_position_insert_produces_conflict(self) -> None:
396 ours_op = _ins("f.mid", pos=3, cid="A")
397 theirs_op = _ins("f.mid", pos=3, cid="B")
398 result = merge_op_lists([], [ours_op], [theirs_op])
399 assert not result.is_clean
400 assert len(result.conflict_ops) == 1
401 assert result.conflict_ops[0][0]["content_id"] == "A"
402 assert result.conflict_ops[0][1]["content_id"] == "B"
403
404 def test_consensus_addition_included_once(self) -> None:
405 op = _ins("f.mid", pos=4, cid="shared")
406 result = merge_op_lists([], [op], [op])
407 # Consensus: both added the same op → include exactly once.
408 assert len(result.merged_ops) == 1
409 assert result.conflict_ops == []
410
411 def test_base_ops_kept_by_both_sides_included(self) -> None:
412 base_op = _ins("f.mid", pos=0, cid="base")
413 # Both sides still have the base op.
414 result = merge_op_lists([base_op], [base_op], [base_op])
415 assert base_op in result.merged_ops
416
417 def test_base_op_deleted_by_ours_not_in_merged(self) -> None:
418 base_op = _ins("f.mid", pos=0, cid="base")
419 # Ours removed it, theirs kept it.
420 result = merge_op_lists([base_op], [], [base_op])
421 # The base op is NOT in kept (ours removed it) and NOT in ours_new
422 # (it was in base). It remains in theirs, so theirs "kept" it.
423 # Only ops in base AND in both branches end up in kept.
424 assert base_op not in result.merged_ops
425
426 def test_replace_conflict_at_same_address(self) -> None:
427 ours_op = _rep("f.mid", "old", "v-ours")
428 theirs_op = _rep("f.mid", "old", "v-theirs")
429 result = merge_op_lists([], [ours_op], [theirs_op])
430 assert not result.is_clean
431 assert len(result.conflict_ops) == 1
432
433 def test_replace_at_different_addresses_no_conflict(self) -> None:
434 ours_op = _rep("a.mid", "old", "new-a")
435 theirs_op = _rep("b.mid", "old", "new-b")
436 result = merge_op_lists([], [ours_op], [theirs_op])
437 assert result.is_clean
438 assert len(result.merged_ops) == 2
439
440 def test_consensus_delete_included_once(self) -> None:
441 del_op = _del("f.mid", pos=2, cid="gone")
442 result = merge_op_lists([], [del_op], [del_op])
443 assert len(result.merged_ops) == 1
444
445 def test_note_level_multi_insert_positions_adjusted_correctly(self) -> None:
446 """Simulate two musicians adding notes at non-overlapping bars."""
447 ours_ops: list[DomainOp] = [
448 _ins("lead.mid", pos=5, cid="note-A"),
449 _ins("lead.mid", pos=10, cid="note-B"),
450 ]
451 theirs_ops: list[DomainOp] = [
452 _ins("lead.mid", pos=3, cid="note-X"),
453 _ins("lead.mid", pos=8, cid="note-Y"),
454 ]
455 result = merge_op_lists([], ours_ops, theirs_ops)
456 assert result.is_clean is True
457 assert len(result.merged_ops) == 4
458
459 # Expected positions after adjustment (counting formula):
460 # note-A(5) → 5 + count(theirs ≤ 5) = 5 + 1[X(3)] = 6
461 # note-B(10) → 10 + 2[X(3),Y(8)] = 12
462 # note-X(3) → 3 + 0 = 3
463 # note-Y(8) → 8 + 1[A(5)] = 9
464 pos_by_cid = {
465 op["content_id"]: op["position"]
466 for op in result.merged_ops
467 if op["op"] == "insert"
468 }
469 assert pos_by_cid["note-A"] == 6
470 assert pos_by_cid["note-B"] == 12
471 assert pos_by_cid["note-X"] == 3
472 assert pos_by_cid["note-Y"] == 9
473
474 def test_mixed_conflict_and_clean_ops(self) -> None:
475 """A conflict on one file should not contaminate clean ops on others."""
476 conflict_ours = _rep("shared.mid", "old", "v-ours")
477 conflict_theirs = _rep("shared.mid", "old", "v-theirs")
478 clean_ours = _ins("only-ours.mid", pos=0, cid="ours-new-file")
479 clean_theirs = _del("only-theirs.mid", pos=2, cid="their-del")
480
481 result = merge_op_lists(
482 [],
483 [conflict_ours, clean_ours],
484 [conflict_theirs, clean_theirs],
485 )
486 assert len(result.conflict_ops) == 1
487 # Clean ops from both sides should appear in merged.
488 merged_cids = {
489 op.get("content_id", "") or op.get("new_content_id", "")
490 for op in result.merged_ops
491 }
492 assert "ours-new-file" in merged_cids
493 assert "their-del" in merged_cids
494
495 def test_patch_ops_at_different_files_both_included(self) -> None:
496 ours_op = _patch("track-a.mid")
497 theirs_op = _patch("track-b.mid")
498 result = merge_op_lists([], [ours_op], [theirs_op])
499 assert result.is_clean is True
500 assert len(result.merged_ops) == 2
501
502 def test_patch_ops_at_same_file_with_non_conflicting_children(self) -> None:
503 child_a = _ins("note:0", pos=1, cid="note-1")
504 child_b = _ins("note:0", pos=4, cid="note-2")
505 ours_op = _patch("f.mid", child_ops=[child_a])
506 theirs_op = _patch("f.mid", child_ops=[child_b])
507 result = merge_op_lists([], [ours_op], [theirs_op])
508 # PatchOps at same address with commuting children should commute.
509 assert result.is_clean is True
510
511 def test_move_and_delete_conflict_detected(self) -> None:
512 move_op = _mov("f.mid", from_pos=5, to_pos=0)
513 del_op = _del("f.mid", pos=5)
514 result = merge_op_lists([], [move_op], [del_op])
515 assert not result.is_clean
516
517 def test_merge_op_lists_is_deterministic(self) -> None:
518 """Same inputs → same output on every call."""
519 ours = [_ins("f.mid", pos=2, cid="a"), _del("g.mid", pos=0, cid="b")]
520 theirs = [_ins("f.mid", pos=7, cid="c"), _rep("h.mid", "x", "y")]
521 r1 = merge_op_lists([], ours, theirs)
522 r2 = merge_op_lists([], ours, theirs)
523 assert [_op_key(o) for o in r1.merged_ops] == [_op_key(o) for o in r2.merged_ops]
524 assert r1.conflict_ops == r2.conflict_ops
525
526
527 # ===========================================================================
528 # Part 5 — merge_structured: StructuredDelta entry point
529 # ===========================================================================
530
531
532 class TestMergeStructured:
533 def test_empty_deltas_produce_clean_result(self) -> None:
534 base = _delta([])
535 ours = _delta([])
536 theirs = _delta([])
537 result = merge_structured(base, ours, theirs)
538 assert result.is_clean is True
539 assert result.merged_ops == []
540
541 def test_non_conflicting_deltas_auto_merge(self) -> None:
542 op_a = _ins("a.mid", pos=1, cid="A")
543 op_b = _ins("b.mid", pos=2, cid="B")
544 result = merge_structured(_delta([]), _delta([op_a]), _delta([op_b]))
545 assert result.is_clean is True
546 assert len(result.merged_ops) == 2
547
548 def test_conflicting_deltas_reported(self) -> None:
549 op_a = _rep("shared.mid", "old", "v-a")
550 op_b = _rep("shared.mid", "old", "v-b")
551 result = merge_structured(_delta([]), _delta([op_a]), _delta([op_b]))
552 assert not result.is_clean
553 assert len(result.conflict_ops) == 1
554
555 def test_base_ops_respected_by_both_sides(self) -> None:
556 shared = _ins("f.mid", pos=0, cid="shared")
557 result = merge_structured(
558 _delta([shared]),
559 _delta([shared, _ins("f.mid", pos=5, cid="extra-ours")]),
560 _delta([shared]),
561 )
562 assert result.is_clean is True
563 # The 'shared' op is kept; 'extra-ours' is new and passes through.
564 assert len(result.merged_ops) >= 1
565
566
567 # ===========================================================================
568 # Part 6 — MergeOpsResult
569 # ===========================================================================
570
571
572 class TestMergeOpsResult:
573 def test_is_clean_when_no_conflicts(self) -> None:
574 r = MergeOpsResult(merged_ops=[], conflict_ops=[])
575 assert r.is_clean is True
576
577 def test_is_not_clean_when_conflicts_present(self) -> None:
578 a = _ins("f.mid", pos=1)
579 b = _ins("f.mid", pos=1)
580 r = MergeOpsResult(merged_ops=[], conflict_ops=[(a, b)])
581 assert r.is_clean is False
582
583 def test_default_factory_empty_lists(self) -> None:
584 r = MergeOpsResult()
585 assert r.merged_ops == []
586 assert r.conflict_ops == []
587
588
589 # ===========================================================================
590 # Part 7 — _op_key determinism and uniqueness
591 # ===========================================================================
592
593
594 class TestOpKey:
595 def test_insert_key_includes_all_fields(self) -> None:
596 op = _ins("f.mid", pos=3, cid="abc")
597 key = _op_key(op)
598 assert "insert" in key
599 assert "f.mid" in key
600 assert "3" in key
601 assert "abc" in key
602
603 def test_same_op_produces_same_key(self) -> None:
604 op = _del("f.mid", pos=2, cid="xyz")
605 assert _op_key(op) == _op_key(op)
606
607 def test_different_positions_produce_different_keys(self) -> None:
608 a = _ins("f.mid", pos=1, cid="c")
609 b = _ins("f.mid", pos=2, cid="c")
610 assert _op_key(a) != _op_key(b)
611
612 def test_move_key_includes_from_and_to(self) -> None:
613 op = _mov("f.mid", from_pos=3, to_pos=7)
614 key = _op_key(op)
615 assert "3" in key
616 assert "7" in key
617
618 def test_replace_key_includes_old_and_new(self) -> None:
619 op = _rep("f.mid", "old-id", "new-id")
620 key = _op_key(op)
621 assert "old-id" in key
622 assert "new-id" in key
623
624 def test_patch_key_includes_address_and_domain(self) -> None:
625 op = _patch("f.mid")
626 key = _op_key(op)
627 assert "patch" in key
628 assert "f.mid" in key