cgcardona / muse public
midi_merge.py python
587 lines 22.7 KB
04004b82 Rename MusicRGA → MidiRGA and purge all 'music plugin' terminology Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """MIDI dimension-aware merge for the Muse MIDI plugin.
2
3 This module implements the multidimensional merge that makes Muse meaningfully
4 different from git. Git treats every file as an opaque byte sequence: any
5 two-branch change to the same file is a conflict. Muse understands that a
6 MIDI file has *independent orthogonal axes*, and two collaborators can touch
7 different axes of the same file without conflicting.
8
9 Dimensions
10 ----------
11
12 MIDI carries far more independent axes than a naive "notes vs. everything else"
13 split. The full dimension taxonomy maps every MIDI event type to exactly one
14 internal bucket:
15
16 +----------------------+--------------------------------------------------+
17 | Internal dimension | MIDI event types / CC numbers |
18 +======================+==================================================+
19 | ``notes`` | ``note_on`` / ``note_off`` |
20 +----------------------+--------------------------------------------------+
21 | ``pitch_bend`` | ``pitchwheel`` |
22 +----------------------+--------------------------------------------------+
23 | ``channel_pressure`` | ``aftertouch`` (mono channel pressure) |
24 +----------------------+--------------------------------------------------+
25 | ``poly_pressure`` | ``polytouch`` (per-note polyphonic aftertouch) |
26 +----------------------+--------------------------------------------------+
27 | ``cc_modulation`` | CC 1 — modulation wheel |
28 +----------------------+--------------------------------------------------+
29 | ``cc_volume`` | CC 7 — channel volume |
30 +----------------------+--------------------------------------------------+
31 | ``cc_pan`` | CC 10 — stereo pan |
32 +----------------------+--------------------------------------------------+
33 | ``cc_expression`` | CC 11 — expression controller |
34 +----------------------+--------------------------------------------------+
35 | ``cc_sustain`` | CC 64 — damper / sustain pedal |
36 +----------------------+--------------------------------------------------+
37 | ``cc_sostenuto`` | CC 66 — sostenuto pedal |
38 +----------------------+--------------------------------------------------+
39 | ``cc_soft_pedal`` | CC 67 — soft pedal (una corda) |
40 +----------------------+--------------------------------------------------+
41 | ``cc_portamento`` | CC 65 — portamento on/off |
42 +----------------------+--------------------------------------------------+
43 | ``cc_reverb`` | CC 91 — reverb send level |
44 +----------------------+--------------------------------------------------+
45 | ``cc_chorus`` | CC 93 — chorus send level |
46 +----------------------+--------------------------------------------------+
47 | ``cc_other`` | All other CC events (numbered controllers) |
48 +----------------------+--------------------------------------------------+
49 | ``program_change`` | ``program_change`` (patch / instrument select) |
50 +----------------------+--------------------------------------------------+
51 | ``tempo_map`` | ``set_tempo`` meta events |
52 +----------------------+--------------------------------------------------+
53 | ``time_signatures`` | ``time_signature`` meta events |
54 +----------------------+--------------------------------------------------+
55 | ``key_signatures`` | ``key_signature`` meta events |
56 +----------------------+--------------------------------------------------+
57 | ``markers`` | ``marker``, ``cue_marker``, ``text``, |
58 | | ``lyrics``, ``copyright`` meta events |
59 +----------------------+--------------------------------------------------+
60 | ``track_structure`` | ``track_name``, ``instrument_name``, ``sysex``, |
61 | | ``sequencer_specific`` and unknown meta events |
62 +----------------------+--------------------------------------------------+
63
64 Why fine-grained dimensions matter
65 -----------------------------------
66 With the old 4-bucket model, changing sustain pedal (CC64) and changing channel
67 volume (CC7) were the same dimension: they always conflicted. With 21 internal
68 dimensions they are independent — two agents can edit different aspects of the
69 same MIDI file without ever conflicting.
70
71 Independence rules
72 ------------------
73 - **Independent** (``independent_merge=True``): notes, pitch_bend, all CC
74 dimensions, channel_pressure, poly_pressure, program_change, key_signatures,
75 markers. Conflicts in these dimensions never block merging others.
76 - **Non-independent** (``independent_merge=False``): tempo_map, time_signatures,
77 track_structure. A conflict here blocks merging other dimensions until
78 resolved, because a tempo change shifts the musical meaning of every subsequent
79 tick position, and track structure changes affect routing.
80
81 Merge algorithm
82 ---------------
83 1. Parse ``base``, ``left``, and ``right`` MIDI bytes into event streams.
84 2. Convert to absolute-tick representation and bucket by dimension.
85 3. Hash each bucket; compare ``base ↔ left`` and ``base ↔ right`` to detect
86 per-dimension changes.
87 4. For each dimension apply the winning side determined by ``.museattributes``
88 strategy (or the standard one-sided-change rule when no conflict exists).
89 5. Reconstruct a valid MIDI file by merging winning dimension slices, sorting
90 by absolute tick, converting back to delta-time, and writing to bytes.
91
92 Public API
93 ----------
94 - :func:`extract_dimensions` — parse MIDI bytes → ``MidiDimensions``
95 - :func:`merge_midi_dimensions` — three-way dimension merge → bytes or ``None``
96 - :func:`dimension_conflict_detail` — per-dimension change report for logging
97 - :data:`INTERNAL_DIMS` — ordered list of all internal dimension names
98 - :data:`DIM_ALIAS` — user-facing ``.museattributes`` name → internal bucket
99 - :data:`NON_INDEPENDENT_DIMS` — dimensions that block others on conflict
100 """
101
102 import hashlib
103 import io
104 import json
105 from dataclasses import dataclass, field
106
107 import mido
108
109 from muse.core.attributes import AttributeRule, resolve_strategy
110
111 # ---------------------------------------------------------------------------
112 # Dimension constants — the complete MIDI dimension taxonomy
113 # ---------------------------------------------------------------------------
114
115 #: Internal dimension names, ordered canonically.
116 #: Each MIDI event type maps to exactly one of these buckets.
117 INTERNAL_DIMS: list[str] = [
118 # --- Expressive note content ---
119 "notes", # note_on / note_off
120 "pitch_bend", # pitchwheel
121 "channel_pressure", # aftertouch (mono)
122 "poly_pressure", # polytouch (per-note)
123 # --- Named CC controllers (individually mergeable) ---
124 "cc_modulation", # CC 1
125 "cc_volume", # CC 7
126 "cc_pan", # CC 10
127 "cc_expression", # CC 11
128 "cc_sustain", # CC 64
129 "cc_portamento", # CC 65
130 "cc_sostenuto", # CC 66
131 "cc_soft_pedal", # CC 67
132 "cc_reverb", # CC 91
133 "cc_chorus", # CC 93
134 "cc_other", # all remaining CC numbers
135 # --- Patch / program selection ---
136 "program_change",
137 # --- Timeline / notation metadata (non-independent) ---
138 "tempo_map", # set_tempo — non-independent: affects all tick positions
139 "time_signatures", # time_signature — non-independent: affects bar structure
140 # --- Tonal context and notation ---
141 "key_signatures", # key_signature
142 "markers", # marker, cue_marker, text, lyrics, copyright
143 # --- Track structure (non-independent) ---
144 "track_structure", # track_name, instrument_name, sysex, unknown meta
145 ]
146
147 #: Dimensions whose conflicts block merging all other dimensions until resolved.
148 #: All other dimensions are merged in parallel regardless of conflicts here.
149 NON_INDEPENDENT_DIMS: frozenset[str] = frozenset({
150 "tempo_map",
151 "time_signatures",
152 "track_structure",
153 })
154
155 #: User-facing dimension names from .museattributes mapped to internal buckets.
156 #: Agents and humans use these names in merge strategy declarations.
157 DIM_ALIAS: dict[str, str] = {
158 "pitch_bend": "pitch_bend",
159 "aftertouch": "channel_pressure",
160 "poly_aftertouch": "poly_pressure",
161 "modulation": "cc_modulation",
162 "volume": "cc_volume",
163 "pan": "cc_pan",
164 "expression": "cc_expression",
165 "sustain": "cc_sustain",
166 "portamento": "cc_portamento",
167 "sostenuto": "cc_sostenuto",
168 "soft_pedal": "cc_soft_pedal",
169 "reverb": "cc_reverb",
170 "chorus": "cc_chorus",
171 "automation": "cc_other",
172 "program": "program_change",
173 "tempo": "tempo_map",
174 "time_sig": "time_signatures",
175 "key_sig": "key_signatures",
176 "markers": "markers",
177 "track_structure": "track_structure",
178 }
179
180 #: All valid names (aliases + internal) → internal bucket.
181 _CANONICAL: dict[str, str] = {**DIM_ALIAS, **{d: d for d in INTERNAL_DIMS}}
182
183 #: CC number → internal dimension name for named controllers.
184 _CC_DIM: dict[int, str] = {
185 1: "cc_modulation",
186 7: "cc_volume",
187 10: "cc_pan",
188 11: "cc_expression",
189 64: "cc_sustain",
190 65: "cc_portamento",
191 66: "cc_sostenuto",
192 67: "cc_soft_pedal",
193 91: "cc_reverb",
194 93: "cc_chorus",
195 }
196
197
198 # ---------------------------------------------------------------------------
199 # Data types
200 # ---------------------------------------------------------------------------
201
202
203 @dataclass
204 class DimensionSlice:
205 """Events belonging to one dimension of a MIDI file.
206
207 ``events`` is a list of ``(abs_tick, mido.Message)`` pairs sorted by
208 ascending absolute tick. ``content_hash`` is the SHA-256 digest of the
209 canonical JSON serialisation of the event list (used for change detection
210 without loading file bytes).
211 """
212
213 name: str
214 events: list[tuple[int, mido.Message]] = field(default_factory=list)
215 content_hash: str = ""
216
217 def __post_init__(self) -> None:
218 if not self.content_hash:
219 self.content_hash = _hash_events(self.events)
220
221
222 @dataclass
223 class MidiDimensions:
224 """All dimension slices extracted from one MIDI file.
225
226 ``slices`` maps internal dimension name → :class:`DimensionSlice`.
227 Every internal dimension in :data:`INTERNAL_DIMS` has an entry, even if
228 the corresponding event list is empty (hash of empty list is stable).
229 """
230
231 ticks_per_beat: int
232 file_type: int
233 slices: dict[str, DimensionSlice]
234
235 def get(self, dim: str) -> DimensionSlice:
236 """Return the slice for a user-facing or internal dimension name."""
237 internal = _CANONICAL.get(dim, dim)
238 return self.slices[internal]
239
240
241 # ---------------------------------------------------------------------------
242 # Internal helpers
243 # ---------------------------------------------------------------------------
244
245
246 def _classify_event(msg: mido.Message) -> str | None:
247 """Map a mido Message to an internal dimension bucket.
248
249 Returns ``None`` for events that should be excluded from all buckets
250 (e.g. ``end_of_track`` is handled during reconstruction, not stored here).
251 Unknown messages that are meta events fall back to ``"track_structure"``.
252 True unknowns (no ``is_meta`` attribute) are discarded.
253 """
254 t = msg.type
255
256 # --- Note events ---
257 if t in ("note_on", "note_off"):
258 return "notes"
259
260 # --- Pitch / pressure ---
261 if t == "pitchwheel":
262 return "pitch_bend"
263 if t == "aftertouch":
264 return "channel_pressure"
265 if t == "polytouch":
266 return "poly_pressure"
267
268 # --- CC — split by controller number ---
269 if t == "control_change":
270 return _CC_DIM.get(msg.control, "cc_other")
271
272 # --- Program change ---
273 if t == "program_change":
274 return "program_change"
275
276 # --- Timeline metadata ---
277 if t == "set_tempo":
278 return "tempo_map"
279 if t == "time_signature":
280 return "time_signatures"
281 if t == "key_signature":
282 return "key_signatures"
283
284 # --- Section markers and text annotations ---
285 if t in ("marker", "cue_marker", "text", "lyrics", "copyright"):
286 return "markers"
287
288 # --- Track structure and routing ---
289 if t in ("track_name", "instrument_name", "sysex", "sequencer_specific"):
290 return "track_structure"
291
292 # --- End-of-track is reconstructed, not stored ---
293 if t == "end_of_track":
294 return None
295
296 # --- Unknown meta events → track structure (safe default) ---
297 if getattr(msg, "is_meta", False):
298 return "track_structure"
299
300 return None
301
302
303 type _MsgVal = int | str | list[int]
304
305
306 def _msg_to_dict(msg: mido.Message) -> dict[str, _MsgVal]:
307 """Serialise a mido Message to a JSON-compatible dict."""
308 d: dict[str, _MsgVal] = {"type": msg.type}
309 for attr in (
310 "channel", "note", "velocity", "control", "value",
311 "pitch", "program", "numerator", "denominator",
312 "clocks_per_click", "notated_32nd_notes_per_beat",
313 "tempo", "key", "scale", "text", "data",
314 ):
315 if hasattr(msg, attr):
316 raw = getattr(msg, attr)
317 if isinstance(raw, (bytes, bytearray)):
318 d[attr] = list(raw)
319 elif isinstance(raw, str):
320 d[attr] = raw
321 elif isinstance(raw, int):
322 d[attr] = raw
323 return d
324
325
326 def _hash_events(events: list[tuple[int, mido.Message]]) -> str:
327 """SHA-256 of the canonical JSON representation of an event list."""
328 payload = json.dumps(
329 [(tick, _msg_to_dict(msg)) for tick, msg in events],
330 sort_keys=True,
331 separators=(",", ":"),
332 ).encode()
333 return hashlib.sha256(payload).hexdigest()
334
335
336 def _to_absolute(track: mido.MidiTrack) -> list[tuple[int, mido.Message]]:
337 """Convert a delta-time track to a list of ``(abs_tick, msg)`` pairs."""
338 result: list[tuple[int, mido.Message]] = []
339 abs_tick = 0
340 for msg in track:
341 abs_tick += msg.time
342 result.append((abs_tick, msg))
343 return result
344
345
346 # ---------------------------------------------------------------------------
347 # Public: extract_dimensions
348 # ---------------------------------------------------------------------------
349
350
351 def extract_dimensions(midi_bytes: bytes) -> MidiDimensions:
352 """Parse *midi_bytes* and bucket events by dimension.
353
354 Every event type in the MIDI spec maps to exactly one of the
355 :data:`INTERNAL_DIMS` buckets. Empty buckets are present with an empty
356 event list so that callers can always index by dimension name.
357
358 Args:
359 midi_bytes: Raw bytes of a ``.mid`` file.
360
361 Returns:
362 A :class:`MidiDimensions` with one :class:`DimensionSlice` per
363 internal dimension. Events within each slice are sorted by ascending
364 absolute tick, then by event type for determinism when multiple events
365 share the same tick.
366
367 Raises:
368 ValueError: If *midi_bytes* cannot be parsed as a MIDI file.
369 """
370 try:
371 mid = mido.MidiFile(file=io.BytesIO(midi_bytes))
372 except Exception as exc:
373 raise ValueError(f"Failed to parse MIDI data: {exc}") from exc
374
375 buckets: dict[str, list[tuple[int, mido.Message]]] = {
376 dim: [] for dim in INTERNAL_DIMS
377 }
378
379 for track in mid.tracks:
380 for abs_tick, msg in _to_absolute(track):
381 bucket = _classify_event(msg)
382 if bucket is not None:
383 buckets[bucket].append((abs_tick, msg))
384
385 for dim in INTERNAL_DIMS:
386 buckets[dim].sort(key=lambda x: (x[0], x[1].type))
387
388 slices = {
389 dim: DimensionSlice(name=dim, events=events)
390 for dim, events in buckets.items()
391 }
392 return MidiDimensions(
393 ticks_per_beat=mid.ticks_per_beat,
394 file_type=mid.type,
395 slices=slices,
396 )
397
398
399 # ---------------------------------------------------------------------------
400 # Public: dimension_conflict_detail
401 # ---------------------------------------------------------------------------
402
403
404 def dimension_conflict_detail(
405 base: MidiDimensions,
406 left: MidiDimensions,
407 right: MidiDimensions,
408 ) -> dict[str, str]:
409 """Return a per-dimension change report for a conflicting file.
410
411 Returns a dict mapping internal dimension name to one of:
412
413 - ``"unchanged"`` — neither side changed this dimension.
414 - ``"left_only"`` — only the left (ours) side changed.
415 - ``"right_only"`` — only the right (theirs) side changed.
416 - ``"both"`` — both sides changed; a dimension-level conflict.
417
418 This is used by :func:`merge_midi_dimensions` and surfaced in
419 ``muse merge`` output for human-readable conflict diagnostics.
420 """
421 report: dict[str, str] = {}
422 for dim in INTERNAL_DIMS:
423 base_hash = base.slices[dim].content_hash
424 left_hash = left.slices[dim].content_hash
425 right_hash = right.slices[dim].content_hash
426 left_changed = base_hash != left_hash
427 right_changed = base_hash != right_hash
428 if left_changed and right_changed:
429 report[dim] = "both"
430 elif left_changed:
431 report[dim] = "left_only"
432 elif right_changed:
433 report[dim] = "right_only"
434 else:
435 report[dim] = "unchanged"
436 return report
437
438
439 # ---------------------------------------------------------------------------
440 # Reconstruction helpers
441 # ---------------------------------------------------------------------------
442
443
444 def _events_to_track(
445 events: list[tuple[int, mido.Message]],
446 ) -> mido.MidiTrack:
447 """Convert absolute-tick events to a mido MidiTrack with delta times."""
448 track = mido.MidiTrack()
449 prev_tick = 0
450 for abs_tick, msg in sorted(events, key=lambda x: (x[0], x[1].type)):
451 delta = abs_tick - prev_tick
452 new_msg = msg.copy(time=delta)
453 track.append(new_msg)
454 prev_tick = abs_tick
455 if not track or track[-1].type != "end_of_track":
456 track.append(mido.MetaMessage("end_of_track", time=0))
457 return track
458
459
460 def _reconstruct(
461 ticks_per_beat: int,
462 winning_slices: dict[str, list[tuple[int, mido.Message]]],
463 ) -> bytes:
464 """Build a type-0 MIDI file from winning dimension event lists.
465
466 All dimension events are merged into a single track (type-0) for
467 maximum compatibility. The absolute-tick ordering is preserved and
468 duplicate end_of_track messages are removed.
469 """
470 all_events: list[tuple[int, mido.Message]] = []
471 for events in winning_slices.values():
472 all_events.extend(events)
473
474 all_events = [
475 (tick, msg) for tick, msg in all_events
476 if msg.type != "end_of_track"
477 ]
478 all_events.sort(key=lambda x: (x[0], x[1].type))
479
480 track = _events_to_track(all_events)
481 mid = mido.MidiFile(type=0, ticks_per_beat=ticks_per_beat)
482 mid.tracks.append(track)
483
484 buf = io.BytesIO()
485 mid.save(file=buf)
486 return buf.getvalue()
487
488
489 # ---------------------------------------------------------------------------
490 # Public: merge_midi_dimensions
491 # ---------------------------------------------------------------------------
492
493
494 def merge_midi_dimensions(
495 base_bytes: bytes,
496 left_bytes: bytes,
497 right_bytes: bytes,
498 attrs_rules: list[AttributeRule],
499 path: str,
500 ) -> tuple[bytes, dict[str, str]] | None:
501 """Attempt a dimension-level three-way merge of a MIDI file.
502
503 For each internal dimension (all 21 of them):
504
505 - If neither side changed → keep base.
506 - If only one side changed → take that side (clean auto-merge).
507 - If both sides changed → consult ``.museattributes`` strategy:
508
509 * ``ours`` / ``theirs`` → take the specified side; record in report.
510 * ``manual`` / ``auto`` / ``union`` → unresolvable; return ``None``.
511
512 Non-independent dimensions (``tempo_map``, ``time_signatures``,
513 ``track_structure``) that have bilateral conflicts cause an immediate
514 ``None`` return — their conflicts cannot be auto-resolved because they
515 affect the semantic meaning of all other dimensions.
516
517 Args:
518 base_bytes: MIDI bytes for the common ancestor.
519 left_bytes: MIDI bytes for the ours (left) branch.
520 right_bytes: MIDI bytes for the theirs (right) branch.
521 attrs_rules: Rule list from :func:`muse.core.attributes.load_attributes`.
522 path: Workspace-relative POSIX path (used for strategy lookup).
523
524 Returns:
525 A ``(merged_bytes, dimension_report)`` tuple when all dimension
526 conflicts can be resolved, or ``None`` when at least one dimension
527 conflict has no resolvable strategy.
528
529 *dimension_report* maps each internal dimension name to the side
530 chosen: ``"base"``, ``"left"``, ``"right"``, or the strategy string.
531 Only dimensions with non-empty event lists or conflicts are included.
532
533 Raises:
534 ValueError: If any of the byte strings cannot be parsed as MIDI.
535 """
536 base_dims = extract_dimensions(base_bytes)
537 left_dims = extract_dimensions(left_bytes)
538 right_dims = extract_dimensions(right_bytes)
539
540 detail = dimension_conflict_detail(base_dims, left_dims, right_dims)
541
542 winning_slices: dict[str, list[tuple[int, mido.Message]]] = {}
543 dimension_report: dict[str, str] = {}
544
545 for dim in INTERNAL_DIMS:
546 change = detail[dim]
547
548 if change == "unchanged":
549 winning_slices[dim] = base_dims.slices[dim].events
550 if base_dims.slices[dim].events:
551 dimension_report[dim] = "base"
552
553 elif change == "left_only":
554 winning_slices[dim] = left_dims.slices[dim].events
555 dimension_report[dim] = "left"
556
557 elif change == "right_only":
558 winning_slices[dim] = right_dims.slices[dim].events
559 dimension_report[dim] = "right"
560
561 else:
562 # Both sides changed — resolve via .museattributes strategy.
563 # Look up by user-facing aliases first, then internal name.
564 user_dim_names = [k for k, v in DIM_ALIAS.items() if v == dim]
565 user_dim_names.append(dim) # internal name is also a valid alias
566
567 strategy = "auto"
568 for user_dim in user_dim_names:
569 s = resolve_strategy(attrs_rules, path, user_dim)
570 if s != "auto":
571 strategy = s
572 break
573 if strategy == "auto":
574 strategy = resolve_strategy(attrs_rules, path, "*")
575
576 if strategy == "ours":
577 winning_slices[dim] = left_dims.slices[dim].events
578 dimension_report[dim] = f"ours ({dim})"
579 elif strategy == "theirs":
580 winning_slices[dim] = right_dims.slices[dim].events
581 dimension_report[dim] = f"theirs ({dim})"
582 else:
583 # Unresolvable conflict. Non-independent dims fail fast.
584 return None
585
586 merged_bytes = _reconstruct(base_dims.ticks_per_beat, winning_slices)
587 return merged_bytes, dimension_report