gabriel / muse public
op_transform.py python
546 lines 21.3 KB
6d8ca4ac feat: god-tier MIDI dimension expansion + full supercharge architecture Gabriel Cardona <gabriel@tellurstori.com> 5d ago
1 """Operational transformation for Muse domain operations.
2
3 This module implements the commutativity rules and position-adjustment
4 transforms that allow the merge engine to reason over ``DomainOp`` trees
5 rather than file-path sets. The result is sub-file auto-merge: two agents
6 inserting notes at non-overlapping bars never produce a conflict.
7
8 Theory
9 ------
10 Operational Transformation (OT) is the theory behind real-time collaborative
11 editors (Google Docs, VS Code Live Share). The key insight is that two
12 *concurrent* operations — generated independently against the same base state
13 — can be applied in sequence without conflict if and only if they *commute*:
14 applying them in either order yields the same final state.
15
16 For concurrent operations that commute, OT provides a ``transform`` function
17 that adjusts positions so that the result is identical regardless of which
18 operation is applied first.
19
20 Public API
21 ----------
22 - :class:`MergeOpsResult` — structured result of merging two op lists.
23 - :func:`ops_commute` — commmutativity oracle for any two ``DomainOp``\\s.
24 - :func:`transform` — position-adjusted ``(a', b')`` for commuting ops.
25 - :func:`merge_op_lists` — three-way merge at operation granularity.
26
27 Commutativity rules (summary)
28 ------------------------------
29
30 ============================================= =====================================
31 Op A Op B Commute?
32 ============================================= =====================================
33 InsertOp(pos=i) InsertOp(pos=j) Yes — if i ≠ j or both None (unordered)
34 InsertOp(pos=i) InsertOp(pos=i) **No** — positional conflict
35 InsertOp(addr=A) DeleteOp(addr=B) Yes — if A ≠ B (different containers)
36 InsertOp(addr=A) DeleteOp(addr=A) **No** — same container
37 DeleteOp(addr=A) DeleteOp(addr=B) Yes — always (consensus delete is fine)
38 ReplaceOp(addr=A) ReplaceOp(addr=B) Yes — if A ≠ B
39 ReplaceOp(addr=A) ReplaceOp(addr=A) **No** — concurrent value conflict
40 MoveOp(from=i) MoveOp(from=j) Yes — if i ≠ j
41 MoveOp(from=i) DeleteOp(pos=i) **No** — move-delete conflict
42 PatchOp(addr=A) PatchOp(addr=B) Yes — if A ≠ B; recurse if A == B
43 ============================================= =====================================
44
45 Position adjustment
46 -------------------
47 When two ``InsertOp``\\s at different positions commute, the later-applied one
48 must have its position adjusted. Concretely, if *a* inserts at position *i*
49 and *b* inserts at position *j* with *i < j*:
50
51 - Applying *a* first shifts every element at position ≥ i by one; so *b*
52 must be adjusted to *j + 1*.
53 - Applying *b* first does not affect positions < j; so *a* stays at *i*.
54
55 For *merge_op_lists*, positions are adjusted via the **counting formula**: for
56 each InsertOp in one side's exclusive additions, add the count of the other
57 side's InsertOps that have position ≤ this op's position (on the same
58 address). This is correct for any number of concurrent insertions and avoids
59 the cascading adjustment errors that arise from naive sequential pairwise OT.
60
61 Synchronous guarantee
62 ---------------------
63 All functions are synchronous, pure, and allocation-bounded — no I/O, no
64 async, no external state.
65 """
66 from __future__ import annotations
67
68 import logging
69 from dataclasses import dataclass, field
70
71 from muse.domain import (
72 DeleteOp,
73 DomainOp,
74 InsertOp,
75 MoveOp,
76 PatchOp,
77 ReplaceOp,
78 StructuredDelta,
79 )
80
81 logger = logging.getLogger(__name__)
82
83
84 # ---------------------------------------------------------------------------
85 # Result type
86 # ---------------------------------------------------------------------------
87
88
89 @dataclass
90 class MergeOpsResult:
91 """Result of a three-way operation-level merge.
92
93 ``merged_ops`` contains the operations from both sides that can be applied
94 to the common ancestor to produce the merged state. Positions in any
95 ``InsertOp`` entries have been adjusted so that the ops can be applied in
96 ascending position order to produce a deterministic result.
97
98 ``conflict_ops`` contains pairs ``(our_op, their_op)`` where the two
99 operations cannot be auto-merged. Each pair must be resolved manually
100 (or via ``.museattributes`` strategy) before the merge can complete.
101
102 ``is_clean`` is ``True`` when ``conflict_ops`` is empty.
103 """
104
105 merged_ops: list[DomainOp] = field(default_factory=list)
106 conflict_ops: list[tuple[DomainOp, DomainOp]] = field(default_factory=list)
107
108 @property
109 def is_clean(self) -> bool:
110 """``True`` when no conflicting operation pairs were found."""
111 return len(self.conflict_ops) == 0
112
113
114 # ---------------------------------------------------------------------------
115 # Internal helpers
116 # ---------------------------------------------------------------------------
117
118
119 def _op_key(op: DomainOp) -> tuple[str, ...]:
120 """Return a hashable key uniquely identifying *op* for set membership tests.
121
122 The key captures all semantically significant fields so that two ops with
123 identical effect produce the same key. This is used to detect consensus
124 operations (both sides added the same op independently).
125 """
126 if op["op"] == "insert":
127 return ("insert", op["address"], str(op["position"]), op["content_id"])
128 if op["op"] == "delete":
129 return ("delete", op["address"], str(op["position"]), op["content_id"])
130 if op["op"] == "move":
131 return (
132 "move",
133 op["address"],
134 str(op["from_position"]),
135 str(op["to_position"]),
136 op["content_id"],
137 )
138 if op["op"] == "replace":
139 return (
140 "replace",
141 op["address"],
142 str(op["position"]),
143 op["old_content_id"],
144 op["new_content_id"],
145 )
146 if op["op"] == "mutate":
147 return ("mutate", op["address"], op["entity_id"], op["old_content_id"], op["new_content_id"])
148 # PatchOp — key on address and child_domain; child_ops are not hashed for
149 # performance reasons. Two patch ops on the same container are treated as
150 # the same "slot" for conflict detection purposes.
151 return ("patch", op["address"], op["child_domain"])
152
153
154 # ---------------------------------------------------------------------------
155 # Commutativity oracle
156 # ---------------------------------------------------------------------------
157
158
159 def ops_commute(a: DomainOp, b: DomainOp) -> bool:
160 """Return ``True`` if operations *a* and *b* commute (are auto-mergeable).
161
162 Two operations commute when applying them in either order produces the
163 same final state. This function implements the commutativity rules table
164 for all 25 op-kind pairs.
165
166 For ``PatchOp`` at the same address, commmutativity is determined
167 recursively by checking all child-op pairs.
168
169 Args:
170 a: First domain operation.
171 b: Second domain operation.
172
173 Returns:
174 ``True`` if the two operations can be safely auto-merged.
175 """
176 # ------------------------------------------------------------------
177 # InsertOp + *
178 # ------------------------------------------------------------------
179 if a["op"] == "insert":
180 if b["op"] == "insert":
181 # Different containers always commute — they are completely independent.
182 if a["address"] != b["address"]:
183 return True
184 a_pos, b_pos = a["position"], b["position"]
185 # Unordered collections (position=None) always commute.
186 if a_pos is None or b_pos is None:
187 return True
188 # Ordered sequences within the same container: conflict only at equal positions.
189 return a_pos != b_pos
190 if b["op"] == "delete":
191 # Conservative: inserts and deletes at the same container conflict.
192 return a["address"] != b["address"]
193 if b["op"] == "move":
194 return a["address"] != b["address"]
195 if b["op"] == "replace":
196 return a["address"] != b["address"]
197 # b is PatchOp (exhaustion of DeleteOp | MoveOp | ReplaceOp | PatchOp)
198 return a["address"] != b["address"]
199
200 # ------------------------------------------------------------------
201 # DeleteOp + *
202 # ------------------------------------------------------------------
203 if a["op"] == "delete":
204 if b["op"] == "insert":
205 return a["address"] != b["address"]
206 if b["op"] == "delete":
207 # Consensus delete (same or different address) always commutes.
208 # Two branches that both removed the same element produce the same
209 # result: the element is absent.
210 return True
211 if b["op"] == "move":
212 # Conflict if the delete's position matches the move's source.
213 a_pos = a["position"]
214 if a_pos is None:
215 return True # unordered collection: no positional conflict
216 return a_pos != b["from_position"]
217 if b["op"] == "replace":
218 return a["address"] != b["address"]
219 # b is PatchOp
220 return a["address"] != b["address"]
221
222 # ------------------------------------------------------------------
223 # MoveOp + *
224 # ------------------------------------------------------------------
225 if a["op"] == "move":
226 if b["op"] == "insert":
227 return a["address"] != b["address"]
228 if b["op"] == "delete":
229 b_pos = b["position"]
230 if b_pos is None:
231 return True
232 return a["from_position"] != b_pos
233 if b["op"] == "move":
234 # Two moves from different source positions commute.
235 return a["from_position"] != b["from_position"]
236 if b["op"] == "replace":
237 return a["address"] != b["address"]
238 # b is PatchOp
239 return a["address"] != b["address"]
240
241 # ------------------------------------------------------------------
242 # ReplaceOp + *
243 # ------------------------------------------------------------------
244 if a["op"] == "replace":
245 if b["op"] == "insert":
246 return a["address"] != b["address"]
247 if b["op"] == "delete":
248 return a["address"] != b["address"]
249 if b["op"] == "move":
250 return a["address"] != b["address"]
251 if b["op"] == "replace":
252 # Two replaces at the same address conflict (concurrent value change).
253 return a["address"] != b["address"]
254 # b is PatchOp
255 return a["address"] != b["address"]
256
257 # ------------------------------------------------------------------
258 # MutateOp + * (a["op"] == "mutate" commutes with everything at a
259 # different address; same-entity concurrent mutations conflict)
260 # ------------------------------------------------------------------
261 if a["op"] == "mutate":
262 if b["op"] == "mutate":
263 return a["entity_id"] != b["entity_id"]
264 return a["address"] != b["address"]
265
266 # ------------------------------------------------------------------
267 # PatchOp + * (a["op"] == "patch" after all checks above)
268 # ------------------------------------------------------------------
269 if b["op"] == "insert":
270 return a["address"] != b["address"]
271 if b["op"] == "delete":
272 return a["address"] != b["address"]
273 if b["op"] == "move":
274 return a["address"] != b["address"]
275 if b["op"] == "replace":
276 return a["address"] != b["address"]
277 if b["op"] == "mutate":
278 return a["address"] != b["address"]
279 # b is PatchOp
280 if a["address"] != b["address"]:
281 return True
282 # Same address: recurse into child ops — all child pairs must commute.
283 for child_a in a["child_ops"]:
284 for child_b in b["child_ops"]:
285 if not ops_commute(child_a, child_b):
286 return False
287 return True
288
289
290 # ---------------------------------------------------------------------------
291 # OT transform
292 # ---------------------------------------------------------------------------
293
294
295 def transform(a: DomainOp, b: DomainOp) -> tuple[DomainOp, DomainOp]:
296 """Return ``(a', b')`` such that ``apply(apply(base, a), b') == apply(apply(base, b), a')``.
297
298 This is the core OT transform function. It should only be called when
299 :func:`ops_commute` has confirmed that *a* and *b* commute. For all
300 commuting pairs except ordered InsertOp+InsertOp, the identity transform
301 is returned — the operations do not interfere with each other's positions.
302
303 For the InsertOp+InsertOp case with integer positions (the most common
304 case in practice), positions are adjusted so the diamond property holds:
305 the same final sequence is produced regardless of application order.
306
307 Args:
308 a: First domain operation.
309 b: Second domain operation (must commute with *a*).
310
311 Returns:
312 A tuple ``(a', b')`` where:
313
314 - *a'* is the version of *a* to apply when *b* has already been applied.
315 - *b'* is the version of *b* to apply when *a* has already been applied.
316 """
317 if a["op"] == "insert" and b["op"] == "insert":
318 a_pos, b_pos = a["position"], b["position"]
319 if a_pos is not None and b_pos is not None and a_pos != b_pos:
320 if a_pos < b_pos:
321 # a inserts before b's original position → b shifts up by 1.
322 b_prime = InsertOp(
323 op="insert",
324 address=b["address"],
325 position=b_pos + 1,
326 content_id=b["content_id"],
327 content_summary=b["content_summary"],
328 )
329 return a, b_prime
330 else:
331 # b inserts before a's original position → a shifts up by 1.
332 a_prime = InsertOp(
333 op="insert",
334 address=a["address"],
335 position=a_pos + 1,
336 content_id=a["content_id"],
337 content_summary=a["content_summary"],
338 )
339 return a_prime, b
340
341 # All other commuting pairs: identity transform.
342 return a, b
343
344
345 # ---------------------------------------------------------------------------
346 # Three-way merge at operation granularity
347 # ---------------------------------------------------------------------------
348
349
350 def _adjust_insert_positions(
351 ops: list[DomainOp],
352 other_ops: list[DomainOp],
353 ) -> list[DomainOp]:
354 """Adjust ``InsertOp`` positions in *ops* to account for *other_ops*.
355
356 For each ``InsertOp`` with a non-``None`` position in *ops*, the adjusted
357 position is ``original_position + count`` where ``count`` is the number of
358 ``InsertOp``\\s in *other_ops* that share the same ``address`` and have
359 ``position ≤ original_position``.
360
361 This implements the *counting formula* for multi-op position adjustment.
362 It is correct for any number of concurrent insertions on each side,
363 producing the same final sequence regardless of application order.
364
365 Non-``InsertOp`` entries and unordered inserts (``position=None``) pass
366 through unchanged.
367
368 Args:
369 ops: The list of ops whose positions need adjustment.
370 other_ops: The concurrent operations from the other branch.
371
372 Returns:
373 A new list with adjusted ``InsertOp``\\s; all other entries are copied
374 unchanged.
375 """
376 # Collect other-side InsertOp positions, grouped by address.
377 other_by_addr: dict[str, list[int]] = {}
378 for op in other_ops:
379 if op["op"] == "insert" and op["position"] is not None:
380 addr = op["address"]
381 if addr not in other_by_addr:
382 other_by_addr[addr] = []
383 other_by_addr[addr].append(op["position"])
384
385 result: list[DomainOp] = []
386 for op in ops:
387 if op["op"] == "insert" and op["position"] is not None:
388 addr = op["address"]
389 pos = op["position"]
390 others = other_by_addr.get(addr, [])
391 shift = sum(1 for p in others if p <= pos)
392 if shift:
393 result.append(
394 InsertOp(
395 op="insert",
396 address=addr,
397 position=pos + shift,
398 content_id=op["content_id"],
399 content_summary=op["content_summary"],
400 )
401 )
402 else:
403 result.append(op)
404 else:
405 result.append(op)
406
407 return result
408
409
410 def merge_op_lists(
411 base_ops: list[DomainOp],
412 ours_ops: list[DomainOp],
413 theirs_ops: list[DomainOp],
414 ) -> MergeOpsResult:
415 """Three-way merge at operation granularity.
416
417 Implements the standard three-way merge algorithm applied to typed domain
418 operations rather than file-path sets. The inputs represent:
419
420 - *base_ops*: operations present in the common ancestor.
421 - *ours_ops*: operations present on our branch (superset of base for
422 kept ops, plus our new additions).
423 - *theirs_ops*: operations present on their branch (same structure).
424
425 Algorithm
426 ---------
427 1. **Kept from base** — ops in base that both sides retained are included
428 unchanged.
429 2. **Consensus additions** — ops added independently by both sides (same
430 key) are included exactly once (idempotent).
431 3. **Exclusive additions** — ops added by only one side enter the
432 commmutativity check:
433
434 - Any pair (ours_exclusive, theirs_exclusive) where
435 :func:`ops_commute` returns ``False`` is recorded as a conflict.
436 - Exclusive additions not involved in any conflict are included in
437 ``merged_ops``, with ``InsertOp`` positions adjusted via
438 :func:`_adjust_insert_positions`.
439
440 Position adjustment note
441 ------------------------
442 The adjusted ``InsertOp`` positions in ``merged_ops`` are *absolute
443 positions in the final merged sequence* — meaning they already account for
444 all insertions from both sides. Callers applying the merged ops to the
445 base state should apply ``InsertOp``\\s in ascending position order to
446 obtain the correct final sequence.
447
448 Args:
449 base_ops: Operations in the common ancestor delta.
450 ours_ops: Operations on our branch.
451 theirs_ops: Operations on their branch.
452
453 Returns:
454 A :class:`MergeOpsResult` with merged and conflicting op lists.
455 """
456 base_key_set = {_op_key(op) for op in base_ops}
457 ours_key_set = {_op_key(op) for op in ours_ops}
458 theirs_key_set = {_op_key(op) for op in theirs_ops}
459
460 # 1. Ops both sides kept from the base.
461 kept: list[DomainOp] = [
462 op
463 for op in base_ops
464 if _op_key(op) in ours_key_set and _op_key(op) in theirs_key_set
465 ]
466
467 # 2. New ops — not present in base.
468 ours_new = [op for op in ours_ops if _op_key(op) not in base_key_set]
469 theirs_new = [op for op in theirs_ops if _op_key(op) not in base_key_set]
470
471 ours_new_keys = {_op_key(op) for op in ours_new}
472 theirs_new_keys = {_op_key(op) for op in theirs_new}
473 consensus_keys = ours_new_keys & theirs_new_keys
474
475 # Consensus additions: both sides added the same op → include once.
476 consensus: list[DomainOp] = [
477 op for op in ours_new if _op_key(op) in consensus_keys
478 ]
479
480 # 3. Each side's exclusive new additions.
481 ours_exclusive = [op for op in ours_new if _op_key(op) not in consensus_keys]
482 theirs_exclusive = [op for op in theirs_new if _op_key(op) not in consensus_keys]
483
484 # Conflict detection: any pair from both sides that does not commute.
485 conflict_ops: list[tuple[DomainOp, DomainOp]] = []
486 conflicting_ours_keys: set[tuple[str, ...]] = set()
487 conflicting_theirs_keys: set[tuple[str, ...]] = set()
488
489 for our_op in ours_exclusive:
490 for their_op in theirs_exclusive:
491 if not ops_commute(our_op, their_op):
492 conflict_ops.append((our_op, their_op))
493 conflicting_ours_keys.add(_op_key(our_op))
494 conflicting_theirs_keys.add(_op_key(their_op))
495
496 # 4. Clean ops: not involved in any conflict.
497 clean_ours = [
498 op for op in ours_exclusive if _op_key(op) not in conflicting_ours_keys
499 ]
500 clean_theirs = [
501 op for op in theirs_exclusive if _op_key(op) not in conflicting_theirs_keys
502 ]
503
504 # 5. Position adjustment using the counting formula.
505 clean_ours_adjusted = _adjust_insert_positions(clean_ours, clean_theirs)
506 clean_theirs_adjusted = _adjust_insert_positions(clean_theirs, clean_ours)
507
508 merged_ops: list[DomainOp] = (
509 list(kept) + list(consensus) + clean_ours_adjusted + clean_theirs_adjusted
510 )
511
512 logger.debug(
513 "merge_op_lists: kept=%d consensus=%d ours=%d theirs=%d conflicts=%d",
514 len(kept),
515 len(consensus),
516 len(clean_ours_adjusted),
517 len(clean_theirs_adjusted),
518 len(conflict_ops),
519 )
520
521 return MergeOpsResult(merged_ops=merged_ops, conflict_ops=conflict_ops)
522
523
524 def merge_structured(
525 base_delta: StructuredDelta,
526 ours_delta: StructuredDelta,
527 theirs_delta: StructuredDelta,
528 ) -> MergeOpsResult:
529 """Merge two structured deltas against a common base delta.
530
531 A convenience wrapper over :func:`merge_op_lists` that accepts
532 :class:`~muse.domain.StructuredDelta` objects directly.
533
534 Args:
535 base_delta: Delta representing the common ancestor's operations.
536 ours_delta: Delta produced by our branch.
537 theirs_delta: Delta produced by their branch.
538
539 Returns:
540 A :class:`MergeOpsResult` describing the merged and conflicting ops.
541 """
542 return merge_op_lists(
543 base_delta["ops"],
544 ours_delta["ops"],
545 theirs_delta["ops"],
546 )