gabriel / muse public
op_transform.py python
533 lines 20.7 KB
53d2d9ce feat(phase-3): operation-level merge engine — OT-based auto-merge for n… Gabriel Cardona <cgcardona@gmail.com> 6d ago
1 """Operational transformation for Muse domain operations — Phase 3.
2
3 This module implements the commutativity rules and position-adjustment
4 transforms that allow the merge engine to reason over ``DomainOp`` trees
5 rather than file-path sets. The result is sub-file auto-merge: two agents
6 inserting notes at non-overlapping bars never produce a conflict.
7
8 Theory
9 ------
10 Operational Transformation (OT) is the theory behind real-time collaborative
11 editors (Google Docs, VS Code Live Share). The key insight is that two
12 *concurrent* operations — generated independently against the same base state
13 — can be applied in sequence without conflict if and only if they *commute*:
14 applying them in either order yields the same final state.
15
16 For concurrent operations that commute, OT provides a ``transform`` function
17 that adjusts positions so that the result is identical regardless of which
18 operation is applied first.
19
20 Public API
21 ----------
22 - :class:`MergeOpsResult` — structured result of merging two op lists.
23 - :func:`ops_commute` — commmutativity oracle for any two ``DomainOp``\\s.
24 - :func:`transform` — position-adjusted ``(a', b')`` for commuting ops.
25 - :func:`merge_op_lists` — three-way merge at operation granularity.
26
27 Commutativity rules (summary)
28 ------------------------------
29
30 ============================================= =====================================
31 Op A Op B Commute?
32 ============================================= =====================================
33 InsertOp(pos=i) InsertOp(pos=j) Yes — if i ≠ j or both None (unordered)
34 InsertOp(pos=i) InsertOp(pos=i) **No** — positional conflict
35 InsertOp(addr=A) DeleteOp(addr=B) Yes — if A ≠ B (different containers)
36 InsertOp(addr=A) DeleteOp(addr=A) **No** — same container
37 DeleteOp(addr=A) DeleteOp(addr=B) Yes — always (consensus delete is fine)
38 ReplaceOp(addr=A) ReplaceOp(addr=B) Yes — if A ≠ B
39 ReplaceOp(addr=A) ReplaceOp(addr=A) **No** — concurrent value conflict
40 MoveOp(from=i) MoveOp(from=j) Yes — if i ≠ j
41 MoveOp(from=i) DeleteOp(pos=i) **No** — move-delete conflict
42 PatchOp(addr=A) PatchOp(addr=B) Yes — if A ≠ B; recurse if A == B
43 ============================================= =====================================
44
45 Position adjustment
46 -------------------
47 When two ``InsertOp``\\s at different positions commute, the later-applied one
48 must have its position adjusted. Concretely, if *a* inserts at position *i*
49 and *b* inserts at position *j* with *i < j*:
50
51 - Applying *a* first shifts every element at position ≥ i by one; so *b*
52 must be adjusted to *j + 1*.
53 - Applying *b* first does not affect positions < j; so *a* stays at *i*.
54
55 For *merge_op_lists*, positions are adjusted via the **counting formula**: for
56 each InsertOp in one side's exclusive additions, add the count of the other
57 side's InsertOps that have position ≤ this op's position (on the same
58 address). This is correct for any number of concurrent insertions and avoids
59 the cascading adjustment errors that arise from naive sequential pairwise OT.
60
61 Synchronous guarantee
62 ---------------------
63 All functions are synchronous, pure, and allocation-bounded — no I/O, no
64 async, no external state.
65 """
66 from __future__ import annotations
67
68 import logging
69 from dataclasses import dataclass, field
70
71 from muse.domain import (
72 DeleteOp,
73 DomainOp,
74 InsertOp,
75 MoveOp,
76 PatchOp,
77 ReplaceOp,
78 StructuredDelta,
79 )
80
81 logger = logging.getLogger(__name__)
82
83
84 # ---------------------------------------------------------------------------
85 # Result type
86 # ---------------------------------------------------------------------------
87
88
89 @dataclass
90 class MergeOpsResult:
91 """Result of a three-way operation-level merge.
92
93 ``merged_ops`` contains the operations from both sides that can be applied
94 to the common ancestor to produce the merged state. Positions in any
95 ``InsertOp`` entries have been adjusted so that the ops can be applied in
96 ascending position order to produce a deterministic result.
97
98 ``conflict_ops`` contains pairs ``(our_op, their_op)`` where the two
99 operations cannot be auto-merged. Each pair must be resolved manually
100 (or via ``.museattributes`` strategy) before the merge can complete.
101
102 ``is_clean`` is ``True`` when ``conflict_ops`` is empty.
103 """
104
105 merged_ops: list[DomainOp] = field(default_factory=list)
106 conflict_ops: list[tuple[DomainOp, DomainOp]] = field(default_factory=list)
107
108 @property
109 def is_clean(self) -> bool:
110 """``True`` when no conflicting operation pairs were found."""
111 return len(self.conflict_ops) == 0
112
113
114 # ---------------------------------------------------------------------------
115 # Internal helpers
116 # ---------------------------------------------------------------------------
117
118
119 def _op_key(op: DomainOp) -> tuple[str, ...]:
120 """Return a hashable key uniquely identifying *op* for set membership tests.
121
122 The key captures all semantically significant fields so that two ops with
123 identical effect produce the same key. This is used to detect consensus
124 operations (both sides added the same op independently).
125 """
126 if op["op"] == "insert":
127 return ("insert", op["address"], str(op["position"]), op["content_id"])
128 if op["op"] == "delete":
129 return ("delete", op["address"], str(op["position"]), op["content_id"])
130 if op["op"] == "move":
131 return (
132 "move",
133 op["address"],
134 str(op["from_position"]),
135 str(op["to_position"]),
136 op["content_id"],
137 )
138 if op["op"] == "replace":
139 return (
140 "replace",
141 op["address"],
142 str(op["position"]),
143 op["old_content_id"],
144 op["new_content_id"],
145 )
146 # PatchOp — key on address and child_domain; child_ops are not hashed for
147 # performance reasons. Two patch ops on the same container are treated as
148 # the same "slot" for conflict detection purposes.
149 return ("patch", op["address"], op["child_domain"])
150
151
152 # ---------------------------------------------------------------------------
153 # Commutativity oracle
154 # ---------------------------------------------------------------------------
155
156
157 def ops_commute(a: DomainOp, b: DomainOp) -> bool:
158 """Return ``True`` if operations *a* and *b* commute (are auto-mergeable).
159
160 Two operations commute when applying them in either order produces the
161 same final state. This function implements the commutativity rules table
162 from the Phase 3 spec for all 25 op-kind pairs.
163
164 For ``PatchOp`` at the same address, commmutativity is determined
165 recursively by checking all child-op pairs.
166
167 Args:
168 a: First domain operation.
169 b: Second domain operation.
170
171 Returns:
172 ``True`` if the two operations can be safely auto-merged.
173 """
174 # ------------------------------------------------------------------
175 # InsertOp + *
176 # ------------------------------------------------------------------
177 if a["op"] == "insert":
178 if b["op"] == "insert":
179 # Different containers always commute — they are completely independent.
180 if a["address"] != b["address"]:
181 return True
182 a_pos, b_pos = a["position"], b["position"]
183 # Unordered collections (position=None) always commute.
184 if a_pos is None or b_pos is None:
185 return True
186 # Ordered sequences within the same container: conflict only at equal positions.
187 return a_pos != b_pos
188 if b["op"] == "delete":
189 # Conservative: inserts and deletes at the same container conflict.
190 return a["address"] != b["address"]
191 if b["op"] == "move":
192 return a["address"] != b["address"]
193 if b["op"] == "replace":
194 return a["address"] != b["address"]
195 # b is PatchOp (exhaustion of DeleteOp | MoveOp | ReplaceOp | PatchOp)
196 return a["address"] != b["address"]
197
198 # ------------------------------------------------------------------
199 # DeleteOp + *
200 # ------------------------------------------------------------------
201 if a["op"] == "delete":
202 if b["op"] == "insert":
203 return a["address"] != b["address"]
204 if b["op"] == "delete":
205 # Consensus delete (same or different address) always commutes.
206 # Two branches that both removed the same element produce the same
207 # result: the element is absent.
208 return True
209 if b["op"] == "move":
210 # Conflict if the delete's position matches the move's source.
211 a_pos = a["position"]
212 if a_pos is None:
213 return True # unordered collection: no positional conflict
214 return a_pos != b["from_position"]
215 if b["op"] == "replace":
216 return a["address"] != b["address"]
217 # b is PatchOp
218 return a["address"] != b["address"]
219
220 # ------------------------------------------------------------------
221 # MoveOp + *
222 # ------------------------------------------------------------------
223 if a["op"] == "move":
224 if b["op"] == "insert":
225 return a["address"] != b["address"]
226 if b["op"] == "delete":
227 b_pos = b["position"]
228 if b_pos is None:
229 return True
230 return a["from_position"] != b_pos
231 if b["op"] == "move":
232 # Two moves from different source positions commute.
233 return a["from_position"] != b["from_position"]
234 if b["op"] == "replace":
235 return a["address"] != b["address"]
236 # b is PatchOp
237 return a["address"] != b["address"]
238
239 # ------------------------------------------------------------------
240 # ReplaceOp + *
241 # ------------------------------------------------------------------
242 if a["op"] == "replace":
243 if b["op"] == "insert":
244 return a["address"] != b["address"]
245 if b["op"] == "delete":
246 return a["address"] != b["address"]
247 if b["op"] == "move":
248 return a["address"] != b["address"]
249 if b["op"] == "replace":
250 # Two replaces at the same address conflict (concurrent value change).
251 return a["address"] != b["address"]
252 # b is PatchOp
253 return a["address"] != b["address"]
254
255 # ------------------------------------------------------------------
256 # PatchOp + * (a["op"] == "patch" after the four checks above)
257 # ------------------------------------------------------------------
258 if b["op"] == "insert":
259 return a["address"] != b["address"]
260 if b["op"] == "delete":
261 return a["address"] != b["address"]
262 if b["op"] == "move":
263 return a["address"] != b["address"]
264 if b["op"] == "replace":
265 return a["address"] != b["address"]
266 # b is PatchOp
267 if a["address"] != b["address"]:
268 return True
269 # Same address: recurse into child ops — all child pairs must commute.
270 for child_a in a["child_ops"]:
271 for child_b in b["child_ops"]:
272 if not ops_commute(child_a, child_b):
273 return False
274 return True
275
276
277 # ---------------------------------------------------------------------------
278 # OT transform
279 # ---------------------------------------------------------------------------
280
281
282 def transform(a: DomainOp, b: DomainOp) -> tuple[DomainOp, DomainOp]:
283 """Return ``(a', b')`` such that ``apply(apply(base, a), b') == apply(apply(base, b), a')``.
284
285 This is the core OT transform function. It should only be called when
286 :func:`ops_commute` has confirmed that *a* and *b* commute. For all
287 commuting pairs except ordered InsertOp+InsertOp, the identity transform
288 is returned — the operations do not interfere with each other's positions.
289
290 For the InsertOp+InsertOp case with integer positions (the most common
291 case in practice), positions are adjusted so the diamond property holds:
292 the same final sequence is produced regardless of application order.
293
294 Args:
295 a: First domain operation.
296 b: Second domain operation (must commute with *a*).
297
298 Returns:
299 A tuple ``(a', b')`` where:
300
301 - *a'* is the version of *a* to apply when *b* has already been applied.
302 - *b'* is the version of *b* to apply when *a* has already been applied.
303 """
304 if a["op"] == "insert" and b["op"] == "insert":
305 a_pos, b_pos = a["position"], b["position"]
306 if a_pos is not None and b_pos is not None and a_pos != b_pos:
307 if a_pos < b_pos:
308 # a inserts before b's original position → b shifts up by 1.
309 b_prime = InsertOp(
310 op="insert",
311 address=b["address"],
312 position=b_pos + 1,
313 content_id=b["content_id"],
314 content_summary=b["content_summary"],
315 )
316 return a, b_prime
317 else:
318 # b inserts before a's original position → a shifts up by 1.
319 a_prime = InsertOp(
320 op="insert",
321 address=a["address"],
322 position=a_pos + 1,
323 content_id=a["content_id"],
324 content_summary=a["content_summary"],
325 )
326 return a_prime, b
327
328 # All other commuting pairs: identity transform.
329 return a, b
330
331
332 # ---------------------------------------------------------------------------
333 # Three-way merge at operation granularity
334 # ---------------------------------------------------------------------------
335
336
337 def _adjust_insert_positions(
338 ops: list[DomainOp],
339 other_ops: list[DomainOp],
340 ) -> list[DomainOp]:
341 """Adjust ``InsertOp`` positions in *ops* to account for *other_ops*.
342
343 For each ``InsertOp`` with a non-``None`` position in *ops*, the adjusted
344 position is ``original_position + count`` where ``count`` is the number of
345 ``InsertOp``\\s in *other_ops* that share the same ``address`` and have
346 ``position ≤ original_position``.
347
348 This implements the *counting formula* for multi-op position adjustment.
349 It is correct for any number of concurrent insertions on each side,
350 producing the same final sequence regardless of application order.
351
352 Non-``InsertOp`` entries and unordered inserts (``position=None``) pass
353 through unchanged.
354
355 Args:
356 ops: The list of ops whose positions need adjustment.
357 other_ops: The concurrent operations from the other branch.
358
359 Returns:
360 A new list with adjusted ``InsertOp``\\s; all other entries are copied
361 unchanged.
362 """
363 # Collect other-side InsertOp positions, grouped by address.
364 other_by_addr: dict[str, list[int]] = {}
365 for op in other_ops:
366 if op["op"] == "insert" and op["position"] is not None:
367 addr = op["address"]
368 if addr not in other_by_addr:
369 other_by_addr[addr] = []
370 other_by_addr[addr].append(op["position"])
371
372 result: list[DomainOp] = []
373 for op in ops:
374 if op["op"] == "insert" and op["position"] is not None:
375 addr = op["address"]
376 pos = op["position"]
377 others = other_by_addr.get(addr, [])
378 shift = sum(1 for p in others if p <= pos)
379 if shift:
380 result.append(
381 InsertOp(
382 op="insert",
383 address=addr,
384 position=pos + shift,
385 content_id=op["content_id"],
386 content_summary=op["content_summary"],
387 )
388 )
389 else:
390 result.append(op)
391 else:
392 result.append(op)
393
394 return result
395
396
397 def merge_op_lists(
398 base_ops: list[DomainOp],
399 ours_ops: list[DomainOp],
400 theirs_ops: list[DomainOp],
401 ) -> MergeOpsResult:
402 """Three-way merge at operation granularity.
403
404 Implements the standard three-way merge algorithm applied to typed domain
405 operations rather than file-path sets. The inputs represent:
406
407 - *base_ops*: operations present in the common ancestor.
408 - *ours_ops*: operations present on our branch (superset of base for
409 kept ops, plus our new additions).
410 - *theirs_ops*: operations present on their branch (same structure).
411
412 Algorithm
413 ---------
414 1. **Kept from base** — ops in base that both sides retained are included
415 unchanged.
416 2. **Consensus additions** — ops added independently by both sides (same
417 key) are included exactly once (idempotent).
418 3. **Exclusive additions** — ops added by only one side enter the
419 commmutativity check:
420
421 - Any pair (ours_exclusive, theirs_exclusive) where
422 :func:`ops_commute` returns ``False`` is recorded as a conflict.
423 - Exclusive additions not involved in any conflict are included in
424 ``merged_ops``, with ``InsertOp`` positions adjusted via
425 :func:`_adjust_insert_positions`.
426
427 Position adjustment note
428 ------------------------
429 The adjusted ``InsertOp`` positions in ``merged_ops`` are *absolute
430 positions in the final merged sequence* — meaning they already account for
431 all insertions from both sides. Callers applying the merged ops to the
432 base state should apply ``InsertOp``\\s in ascending position order to
433 obtain the correct final sequence.
434
435 Args:
436 base_ops: Operations in the common ancestor delta.
437 ours_ops: Operations on our branch.
438 theirs_ops: Operations on their branch.
439
440 Returns:
441 A :class:`MergeOpsResult` with merged and conflicting op lists.
442 """
443 base_key_set = {_op_key(op) for op in base_ops}
444 ours_key_set = {_op_key(op) for op in ours_ops}
445 theirs_key_set = {_op_key(op) for op in theirs_ops}
446
447 # 1. Ops both sides kept from the base.
448 kept: list[DomainOp] = [
449 op
450 for op in base_ops
451 if _op_key(op) in ours_key_set and _op_key(op) in theirs_key_set
452 ]
453
454 # 2. New ops — not present in base.
455 ours_new = [op for op in ours_ops if _op_key(op) not in base_key_set]
456 theirs_new = [op for op in theirs_ops if _op_key(op) not in base_key_set]
457
458 ours_new_keys = {_op_key(op) for op in ours_new}
459 theirs_new_keys = {_op_key(op) for op in theirs_new}
460 consensus_keys = ours_new_keys & theirs_new_keys
461
462 # Consensus additions: both sides added the same op → include once.
463 consensus: list[DomainOp] = [
464 op for op in ours_new if _op_key(op) in consensus_keys
465 ]
466
467 # 3. Each side's exclusive new additions.
468 ours_exclusive = [op for op in ours_new if _op_key(op) not in consensus_keys]
469 theirs_exclusive = [op for op in theirs_new if _op_key(op) not in consensus_keys]
470
471 # Conflict detection: any pair from both sides that does not commute.
472 conflict_ops: list[tuple[DomainOp, DomainOp]] = []
473 conflicting_ours_keys: set[tuple[str, ...]] = set()
474 conflicting_theirs_keys: set[tuple[str, ...]] = set()
475
476 for our_op in ours_exclusive:
477 for their_op in theirs_exclusive:
478 if not ops_commute(our_op, their_op):
479 conflict_ops.append((our_op, their_op))
480 conflicting_ours_keys.add(_op_key(our_op))
481 conflicting_theirs_keys.add(_op_key(their_op))
482
483 # 4. Clean ops: not involved in any conflict.
484 clean_ours = [
485 op for op in ours_exclusive if _op_key(op) not in conflicting_ours_keys
486 ]
487 clean_theirs = [
488 op for op in theirs_exclusive if _op_key(op) not in conflicting_theirs_keys
489 ]
490
491 # 5. Position adjustment using the counting formula.
492 clean_ours_adjusted = _adjust_insert_positions(clean_ours, clean_theirs)
493 clean_theirs_adjusted = _adjust_insert_positions(clean_theirs, clean_ours)
494
495 merged_ops: list[DomainOp] = (
496 list(kept) + list(consensus) + clean_ours_adjusted + clean_theirs_adjusted
497 )
498
499 logger.debug(
500 "merge_op_lists: kept=%d consensus=%d ours=%d theirs=%d conflicts=%d",
501 len(kept),
502 len(consensus),
503 len(clean_ours_adjusted),
504 len(clean_theirs_adjusted),
505 len(conflict_ops),
506 )
507
508 return MergeOpsResult(merged_ops=merged_ops, conflict_ops=conflict_ops)
509
510
511 def merge_structured(
512 base_delta: StructuredDelta,
513 ours_delta: StructuredDelta,
514 theirs_delta: StructuredDelta,
515 ) -> MergeOpsResult:
516 """Merge two structured deltas against a common base delta.
517
518 A convenience wrapper over :func:`merge_op_lists` that accepts
519 :class:`~muse.domain.StructuredDelta` objects directly.
520
521 Args:
522 base_delta: Delta representing the common ancestor's operations.
523 ours_delta: Delta produced by our branch.
524 theirs_delta: Delta produced by their branch.
525
526 Returns:
527 A :class:`MergeOpsResult` describing the merged and conflicting ops.
528 """
529 return merge_op_lists(
530 base_delta["ops"],
531 ours_delta["ops"],
532 theirs_delta["ops"],
533 )