_crdt_notes.py
python
| 1 | """Voice-aware Music RGA — experimental CRDT for live MIDI note sequences. |
| 2 | |
| 3 | This module is a **research prototype** for the live collaboration foundation |
| 4 | described in the Muse supercharge plan. It is NOT wired into the production |
| 5 | merge path. Its purpose is to: |
| 6 | |
| 7 | 1. Demonstrate that concurrent note insertions can be made commutative. |
| 8 | 2. Provide a benchmark harness for comparing voice-aware RGA against LSEQ. |
| 9 | 3. Serve as the implementation foundation for the eventual live collaboration |
| 10 | layer (Workstream 7 / 8 in the plan). |
| 11 | |
| 12 | Design |
| 13 | ------ |
| 14 | Standard RGA (Roh 2011) orders concurrent insertions at the same position |
| 15 | lexicographically by op_id. For music this produces unacceptable results: |
| 16 | two agents inserting bass and soprano notes at the same beat would interleave |
| 17 | their pitches arbitrarily, producing nonsense voice crossings. |
| 18 | |
| 19 | **Music-RGA** uses a multi-key position ordering: |
| 20 | |
| 21 | NotePosition = (measure, beat_sub, voice_lane, op_id) |
| 22 | |
| 23 | Concurrent insertions at the same ``(measure, beat_sub)`` are ordered by |
| 24 | ``voice_lane`` first (bass=0 < tenor=1 < alto=2 < soprano=3), then by |
| 25 | ``op_id`` as a tie-break. This guarantees that bass notes always precede |
| 26 | treble notes in the materialised sequence regardless of insertion order. |
| 27 | |
| 28 | Voice lane assignment |
| 29 | --------------------- |
| 30 | Voice lane is determined from the note's pitch at insert time using a |
| 31 | coarse tessiture model: |
| 32 | |
| 33 | pitch < 48 → 0 (bass) |
| 34 | 48 ≤ pitch < 60 → 1 (tenor) |
| 35 | 60 ≤ pitch < 72 → 2 (alto) |
| 36 | pitch ≥ 72 → 3 (soprano) |
| 37 | |
| 38 | Agents that perform explicit voice separation can override ``voice_lane`` |
| 39 | when calling :meth:`MusicRGA.insert`. |
| 40 | |
| 41 | CRDT properties |
| 42 | --------------- |
| 43 | The three lattice laws are demonstrated by :func:`_verify_crdt_laws` in the |
| 44 | test suite: |
| 45 | |
| 46 | 1. **Commutativity**: ``merge(a, b).to_sequence() == merge(b, a).to_sequence()`` |
| 47 | 2. **Associativity**: ``merge(merge(a, b), c) == merge(a, merge(b, c))`` |
| 48 | 3. **Idempotency**: ``merge(a, a).to_sequence() == a.to_sequence()`` |
| 49 | |
| 50 | Relationship to the commit DAG |
| 51 | ------------------------------- |
| 52 | A live session accumulates :class:`RGANoteEntry` operations. At commit time, |
| 53 | :meth:`MusicRGA.to_domain_ops` translates the CRDT state into canonical Muse |
| 54 | :class:`~muse.domain.DomainOp` entries for storage in the commit record. |
| 55 | The CRDT state itself is ephemeral — not stored in the object store. |
| 56 | |
| 57 | Public API |
| 58 | ---------- |
| 59 | - :class:`NotePosition` — music-aware position key (NamedTuple). |
| 60 | - :class:`RGANoteEntry` — one element in the RGA (TypedDict). |
| 61 | - :class:`MusicRGA` — the voice-aware ordered note sequence CRDT. |
| 62 | """ |
| 63 | |
| 64 | import logging |
| 65 | import uuid as _uuid_mod |
| 66 | from typing import NamedTuple, TypedDict |
| 67 | |
| 68 | from muse.domain import DeleteOp, DomainOp, InsertOp |
| 69 | from muse.plugins.midi.midi_diff import NoteKey, _note_content_id, _note_summary |
| 70 | |
| 71 | logger = logging.getLogger(__name__) |
| 72 | |
| 73 | # --------------------------------------------------------------------------- |
| 74 | # Music-aware position key |
| 75 | # --------------------------------------------------------------------------- |
| 76 | |
| 77 | |
| 78 | class NotePosition(NamedTuple): |
| 79 | """Multi-key position for voice-aware ordering in the Music RGA. |
| 80 | |
| 81 | Fields are ordered by comparison priority: |
| 82 | |
| 83 | ``measure`` |
| 84 | 1-indexed bar number. Notes in earlier bars always precede notes |
| 85 | in later bars. |
| 86 | ``beat_sub`` |
| 87 | Tick offset within the bar. Lower onset first within the same bar. |
| 88 | ``voice_lane`` |
| 89 | Voice stream: 0=bass, 1=tenor, 2=alto, 3=soprano, 4+=auxiliary. |
| 90 | At the same ``(measure, beat_sub)``, lower voice lane wins — bass |
| 91 | notes are placed before treble notes, preventing voice crossings. |
| 92 | ``op_id`` |
| 93 | UUID4 op identifier. Lexicographic tie-break for concurrent |
| 94 | insertions by different actors in the same voice at the same beat. |
| 95 | """ |
| 96 | |
| 97 | measure: int |
| 98 | beat_sub: int |
| 99 | voice_lane: int |
| 100 | op_id: str |
| 101 | |
| 102 | |
| 103 | def _pitch_to_voice_lane(pitch: int) -> int: |
| 104 | """Map a MIDI pitch to a coarse voice lane index. |
| 105 | |
| 106 | Tessiture boundaries (MIDI pitch → voice): |
| 107 | - 0–47 → 0 (bass, sub-bass) |
| 108 | - 48–59 → 1 (tenor, baritone) |
| 109 | - 60–71 → 2 (alto, mezzo) |
| 110 | - 72–127 → 3 (soprano, treble) |
| 111 | """ |
| 112 | if pitch < 48: |
| 113 | return 0 |
| 114 | if pitch < 60: |
| 115 | return 1 |
| 116 | if pitch < 72: |
| 117 | return 2 |
| 118 | return 3 |
| 119 | |
| 120 | |
| 121 | # --------------------------------------------------------------------------- |
| 122 | # RGA entry |
| 123 | # --------------------------------------------------------------------------- |
| 124 | |
| 125 | |
| 126 | class RGANoteEntry(TypedDict): |
| 127 | """One element in the :class:`MusicRGA` linked list. |
| 128 | |
| 129 | ``op_id`` Unique insertion operation ID (UUID4). |
| 130 | ``actor_id`` The agent or human that performed this insertion. |
| 131 | ``note`` The MIDI note content. |
| 132 | ``position`` Music-aware position key for ordering. |
| 133 | ``parent_op_id`` The ``op_id`` of the element this was inserted after |
| 134 | (``None`` for head insertions). |
| 135 | ``tombstone`` ``True`` when this note has been deleted (standard RGA |
| 136 | tombstone semantics — the entry is retained so that its |
| 137 | position remains stable for other replicas). |
| 138 | """ |
| 139 | |
| 140 | op_id: str |
| 141 | actor_id: str |
| 142 | note: NoteKey |
| 143 | position: NotePosition |
| 144 | parent_op_id: str | None |
| 145 | tombstone: bool |
| 146 | |
| 147 | |
| 148 | # --------------------------------------------------------------------------- |
| 149 | # MusicRGA |
| 150 | # --------------------------------------------------------------------------- |
| 151 | |
| 152 | |
| 153 | class MusicRGA: |
| 154 | """Voice-aware Replicated Growable Array for live MIDI note sequences. |
| 155 | |
| 156 | Implements the standard RGA CRDT (Roh et al., 2011) with a music-aware |
| 157 | position key (:class:`NotePosition`) that orders concurrent insertions by |
| 158 | voice lane before falling back to op_id, preventing voice crossings in |
| 159 | concurrent collaborative edits. |
| 160 | |
| 161 | Usage:: |
| 162 | |
| 163 | seq = MusicRGA("agent-1") |
| 164 | e1 = seq.insert(bass_note) |
| 165 | e2 = seq.insert(soprano_note) |
| 166 | seq.delete(e1["op_id"]) |
| 167 | |
| 168 | # On another replica: |
| 169 | seq2 = MusicRGA("agent-2") |
| 170 | e3 = seq2.insert(tenor_note) |
| 171 | |
| 172 | merged = MusicRGA.merge(seq, seq2) |
| 173 | notes = merged.to_sequence() # deterministic, voice-ordered |
| 174 | |
| 175 | Args: |
| 176 | actor_id: Stable identifier for the agent or human using this replica. |
| 177 | """ |
| 178 | |
| 179 | def __init__(self, actor_id: str) -> None: |
| 180 | self._actor_id = actor_id |
| 181 | self._entries: dict[str, RGANoteEntry] = {} # op_id → entry |
| 182 | |
| 183 | # ------------------------------------------------------------------ |
| 184 | # Insertion |
| 185 | # ------------------------------------------------------------------ |
| 186 | |
| 187 | def insert( |
| 188 | self, |
| 189 | note: NoteKey, |
| 190 | *, |
| 191 | after: str | None = None, |
| 192 | voice_lane: int | None = None, |
| 193 | ticks_per_beat: int = 480, |
| 194 | time_sig_numerator: int = 4, |
| 195 | ) -> RGANoteEntry: |
| 196 | """Insert *note* into the sequence, optionally after entry *after*. |
| 197 | |
| 198 | Args: |
| 199 | note: The MIDI note to insert. |
| 200 | after: ``op_id`` of the entry to insert after. |
| 201 | ``None`` inserts at the head. |
| 202 | voice_lane: Override the automatic tessiture assignment. |
| 203 | ticks_per_beat: Used to compute measure and beat_sub. |
| 204 | time_sig_numerator: Beats per bar (default 4 for 4/4 time). |
| 205 | |
| 206 | Returns: |
| 207 | The created :class:`RGANoteEntry`. |
| 208 | """ |
| 209 | op_id = str(_uuid_mod.uuid4()) |
| 210 | |
| 211 | ticks_per_bar = ticks_per_beat * time_sig_numerator |
| 212 | measure = note["start_tick"] // ticks_per_bar + 1 |
| 213 | beat_sub = note["start_tick"] % ticks_per_bar |
| 214 | lane = voice_lane if voice_lane is not None else _pitch_to_voice_lane(note["pitch"]) |
| 215 | |
| 216 | position = NotePosition( |
| 217 | measure=measure, |
| 218 | beat_sub=beat_sub, |
| 219 | voice_lane=lane, |
| 220 | op_id=op_id, |
| 221 | ) |
| 222 | |
| 223 | entry: RGANoteEntry = RGANoteEntry( |
| 224 | op_id=op_id, |
| 225 | actor_id=self._actor_id, |
| 226 | note=note, |
| 227 | position=position, |
| 228 | parent_op_id=after, |
| 229 | tombstone=False, |
| 230 | ) |
| 231 | self._entries[op_id] = entry |
| 232 | logger.debug( |
| 233 | "MusicRGA insert: actor=%r pitch=%d measure=%d voice=%d op=%s", |
| 234 | self._actor_id, |
| 235 | note["pitch"], |
| 236 | measure, |
| 237 | lane, |
| 238 | op_id[:8], |
| 239 | ) |
| 240 | return entry |
| 241 | |
| 242 | # ------------------------------------------------------------------ |
| 243 | # Deletion |
| 244 | # ------------------------------------------------------------------ |
| 245 | |
| 246 | def delete(self, op_id: str) -> None: |
| 247 | """Mark the entry with *op_id* as tombstoned. |
| 248 | |
| 249 | The entry remains in the internal map so that its position continues |
| 250 | to anchor other entries that were inserted after it. |
| 251 | |
| 252 | Args: |
| 253 | op_id: The op_id of the entry to delete. |
| 254 | |
| 255 | Raises: |
| 256 | KeyError: When *op_id* is not found in this replica. |
| 257 | """ |
| 258 | if op_id not in self._entries: |
| 259 | raise KeyError(f"op_id {op_id!r} not found in MusicRGA") |
| 260 | entry = self._entries[op_id] |
| 261 | self._entries[op_id] = RGANoteEntry( |
| 262 | op_id=entry["op_id"], |
| 263 | actor_id=entry["actor_id"], |
| 264 | note=entry["note"], |
| 265 | position=entry["position"], |
| 266 | parent_op_id=entry["parent_op_id"], |
| 267 | tombstone=True, |
| 268 | ) |
| 269 | |
| 270 | # ------------------------------------------------------------------ |
| 271 | # Materialisation |
| 272 | # ------------------------------------------------------------------ |
| 273 | |
| 274 | def to_sequence(self) -> list[NoteKey]: |
| 275 | """Materialise the live note sequence (excluding tombstones). |
| 276 | |
| 277 | Entries are sorted by their :class:`NotePosition` key: |
| 278 | ``(measure, beat_sub, voice_lane, op_id)``. This guarantees a |
| 279 | deterministic, voice-coherent ordering regardless of insertion order |
| 280 | across replicas. |
| 281 | |
| 282 | Returns: |
| 283 | Sorted list of live (non-tombstoned) :class:`NoteKey` objects. |
| 284 | """ |
| 285 | live = [e for e in self._entries.values() if not e["tombstone"]] |
| 286 | live.sort(key=lambda e: e["position"]) |
| 287 | return [e["note"] for e in live] |
| 288 | |
| 289 | def entry_count(self) -> int: |
| 290 | """Return the total number of entries including tombstones.""" |
| 291 | return len(self._entries) |
| 292 | |
| 293 | def live_count(self) -> int: |
| 294 | """Return the number of non-tombstoned (visible) entries.""" |
| 295 | return sum(1 for e in self._entries.values() if not e["tombstone"]) |
| 296 | |
| 297 | # ------------------------------------------------------------------ |
| 298 | # CRDT merge — commutative, associative, idempotent |
| 299 | # ------------------------------------------------------------------ |
| 300 | |
| 301 | @staticmethod |
| 302 | def merge(a: "MusicRGA", b: "MusicRGA") -> "MusicRGA": |
| 303 | """Return a new MusicRGA that is the join of replicas *a* and *b*. |
| 304 | |
| 305 | The join is: |
| 306 | - **Commutative**: ``merge(a, b).to_sequence() == merge(b, a).to_sequence()`` |
| 307 | - **Associative**: ``merge(merge(a, b), c) == merge(a, merge(b, c))`` |
| 308 | - **Idempotent**: ``merge(a, a).to_sequence() == a.to_sequence()`` |
| 309 | |
| 310 | For entries present in both replicas, deletion wins (tombstone=True |
| 311 | takes priority over tombstone=False). This is the standard OR-Set |
| 312 | / RGA semantics for concurrent delete-and-insert. |
| 313 | |
| 314 | Args: |
| 315 | a: First replica. |
| 316 | b: Second replica. |
| 317 | |
| 318 | Returns: |
| 319 | A new :class:`MusicRGA` containing the union of all entries from |
| 320 | both replicas with tombstone-wins conflict resolution. |
| 321 | """ |
| 322 | merged = MusicRGA(actor_id=f"merge({a._actor_id},{b._actor_id})") |
| 323 | |
| 324 | all_op_ids = set(a._entries) | set(b._entries) |
| 325 | for op_id in all_op_ids: |
| 326 | entry_a = a._entries.get(op_id) |
| 327 | entry_b = b._entries.get(op_id) |
| 328 | |
| 329 | if entry_a is not None and entry_b is not None: |
| 330 | # Tombstone wins — if either replica deleted this entry, it |
| 331 | # is considered deleted in the merged result. |
| 332 | tombstone = entry_a["tombstone"] or entry_b["tombstone"] |
| 333 | merged._entries[op_id] = RGANoteEntry( |
| 334 | op_id=entry_a["op_id"], |
| 335 | actor_id=entry_a["actor_id"], |
| 336 | note=entry_a["note"], |
| 337 | position=entry_a["position"], |
| 338 | parent_op_id=entry_a["parent_op_id"], |
| 339 | tombstone=tombstone, |
| 340 | ) |
| 341 | elif entry_a is not None: |
| 342 | merged._entries[op_id] = entry_a |
| 343 | else: |
| 344 | assert entry_b is not None |
| 345 | merged._entries[op_id] = entry_b |
| 346 | |
| 347 | return merged |
| 348 | |
| 349 | # ------------------------------------------------------------------ |
| 350 | # Conversion to Muse DomainOps |
| 351 | # ------------------------------------------------------------------ |
| 352 | |
| 353 | def to_domain_ops( |
| 354 | self, |
| 355 | base_sequence: list[NoteKey], |
| 356 | ticks_per_beat: int = 480, |
| 357 | ) -> list[DomainOp]: |
| 358 | """Convert this CRDT state to Muse DomainOps relative to a base sequence. |
| 359 | |
| 360 | Used at commit time to crystallise a live session's CRDT state into |
| 361 | the canonical Muse typed delta algebra for storage in the commit record. |
| 362 | |
| 363 | The conversion computes: |
| 364 | - ``InsertOp`` for notes present in the live sequence but not in base. |
| 365 | - ``DeleteOp`` for notes present in base but not in the live sequence. |
| 366 | |
| 367 | Args: |
| 368 | base_sequence: The committed note list at the start of the session. |
| 369 | ticks_per_beat: Used for human-readable summaries. |
| 370 | |
| 371 | Returns: |
| 372 | List of :class:`~muse.domain.DomainOp` entries. |
| 373 | """ |
| 374 | live = self.to_sequence() |
| 375 | base_content_ids = {_note_content_id(n) for n in base_sequence} |
| 376 | live_content_ids = {_note_content_id(n) for n in live} |
| 377 | |
| 378 | ops: list[DomainOp] = [] |
| 379 | |
| 380 | for i, note in enumerate(live): |
| 381 | cid = _note_content_id(note) |
| 382 | if cid not in base_content_ids: |
| 383 | ops.append( |
| 384 | InsertOp( |
| 385 | op="insert", |
| 386 | address=f"note:{i}", |
| 387 | position=i, |
| 388 | content_id=cid, |
| 389 | content_summary=_note_summary(note, ticks_per_beat), |
| 390 | ) |
| 391 | ) |
| 392 | |
| 393 | for i, note in enumerate(base_sequence): |
| 394 | cid = _note_content_id(note) |
| 395 | if cid not in live_content_ids: |
| 396 | ops.append( |
| 397 | DeleteOp( |
| 398 | op="delete", |
| 399 | address=f"note:{i}", |
| 400 | position=i, |
| 401 | content_id=cid, |
| 402 | content_summary=_note_summary(note, ticks_per_beat), |
| 403 | ) |
| 404 | ) |
| 405 | |
| 406 | return ops |