cgcardona / muse public
test_diff_algorithms.py python
602 lines 23.6 KB
d1dfb412 feat: implement 3 missing plan items — property tests, format_version, … Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Tests for the diff algorithm library.
2
3 Covers all four algorithm modules (lcs, tree_edit, numerical, set_ops) and
4 the schema-driven dispatch in ``muse.core.diff_algorithms``.
5
6 Each algorithm is tested at three levels:
7 1. **Unit** — the core function in isolation.
8 2. **Output shape** — the returned ``StructuredDelta`` is well-formed.
9 3. **Dispatch** — ``diff_by_schema`` routes correctly for each schema kind.
10 """
11 from __future__ import annotations
12
13 import hashlib
14
15 import pytest
16 from typing import Literal
17
18 from muse.core.diff_algorithms import (
19 DiffInput,
20 MapInput,
21 SequenceInput,
22 SetInput,
23 TensorInput,
24 TreeInput,
25 TreeNode,
26 diff_by_schema,
27 snapshot_diff,
28 )
29 from muse.core.diff_algorithms import lcs as lcs_mod
30 from muse.core.diff_algorithms import numerical as numerical_mod
31 from muse.core.diff_algorithms import set_ops as set_ops_mod
32 from muse.core.diff_algorithms import tree_edit as tree_edit_mod
33 from muse.core.schema import (
34 DomainSchema,
35 DimensionSpec,
36 MapSchema,
37 SequenceSchema,
38 SetSchema,
39 TensorSchema,
40 TreeSchema,
41 )
42 from muse.domain import SnapshotManifest, StructuredDelta
43
44
45 # ---------------------------------------------------------------------------
46 # Helpers
47 # ---------------------------------------------------------------------------
48
49
50 def _cid(s: str) -> str:
51 """Return a deterministic SHA-256 hex for a short string."""
52 return hashlib.sha256(s.encode()).hexdigest()
53
54
55 def _seq_schema(element_type: str = "item") -> SequenceSchema:
56 return SequenceSchema(
57 kind="sequence",
58 element_type=element_type,
59 identity="by_position",
60 diff_algorithm="lcs",
61 alphabet=None,
62 )
63
64
65 def _set_schema(element_type: str = "file") -> SetSchema:
66 return SetSchema(kind="set", element_type=element_type, identity="by_content")
67
68
69 DiffMode = Literal["sparse", "block", "full"]
70
71
72 def _tensor_schema(
73 mode: DiffMode = "sparse", epsilon: float = 0.0
74 ) -> TensorSchema:
75 return TensorSchema(
76 kind="tensor",
77 dtype="float32",
78 rank=1,
79 epsilon=epsilon,
80 diff_mode=mode,
81 )
82
83
84 def _tree_schema() -> TreeSchema:
85 return TreeSchema(kind="tree", node_type="node", diff_algorithm="zhang_shasha")
86
87
88 def _map_schema() -> MapSchema:
89 return MapSchema(
90 kind="map",
91 key_type="key",
92 value_schema=_seq_schema(),
93 identity="by_key",
94 )
95
96
97 def _leaf(label: str) -> TreeNode:
98 return TreeNode(id=label, label=label, content_id=_cid(label), children=())
99
100
101 def _node(label: str, *children: TreeNode) -> TreeNode:
102 return TreeNode(
103 id=label, label=label, content_id=_cid(label), children=tuple(children)
104 )
105
106
107 def _is_valid_delta(d: StructuredDelta) -> bool:
108 return isinstance(d["ops"], list) and isinstance(d["summary"], str)
109
110
111 # ===========================================================================
112 # LCS / Myers tests
113 # ===========================================================================
114
115
116 class TestLCSMyersSES:
117 def test_empty_to_empty_returns_no_steps(self) -> None:
118 steps = lcs_mod.myers_ses([], [])
119 assert steps == []
120
121 def test_empty_to_sequence_all_inserts(self) -> None:
122 steps = lcs_mod.myers_ses([], ["a", "b", "c"])
123 kinds = [s.kind for s in steps]
124 assert kinds == ["insert", "insert", "insert"]
125
126 def test_sequence_to_empty_all_deletes(self) -> None:
127 steps = lcs_mod.myers_ses(["a", "b", "c"], [])
128 kinds = [s.kind for s in steps]
129 assert kinds == ["delete", "delete", "delete"]
130
131 def test_identical_sequences_all_keeps(self) -> None:
132 ids = ["a", "b", "c"]
133 steps = lcs_mod.myers_ses(ids, ids)
134 assert all(s.kind == "keep" for s in steps)
135 assert len(steps) == 3
136
137 def test_single_insert_in_middle(self) -> None:
138 base = ["a", "c"]
139 target = ["a", "b", "c"]
140 steps = lcs_mod.myers_ses(base, target)
141 inserts = [s for s in steps if s.kind == "insert"]
142 assert len(inserts) == 1
143 assert inserts[0].item == "b"
144
145 def test_single_delete_in_middle(self) -> None:
146 base = ["a", "b", "c"]
147 target = ["a", "c"]
148 steps = lcs_mod.myers_ses(base, target)
149 deletes = [s for s in steps if s.kind == "delete"]
150 assert len(deletes) == 1
151 assert deletes[0].item == "b"
152
153 def test_lcs_is_minimal(self) -> None:
154 base = ["a", "b", "c", "d"]
155 target = ["a", "x", "c", "d"]
156 steps = lcs_mod.myers_ses(base, target)
157 keeps = [s for s in steps if s.kind == "keep"]
158 inserts = [s for s in steps if s.kind == "insert"]
159 deletes = [s for s in steps if s.kind == "delete"]
160 assert len(keeps) == 3 # a, c, d are kept
161 assert len(inserts) == 1
162 assert len(deletes) == 1
163
164 def test_step_indices_are_consistent(self) -> None:
165 base = ["x", "y", "z"]
166 target = ["y", "z", "w"]
167 steps = lcs_mod.myers_ses(base, target)
168 for s in steps:
169 if s.kind == "delete":
170 assert s.item == base[s.base_index]
171 elif s.kind == "insert":
172 assert s.item == target[s.target_index]
173
174
175 class TestLCSDetectMoves:
176 def test_paired_delete_insert_becomes_move(self) -> None:
177 from muse.domain import DeleteOp, InsertOp
178
179 cid = _cid("note")
180 ins_op = InsertOp(op="insert", address="", position=3, content_id=cid, content_summary="")
181 del_op = DeleteOp(op="delete", address="", position=0, content_id=cid, content_summary="")
182 moves, rem_ins, rem_del = lcs_mod.detect_moves([ins_op], [del_op])
183 assert len(moves) == 1
184 assert moves[0]["op"] == "move"
185 assert moves[0]["from_position"] == 0
186 assert moves[0]["to_position"] == 3
187 assert len(rem_ins) == 0
188 assert len(rem_del) == 0
189
190 def test_same_position_not_a_move(self) -> None:
191 from muse.domain import DeleteOp, InsertOp
192
193 cid = _cid("item")
194 ins_op = InsertOp(op="insert", address="", position=1, content_id=cid, content_summary="")
195 del_op = DeleteOp(op="delete", address="", position=1, content_id=cid, content_summary="")
196 moves, rem_ins, rem_del = lcs_mod.detect_moves([ins_op], [del_op])
197 assert len(moves) == 0
198 assert len(rem_ins) == 1
199 assert len(rem_del) == 1
200
201 def test_no_paired_content_no_moves(self) -> None:
202 from muse.domain import DeleteOp, InsertOp
203
204 ins_op = InsertOp(op="insert", address="", position=0, content_id=_cid("a"), content_summary="")
205 del_op = DeleteOp(op="delete", address="", position=0, content_id=_cid("b"), content_summary="")
206 moves, rem_ins, rem_del = lcs_mod.detect_moves([ins_op], [del_op])
207 assert len(moves) == 0
208 assert len(rem_ins) == 1
209 assert len(rem_del) == 1
210
211
212 class TestLCSDiff:
213 def test_empty_to_sequence_is_all_inserts(self) -> None:
214 delta = lcs_mod.diff(_seq_schema(), [], ["a", "b"], domain="test")
215 ops = [op for op in delta["ops"] if op["op"] == "insert"]
216 assert len(ops) == 2
217
218 def test_sequence_to_empty_is_all_deletes(self) -> None:
219 delta = lcs_mod.diff(_seq_schema(), ["a", "b"], [], domain="test")
220 ops = [op for op in delta["ops"] if op["op"] == "delete"]
221 assert len(ops) == 2
222
223 def test_identical_sequences_returns_no_ops(self) -> None:
224 delta = lcs_mod.diff(_seq_schema(), ["a", "b", "c"], ["a", "b", "c"], domain="test")
225 assert delta["ops"] == []
226
227 def test_produces_valid_structured_delta(self) -> None:
228 delta = lcs_mod.diff(_seq_schema("note"), ["x"], ["x", "y"], domain="midi")
229 assert _is_valid_delta(delta)
230 assert delta["domain"] == "midi"
231
232 def test_move_detected_from_delete_plus_insert(self) -> None:
233 a, b, c = _cid("a"), _cid("b"), _cid("c")
234 delta = lcs_mod.diff(_seq_schema(), [a, b, c], [b, c, a], domain="test")
235 ops_by_kind = {op["op"] for op in delta["ops"]}
236 assert "move" in ops_by_kind
237
238 def test_summary_is_human_readable(self) -> None:
239 delta = lcs_mod.diff(_seq_schema("note"), ["a"], ["a", "b"], domain="test")
240 assert "note" in delta["summary"]
241 assert "added" in delta["summary"]
242
243
244 # ===========================================================================
245 # Tree edit tests
246 # ===========================================================================
247
248
249 class TestTreeEditDiff:
250 def test_identical_trees_returns_no_ops(self) -> None:
251 root = _node("root", _leaf("A"), _leaf("B"))
252 delta = tree_edit_mod.diff(_tree_schema(), root, root, domain="test")
253 assert delta["ops"] == []
254
255 def test_leaf_relabel_is_replace(self) -> None:
256 base = _leaf("A")
257 old_cid = _cid("A")
258 new_node = TreeNode(id="A", label="A", content_id=_cid("A_new"), children=())
259 target = TreeNode(id="root", label="root", content_id=_cid("root"),
260 children=(new_node,))
261 base_root = TreeNode(id="root", label="root", content_id=_cid("root"),
262 children=(base,))
263 delta = tree_edit_mod.diff(_tree_schema(), base_root, target, domain="test")
264 replace_ops = [op for op in delta["ops"] if op["op"] == "replace"]
265 assert len(replace_ops) == 1
266
267 def test_node_insert(self) -> None:
268 base = _node("root", _leaf("A"))
269 target = _node("root", _leaf("A"), _leaf("B"))
270 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
271 insert_ops = [op for op in delta["ops"] if op["op"] == "insert"]
272 assert len(insert_ops) >= 1
273
274 def test_node_delete(self) -> None:
275 base = _node("root", _leaf("A"), _leaf("B"))
276 target = _node("root", _leaf("A"))
277 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
278 delete_ops = [op for op in delta["ops"] if op["op"] == "delete"]
279 assert len(delete_ops) >= 1
280
281 def test_subtree_move(self) -> None:
282 leaf_a = _leaf("A")
283 leaf_b = _leaf("B")
284 base = _node("root", leaf_a, leaf_b)
285 # Move: leaf_b before leaf_a
286 target = _node("root", leaf_b, leaf_a)
287 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
288 # Should produce a move or a pair of delete/insert
289 op_kinds = {op["op"] for op in delta["ops"]}
290 assert op_kinds & {"move", "insert", "delete"}
291
292 def test_produces_valid_structured_delta(self) -> None:
293 base = _node("root", _leaf("X"))
294 target = _node("root", _leaf("Y"))
295 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="midi")
296 assert _is_valid_delta(delta)
297 assert delta["domain"] == "midi"
298
299 def test_summary_is_human_readable(self) -> None:
300 base = _node("root", _leaf("A"))
301 target = _node("root", _leaf("A"), _leaf("B"))
302 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
303 assert isinstance(delta["summary"], str)
304 assert len(delta["summary"]) > 0
305
306
307 # ===========================================================================
308 # Numerical diff tests
309 # ===========================================================================
310
311
312 class TestNumericalDiff:
313 def test_within_epsilon_returns_no_ops(self) -> None:
314 schema = _tensor_schema(epsilon=1.0)
315 delta = numerical_mod.diff(schema, [1.0, 2.0, 3.0], [1.4, 2.0, 3.0], domain="test")
316 assert delta["ops"] == []
317
318 def test_outside_epsilon_returns_replace(self) -> None:
319 schema = _tensor_schema(epsilon=0.1)
320 delta = numerical_mod.diff(schema, [1.0, 2.0, 3.0], [1.0, 5.0, 3.0], domain="test")
321 assert len(delta["ops"]) == 1
322 assert delta["ops"][0]["op"] == "replace"
323
324 def test_identical_arrays_returns_no_ops(self) -> None:
325 schema = _tensor_schema()
326 delta = numerical_mod.diff(schema, [1.0, 2.0], [1.0, 2.0], domain="test")
327 assert delta["ops"] == []
328
329 def test_sparse_mode_one_op_per_element(self) -> None:
330 schema = _tensor_schema(mode="sparse", epsilon=0.0)
331 base = [1.0, 2.0, 3.0]
332 target = [9.0, 2.0, 9.0]
333 delta = numerical_mod.diff(schema, base, target, domain="test")
334 assert len(delta["ops"]) == 2 # positions 0 and 2
335 for op in delta["ops"]:
336 assert op["op"] == "replace"
337
338 def test_block_mode_groups_adjacent(self) -> None:
339 schema = _tensor_schema(mode="block", epsilon=0.0)
340 base = [1.0, 2.0, 3.0, 4.0, 5.0]
341 target = [9.0, 9.0, 3.0, 9.0, 9.0]
342 delta = numerical_mod.diff(schema, base, target, domain="test")
343 # Changes at 0,1 and 3,4 → two blocks
344 assert len(delta["ops"]) == 2
345
346 def test_full_mode_single_op(self) -> None:
347 schema = _tensor_schema(mode="full", epsilon=0.0)
348 base = [1.0, 2.0, 3.0]
349 target = [1.0, 99.0, 3.0]
350 delta = numerical_mod.diff(schema, base, target, domain="test")
351 assert len(delta["ops"]) == 1
352 assert delta["ops"][0]["op"] == "replace"
353
354 def test_length_mismatch_returns_single_replace(self) -> None:
355 schema = _tensor_schema()
356 delta = numerical_mod.diff(schema, [1.0, 2.0], [1.0, 2.0, 3.0], domain="test")
357 assert len(delta["ops"]) == 1
358 assert delta["ops"][0]["op"] == "replace"
359
360 def test_produces_valid_structured_delta(self) -> None:
361 schema = _tensor_schema(epsilon=0.5)
362 delta = numerical_mod.diff(schema, [0.0, 1.0], [0.0, 2.0], domain="midi")
363 assert _is_valid_delta(delta)
364 assert delta["domain"] == "midi"
365
366
367 # ===========================================================================
368 # Set ops tests
369 # ===========================================================================
370
371
372 class TestSetOpsDiff:
373 def test_add_returns_insert(self) -> None:
374 schema = _set_schema()
375 base: frozenset[str] = frozenset()
376 target = frozenset({_cid("file_a")})
377 delta = set_ops_mod.diff(schema, base, target, domain="test")
378 assert len(delta["ops"]) == 1
379 assert delta["ops"][0]["op"] == "insert"
380
381 def test_remove_returns_delete(self) -> None:
382 schema = _set_schema()
383 cid = _cid("file_a")
384 base = frozenset({cid})
385 target: frozenset[str] = frozenset()
386 delta = set_ops_mod.diff(schema, base, target, domain="test")
387 assert len(delta["ops"]) == 1
388 assert delta["ops"][0]["op"] == "delete"
389
390 def test_no_change_returns_empty(self) -> None:
391 schema = _set_schema()
392 cids = frozenset({_cid("a"), _cid("b")})
393 delta = set_ops_mod.diff(schema, cids, cids, domain="test")
394 assert delta["ops"] == []
395
396 def test_all_ops_have_none_position(self) -> None:
397 schema = _set_schema()
398 base: frozenset[str] = frozenset()
399 target = frozenset({_cid("x"), _cid("y")})
400 delta = set_ops_mod.diff(schema, base, target, domain="test")
401 for op in delta["ops"]:
402 assert op["position"] is None
403
404 def test_produces_valid_structured_delta(self) -> None:
405 schema = _set_schema("audio_file")
406 base = frozenset({_cid("drums"), _cid("bass")})
407 target = frozenset({_cid("drums"), _cid("guitar")})
408 delta = set_ops_mod.diff(schema, base, target, domain="midi")
409 assert _is_valid_delta(delta)
410 assert delta["domain"] == "midi"
411 assert "audio_file" in delta["summary"]
412
413
414 # ===========================================================================
415 # Schema dispatch (diff_by_schema) tests
416 # ===========================================================================
417
418
419 class TestDiffBySchema:
420 def test_dispatch_sequence_schema_calls_lcs(self) -> None:
421 schema = _seq_schema("note")
422 base: DiffInput = SequenceInput(kind="sequence", items=["a"])
423 target: DiffInput = SequenceInput(kind="sequence", items=["a", "b"])
424 delta = diff_by_schema(schema, base, target, domain="test")
425 assert delta["domain"] == "test"
426 insert_ops = [op for op in delta["ops"] if op["op"] == "insert"]
427 assert len(insert_ops) == 1
428
429 def test_dispatch_set_schema_calls_set_ops(self) -> None:
430 schema = _set_schema("file")
431 cid_a = _cid("a")
432 base: DiffInput = SetInput(kind="set", items=frozenset({cid_a}))
433 target: DiffInput = SetInput(kind="set", items=frozenset())
434 delta = diff_by_schema(schema, base, target, domain="test")
435 delete_ops = [op for op in delta["ops"] if op["op"] == "delete"]
436 assert len(delete_ops) == 1
437
438 def test_dispatch_tensor_schema_calls_numerical(self) -> None:
439 schema = _tensor_schema(epsilon=0.0)
440 base: DiffInput = TensorInput(kind="tensor", values=[1.0, 2.0])
441 target: DiffInput = TensorInput(kind="tensor", values=[1.0, 9.0])
442 delta = diff_by_schema(schema, base, target, domain="test")
443 replace_ops = [op for op in delta["ops"] if op["op"] == "replace"]
444 assert len(replace_ops) == 1
445
446 def test_dispatch_tree_schema_calls_tree_edit(self) -> None:
447 schema = _tree_schema()
448 base_tree = _node("root", _leaf("A"))
449 target_tree = _node("root", _leaf("A"), _leaf("B"))
450 base: DiffInput = TreeInput(kind="tree", root=base_tree)
451 target: DiffInput = TreeInput(kind="tree", root=target_tree)
452 delta = diff_by_schema(schema, base, target, domain="test")
453 assert _is_valid_delta(delta)
454
455 def test_dispatch_map_schema_recurses(self) -> None:
456 schema = _map_schema()
457 cid_a, cid_b = _cid("va"), _cid("vb")
458 base: DiffInput = MapInput(kind="map", entries={"key1": cid_a})
459 target: DiffInput = MapInput(kind="map", entries={"key1": cid_b, "key2": cid_a})
460 delta = diff_by_schema(schema, base, target, domain="test")
461 assert _is_valid_delta(delta)
462 # key2 added → insert op; key1 changed → replace op
463 op_kinds = [op["op"] for op in delta["ops"]]
464 assert "insert" in op_kinds
465 assert "replace" in op_kinds
466
467 def test_type_error_on_mismatched_schema_and_input(self) -> None:
468 schema = _seq_schema()
469 wrong_input: DiffInput = SetInput(kind="set", items=frozenset())
470 with pytest.raises(TypeError, match="sequence schema requires SequenceInput"):
471 diff_by_schema(schema, wrong_input, wrong_input, domain="test")
472
473 def test_identical_sequence_produces_no_ops(self) -> None:
474 schema = _seq_schema()
475 items = ["a", "b", "c"]
476 base: DiffInput = SequenceInput(kind="sequence", items=items)
477 target: DiffInput = SequenceInput(kind="sequence", items=items)
478 delta = diff_by_schema(schema, base, target, domain="test")
479 assert delta["ops"] == []
480
481 def test_map_add_key_is_insert(self) -> None:
482 schema = _map_schema()
483 base: DiffInput = MapInput(kind="map", entries={})
484 target: DiffInput = MapInput(kind="map", entries={"chr1": _cid("seq")})
485 delta = diff_by_schema(schema, base, target, domain="genomics")
486 assert delta["ops"][0]["op"] == "insert"
487
488 def test_map_remove_key_is_delete(self) -> None:
489 schema = _map_schema()
490 base: DiffInput = MapInput(kind="map", entries={"chr1": _cid("seq")})
491 target: DiffInput = MapInput(kind="map", entries={})
492 delta = diff_by_schema(schema, base, target, domain="genomics")
493 assert delta["ops"][0]["op"] == "delete"
494
495 def test_map_unchanged_returns_no_ops(self) -> None:
496 schema = _map_schema()
497 entries = {"k1": _cid("v1"), "k2": _cid("v2")}
498 base: DiffInput = MapInput(kind="map", entries=entries)
499 target: DiffInput = MapInput(kind="map", entries=entries)
500 delta = diff_by_schema(schema, base, target, domain="test")
501 assert delta["ops"] == []
502
503
504 # ---------------------------------------------------------------------------
505 # snapshot_diff — schema-driven auto-diff for SnapshotManifests
506 # ---------------------------------------------------------------------------
507
508
509 def _minimal_schema(domain: str) -> DomainSchema:
510 """Minimal DomainSchema for snapshot_diff tests."""
511 return DomainSchema(
512 domain=domain,
513 description="Test domain",
514 dimensions=[],
515 top_level=SetSchema(kind="set", element_type="file", identity="by_content"),
516 merge_mode="three_way",
517 schema_version=1,
518 )
519
520
521 class TestSnapshotDiff:
522 """snapshot_diff provides schema-driven file-level diffs for any plugin."""
523
524 def test_added_file_is_insert_op(self) -> None:
525 schema = _minimal_schema("mydomain")
526 base: SnapshotManifest = {"files": {}, "domain": "mydomain"}
527 target: SnapshotManifest = {"files": {"data.txt": _cid("hello")}, "domain": "mydomain"}
528 delta = snapshot_diff(schema, base, target)
529 assert len(delta["ops"]) == 1
530 assert delta["ops"][0]["op"] == "insert"
531 assert delta["ops"][0]["address"] == "data.txt"
532
533 def test_removed_file_is_delete_op(self) -> None:
534 schema = _minimal_schema("mydomain")
535 base: SnapshotManifest = {"files": {"data.txt": _cid("hello")}, "domain": "mydomain"}
536 target: SnapshotManifest = {"files": {}, "domain": "mydomain"}
537 delta = snapshot_diff(schema, base, target)
538 assert len(delta["ops"]) == 1
539 assert delta["ops"][0]["op"] == "delete"
540
541 def test_modified_file_is_replace_op(self) -> None:
542 schema = _minimal_schema("mydomain")
543 base: SnapshotManifest = {"files": {"data.txt": _cid("v1")}, "domain": "mydomain"}
544 target: SnapshotManifest = {"files": {"data.txt": _cid("v2")}, "domain": "mydomain"}
545 delta = snapshot_diff(schema, base, target)
546 assert len(delta["ops"]) == 1
547 assert delta["ops"][0]["op"] == "replace"
548
549 def test_identical_snapshots_have_no_ops(self) -> None:
550 schema = _minimal_schema("mydomain")
551 manifest: SnapshotManifest = {"files": {"a.txt": _cid("a"), "b.txt": _cid("b")}, "domain": "mydomain"}
552 delta = snapshot_diff(schema, manifest, manifest)
553 assert delta["ops"] == []
554 assert delta["summary"] == "no changes"
555
556 def test_domain_tag_taken_from_schema(self) -> None:
557 schema = _minimal_schema("myplugin")
558 base: SnapshotManifest = {"files": {}, "domain": "myplugin"}
559 target: SnapshotManifest = {"files": {"f.txt": _cid("x")}, "domain": "myplugin"}
560 delta = snapshot_diff(schema, base, target)
561 assert delta["domain"] == "myplugin"
562
563 def test_multiple_changes_produce_correct_op_mix(self) -> None:
564 schema = _minimal_schema("mydomain")
565 base: SnapshotManifest = {
566 "files": {
567 "keep.txt": _cid("same"),
568 "modify.txt": _cid("old"),
569 "delete.txt": _cid("gone"),
570 },
571 "domain": "mydomain",
572 }
573 target: SnapshotManifest = {
574 "files": {
575 "keep.txt": _cid("same"),
576 "modify.txt": _cid("new"),
577 "add.txt": _cid("fresh"),
578 },
579 "domain": "mydomain",
580 }
581 delta = snapshot_diff(schema, base, target)
582 ops_by_kind = {op["op"] for op in delta["ops"]}
583 assert "insert" in ops_by_kind # add.txt
584 assert "delete" in ops_by_kind # delete.txt
585 assert "replace" in ops_by_kind # modify.txt
586 assert len(delta["ops"]) == 3
587
588 def test_scaffold_plugin_uses_snapshot_diff(self) -> None:
589 """ScaffoldPlugin.diff() delegates to snapshot_diff — no custom set-algebra needed."""
590 from muse.plugins.scaffold.plugin import ScaffoldPlugin
591
592 plugin = ScaffoldPlugin()
593 base: SnapshotManifest = {"files": {"a.scaffold": _cid("v1")}, "domain": "scaffold"}
594 target: SnapshotManifest = {
595 "files": {"a.scaffold": _cid("v2"), "b.scaffold": _cid("new")},
596 "domain": "scaffold",
597 }
598 delta = plugin.diff(base, target)
599 op_types = {op["op"] for op in delta["ops"]}
600 assert "replace" in op_types
601 assert "insert" in op_types
602 assert delta["domain"] == "scaffold"