gabriel / muse public
_crdt_notes.py python
406 lines 14.4 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """Voice-aware Music RGA — experimental CRDT for live MIDI note sequences.
2
3 This module is a **research prototype** for the live collaboration foundation
4 described in the Muse supercharge plan. It is NOT wired into the production
5 merge path. Its purpose is to:
6
7 1. Demonstrate that concurrent note insertions can be made commutative.
8 2. Provide a benchmark harness for comparing voice-aware RGA against LSEQ.
9 3. Serve as the implementation foundation for the eventual live collaboration
10 layer (Workstream 7 / 8 in the plan).
11
12 Design
13 ------
14 Standard RGA (Roh 2011) orders concurrent insertions at the same position
15 lexicographically by op_id. For music this produces unacceptable results:
16 two agents inserting bass and soprano notes at the same beat would interleave
17 their pitches arbitrarily, producing nonsense voice crossings.
18
19 **Music-RGA** uses a multi-key position ordering:
20
21 NotePosition = (measure, beat_sub, voice_lane, op_id)
22
23 Concurrent insertions at the same ``(measure, beat_sub)`` are ordered by
24 ``voice_lane`` first (bass=0 < tenor=1 < alto=2 < soprano=3), then by
25 ``op_id`` as a tie-break. This guarantees that bass notes always precede
26 treble notes in the materialised sequence regardless of insertion order.
27
28 Voice lane assignment
29 ---------------------
30 Voice lane is determined from the note's pitch at insert time using a
31 coarse tessiture model:
32
33 pitch < 48 → 0 (bass)
34 48 ≤ pitch < 60 → 1 (tenor)
35 60 ≤ pitch < 72 → 2 (alto)
36 pitch ≥ 72 → 3 (soprano)
37
38 Agents that perform explicit voice separation can override ``voice_lane``
39 when calling :meth:`MusicRGA.insert`.
40
41 CRDT properties
42 ---------------
43 The three lattice laws are demonstrated by :func:`_verify_crdt_laws` in the
44 test suite:
45
46 1. **Commutativity**: ``merge(a, b).to_sequence() == merge(b, a).to_sequence()``
47 2. **Associativity**: ``merge(merge(a, b), c) == merge(a, merge(b, c))``
48 3. **Idempotency**: ``merge(a, a).to_sequence() == a.to_sequence()``
49
50 Relationship to the commit DAG
51 -------------------------------
52 A live session accumulates :class:`RGANoteEntry` operations. At commit time,
53 :meth:`MusicRGA.to_domain_ops` translates the CRDT state into canonical Muse
54 :class:`~muse.domain.DomainOp` entries for storage in the commit record.
55 The CRDT state itself is ephemeral — not stored in the object store.
56
57 Public API
58 ----------
59 - :class:`NotePosition` — music-aware position key (NamedTuple).
60 - :class:`RGANoteEntry` — one element in the RGA (TypedDict).
61 - :class:`MusicRGA` — the voice-aware ordered note sequence CRDT.
62 """
63
64 import logging
65 import uuid as _uuid_mod
66 from typing import NamedTuple, TypedDict
67
68 from muse.domain import DeleteOp, DomainOp, InsertOp
69 from muse.plugins.midi.midi_diff import NoteKey, _note_content_id, _note_summary
70
71 logger = logging.getLogger(__name__)
72
73 # ---------------------------------------------------------------------------
74 # Music-aware position key
75 # ---------------------------------------------------------------------------
76
77
78 class NotePosition(NamedTuple):
79 """Multi-key position for voice-aware ordering in the Music RGA.
80
81 Fields are ordered by comparison priority:
82
83 ``measure``
84 1-indexed bar number. Notes in earlier bars always precede notes
85 in later bars.
86 ``beat_sub``
87 Tick offset within the bar. Lower onset first within the same bar.
88 ``voice_lane``
89 Voice stream: 0=bass, 1=tenor, 2=alto, 3=soprano, 4+=auxiliary.
90 At the same ``(measure, beat_sub)``, lower voice lane wins — bass
91 notes are placed before treble notes, preventing voice crossings.
92 ``op_id``
93 UUID4 op identifier. Lexicographic tie-break for concurrent
94 insertions by different actors in the same voice at the same beat.
95 """
96
97 measure: int
98 beat_sub: int
99 voice_lane: int
100 op_id: str
101
102
103 def _pitch_to_voice_lane(pitch: int) -> int:
104 """Map a MIDI pitch to a coarse voice lane index.
105
106 Tessiture boundaries (MIDI pitch → voice):
107 - 0–47 → 0 (bass, sub-bass)
108 - 48–59 → 1 (tenor, baritone)
109 - 60–71 → 2 (alto, mezzo)
110 - 72–127 → 3 (soprano, treble)
111 """
112 if pitch < 48:
113 return 0
114 if pitch < 60:
115 return 1
116 if pitch < 72:
117 return 2
118 return 3
119
120
121 # ---------------------------------------------------------------------------
122 # RGA entry
123 # ---------------------------------------------------------------------------
124
125
126 class RGANoteEntry(TypedDict):
127 """One element in the :class:`MusicRGA` linked list.
128
129 ``op_id`` Unique insertion operation ID (UUID4).
130 ``actor_id`` The agent or human that performed this insertion.
131 ``note`` The MIDI note content.
132 ``position`` Music-aware position key for ordering.
133 ``parent_op_id`` The ``op_id`` of the element this was inserted after
134 (``None`` for head insertions).
135 ``tombstone`` ``True`` when this note has been deleted (standard RGA
136 tombstone semantics — the entry is retained so that its
137 position remains stable for other replicas).
138 """
139
140 op_id: str
141 actor_id: str
142 note: NoteKey
143 position: NotePosition
144 parent_op_id: str | None
145 tombstone: bool
146
147
148 # ---------------------------------------------------------------------------
149 # MusicRGA
150 # ---------------------------------------------------------------------------
151
152
153 class MusicRGA:
154 """Voice-aware Replicated Growable Array for live MIDI note sequences.
155
156 Implements the standard RGA CRDT (Roh et al., 2011) with a music-aware
157 position key (:class:`NotePosition`) that orders concurrent insertions by
158 voice lane before falling back to op_id, preventing voice crossings in
159 concurrent collaborative edits.
160
161 Usage::
162
163 seq = MusicRGA("agent-1")
164 e1 = seq.insert(bass_note)
165 e2 = seq.insert(soprano_note)
166 seq.delete(e1["op_id"])
167
168 # On another replica:
169 seq2 = MusicRGA("agent-2")
170 e3 = seq2.insert(tenor_note)
171
172 merged = MusicRGA.merge(seq, seq2)
173 notes = merged.to_sequence() # deterministic, voice-ordered
174
175 Args:
176 actor_id: Stable identifier for the agent or human using this replica.
177 """
178
179 def __init__(self, actor_id: str) -> None:
180 self._actor_id = actor_id
181 self._entries: dict[str, RGANoteEntry] = {} # op_id → entry
182
183 # ------------------------------------------------------------------
184 # Insertion
185 # ------------------------------------------------------------------
186
187 def insert(
188 self,
189 note: NoteKey,
190 *,
191 after: str | None = None,
192 voice_lane: int | None = None,
193 ticks_per_beat: int = 480,
194 time_sig_numerator: int = 4,
195 ) -> RGANoteEntry:
196 """Insert *note* into the sequence, optionally after entry *after*.
197
198 Args:
199 note: The MIDI note to insert.
200 after: ``op_id`` of the entry to insert after.
201 ``None`` inserts at the head.
202 voice_lane: Override the automatic tessiture assignment.
203 ticks_per_beat: Used to compute measure and beat_sub.
204 time_sig_numerator: Beats per bar (default 4 for 4/4 time).
205
206 Returns:
207 The created :class:`RGANoteEntry`.
208 """
209 op_id = str(_uuid_mod.uuid4())
210
211 ticks_per_bar = ticks_per_beat * time_sig_numerator
212 measure = note["start_tick"] // ticks_per_bar + 1
213 beat_sub = note["start_tick"] % ticks_per_bar
214 lane = voice_lane if voice_lane is not None else _pitch_to_voice_lane(note["pitch"])
215
216 position = NotePosition(
217 measure=measure,
218 beat_sub=beat_sub,
219 voice_lane=lane,
220 op_id=op_id,
221 )
222
223 entry: RGANoteEntry = RGANoteEntry(
224 op_id=op_id,
225 actor_id=self._actor_id,
226 note=note,
227 position=position,
228 parent_op_id=after,
229 tombstone=False,
230 )
231 self._entries[op_id] = entry
232 logger.debug(
233 "MusicRGA insert: actor=%r pitch=%d measure=%d voice=%d op=%s",
234 self._actor_id,
235 note["pitch"],
236 measure,
237 lane,
238 op_id[:8],
239 )
240 return entry
241
242 # ------------------------------------------------------------------
243 # Deletion
244 # ------------------------------------------------------------------
245
246 def delete(self, op_id: str) -> None:
247 """Mark the entry with *op_id* as tombstoned.
248
249 The entry remains in the internal map so that its position continues
250 to anchor other entries that were inserted after it.
251
252 Args:
253 op_id: The op_id of the entry to delete.
254
255 Raises:
256 KeyError: When *op_id* is not found in this replica.
257 """
258 if op_id not in self._entries:
259 raise KeyError(f"op_id {op_id!r} not found in MusicRGA")
260 entry = self._entries[op_id]
261 self._entries[op_id] = RGANoteEntry(
262 op_id=entry["op_id"],
263 actor_id=entry["actor_id"],
264 note=entry["note"],
265 position=entry["position"],
266 parent_op_id=entry["parent_op_id"],
267 tombstone=True,
268 )
269
270 # ------------------------------------------------------------------
271 # Materialisation
272 # ------------------------------------------------------------------
273
274 def to_sequence(self) -> list[NoteKey]:
275 """Materialise the live note sequence (excluding tombstones).
276
277 Entries are sorted by their :class:`NotePosition` key:
278 ``(measure, beat_sub, voice_lane, op_id)``. This guarantees a
279 deterministic, voice-coherent ordering regardless of insertion order
280 across replicas.
281
282 Returns:
283 Sorted list of live (non-tombstoned) :class:`NoteKey` objects.
284 """
285 live = [e for e in self._entries.values() if not e["tombstone"]]
286 live.sort(key=lambda e: e["position"])
287 return [e["note"] for e in live]
288
289 def entry_count(self) -> int:
290 """Return the total number of entries including tombstones."""
291 return len(self._entries)
292
293 def live_count(self) -> int:
294 """Return the number of non-tombstoned (visible) entries."""
295 return sum(1 for e in self._entries.values() if not e["tombstone"])
296
297 # ------------------------------------------------------------------
298 # CRDT merge — commutative, associative, idempotent
299 # ------------------------------------------------------------------
300
301 @staticmethod
302 def merge(a: "MusicRGA", b: "MusicRGA") -> "MusicRGA":
303 """Return a new MusicRGA that is the join of replicas *a* and *b*.
304
305 The join is:
306 - **Commutative**: ``merge(a, b).to_sequence() == merge(b, a).to_sequence()``
307 - **Associative**: ``merge(merge(a, b), c) == merge(a, merge(b, c))``
308 - **Idempotent**: ``merge(a, a).to_sequence() == a.to_sequence()``
309
310 For entries present in both replicas, deletion wins (tombstone=True
311 takes priority over tombstone=False). This is the standard OR-Set
312 / RGA semantics for concurrent delete-and-insert.
313
314 Args:
315 a: First replica.
316 b: Second replica.
317
318 Returns:
319 A new :class:`MusicRGA` containing the union of all entries from
320 both replicas with tombstone-wins conflict resolution.
321 """
322 merged = MusicRGA(actor_id=f"merge({a._actor_id},{b._actor_id})")
323
324 all_op_ids = set(a._entries) | set(b._entries)
325 for op_id in all_op_ids:
326 entry_a = a._entries.get(op_id)
327 entry_b = b._entries.get(op_id)
328
329 if entry_a is not None and entry_b is not None:
330 # Tombstone wins — if either replica deleted this entry, it
331 # is considered deleted in the merged result.
332 tombstone = entry_a["tombstone"] or entry_b["tombstone"]
333 merged._entries[op_id] = RGANoteEntry(
334 op_id=entry_a["op_id"],
335 actor_id=entry_a["actor_id"],
336 note=entry_a["note"],
337 position=entry_a["position"],
338 parent_op_id=entry_a["parent_op_id"],
339 tombstone=tombstone,
340 )
341 elif entry_a is not None:
342 merged._entries[op_id] = entry_a
343 else:
344 assert entry_b is not None
345 merged._entries[op_id] = entry_b
346
347 return merged
348
349 # ------------------------------------------------------------------
350 # Conversion to Muse DomainOps
351 # ------------------------------------------------------------------
352
353 def to_domain_ops(
354 self,
355 base_sequence: list[NoteKey],
356 ticks_per_beat: int = 480,
357 ) -> list[DomainOp]:
358 """Convert this CRDT state to Muse DomainOps relative to a base sequence.
359
360 Used at commit time to crystallise a live session's CRDT state into
361 the canonical Muse typed delta algebra for storage in the commit record.
362
363 The conversion computes:
364 - ``InsertOp`` for notes present in the live sequence but not in base.
365 - ``DeleteOp`` for notes present in base but not in the live sequence.
366
367 Args:
368 base_sequence: The committed note list at the start of the session.
369 ticks_per_beat: Used for human-readable summaries.
370
371 Returns:
372 List of :class:`~muse.domain.DomainOp` entries.
373 """
374 live = self.to_sequence()
375 base_content_ids = {_note_content_id(n) for n in base_sequence}
376 live_content_ids = {_note_content_id(n) for n in live}
377
378 ops: list[DomainOp] = []
379
380 for i, note in enumerate(live):
381 cid = _note_content_id(note)
382 if cid not in base_content_ids:
383 ops.append(
384 InsertOp(
385 op="insert",
386 address=f"note:{i}",
387 position=i,
388 content_id=cid,
389 content_summary=_note_summary(note, ticks_per_beat),
390 )
391 )
392
393 for i, note in enumerate(base_sequence):
394 cid = _note_content_id(note)
395 if cid not in live_content_ids:
396 ops.append(
397 DeleteOp(
398 op="delete",
399 address=f"note:{i}",
400 position=i,
401 content_id=cid,
402 content_summary=_note_summary(note, ticks_per_beat),
403 )
404 )
405
406 return ops