cgcardona / muse public
lcs.py python
235 lines 8.2 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """LCS / Myers shortest-edit-script algorithm for ordered sequences.
2
3 Operates on ``list[str]`` where each string is a content ID (SHA-256 or
4 deterministic hash). Two elements are considered identical iff their content
5 IDs are equal — the algorithm never inspects actual content.
6
7 Public API
8 ----------
9 - :func:`myers_ses` — compute shortest edit script (keep / insert / delete).
10 - :func:`detect_moves` — post-process insert+delete pairs into ``MoveOp``\\s.
11 - :func:`diff` — end-to-end: list[str] × list[str] → ``StructuredDelta``.
12
13 Algorithm
14 ---------
15 ``myers_ses`` uses the classic O(nm) LCS dynamic-programming traceback. This
16 is the same algorithm as ``midi_diff.lcs_edit_script`` but operates on content
17 IDs (strings) rather than ``NoteKey`` dicts, making it fully generic.
18
19 The patience-diff and O(nd) Myers variants (see ``SequenceSchema.diff_algorithm``)
20 are not yet implemented; both fall back to the O(nm) LCS.
21 as an optimisation without changing the public API.
22 """
23
24 import logging
25 from dataclasses import dataclass
26 from typing import Literal
27
28 from muse.core.schema import SequenceSchema
29 from muse.domain import DeleteOp, DomainOp, InsertOp, MoveOp, StructuredDelta
30
31 logger = logging.getLogger(__name__)
32
33 EditKind = Literal["keep", "insert", "delete"]
34
35
36 @dataclass(frozen=True)
37 class EditStep:
38 """One step in the shortest edit script produced by :func:`myers_ses`."""
39
40 kind: EditKind
41 base_index: int # index in the base content-ID list
42 target_index: int # index in the target content-ID list
43 item: str # content ID of the element
44
45
46 # ---------------------------------------------------------------------------
47 # Core algorithm
48 # ---------------------------------------------------------------------------
49
50
51 def myers_ses(base: list[str], target: list[str]) -> list[EditStep]:
52 """Compute the shortest edit script transforming *base* into *target*.
53
54 Uses the O(nm) LCS dynamic-programming table followed by a linear-time
55 traceback. Two elements are equal iff their content IDs match.
56
57 Args:
58 base: Ordered list of content IDs for the base sequence.
59 target: Ordered list of content IDs for the target sequence.
60
61 Returns:
62 A list of :class:`EditStep` entries (keep / insert / delete) that
63 transforms *base* into *target*. The number of "keep" steps equals
64 the LCS length; insert + delete steps are minimal.
65 """
66 n, m = len(base), len(target)
67
68 # dp[i][j] = length of LCS of base[i:] and target[j:]
69 dp: list[list[int]] = [[0] * (m + 1) for _ in range(n + 1)]
70 for i in range(n - 1, -1, -1):
71 for j in range(m - 1, -1, -1):
72 if base[i] == target[j]:
73 dp[i][j] = dp[i + 1][j + 1] + 1
74 else:
75 dp[i][j] = max(dp[i + 1][j], dp[i][j + 1])
76
77 steps: list[EditStep] = []
78 i, j = 0, 0
79 while i < n or j < m:
80 if i < n and j < m and base[i] == target[j]:
81 steps.append(EditStep("keep", i, j, base[i]))
82 i += 1
83 j += 1
84 elif j < m and (i >= n or dp[i][j + 1] >= dp[i + 1][j]):
85 steps.append(EditStep("insert", i, j, target[j]))
86 j += 1
87 else:
88 steps.append(EditStep("delete", i, j, base[i]))
89 i += 1
90
91 return steps
92
93
94 # ---------------------------------------------------------------------------
95 # Move detection post-pass
96 # ---------------------------------------------------------------------------
97
98
99 def detect_moves(
100 inserts: list[InsertOp],
101 deletes: list[DeleteOp],
102 ) -> tuple[list[MoveOp], list[InsertOp], list[DeleteOp]]:
103 """Collapse (delete, insert) pairs that share a content ID into ``MoveOp``\\s.
104
105 A move is defined as a delete and an insert of the same content (same
106 ``content_id``) at different positions. Where the positions are the same,
107 the pair is left as separate insert/delete ops (idempotent round-trip).
108
109 Args:
110 inserts: ``InsertOp`` entries from the LCS edit script.
111 deletes: ``DeleteOp`` entries from the LCS edit script.
112
113 Returns:
114 A tuple ``(moves, remaining_inserts, remaining_deletes)`` where
115 ``moves`` contains the detected ``MoveOp``\\s and the remaining lists
116 hold ops that could not be paired.
117 """
118 delete_by_content: dict[str, DeleteOp] = {}
119 for d in deletes:
120 # Keep the first delete for each content_id — later ones are true deletes.
121 if d["content_id"] not in delete_by_content:
122 delete_by_content[d["content_id"]] = d
123
124 moves: list[MoveOp] = []
125 remaining_inserts: list[InsertOp] = []
126 consumed: set[str] = set()
127
128 for ins in inserts:
129 cid = ins["content_id"]
130 if cid in delete_by_content and cid not in consumed:
131 d = delete_by_content[cid]
132 from_pos = d["position"] if d["position"] is not None else 0
133 to_pos = ins["position"] if ins["position"] is not None else 0
134 if from_pos != to_pos:
135 moves.append(
136 MoveOp(
137 op="move",
138 address=ins["address"],
139 from_position=from_pos,
140 to_position=to_pos,
141 content_id=cid,
142 )
143 )
144 consumed.add(cid)
145 continue
146 remaining_inserts.append(ins)
147
148 remaining_deletes = [d for d in deletes if d["content_id"] not in consumed]
149 return moves, remaining_inserts, remaining_deletes
150
151
152 # ---------------------------------------------------------------------------
153 # Top-level diff entry point
154 # ---------------------------------------------------------------------------
155
156
157 def diff(
158 schema: SequenceSchema,
159 base: list[str],
160 target: list[str],
161 *,
162 domain: str,
163 address: str = "",
164 ) -> StructuredDelta:
165 """Diff two ordered sequences of content IDs, returning a ``StructuredDelta``.
166
167 Runs :func:`myers_ses`, then :func:`detect_moves` to collapse paired
168 insert/delete entries into ``MoveOp``\\s. The resulting ``ops`` list
169 contains ``DeleteOp``, ``InsertOp``, and ``MoveOp`` entries.
170
171 Args:
172 schema: The ``SequenceSchema`` declaring element type and identity.
173 base: Base (ancestor) sequence as a list of content IDs.
174 target: Target (newer) sequence as a list of content IDs.
175 domain: Domain tag for the returned ``StructuredDelta``.
176 address: Address prefix for generated op entries (e.g. file path).
177
178 Returns:
179 A ``StructuredDelta`` with a human-readable ``summary`` and typed ops.
180 """
181 steps = myers_ses(base, target)
182
183 raw_inserts: list[InsertOp] = []
184 raw_deletes: list[DeleteOp] = []
185 elem = schema["element_type"]
186
187 for step in steps:
188 if step.kind == "insert":
189 raw_inserts.append(
190 InsertOp(
191 op="insert",
192 address=address,
193 position=step.target_index,
194 content_id=step.item,
195 content_summary=f"{elem}:{step.item[:8]}",
196 )
197 )
198 elif step.kind == "delete":
199 raw_deletes.append(
200 DeleteOp(
201 op="delete",
202 address=address,
203 position=step.base_index,
204 content_id=step.item,
205 content_summary=f"{elem}:{step.item[:8]}",
206 )
207 )
208
209 moves, remaining_inserts, remaining_deletes = detect_moves(raw_inserts, raw_deletes)
210 ops: list[DomainOp] = [*remaining_deletes, *remaining_inserts, *moves]
211
212 n_ins = len(remaining_inserts)
213 n_del = len(remaining_deletes)
214 n_mov = len(moves)
215
216 parts: list[str] = []
217 if n_ins:
218 parts.append(f"{n_ins} {elem}{'s' if n_ins != 1 else ''} added")
219 if n_del:
220 parts.append(f"{n_del} {elem}{'s' if n_del != 1 else ''} removed")
221 if n_mov:
222 parts.append(f"{n_mov} {'moved' if n_mov != 1 else 'moved'}")
223 summary = ", ".join(parts) if parts else f"no {elem} changes"
224
225 logger.debug(
226 "lcs.diff %r: +%d -%d ~%d ops on %d→%d elements",
227 address,
228 n_ins,
229 n_del,
230 n_mov,
231 len(base),
232 len(target),
233 )
234
235 return StructuredDelta(domain=domain, ops=ops, summary=summary)