gabriel / muse public
_crdt_notes.py python
407 lines 14.5 KB
9ee9c39c refactor: rename music→midi domain, strip all 5-dim backward compat Gabriel Cardona <gabriel@tellurstori.com> 5d ago
1 """Voice-aware Music RGA — experimental CRDT for live MIDI note sequences.
2
3 This module is a **research prototype** for the live collaboration foundation
4 described in the Muse supercharge plan. It is NOT wired into the production
5 merge path. Its purpose is to:
6
7 1. Demonstrate that concurrent note insertions can be made commutative.
8 2. Provide a benchmark harness for comparing voice-aware RGA against LSEQ.
9 3. Serve as the implementation foundation for the eventual live collaboration
10 layer (Workstream 7 / 8 in the plan).
11
12 Design
13 ------
14 Standard RGA (Roh 2011) orders concurrent insertions at the same position
15 lexicographically by op_id. For music this produces unacceptable results:
16 two agents inserting bass and soprano notes at the same beat would interleave
17 their pitches arbitrarily, producing nonsense voice crossings.
18
19 **Music-RGA** uses a multi-key position ordering:
20
21 NotePosition = (measure, beat_sub, voice_lane, op_id)
22
23 Concurrent insertions at the same ``(measure, beat_sub)`` are ordered by
24 ``voice_lane`` first (bass=0 < tenor=1 < alto=2 < soprano=3), then by
25 ``op_id`` as a tie-break. This guarantees that bass notes always precede
26 treble notes in the materialised sequence regardless of insertion order.
27
28 Voice lane assignment
29 ---------------------
30 Voice lane is determined from the note's pitch at insert time using a
31 coarse tessiture model:
32
33 pitch < 48 → 0 (bass)
34 48 ≤ pitch < 60 → 1 (tenor)
35 60 ≤ pitch < 72 → 2 (alto)
36 pitch ≥ 72 → 3 (soprano)
37
38 Agents that perform explicit voice separation can override ``voice_lane``
39 when calling :meth:`MusicRGA.insert`.
40
41 CRDT properties
42 ---------------
43 The three lattice laws are demonstrated by :func:`_verify_crdt_laws` in the
44 test suite:
45
46 1. **Commutativity**: ``merge(a, b).to_sequence() == merge(b, a).to_sequence()``
47 2. **Associativity**: ``merge(merge(a, b), c) == merge(a, merge(b, c))``
48 3. **Idempotency**: ``merge(a, a).to_sequence() == a.to_sequence()``
49
50 Relationship to the commit DAG
51 -------------------------------
52 A live session accumulates :class:`RGANoteEntry` operations. At commit time,
53 :meth:`MusicRGA.to_domain_ops` translates the CRDT state into canonical Muse
54 :class:`~muse.domain.DomainOp` entries for storage in the commit record.
55 The CRDT state itself is ephemeral — not stored in the object store.
56
57 Public API
58 ----------
59 - :class:`NotePosition` — music-aware position key (NamedTuple).
60 - :class:`RGANoteEntry` — one element in the RGA (TypedDict).
61 - :class:`MusicRGA` — the voice-aware ordered note sequence CRDT.
62 """
63 from __future__ import annotations
64
65 import logging
66 import uuid as _uuid_mod
67 from typing import NamedTuple, TypedDict
68
69 from muse.domain import DeleteOp, DomainOp, InsertOp
70 from muse.plugins.midi.midi_diff import NoteKey, _note_content_id, _note_summary
71
72 logger = logging.getLogger(__name__)
73
74 # ---------------------------------------------------------------------------
75 # Music-aware position key
76 # ---------------------------------------------------------------------------
77
78
79 class NotePosition(NamedTuple):
80 """Multi-key position for voice-aware ordering in the Music RGA.
81
82 Fields are ordered by comparison priority:
83
84 ``measure``
85 1-indexed bar number. Notes in earlier bars always precede notes
86 in later bars.
87 ``beat_sub``
88 Tick offset within the bar. Lower onset first within the same bar.
89 ``voice_lane``
90 Voice stream: 0=bass, 1=tenor, 2=alto, 3=soprano, 4+=auxiliary.
91 At the same ``(measure, beat_sub)``, lower voice lane wins — bass
92 notes are placed before treble notes, preventing voice crossings.
93 ``op_id``
94 UUID4 op identifier. Lexicographic tie-break for concurrent
95 insertions by different actors in the same voice at the same beat.
96 """
97
98 measure: int
99 beat_sub: int
100 voice_lane: int
101 op_id: str
102
103
104 def _pitch_to_voice_lane(pitch: int) -> int:
105 """Map a MIDI pitch to a coarse voice lane index.
106
107 Tessiture boundaries (MIDI pitch → voice):
108 - 0–47 → 0 (bass, sub-bass)
109 - 48–59 → 1 (tenor, baritone)
110 - 60–71 → 2 (alto, mezzo)
111 - 72–127 → 3 (soprano, treble)
112 """
113 if pitch < 48:
114 return 0
115 if pitch < 60:
116 return 1
117 if pitch < 72:
118 return 2
119 return 3
120
121
122 # ---------------------------------------------------------------------------
123 # RGA entry
124 # ---------------------------------------------------------------------------
125
126
127 class RGANoteEntry(TypedDict):
128 """One element in the :class:`MusicRGA` linked list.
129
130 ``op_id`` Unique insertion operation ID (UUID4).
131 ``actor_id`` The agent or human that performed this insertion.
132 ``note`` The MIDI note content.
133 ``position`` Music-aware position key for ordering.
134 ``parent_op_id`` The ``op_id`` of the element this was inserted after
135 (``None`` for head insertions).
136 ``tombstone`` ``True`` when this note has been deleted (standard RGA
137 tombstone semantics — the entry is retained so that its
138 position remains stable for other replicas).
139 """
140
141 op_id: str
142 actor_id: str
143 note: NoteKey
144 position: NotePosition
145 parent_op_id: str | None
146 tombstone: bool
147
148
149 # ---------------------------------------------------------------------------
150 # MusicRGA
151 # ---------------------------------------------------------------------------
152
153
154 class MusicRGA:
155 """Voice-aware Replicated Growable Array for live MIDI note sequences.
156
157 Implements the standard RGA CRDT (Roh et al., 2011) with a music-aware
158 position key (:class:`NotePosition`) that orders concurrent insertions by
159 voice lane before falling back to op_id, preventing voice crossings in
160 concurrent collaborative edits.
161
162 Usage::
163
164 seq = MusicRGA("agent-1")
165 e1 = seq.insert(bass_note)
166 e2 = seq.insert(soprano_note)
167 seq.delete(e1["op_id"])
168
169 # On another replica:
170 seq2 = MusicRGA("agent-2")
171 e3 = seq2.insert(tenor_note)
172
173 merged = MusicRGA.merge(seq, seq2)
174 notes = merged.to_sequence() # deterministic, voice-ordered
175
176 Args:
177 actor_id: Stable identifier for the agent or human using this replica.
178 """
179
180 def __init__(self, actor_id: str) -> None:
181 self._actor_id = actor_id
182 self._entries: dict[str, RGANoteEntry] = {} # op_id → entry
183
184 # ------------------------------------------------------------------
185 # Insertion
186 # ------------------------------------------------------------------
187
188 def insert(
189 self,
190 note: NoteKey,
191 *,
192 after: str | None = None,
193 voice_lane: int | None = None,
194 ticks_per_beat: int = 480,
195 time_sig_numerator: int = 4,
196 ) -> RGANoteEntry:
197 """Insert *note* into the sequence, optionally after entry *after*.
198
199 Args:
200 note: The MIDI note to insert.
201 after: ``op_id`` of the entry to insert after.
202 ``None`` inserts at the head.
203 voice_lane: Override the automatic tessiture assignment.
204 ticks_per_beat: Used to compute measure and beat_sub.
205 time_sig_numerator: Beats per bar (default 4 for 4/4 time).
206
207 Returns:
208 The created :class:`RGANoteEntry`.
209 """
210 op_id = str(_uuid_mod.uuid4())
211
212 ticks_per_bar = ticks_per_beat * time_sig_numerator
213 measure = note["start_tick"] // ticks_per_bar + 1
214 beat_sub = note["start_tick"] % ticks_per_bar
215 lane = voice_lane if voice_lane is not None else _pitch_to_voice_lane(note["pitch"])
216
217 position = NotePosition(
218 measure=measure,
219 beat_sub=beat_sub,
220 voice_lane=lane,
221 op_id=op_id,
222 )
223
224 entry: RGANoteEntry = RGANoteEntry(
225 op_id=op_id,
226 actor_id=self._actor_id,
227 note=note,
228 position=position,
229 parent_op_id=after,
230 tombstone=False,
231 )
232 self._entries[op_id] = entry
233 logger.debug(
234 "MusicRGA insert: actor=%r pitch=%d measure=%d voice=%d op=%s",
235 self._actor_id,
236 note["pitch"],
237 measure,
238 lane,
239 op_id[:8],
240 )
241 return entry
242
243 # ------------------------------------------------------------------
244 # Deletion
245 # ------------------------------------------------------------------
246
247 def delete(self, op_id: str) -> None:
248 """Mark the entry with *op_id* as tombstoned.
249
250 The entry remains in the internal map so that its position continues
251 to anchor other entries that were inserted after it.
252
253 Args:
254 op_id: The op_id of the entry to delete.
255
256 Raises:
257 KeyError: When *op_id* is not found in this replica.
258 """
259 if op_id not in self._entries:
260 raise KeyError(f"op_id {op_id!r} not found in MusicRGA")
261 entry = self._entries[op_id]
262 self._entries[op_id] = RGANoteEntry(
263 op_id=entry["op_id"],
264 actor_id=entry["actor_id"],
265 note=entry["note"],
266 position=entry["position"],
267 parent_op_id=entry["parent_op_id"],
268 tombstone=True,
269 )
270
271 # ------------------------------------------------------------------
272 # Materialisation
273 # ------------------------------------------------------------------
274
275 def to_sequence(self) -> list[NoteKey]:
276 """Materialise the live note sequence (excluding tombstones).
277
278 Entries are sorted by their :class:`NotePosition` key:
279 ``(measure, beat_sub, voice_lane, op_id)``. This guarantees a
280 deterministic, voice-coherent ordering regardless of insertion order
281 across replicas.
282
283 Returns:
284 Sorted list of live (non-tombstoned) :class:`NoteKey` objects.
285 """
286 live = [e for e in self._entries.values() if not e["tombstone"]]
287 live.sort(key=lambda e: e["position"])
288 return [e["note"] for e in live]
289
290 def entry_count(self) -> int:
291 """Return the total number of entries including tombstones."""
292 return len(self._entries)
293
294 def live_count(self) -> int:
295 """Return the number of non-tombstoned (visible) entries."""
296 return sum(1 for e in self._entries.values() if not e["tombstone"])
297
298 # ------------------------------------------------------------------
299 # CRDT merge — commutative, associative, idempotent
300 # ------------------------------------------------------------------
301
302 @staticmethod
303 def merge(a: "MusicRGA", b: "MusicRGA") -> "MusicRGA":
304 """Return a new MusicRGA that is the join of replicas *a* and *b*.
305
306 The join is:
307 - **Commutative**: ``merge(a, b).to_sequence() == merge(b, a).to_sequence()``
308 - **Associative**: ``merge(merge(a, b), c) == merge(a, merge(b, c))``
309 - **Idempotent**: ``merge(a, a).to_sequence() == a.to_sequence()``
310
311 For entries present in both replicas, deletion wins (tombstone=True
312 takes priority over tombstone=False). This is the standard OR-Set
313 / RGA semantics for concurrent delete-and-insert.
314
315 Args:
316 a: First replica.
317 b: Second replica.
318
319 Returns:
320 A new :class:`MusicRGA` containing the union of all entries from
321 both replicas with tombstone-wins conflict resolution.
322 """
323 merged = MusicRGA(actor_id=f"merge({a._actor_id},{b._actor_id})")
324
325 all_op_ids = set(a._entries) | set(b._entries)
326 for op_id in all_op_ids:
327 entry_a = a._entries.get(op_id)
328 entry_b = b._entries.get(op_id)
329
330 if entry_a is not None and entry_b is not None:
331 # Tombstone wins — if either replica deleted this entry, it
332 # is considered deleted in the merged result.
333 tombstone = entry_a["tombstone"] or entry_b["tombstone"]
334 merged._entries[op_id] = RGANoteEntry(
335 op_id=entry_a["op_id"],
336 actor_id=entry_a["actor_id"],
337 note=entry_a["note"],
338 position=entry_a["position"],
339 parent_op_id=entry_a["parent_op_id"],
340 tombstone=tombstone,
341 )
342 elif entry_a is not None:
343 merged._entries[op_id] = entry_a
344 else:
345 assert entry_b is not None
346 merged._entries[op_id] = entry_b
347
348 return merged
349
350 # ------------------------------------------------------------------
351 # Conversion to Muse DomainOps
352 # ------------------------------------------------------------------
353
354 def to_domain_ops(
355 self,
356 base_sequence: list[NoteKey],
357 ticks_per_beat: int = 480,
358 ) -> list[DomainOp]:
359 """Convert this CRDT state to Muse DomainOps relative to a base sequence.
360
361 Used at commit time to crystallise a live session's CRDT state into
362 the canonical Muse typed delta algebra for storage in the commit record.
363
364 The conversion computes:
365 - ``InsertOp`` for notes present in the live sequence but not in base.
366 - ``DeleteOp`` for notes present in base but not in the live sequence.
367
368 Args:
369 base_sequence: The committed note list at the start of the session.
370 ticks_per_beat: Used for human-readable summaries.
371
372 Returns:
373 List of :class:`~muse.domain.DomainOp` entries.
374 """
375 live = self.to_sequence()
376 base_content_ids = {_note_content_id(n) for n in base_sequence}
377 live_content_ids = {_note_content_id(n) for n in live}
378
379 ops: list[DomainOp] = []
380
381 for i, note in enumerate(live):
382 cid = _note_content_id(note)
383 if cid not in base_content_ids:
384 ops.append(
385 InsertOp(
386 op="insert",
387 address=f"note:{i}",
388 position=i,
389 content_id=cid,
390 content_summary=_note_summary(note, ticks_per_beat),
391 )
392 )
393
394 for i, note in enumerate(base_sequence):
395 cid = _note_content_id(note)
396 if cid not in live_content_ids:
397 ops.append(
398 DeleteOp(
399 op="delete",
400 address=f"note:{i}",
401 position=i,
402 content_id=cid,
403 content_summary=_note_summary(note, ticks_per_beat),
404 )
405 )
406
407 return ops