cgcardona / muse public
test_stress_diff_algorithms.py python
407 lines 15.2 KB
119290fc Add mission-critical stress test suite (9 new files, 1716 tests total) (#76) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Stress tests for the diff algorithm library.
2
3 Covers:
4 - LCS / Myers: empty sequences, identical, reversed, interleaved, long sequences.
5 Round-trip property: applying the edit script to the base must reproduce target.
6 - Tree edit: single node, add subtree, remove subtree, deep nesting.
7 - Numerical diff: all three modes (sparse / block / full), epsilon tolerance.
8 - Set diff: add / remove / replace elements.
9 - snapshot_diff: file-level diffing via ScaffoldPlugin.
10 - detect_moves: move detection from paired insert+delete.
11 """
12 from __future__ import annotations
13
14 import hashlib
15 import random
16 from typing import Literal
17
18 import pytest
19
20 from muse.core.diff_algorithms.lcs import myers_ses, detect_moves, EditStep
21 from muse.core.diff_algorithms.tree_edit import TreeNode, diff as tree_diff
22 from muse.core.diff_algorithms.numerical import diff as num_diff
23 from muse.core.diff_algorithms.set_ops import diff as set_diff
24 from muse.core.schema import (
25 SequenceSchema,
26 TensorSchema,
27 SetSchema,
28 TreeSchema,
29 )
30 from muse.domain import InsertOp, DeleteOp, DomainOp, SnapshotManifest
31 from muse.plugins.scaffold.plugin import ScaffoldPlugin
32
33
34 # ---------------------------------------------------------------------------
35 # Helpers
36 # ---------------------------------------------------------------------------
37
38
39 def _cid(label: str) -> str:
40 return hashlib.sha256(label.encode()).hexdigest()[:16]
41
42
43 def _apply_ses(base: list[str], steps: list[EditStep]) -> list[str]:
44 """Apply an edit-script to base to reconstruct target."""
45 result: list[str] = []
46 for step in steps:
47 if step.kind == "keep":
48 result.append(base[step.base_index])
49 elif step.kind == "insert":
50 result.append(step.item)
51 # "delete" steps are skipped.
52 return result
53
54
55 def _seq_schema(addr: str = "test") -> SequenceSchema:
56 return SequenceSchema(
57 field_name=addr,
58 address=addr,
59 diff_algorithm="lcs",
60 element_type="content_id",
61 )
62
63
64 def _tensor_schema(
65 mode: Literal["sparse", "block", "full"] = "sparse", epsilon: float = 1e-9
66 ) -> TensorSchema:
67 return TensorSchema(
68 field_name="tensor",
69 address="tensor",
70 shape=[],
71 dtype="float64",
72 diff_mode=mode,
73 epsilon=epsilon,
74 )
75
76
77 def _set_schema() -> SetSchema:
78 return SetSchema(field_name="elems", address="elems", element_type="str")
79
80
81 def _tree_schema() -> TreeSchema:
82 return TreeSchema(kind="tree", node_type="generic", diff_algorithm="zhang_shasha")
83
84
85 # ===========================================================================
86 # LCS / Myers
87 # ===========================================================================
88
89
90 class TestMyersSES:
91 def test_empty_base_to_nonempty(self) -> None:
92 target = [_cid(f"x{i}") for i in range(5)]
93 steps = myers_ses([], target)
94 assert all(s.kind == "insert" for s in steps)
95
96 def test_nonempty_to_empty(self) -> None:
97 base = [_cid(f"x{i}") for i in range(5)]
98 steps = myers_ses(base, [])
99 assert all(s.kind == "delete" for s in steps)
100
101 def test_identical_sequences_all_keep(self) -> None:
102 seq = [_cid(f"x{i}") for i in range(20)]
103 steps = myers_ses(seq, seq)
104 assert all(s.kind == "keep" for s in steps)
105
106 def test_single_insert_at_beginning(self) -> None:
107 base = [_cid("b"), _cid("c")]
108 target = [_cid("a"), _cid("b"), _cid("c")]
109 steps = myers_ses(base, target)
110 result = _apply_ses(base, steps)
111 assert result == target
112
113 def test_single_delete_from_middle(self) -> None:
114 base = [_cid("a"), _cid("b"), _cid("c")]
115 target = [_cid("a"), _cid("c")]
116 steps = myers_ses(base, target)
117 result = _apply_ses(base, steps)
118 assert result == target
119
120 def test_reversed_sequence(self) -> None:
121 base = [_cid(f"x{i}") for i in range(10)]
122 target = list(reversed(base))
123 steps = myers_ses(base, target)
124 result = _apply_ses(base, steps)
125 assert result == target
126
127 def test_interleaved_sequences(self) -> None:
128 base = [_cid("a"), _cid("b"), _cid("c"), _cid("d")]
129 target = [_cid("a"), _cid("X"), _cid("b"), _cid("Y"), _cid("c"), _cid("Z"), _cid("d")]
130 steps = myers_ses(base, target)
131 result = _apply_ses(base, steps)
132 assert result == target
133
134 def test_round_trip_property_random_sequences(self) -> None:
135 """Applying the SES to base must always reproduce target."""
136 rng = random.Random(42)
137 alphabet = [_cid(f"elem-{i}") for i in range(20)]
138 for _ in range(50):
139 base = rng.choices(alphabet, k=rng.randint(0, 15))
140 target = rng.choices(alphabet, k=rng.randint(0, 15))
141 steps = myers_ses(base, target)
142 assert _apply_ses(base, steps) == target
143
144 def test_long_sequence_1000_items(self) -> None:
145 base = [_cid(f"item-{i}") for i in range(1000)]
146 target = [_cid(f"item-{i}") for i in range(500, 1000)] + [_cid(f"new-{i}") for i in range(500)]
147 steps = myers_ses(base, target)
148 result = _apply_ses(base, steps)
149 assert result == target
150
151 def test_empty_both_sides(self) -> None:
152 assert myers_ses([], []) == []
153
154
155 # ===========================================================================
156 # detect_moves
157 # ===========================================================================
158
159
160 class TestDetectMoves:
161 def _make_insert(self, cid: str, pos: int | None = None) -> InsertOp:
162 return InsertOp(op="insert", address="test", position=pos, content_id=cid, content_summary=cid)
163
164 def _make_delete(self, cid: str, pos: int | None = None) -> DeleteOp:
165 return DeleteOp(op="delete", address="test", position=pos, content_id=cid, content_summary=cid)
166
167 def test_matching_insert_delete_becomes_move(self) -> None:
168 cid = _cid("moved-item")
169 inserts = [self._make_insert(cid, pos=5)]
170 deletes = [self._make_delete(cid, pos=2)]
171 moves, remaining_ins, remaining_del = detect_moves(inserts, deletes)
172 assert len(moves) == 1
173 assert moves[0]["op"] == "move"
174
175 def test_no_match_when_different_content_ids(self) -> None:
176 inserts = [self._make_insert(_cid("a"), pos=5)]
177 deletes = [self._make_delete(_cid("b"), pos=2)]
178 moves, remaining_ins, remaining_del = detect_moves(inserts, deletes)
179 assert len(moves) == 0
180
181 def test_no_moves_on_empty_inputs(self) -> None:
182 moves, ins, dels = detect_moves([], [])
183 assert moves == []
184 assert ins == []
185 assert dels == []
186
187 def test_same_position_not_a_move(self) -> None:
188 """Insert and delete at same position: not a move."""
189 cid = _cid("same-pos")
190 inserts = [self._make_insert(cid, pos=3)]
191 deletes = [self._make_delete(cid, pos=3)]
192 moves, remaining_ins, remaining_del = detect_moves(inserts, deletes)
193 # Same position = not a move by definition.
194 assert len(moves) == 0
195
196
197 # ===========================================================================
198 # Tree edit
199 # ===========================================================================
200
201
202 class TestTreeEditDiff:
203 def _node(self, label: str, cid: str, children: tuple[TreeNode, ...] = ()) -> TreeNode:
204 return TreeNode(id=label, label=label, content_id=cid, children=children)
205
206 def test_identical_trees_no_ops(self) -> None:
207 tree = self._node("root", _cid("root"), (
208 self._node("child-a", _cid("a")),
209 self._node("child-b", _cid("b")),
210 ))
211 schema = _tree_schema()
212 delta = tree_diff(schema, tree, tree, domain="test")
213 assert delta["ops"] == []
214
215 def test_replace_root_content(self) -> None:
216 base = self._node("root", _cid("v1"))
217 target = self._node("root", _cid("v2"))
218 schema = _tree_schema()
219 delta = tree_diff(schema, base, target, domain="test")
220 assert any(op["op"] == "replace" for op in delta["ops"])
221
222 def test_add_child_node(self) -> None:
223 base = self._node("root", _cid("root"), (self._node("a", _cid("a")),))
224 target = self._node("root", _cid("root"), (
225 self._node("a", _cid("a")),
226 self._node("b", _cid("b")),
227 ))
228 schema = _tree_schema()
229 delta = tree_diff(schema, base, target, domain="test")
230 assert any(op["op"] == "insert" for op in delta["ops"])
231
232 def test_remove_child_node(self) -> None:
233 base = self._node("root", _cid("root"), (
234 self._node("a", _cid("a")),
235 self._node("b", _cid("b")),
236 ))
237 target = self._node("root", _cid("root"), (self._node("a", _cid("a")),))
238 schema = _tree_schema()
239 delta = tree_diff(schema, base, target, domain="test")
240 assert any(op["op"] == "delete" for op in delta["ops"])
241
242 def test_empty_tree_to_populated(self) -> None:
243 base = self._node("root", _cid("root"))
244 target = self._node("root", _cid("root"), tuple(
245 self._node(f"child-{i}", _cid(f"c{i}")) for i in range(5)
246 ))
247 schema = _tree_schema()
248 delta = tree_diff(schema, base, target, domain="test")
249 assert any(op["op"] == "insert" for op in delta["ops"])
250
251
252 # ===========================================================================
253 # Numerical diff
254 # ===========================================================================
255
256
257 class TestNumericalDiff:
258 def test_identical_arrays_no_ops(self) -> None:
259 arr = [float(i) for i in range(100)]
260 schema = _tensor_schema("sparse")
261 delta = num_diff(schema, arr, arr, domain="test")
262 assert delta["ops"] == []
263
264 def test_sparse_mode_one_change(self) -> None:
265 base = [0.0] * 10
266 target = [0.0] * 10
267 target[5] = 1.0
268 schema = _tensor_schema("sparse")
269 delta = num_diff(schema, base, target, domain="test")
270 assert len(delta["ops"]) == 1
271 assert delta["ops"][0]["op"] == "replace"
272
273 def test_block_mode_adjacent_changes_grouped(self) -> None:
274 base = [0.0] * 10
275 target = [0.0] * 10
276 for i in range(3, 7):
277 target[i] = float(i)
278 schema = _tensor_schema("block")
279 delta = num_diff(schema, base, target, domain="test")
280 # Block mode should group adjacent changes.
281 assert len(delta["ops"]) <= 4
282
283 def test_full_mode_single_op_for_any_change(self) -> None:
284 base = [0.0] * 100
285 target = list(base)
286 target[50] = 1.0
287 target[99] = 2.0
288 schema = _tensor_schema("full")
289 delta = num_diff(schema, base, target, domain="test")
290 assert len(delta["ops"]) == 1
291
292 def test_epsilon_tolerance_no_spurious_diffs(self) -> None:
293 base = [1.0, 2.0, 3.0]
294 target = [1.0 + 1e-12, 2.0 - 1e-12, 3.0 + 1e-12]
295 schema = _tensor_schema("sparse", epsilon=1e-9)
296 delta = num_diff(schema, base, target, domain="test")
297 assert delta["ops"] == []
298
299 def test_epsilon_threshold_triggers_diff(self) -> None:
300 base = [1.0]
301 target = [1.0 + 1e-5]
302 schema = _tensor_schema("sparse", epsilon=1e-9)
303 delta = num_diff(schema, base, target, domain="test")
304 assert len(delta["ops"]) == 1
305
306 def test_empty_arrays_no_ops(self) -> None:
307 schema = _tensor_schema("sparse")
308 delta = num_diff(schema, [], [], domain="test")
309 assert delta["ops"] == []
310
311 def test_all_values_changed_sparse(self) -> None:
312 base = [0.0] * 50
313 target = [1.0] * 50
314 schema = _tensor_schema("sparse")
315 delta = num_diff(schema, base, target, domain="test")
316 assert len(delta["ops"]) == 50
317
318
319 # ===========================================================================
320 # Set diff
321 # ===========================================================================
322
323
324 class TestSetDiff:
325 def test_identical_sets_no_ops(self) -> None:
326 s = frozenset(f"elem-{i}" for i in range(20))
327 schema = _set_schema()
328 delta = set_diff(schema, s, s, domain="test")
329 assert delta["ops"] == []
330
331 def test_add_elements(self) -> None:
332 base: frozenset[str] = frozenset()
333 target = frozenset(f"new-{i}" for i in range(10))
334 schema = _set_schema()
335 delta = set_diff(schema, base, target, domain="test")
336 assert all(op["op"] == "insert" for op in delta["ops"])
337 assert len(delta["ops"]) == 10
338
339 def test_remove_elements(self) -> None:
340 base = frozenset(f"elem-{i}" for i in range(10))
341 target: frozenset[str] = frozenset()
342 schema = _set_schema()
343 delta = set_diff(schema, base, target, domain="test")
344 assert all(op["op"] == "delete" for op in delta["ops"])
345 assert len(delta["ops"]) == 10
346
347 def test_mixed_add_remove(self) -> None:
348 base = frozenset(f"base-{i}" for i in range(5))
349 target = frozenset(f"base-{i}" for i in range(3)) | frozenset(f"new-{i}" for i in range(3))
350 schema = _set_schema()
351 delta = set_diff(schema, base, target, domain="test")
352 ops_by_kind = {"insert": 0, "delete": 0}
353 for op in delta["ops"]:
354 ops_by_kind[op["op"]] += 1
355 assert ops_by_kind["insert"] == 3
356 assert ops_by_kind["delete"] == 2
357
358 def test_empty_base_to_empty_target(self) -> None:
359 schema = _set_schema()
360 delta = set_diff(schema, frozenset(), frozenset(), domain="test")
361 assert delta["ops"] == []
362
363
364 # ===========================================================================
365 # snapshot_diff (via ScaffoldPlugin which uses it internally)
366 # ===========================================================================
367
368
369 class TestSnapshotDiff:
370 """Test snapshot_diff indirectly via ScaffoldPlugin.diff() which delegates to it."""
371
372 def _diff(self, base: dict[str, str], target: dict[str, str]) -> list[DomainOp]:
373 plugin = ScaffoldPlugin()
374 base_snap = SnapshotManifest(files=base, domain="scaffold")
375 target_snap = SnapshotManifest(files=target, domain="scaffold")
376 delta = plugin.diff(base_snap, target_snap)
377 return list(delta["ops"])
378
379 def test_identical_snapshots_empty_ops(self) -> None:
380 manifest = {f"f{i}.py": _cid(f"v{i}") for i in range(10)}
381 ops = self._diff(manifest, manifest)
382 assert ops == []
383
384 def test_added_files_produce_insert_ops(self) -> None:
385 ops = self._diff({}, {"new.py": _cid("new")})
386 assert any(op["op"] == "insert" and op["address"] == "new.py" for op in ops)
387
388 def test_removed_files_produce_delete_ops(self) -> None:
389 ops = self._diff({"old.py": _cid("old")}, {})
390 assert any(op["op"] == "delete" and op["address"] == "old.py" for op in ops)
391
392 def test_modified_files_produce_replace_ops(self) -> None:
393 ops = self._diff({"f.py": _cid("v1")}, {"f.py": _cid("v2")})
394 assert any(op["op"] == "replace" and op["address"] == "f.py" for op in ops)
395
396 def test_50_file_snapshot_diff_complete(self) -> None:
397 base = {f"f{i:03d}.py": _cid(f"v1-{i}") for i in range(50)}
398 target = {f"f{i:03d}.py": _cid(f"v2-{i}") for i in range(50)}
399 ops = self._diff(base, target)
400 assert len(ops) == 50
401 assert all(op["op"] == "replace" for op in ops)
402
403 def test_ops_sorted_by_address(self) -> None:
404 target = {f"z{i:02d}.py": _cid(f"v{i}") for i in range(10)}
405 ops = self._diff({}, target)
406 addresses = [op["address"] for op in ops]
407 assert addresses == sorted(addresses)