cgcardona / muse public
op_transform.py python
547 lines 21.3 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Operational transformation for Muse domain operations.
2
3 This module implements the commutativity rules and position-adjustment
4 transforms that allow the merge engine to reason over ``DomainOp`` trees
5 rather than file-path sets. The result is sub-file auto-merge: two agents
6 inserting notes at non-overlapping bars never produce a conflict.
7
8 Theory
9 ------
10 Operational Transformation (OT) is the theory behind real-time collaborative
11 editors (Google Docs, VS Code Live Share). The key insight is that two
12 *concurrent* operations — generated independently against the same base state
13 — can be applied in sequence without conflict if and only if they *commute*:
14 applying them in either order yields the same final state.
15
16 For concurrent operations that commute, OT provides a ``transform`` function
17 that adjusts positions so that the result is identical regardless of which
18 operation is applied first.
19
20 Public API
21 ----------
22 - :class:`MergeOpsResult` — structured result of merging two op lists.
23 - :func:`ops_commute` — commmutativity oracle for any two ``DomainOp``\\s.
24 - :func:`transform` — position-adjusted ``(a', b')`` for commuting ops.
25 - :func:`merge_op_lists` — three-way merge at operation granularity.
26
27 Commutativity rules (summary)
28 ------------------------------
29
30 ============================================= =====================================
31 Op A Op B Commute?
32 ============================================= =====================================
33 InsertOp(pos=i) InsertOp(pos=j) Yes — if i ≠ j or both None (unordered)
34 InsertOp(pos=i) InsertOp(pos=i) **No** — positional conflict
35 InsertOp(addr=A) DeleteOp(addr=B) Yes — if A ≠ B (different containers)
36 InsertOp(addr=A) DeleteOp(addr=A) **No** — same container
37 DeleteOp(addr=A) DeleteOp(addr=B) Yes — always (consensus delete is fine)
38 ReplaceOp(addr=A) ReplaceOp(addr=B) Yes — if A ≠ B
39 ReplaceOp(addr=A) ReplaceOp(addr=A) **No** — concurrent value conflict
40 MoveOp(from=i) MoveOp(from=j) Yes — if i ≠ j
41 MoveOp(from=i) DeleteOp(pos=i) **No** — move-delete conflict
42 PatchOp(addr=A) PatchOp(addr=B) Yes — if A ≠ B; recurse if A == B
43 ============================================= =====================================
44
45 Position adjustment
46 -------------------
47 When two ``InsertOp``\\s at different positions commute, the later-applied one
48 must have its position adjusted. Concretely, if *a* inserts at position *i*
49 and *b* inserts at position *j* with *i < j*:
50
51 - Applying *a* first shifts every element at position ≥ i by one; so *b*
52 must be adjusted to *j + 1*.
53 - Applying *b* first does not affect positions < j; so *a* stays at *i*.
54
55 For *merge_op_lists*, positions are adjusted via the **counting formula**: for
56 each InsertOp in one side's exclusive additions, add the count of the other
57 side's InsertOps that have position ≤ this op's position (on the same
58 address). This is correct for any number of concurrent insertions and avoids
59 the cascading adjustment errors that arise from naive sequential pairwise OT.
60
61 Synchronous guarantee
62 ---------------------
63 All functions are synchronous, pure, and allocation-bounded — no I/O, no
64 async, no external state.
65 """
66
67 from __future__ import annotations
68
69 import logging
70 from dataclasses import dataclass, field
71
72 from muse.domain import (
73 DeleteOp,
74 DomainOp,
75 InsertOp,
76 MoveOp,
77 PatchOp,
78 ReplaceOp,
79 StructuredDelta,
80 )
81
82 logger = logging.getLogger(__name__)
83
84
85 # ---------------------------------------------------------------------------
86 # Result type
87 # ---------------------------------------------------------------------------
88
89
90 @dataclass
91 class MergeOpsResult:
92 """Result of a three-way operation-level merge.
93
94 ``merged_ops`` contains the operations from both sides that can be applied
95 to the common ancestor to produce the merged state. Positions in any
96 ``InsertOp`` entries have been adjusted so that the ops can be applied in
97 ascending position order to produce a deterministic result.
98
99 ``conflict_ops`` contains pairs ``(our_op, their_op)`` where the two
100 operations cannot be auto-merged. Each pair must be resolved manually
101 (or via ``.museattributes`` strategy) before the merge can complete.
102
103 ``is_clean`` is ``True`` when ``conflict_ops`` is empty.
104 """
105
106 merged_ops: list[DomainOp] = field(default_factory=list)
107 conflict_ops: list[tuple[DomainOp, DomainOp]] = field(default_factory=list)
108
109 @property
110 def is_clean(self) -> bool:
111 """``True`` when no conflicting operation pairs were found."""
112 return len(self.conflict_ops) == 0
113
114
115 # ---------------------------------------------------------------------------
116 # Internal helpers
117 # ---------------------------------------------------------------------------
118
119
120 def _op_key(op: DomainOp) -> tuple[str, ...]:
121 """Return a hashable key uniquely identifying *op* for set membership tests.
122
123 The key captures all semantically significant fields so that two ops with
124 identical effect produce the same key. This is used to detect consensus
125 operations (both sides added the same op independently).
126 """
127 if op["op"] == "insert":
128 return ("insert", op["address"], str(op["position"]), op["content_id"])
129 if op["op"] == "delete":
130 return ("delete", op["address"], str(op["position"]), op["content_id"])
131 if op["op"] == "move":
132 return (
133 "move",
134 op["address"],
135 str(op["from_position"]),
136 str(op["to_position"]),
137 op["content_id"],
138 )
139 if op["op"] == "replace":
140 return (
141 "replace",
142 op["address"],
143 str(op["position"]),
144 op["old_content_id"],
145 op["new_content_id"],
146 )
147 if op["op"] == "mutate":
148 return ("mutate", op["address"], op["entity_id"], op["old_content_id"], op["new_content_id"])
149 # PatchOp — key on address and child_domain; child_ops are not hashed for
150 # performance reasons. Two patch ops on the same container are treated as
151 # the same "slot" for conflict detection purposes.
152 return ("patch", op["address"], op["child_domain"])
153
154
155 # ---------------------------------------------------------------------------
156 # Commutativity oracle
157 # ---------------------------------------------------------------------------
158
159
160 def ops_commute(a: DomainOp, b: DomainOp) -> bool:
161 """Return ``True`` if operations *a* and *b* commute (are auto-mergeable).
162
163 Two operations commute when applying them in either order produces the
164 same final state. This function implements the commutativity rules table
165 for all 25 op-kind pairs.
166
167 For ``PatchOp`` at the same address, commmutativity is determined
168 recursively by checking all child-op pairs.
169
170 Args:
171 a: First domain operation.
172 b: Second domain operation.
173
174 Returns:
175 ``True`` if the two operations can be safely auto-merged.
176 """
177 # ------------------------------------------------------------------
178 # InsertOp + *
179 # ------------------------------------------------------------------
180 if a["op"] == "insert":
181 if b["op"] == "insert":
182 # Different containers always commute — they are completely independent.
183 if a["address"] != b["address"]:
184 return True
185 a_pos, b_pos = a["position"], b["position"]
186 # Unordered collections (position=None) always commute.
187 if a_pos is None or b_pos is None:
188 return True
189 # Ordered sequences within the same container: conflict only at equal positions.
190 return a_pos != b_pos
191 if b["op"] == "delete":
192 # Conservative: inserts and deletes at the same container conflict.
193 return a["address"] != b["address"]
194 if b["op"] == "move":
195 return a["address"] != b["address"]
196 if b["op"] == "replace":
197 return a["address"] != b["address"]
198 # b is PatchOp (exhaustion of DeleteOp | MoveOp | ReplaceOp | PatchOp)
199 return a["address"] != b["address"]
200
201 # ------------------------------------------------------------------
202 # DeleteOp + *
203 # ------------------------------------------------------------------
204 if a["op"] == "delete":
205 if b["op"] == "insert":
206 return a["address"] != b["address"]
207 if b["op"] == "delete":
208 # Consensus delete (same or different address) always commutes.
209 # Two branches that both removed the same element produce the same
210 # result: the element is absent.
211 return True
212 if b["op"] == "move":
213 # Conflict if the delete's position matches the move's source.
214 a_pos = a["position"]
215 if a_pos is None:
216 return True # unordered collection: no positional conflict
217 return a_pos != b["from_position"]
218 if b["op"] == "replace":
219 return a["address"] != b["address"]
220 # b is PatchOp
221 return a["address"] != b["address"]
222
223 # ------------------------------------------------------------------
224 # MoveOp + *
225 # ------------------------------------------------------------------
226 if a["op"] == "move":
227 if b["op"] == "insert":
228 return a["address"] != b["address"]
229 if b["op"] == "delete":
230 b_pos = b["position"]
231 if b_pos is None:
232 return True
233 return a["from_position"] != b_pos
234 if b["op"] == "move":
235 # Two moves from different source positions commute.
236 return a["from_position"] != b["from_position"]
237 if b["op"] == "replace":
238 return a["address"] != b["address"]
239 # b is PatchOp
240 return a["address"] != b["address"]
241
242 # ------------------------------------------------------------------
243 # ReplaceOp + *
244 # ------------------------------------------------------------------
245 if a["op"] == "replace":
246 if b["op"] == "insert":
247 return a["address"] != b["address"]
248 if b["op"] == "delete":
249 return a["address"] != b["address"]
250 if b["op"] == "move":
251 return a["address"] != b["address"]
252 if b["op"] == "replace":
253 # Two replaces at the same address conflict (concurrent value change).
254 return a["address"] != b["address"]
255 # b is PatchOp
256 return a["address"] != b["address"]
257
258 # ------------------------------------------------------------------
259 # MutateOp + * (a["op"] == "mutate" commutes with everything at a
260 # different address; same-entity concurrent mutations conflict)
261 # ------------------------------------------------------------------
262 if a["op"] == "mutate":
263 if b["op"] == "mutate":
264 return a["entity_id"] != b["entity_id"]
265 return a["address"] != b["address"]
266
267 # ------------------------------------------------------------------
268 # PatchOp + * (a["op"] == "patch" after all checks above)
269 # ------------------------------------------------------------------
270 if b["op"] == "insert":
271 return a["address"] != b["address"]
272 if b["op"] == "delete":
273 return a["address"] != b["address"]
274 if b["op"] == "move":
275 return a["address"] != b["address"]
276 if b["op"] == "replace":
277 return a["address"] != b["address"]
278 if b["op"] == "mutate":
279 return a["address"] != b["address"]
280 # b is PatchOp
281 if a["address"] != b["address"]:
282 return True
283 # Same address: recurse into child ops — all child pairs must commute.
284 for child_a in a["child_ops"]:
285 for child_b in b["child_ops"]:
286 if not ops_commute(child_a, child_b):
287 return False
288 return True
289
290
291 # ---------------------------------------------------------------------------
292 # OT transform
293 # ---------------------------------------------------------------------------
294
295
296 def transform(a: DomainOp, b: DomainOp) -> tuple[DomainOp, DomainOp]:
297 """Return ``(a', b')`` such that ``apply(apply(base, a), b') == apply(apply(base, b), a')``.
298
299 This is the core OT transform function. It should only be called when
300 :func:`ops_commute` has confirmed that *a* and *b* commute. For all
301 commuting pairs except ordered InsertOp+InsertOp, the identity transform
302 is returned — the operations do not interfere with each other's positions.
303
304 For the InsertOp+InsertOp case with integer positions (the most common
305 case in practice), positions are adjusted so the diamond property holds:
306 the same final sequence is produced regardless of application order.
307
308 Args:
309 a: First domain operation.
310 b: Second domain operation (must commute with *a*).
311
312 Returns:
313 A tuple ``(a', b')`` where:
314
315 - *a'* is the version of *a* to apply when *b* has already been applied.
316 - *b'* is the version of *b* to apply when *a* has already been applied.
317 """
318 if a["op"] == "insert" and b["op"] == "insert":
319 a_pos, b_pos = a["position"], b["position"]
320 if a_pos is not None and b_pos is not None and a_pos != b_pos:
321 if a_pos < b_pos:
322 # a inserts before b's original position → b shifts up by 1.
323 b_prime = InsertOp(
324 op="insert",
325 address=b["address"],
326 position=b_pos + 1,
327 content_id=b["content_id"],
328 content_summary=b["content_summary"],
329 )
330 return a, b_prime
331 else:
332 # b inserts before a's original position → a shifts up by 1.
333 a_prime = InsertOp(
334 op="insert",
335 address=a["address"],
336 position=a_pos + 1,
337 content_id=a["content_id"],
338 content_summary=a["content_summary"],
339 )
340 return a_prime, b
341
342 # All other commuting pairs: identity transform.
343 return a, b
344
345
346 # ---------------------------------------------------------------------------
347 # Three-way merge at operation granularity
348 # ---------------------------------------------------------------------------
349
350
351 def _adjust_insert_positions(
352 ops: list[DomainOp],
353 other_ops: list[DomainOp],
354 ) -> list[DomainOp]:
355 """Adjust ``InsertOp`` positions in *ops* to account for *other_ops*.
356
357 For each ``InsertOp`` with a non-``None`` position in *ops*, the adjusted
358 position is ``original_position + count`` where ``count`` is the number of
359 ``InsertOp``\\s in *other_ops* that share the same ``address`` and have
360 ``position ≤ original_position``.
361
362 This implements the *counting formula* for multi-op position adjustment.
363 It is correct for any number of concurrent insertions on each side,
364 producing the same final sequence regardless of application order.
365
366 Non-``InsertOp`` entries and unordered inserts (``position=None``) pass
367 through unchanged.
368
369 Args:
370 ops: The list of ops whose positions need adjustment.
371 other_ops: The concurrent operations from the other branch.
372
373 Returns:
374 A new list with adjusted ``InsertOp``\\s; all other entries are copied
375 unchanged.
376 """
377 # Collect other-side InsertOp positions, grouped by address.
378 other_by_addr: dict[str, list[int]] = {}
379 for op in other_ops:
380 if op["op"] == "insert" and op["position"] is not None:
381 addr = op["address"]
382 if addr not in other_by_addr:
383 other_by_addr[addr] = []
384 other_by_addr[addr].append(op["position"])
385
386 result: list[DomainOp] = []
387 for op in ops:
388 if op["op"] == "insert" and op["position"] is not None:
389 addr = op["address"]
390 pos = op["position"]
391 others = other_by_addr.get(addr, [])
392 shift = sum(1 for p in others if p <= pos)
393 if shift:
394 result.append(
395 InsertOp(
396 op="insert",
397 address=addr,
398 position=pos + shift,
399 content_id=op["content_id"],
400 content_summary=op["content_summary"],
401 )
402 )
403 else:
404 result.append(op)
405 else:
406 result.append(op)
407
408 return result
409
410
411 def merge_op_lists(
412 base_ops: list[DomainOp],
413 ours_ops: list[DomainOp],
414 theirs_ops: list[DomainOp],
415 ) -> MergeOpsResult:
416 """Three-way merge at operation granularity.
417
418 Implements the standard three-way merge algorithm applied to typed domain
419 operations rather than file-path sets. The inputs represent:
420
421 - *base_ops*: operations present in the common ancestor.
422 - *ours_ops*: operations present on our branch (superset of base for
423 kept ops, plus our new additions).
424 - *theirs_ops*: operations present on their branch (same structure).
425
426 Algorithm
427 ---------
428 1. **Kept from base** — ops in base that both sides retained are included
429 unchanged.
430 2. **Consensus additions** — ops added independently by both sides (same
431 key) are included exactly once (idempotent).
432 3. **Exclusive additions** — ops added by only one side enter the
433 commmutativity check:
434
435 - Any pair (ours_exclusive, theirs_exclusive) where
436 :func:`ops_commute` returns ``False`` is recorded as a conflict.
437 - Exclusive additions not involved in any conflict are included in
438 ``merged_ops``, with ``InsertOp`` positions adjusted via
439 :func:`_adjust_insert_positions`.
440
441 Position adjustment note
442 ------------------------
443 The adjusted ``InsertOp`` positions in ``merged_ops`` are *absolute
444 positions in the final merged sequence* — meaning they already account for
445 all insertions from both sides. Callers applying the merged ops to the
446 base state should apply ``InsertOp``\\s in ascending position order to
447 obtain the correct final sequence.
448
449 Args:
450 base_ops: Operations in the common ancestor delta.
451 ours_ops: Operations on our branch.
452 theirs_ops: Operations on their branch.
453
454 Returns:
455 A :class:`MergeOpsResult` with merged and conflicting op lists.
456 """
457 base_key_set = {_op_key(op) for op in base_ops}
458 ours_key_set = {_op_key(op) for op in ours_ops}
459 theirs_key_set = {_op_key(op) for op in theirs_ops}
460
461 # 1. Ops both sides kept from the base.
462 kept: list[DomainOp] = [
463 op
464 for op in base_ops
465 if _op_key(op) in ours_key_set and _op_key(op) in theirs_key_set
466 ]
467
468 # 2. New ops — not present in base.
469 ours_new = [op for op in ours_ops if _op_key(op) not in base_key_set]
470 theirs_new = [op for op in theirs_ops if _op_key(op) not in base_key_set]
471
472 ours_new_keys = {_op_key(op) for op in ours_new}
473 theirs_new_keys = {_op_key(op) for op in theirs_new}
474 consensus_keys = ours_new_keys & theirs_new_keys
475
476 # Consensus additions: both sides added the same op → include once.
477 consensus: list[DomainOp] = [
478 op for op in ours_new if _op_key(op) in consensus_keys
479 ]
480
481 # 3. Each side's exclusive new additions.
482 ours_exclusive = [op for op in ours_new if _op_key(op) not in consensus_keys]
483 theirs_exclusive = [op for op in theirs_new if _op_key(op) not in consensus_keys]
484
485 # Conflict detection: any pair from both sides that does not commute.
486 conflict_ops: list[tuple[DomainOp, DomainOp]] = []
487 conflicting_ours_keys: set[tuple[str, ...]] = set()
488 conflicting_theirs_keys: set[tuple[str, ...]] = set()
489
490 for our_op in ours_exclusive:
491 for their_op in theirs_exclusive:
492 if not ops_commute(our_op, their_op):
493 conflict_ops.append((our_op, their_op))
494 conflicting_ours_keys.add(_op_key(our_op))
495 conflicting_theirs_keys.add(_op_key(their_op))
496
497 # 4. Clean ops: not involved in any conflict.
498 clean_ours = [
499 op for op in ours_exclusive if _op_key(op) not in conflicting_ours_keys
500 ]
501 clean_theirs = [
502 op for op in theirs_exclusive if _op_key(op) not in conflicting_theirs_keys
503 ]
504
505 # 5. Position adjustment using the counting formula.
506 clean_ours_adjusted = _adjust_insert_positions(clean_ours, clean_theirs)
507 clean_theirs_adjusted = _adjust_insert_positions(clean_theirs, clean_ours)
508
509 merged_ops: list[DomainOp] = (
510 list(kept) + list(consensus) + clean_ours_adjusted + clean_theirs_adjusted
511 )
512
513 logger.debug(
514 "merge_op_lists: kept=%d consensus=%d ours=%d theirs=%d conflicts=%d",
515 len(kept),
516 len(consensus),
517 len(clean_ours_adjusted),
518 len(clean_theirs_adjusted),
519 len(conflict_ops),
520 )
521
522 return MergeOpsResult(merged_ops=merged_ops, conflict_ops=conflict_ops)
523
524
525 def merge_structured(
526 base_delta: StructuredDelta,
527 ours_delta: StructuredDelta,
528 theirs_delta: StructuredDelta,
529 ) -> MergeOpsResult:
530 """Merge two structured deltas against a common base delta.
531
532 A convenience wrapper over :func:`merge_op_lists` that accepts
533 :class:`~muse.domain.StructuredDelta` objects directly.
534
535 Args:
536 base_delta: Delta representing the common ancestor's operations.
537 ours_delta: Delta produced by our branch.
538 theirs_delta: Delta produced by their branch.
539
540 Returns:
541 A :class:`MergeOpsResult` describing the merged and conflicting ops.
542 """
543 return merge_op_lists(
544 base_delta["ops"],
545 ours_delta["ops"],
546 theirs_delta["ops"],
547 )