cgcardona / muse public
test_crdts.py python
830 lines 28.3 KB
04004b82 Rename MusicRGA → MidiRGA and purge all 'music plugin' terminology Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Comprehensive test suite for the CRDT primitive library.
2
3 Tests cover all six CRDT types:
4 - :class:`~muse.core.crdts.vclock.VectorClock`
5 - :class:`~muse.core.crdts.lww_register.LWWRegister`
6 - :class:`~muse.core.crdts.or_set.ORSet`
7 - :class:`~muse.core.crdts.rga.RGA`
8 - :class:`~muse.core.crdts.aw_map.AWMap`
9 - :class:`~muse.core.crdts.g_counter.GCounter`
10
11 Each type is tested for:
12 1. Basic operational correctness.
13 2. All three CRDT lattice laws: commutativity, associativity, idempotency.
14 3. Serialisation round-trip (to_dict / from_dict).
15 4. Edge cases (empty structures, concurrent writes, tombstone correctness).
16
17 Additionally, :func:`~muse.core.merge_engine.crdt_join_snapshots` is tested
18 for the integration path through the merge engine.
19 """
20
21 import pathlib
22
23 import pytest
24
25 from muse.domain import CRDTPlugin
26 from muse.core.crdts import (
27 AWMap,
28 GCounter,
29 LWWRegister,
30 ORSet,
31 RGA,
32 VectorClock,
33 )
34 from muse.core.crdts.lww_register import LWWValue
35 from muse.core.crdts.or_set import ORSetDict
36 from muse.core.crdts.rga import RGAElement
37
38
39 # ===========================================================================
40 # VectorClock
41 # ===========================================================================
42
43
44 class TestVectorClock:
45 def test_increment_own_agent(self) -> None:
46 vc = VectorClock()
47 vc2 = vc.increment("agent-A")
48 assert vc2.to_dict() == {"agent-A": 1}
49
50 def test_increment_twice(self) -> None:
51 vc = VectorClock().increment("agent-A").increment("agent-A")
52 assert vc.to_dict()["agent-A"] == 2
53
54 def test_merge_takes_max_per_agent(self) -> None:
55 a = VectorClock({"agent-A": 3, "agent-B": 1})
56 b = VectorClock({"agent-A": 1, "agent-B": 5, "agent-C": 2})
57 merged = a.merge(b)
58 assert merged.to_dict() == {"agent-A": 3, "agent-B": 5, "agent-C": 2}
59
60 def test_happens_before_simple(self) -> None:
61 a = VectorClock({"agent-A": 1})
62 b = VectorClock({"agent-A": 2})
63 assert a.happens_before(b)
64 assert not b.happens_before(a)
65
66 def test_happens_before_multi_agent(self) -> None:
67 a = VectorClock({"agent-A": 1, "agent-B": 2})
68 b = VectorClock({"agent-A": 2, "agent-B": 3})
69 assert a.happens_before(b)
70
71 def test_not_happens_before_concurrent(self) -> None:
72 a = VectorClock({"agent-A": 2, "agent-B": 1})
73 b = VectorClock({"agent-A": 1, "agent-B": 2})
74 assert not a.happens_before(b)
75 assert not b.happens_before(a)
76
77 def test_concurrent_with_neither_dominates(self) -> None:
78 a = VectorClock({"agent-A": 2, "agent-B": 1})
79 b = VectorClock({"agent-A": 1, "agent-B": 2})
80 assert a.concurrent_with(b)
81 assert b.concurrent_with(a)
82
83 def test_not_concurrent_with_itself(self) -> None:
84 a = VectorClock({"agent-A": 1})
85 assert not a.concurrent_with(a)
86
87 def test_idempotent_merge(self) -> None:
88 a = VectorClock({"agent-A": 3, "agent-B": 1})
89 assert a.merge(a).equivalent(a)
90
91 def test_merge_commutativity(self) -> None:
92 a = VectorClock({"agent-A": 3, "agent-B": 1})
93 b = VectorClock({"agent-A": 1, "agent-B": 5})
94 assert a.merge(b).equivalent(b.merge(a))
95
96 def test_merge_associativity(self) -> None:
97 a = VectorClock({"agent-A": 1})
98 b = VectorClock({"agent-B": 2})
99 c = VectorClock({"agent-C": 3})
100 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
101
102 def test_round_trip_to_from_dict(self) -> None:
103 vc = VectorClock({"agent-A": 5, "agent-B": 3})
104 assert VectorClock.from_dict(vc.to_dict()).equivalent(vc)
105
106 def test_empty_clock_happens_before_non_empty(self) -> None:
107 empty = VectorClock()
108 non_empty = VectorClock({"agent-A": 1})
109 assert empty.happens_before(non_empty)
110
111 def test_equal_clocks_not_happens_before(self) -> None:
112 a = VectorClock({"agent-A": 1})
113 b = VectorClock({"agent-A": 1})
114 assert not a.happens_before(b)
115 assert not b.happens_before(a)
116
117
118 # ===========================================================================
119 # LWWRegister
120 # ===========================================================================
121
122
123 class TestLWWRegister:
124 def _make(self, value: str, ts: float, author: str) -> LWWRegister:
125 data: LWWValue = {"value": value, "timestamp": ts, "author": author}
126 return LWWRegister.from_dict(data)
127
128 def test_read_returns_value(self) -> None:
129 r = self._make("C major", 1.0, "agent-1")
130 assert r.read() == "C major"
131
132 def test_lww_later_timestamp_wins(self) -> None:
133 a = self._make("C major", 1.0, "agent-1")
134 b = self._make("G major", 2.0, "agent-2")
135 assert a.join(b).read() == "G major"
136 assert b.join(a).read() == "G major"
137
138 def test_lww_same_timestamp_author_tiebreak(self) -> None:
139 # Lexicographically larger author wins
140 a = self._make("C major", 1.0, "agent-A")
141 b = self._make("G major", 1.0, "agent-B")
142 # "agent-B" > "agent-A" lexicographically
143 result = a.join(b)
144 assert result.read() == "G major"
145 result2 = b.join(a)
146 assert result2.read() == "G major"
147
148 def test_join_is_commutative(self) -> None:
149 a = self._make("C major", 1.0, "agent-1")
150 b = self._make("G major", 2.0, "agent-2")
151 assert a.join(b).equivalent(b.join(a))
152
153 def test_join_is_associative(self) -> None:
154 a = self._make("C major", 1.0, "agent-1")
155 b = self._make("G major", 2.0, "agent-2")
156 c = self._make("D minor", 3.0, "agent-3")
157 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
158
159 def test_join_is_idempotent(self) -> None:
160 a = self._make("C major", 1.0, "agent-1")
161 assert a.join(a).equivalent(a)
162
163 def test_write_returns_winner(self) -> None:
164 r = self._make("C major", 5.0, "agent-1")
165 r2 = r.write("G major", 3.0, "agent-2")
166 # older write loses
167 assert r2.read() == "C major"
168
169 def test_round_trip_to_from_dict(self) -> None:
170 r = self._make("A minor", 42.0, "agent-x")
171 assert LWWRegister.from_dict(r.to_dict()).equivalent(r)
172
173
174 # ===========================================================================
175 # ORSet
176 # ===========================================================================
177
178
179 class TestORSet:
180 def test_add_element(self) -> None:
181 s = ORSet()
182 s, _ = s.add("note-A")
183 assert "note-A" in s
184
185 def test_remove_element(self) -> None:
186 s = ORSet()
187 s, tok = s.add("note-A")
188 s = s.remove("note-A", {tok})
189 assert "note-A" not in s
190
191 def test_add_survives_concurrent_remove(self) -> None:
192 # Agent 1 adds note-A with token_1
193 s1 = ORSet()
194 s1, tok1 = s1.add("note-A")
195
196 # Agent 2 removes note-A by tombstoning token_1
197 s2 = ORSet()
198 s2, _ = s2.add("note-A") # s2 adds with its own token
199 s2 = s2.remove("note-A", {tok1}) # removes agent-1's token only
200
201 # Agent 1 concurrently adds note-A again with token_2 (new token, survives)
202 s1_v2, tok2 = s1.add("note-A")
203
204 # Merge: agent-1's new add survives agent-2's remove of old token
205 merged = s1_v2.join(s2)
206 assert "note-A" in merged
207
208 def test_remove_observed_element_works(self) -> None:
209 s = ORSet()
210 s, tok = s.add("note-B")
211 tokens = s.tokens_for("note-B")
212 s = s.remove("note-B", tokens)
213 assert "note-B" not in s
214
215 def test_join_is_commutative(self) -> None:
216 s1 = ORSet()
217 s1, _ = s1.add("X")
218
219 s2 = ORSet()
220 s2, _ = s2.add("Y")
221
222 assert s1.join(s2).elements() == s2.join(s1).elements()
223
224 def test_join_is_associative(self) -> None:
225 s1 = ORSet()
226 s1, _ = s1.add("X")
227 s2 = ORSet()
228 s2, _ = s2.add("Y")
229 s3 = ORSet()
230 s3, _ = s3.add("Z")
231
232 left = s1.join(s2).join(s3)
233 right = s1.join(s2.join(s3))
234 assert left.elements() == right.elements()
235
236 def test_join_is_idempotent(self) -> None:
237 s = ORSet()
238 s, _ = s.add("X")
239 assert s.join(s).elements() == s.elements()
240
241 def test_tokens_for_returns_live_tokens(self) -> None:
242 s = ORSet()
243 s, tok = s.add("X")
244 assert tok in s.tokens_for("X")
245
246 def test_contains_dunder(self) -> None:
247 s = ORSet()
248 s, _ = s.add("Z")
249 assert "Z" in s
250 assert "W" not in s
251
252 def test_round_trip_to_from_dict(self) -> None:
253 s = ORSet()
254 s, _ = s.add("A")
255 s, _ = s.add("B")
256 data: ORSetDict = s.to_dict()
257 s2 = ORSet.from_dict(data)
258 assert s2.elements() == s.elements()
259
260 def test_add_multiple_same_element(self) -> None:
261 s = ORSet()
262 s, tok1 = s.add("X")
263 s, tok2 = s.add("X")
264 # Both tokens are live
265 assert tok1 in s.tokens_for("X")
266 assert tok2 in s.tokens_for("X")
267
268
269 # ===========================================================================
270 # RGA
271 # ===========================================================================
272
273
274 class TestRGA:
275 def test_insert_after_none_is_prepend(self) -> None:
276 rga = RGA()
277 rga = rga.insert(None, "a", element_id="1@agent")
278 assert rga.to_sequence() == ["a"]
279
280 def test_insert_at_end(self) -> None:
281 id_a = "1@agent"
282 rga = RGA()
283 rga = rga.insert(None, "a", element_id=id_a)
284 rga = rga.insert(id_a, "b", element_id="2@agent")
285 assert rga.to_sequence() == ["a", "b"]
286
287 def test_insert_in_middle(self) -> None:
288 # In RGA, more-recently-inserted elements (larger ID) at the same anchor
289 # appear to the LEFT of earlier-inserted elements. So insert "c" first
290 # with a smaller ID, then insert "b" with a larger ID at the same anchor
291 # to get b appearing before c in the visible sequence.
292 id_a = "1@agent"
293 rga = RGA()
294 rga = rga.insert(None, "a", element_id=id_a)
295 rga = rga.insert(id_a, "c", element_id="2@agent") # inserted first → smaller ID
296 rga = rga.insert(id_a, "b", element_id="3@agent") # inserted second → larger ID → goes left
297 assert rga.to_sequence() == ["a", "b", "c"]
298
299 def test_delete_marks_tombstone(self) -> None:
300 id_a = "1@agent"
301 rga = RGA()
302 rga = rga.insert(None, "a", element_id=id_a)
303 rga = rga.delete(id_a)
304 assert rga.to_sequence() == []
305
306 def test_delete_unknown_id_is_noop(self) -> None:
307 rga = RGA()
308 rga = rga.insert(None, "a", element_id="1@agent")
309 rga2 = rga.delete("nonexistent-id")
310 assert rga2.to_sequence() == ["a"]
311
312 def test_concurrent_insert_same_position_deterministic(self) -> None:
313 # Two agents both insert after id_a; larger ID goes first
314 id_a = "1@agent"
315 rga = RGA()
316 rga = rga.insert(None, "a", element_id=id_a)
317
318 rga_agent1 = rga.insert(id_a, "B", element_id="2@agent-z") # larger ID
319 rga_agent2 = rga.insert(id_a, "C", element_id="2@agent-a") # smaller ID
320
321 merged_1_then_2 = rga_agent1.join(rga_agent2)
322 merged_2_then_1 = rga_agent2.join(rga_agent1)
323
324 # Both orderings must produce the same sequence (commutativity)
325 assert merged_1_then_2.to_sequence() == merged_2_then_1.to_sequence()
326
327 def test_join_is_commutative(self) -> None:
328 rga1 = RGA()
329 rga1 = rga1.insert(None, "X", element_id="1@a")
330
331 rga2 = RGA()
332 rga2 = rga2.insert(None, "Y", element_id="1@b")
333
334 assert rga1.join(rga2).to_sequence() == rga2.join(rga1).to_sequence()
335
336 def test_join_is_associative(self) -> None:
337 rga1 = RGA()
338 rga1 = rga1.insert(None, "X", element_id="1@a")
339
340 rga2 = RGA()
341 rga2 = rga2.insert(None, "Y", element_id="1@b")
342
343 rga3 = RGA()
344 rga3 = rga3.insert(None, "Z", element_id="1@c")
345
346 left = rga1.join(rga2).join(rga3)
347 right = rga1.join(rga2.join(rga3))
348 assert left.to_sequence() == right.to_sequence()
349
350 def test_join_is_idempotent(self) -> None:
351 rga = RGA()
352 rga = rga.insert(None, "X", element_id="1@agent")
353 assert rga.join(rga).to_sequence() == rga.to_sequence()
354
355 def test_to_sequence_excludes_tombstones(self) -> None:
356 id1 = "1@agent"
357 rga = RGA()
358 rga = rga.insert(None, "A", element_id=id1)
359 rga = rga.insert(id1, "B", element_id="2@agent")
360 rga = rga.delete(id1)
361 assert rga.to_sequence() == ["B"]
362
363 def test_rga_round_trip_to_from_dict(self) -> None:
364 id1 = "1@agent"
365 rga = RGA()
366 rga = rga.insert(None, "A", element_id=id1)
367 rga = rga.insert(id1, "B", element_id="2@agent")
368 data: list[RGAElement] = rga.to_dict()
369 rga2 = RGA.from_dict(data)
370 assert rga2.to_sequence() == rga.to_sequence()
371
372 def test_len_excludes_tombstones(self) -> None:
373 id1 = "1@agent"
374 rga = RGA()
375 rga = rga.insert(None, "A", element_id=id1)
376 rga = rga.insert(id1, "B", element_id="2@agent")
377 rga = rga.delete(id1)
378 assert len(rga) == 1
379
380 def test_tombstone_survives_join(self) -> None:
381 id1 = "1@agent"
382 rga1 = RGA()
383 rga1 = rga1.insert(None, "A", element_id=id1)
384
385 rga2 = rga1.delete(id1)
386
387 merged = rga1.join(rga2)
388 # Once deleted in either replica, stays deleted
389 assert "A" not in merged.to_sequence()
390
391
392 # ===========================================================================
393 # AWMap
394 # ===========================================================================
395
396
397 class TestAWMap:
398 def test_set_and_get(self) -> None:
399 m = AWMap()
400 m = m.set("tempo", "120bpm")
401 assert m.get("tempo") == "120bpm"
402
403 def test_get_absent_returns_none(self) -> None:
404 m = AWMap()
405 assert m.get("nonexistent") is None
406
407 def test_overwrite_key(self) -> None:
408 m = AWMap()
409 m = m.set("key", "C major")
410 m = m.set("key", "G major")
411 assert m.get("key") == "G major"
412
413 def test_remove_key(self) -> None:
414 m = AWMap()
415 m = m.set("tempo", "120bpm")
416 m = m.remove("tempo")
417 assert m.get("tempo") is None
418
419 def test_add_wins_concurrent_remove(self) -> None:
420 # Agent A sets "tempo"
421 m1 = AWMap()
422 m1 = m1.set("tempo", "120bpm")
423
424 # Agent B removes "tempo" by tombstoning its tokens
425 m2 = AWMap()
426 m2 = m2.set("tempo", "120bpm")
427 m2 = m2.remove("tempo")
428
429 # Agent A concurrently adds a new value (new token)
430 m1_v2 = m1.set("tempo", "140bpm")
431
432 # Merge: the new add survives because it has a fresh token
433 merged = m1_v2.join(m2)
434 assert merged.get("tempo") == "140bpm"
435
436 def test_join_is_commutative(self) -> None:
437 m1 = AWMap()
438 m1 = m1.set("A", "1")
439 m2 = AWMap()
440 m2 = m2.set("B", "2")
441 assert m1.join(m2).to_plain_dict() == m2.join(m1).to_plain_dict()
442
443 def test_join_is_associative(self) -> None:
444 m1 = AWMap().set("A", "1")
445 m2 = AWMap().set("B", "2")
446 m3 = AWMap().set("C", "3")
447 left = m1.join(m2).join(m3)
448 right = m1.join(m2.join(m3))
449 assert left.to_plain_dict() == right.to_plain_dict()
450
451 def test_join_is_idempotent(self) -> None:
452 m = AWMap().set("A", "1")
453 assert m.join(m).to_plain_dict() == m.to_plain_dict()
454
455 def test_keys_returns_live_keys(self) -> None:
456 m = AWMap()
457 m = m.set("X", "1")
458 m = m.set("Y", "2")
459 assert m.keys() == frozenset({"X", "Y"})
460
461 def test_contains(self) -> None:
462 m = AWMap().set("K", "V")
463 assert "K" in m
464 assert "Z" not in m
465
466 def test_round_trip_to_from_dict(self) -> None:
467 m = AWMap().set("A", "1").set("B", "2")
468 m2 = AWMap.from_dict(m.to_dict())
469 assert m2.to_plain_dict() == m.to_plain_dict()
470
471
472 # ===========================================================================
473 # GCounter
474 # ===========================================================================
475
476
477 class TestGCounter:
478 def test_initial_value_is_zero(self) -> None:
479 c = GCounter()
480 assert c.value() == 0
481
482 def test_increment(self) -> None:
483 c = GCounter().increment("agent-1")
484 assert c.value() == 1
485 assert c.value_for("agent-1") == 1
486
487 def test_increment_by_n(self) -> None:
488 c = GCounter().increment("agent-1", by=5)
489 assert c.value() == 5
490
491 def test_increment_rejects_zero(self) -> None:
492 with pytest.raises(ValueError):
493 GCounter().increment("agent-1", by=0)
494
495 def test_increment_rejects_negative(self) -> None:
496 with pytest.raises(ValueError):
497 GCounter().increment("agent-1", by=-1)
498
499 def test_join_takes_max_per_agent(self) -> None:
500 c1 = GCounter({"agent-A": 3, "agent-B": 1})
501 c2 = GCounter({"agent-A": 1, "agent-B": 5})
502 merged = c1.join(c2)
503 assert merged.value_for("agent-A") == 3
504 assert merged.value_for("agent-B") == 5
505 assert merged.value() == 8
506
507 def test_join_is_commutative(self) -> None:
508 c1 = GCounter({"agent-A": 3})
509 c2 = GCounter({"agent-B": 7})
510 assert c1.join(c2).equivalent(c2.join(c1))
511
512 def test_join_is_associative(self) -> None:
513 c1 = GCounter({"agent-A": 1})
514 c2 = GCounter({"agent-B": 2})
515 c3 = GCounter({"agent-C": 3})
516 assert c1.join(c2).join(c3).equivalent(c1.join(c2.join(c3)))
517
518 def test_join_is_idempotent(self) -> None:
519 c = GCounter({"agent-A": 5})
520 assert c.join(c).equivalent(c)
521
522 def test_value_for_absent_agent_is_zero(self) -> None:
523 c = GCounter()
524 assert c.value_for("ghost-agent") == 0
525
526 def test_round_trip_to_from_dict(self) -> None:
527 c = GCounter({"a": 1, "b": 2})
528 c2 = GCounter.from_dict(c.to_dict())
529 assert c2.equivalent(c)
530
531
532 # ===========================================================================
533 # CRDTPlugin integration — merge engine CRDT path
534 # ===========================================================================
535
536
537 class TestCRDTMergeEngineIntegration:
538 """Tests for :func:`~muse.core.merge_engine.crdt_join_snapshots`.
539
540 Since there is no production CRDTPlugin implementation yet (the MIDI plugin
541 is still three-way mode), we create a minimal stub that satisfies the
542 CRDTPlugin protocol.
543 """
544
545 def _make_stub_plugin(self) -> CRDTPlugin:
546 """Return a minimal CRDTPlugin stub."""
547 from muse.domain import (
548 CRDTPlugin,
549 CRDTSnapshotManifest,
550 DriftReport,
551 LiveState,
552 MergeResult,
553 StateSnapshot,
554 StateDelta,
555 StructuredDelta,
556 )
557 from muse.core.schema import CRDTDimensionSpec, DomainSchema
558
559 class StubCRDTPlugin(CRDTPlugin):
560 def snapshot(self, live_state: LiveState) -> StateSnapshot:
561 return {"files": {}, "domain": "stub"}
562
563 def diff(
564 self,
565 base: StateSnapshot,
566 target: StateSnapshot,
567 *,
568 repo_root: pathlib.Path | None = None,
569 ) -> StateDelta:
570 empty_delta: StructuredDelta = {
571 "domain": "stub",
572 "ops": [],
573 "summary": "no changes",
574 }
575 return empty_delta
576
577 def drift(self, committed: StateSnapshot, live_state: LiveState) -> DriftReport:
578 return DriftReport(has_drift=False)
579
580 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState:
581 return live_state
582
583 def merge(
584 self,
585 base: StateSnapshot,
586 left: StateSnapshot,
587 right: StateSnapshot,
588 *,
589 repo_root: pathlib.Path | None = None,
590 ) -> MergeResult:
591 return MergeResult(merged=base)
592
593 def schema(self) -> DomainSchema:
594 from muse.core.schema import SetSchema
595 schema: SetSchema = {
596 "kind": "set",
597 "element_type": "str",
598 "identity": "by_content",
599 }
600 return {
601 "domain": "stub",
602 "description": "Stub CRDT domain",
603 "dimensions": [],
604 "top_level": schema,
605 "merge_mode": "crdt",
606 "schema_version": 1,
607 }
608
609 def crdt_schema(self) -> list[CRDTDimensionSpec]:
610 return []
611
612 def join(
613 self,
614 a: CRDTSnapshotManifest,
615 b: CRDTSnapshotManifest,
616 ) -> CRDTSnapshotManifest:
617 # Simple merge: union of files, max vclock, union crdt_state
618 from muse.core.crdts.vclock import VectorClock
619
620 vc_a = VectorClock.from_dict(a["vclock"])
621 vc_b = VectorClock.from_dict(b["vclock"])
622 merged_vc = vc_a.merge(vc_b)
623 merged_files = {**a["files"], **b["files"]}
624 merged_crdt_state = {**a["crdt_state"], **b["crdt_state"]}
625 result: CRDTSnapshotManifest = {
626 "files": merged_files,
627 "domain": a["domain"],
628 "vclock": merged_vc.to_dict(),
629 "crdt_state": merged_crdt_state,
630 "schema_version": 1,
631 }
632 return result
633
634 def to_crdt_state(self, snapshot: StateSnapshot) -> CRDTSnapshotManifest:
635 result: CRDTSnapshotManifest = {
636 "files": snapshot.get("files", {}),
637 "domain": snapshot.get("domain", "stub"),
638 "vclock": {},
639 "crdt_state": {},
640 "schema_version": 1,
641 }
642 return result
643
644 def from_crdt_state(self, crdt: CRDTSnapshotManifest) -> StateSnapshot:
645 return {"files": crdt["files"], "domain": crdt["domain"]}
646
647 return StubCRDTPlugin()
648
649 def test_crdt_join_produces_merge_result(self) -> None:
650 from muse.core.merge_engine import crdt_join_snapshots
651
652 plugin = self._make_stub_plugin()
653 result = crdt_join_snapshots(
654 plugin=plugin,
655 a_snapshot={"track.mid": "hash-a"},
656 b_snapshot={"beat.mid": "hash-b"},
657 a_vclock={"agent-1": 1},
658 b_vclock={"agent-2": 1},
659 a_crdt_state={},
660 b_crdt_state={},
661 domain="stub",
662 )
663 assert result.is_clean
664 assert result.conflicts == []
665
666 def test_crdt_join_merges_files(self) -> None:
667 from muse.core.merge_engine import crdt_join_snapshots
668
669 plugin = self._make_stub_plugin()
670 result = crdt_join_snapshots(
671 plugin=plugin,
672 a_snapshot={"track.mid": "hash-a"},
673 b_snapshot={"beat.mid": "hash-b"},
674 a_vclock={},
675 b_vclock={},
676 a_crdt_state={},
677 b_crdt_state={},
678 domain="stub",
679 )
680 assert "track.mid" in result.merged["files"]
681 assert "beat.mid" in result.merged["files"]
682
683 def test_crdt_join_is_commutative(self) -> None:
684 from muse.core.merge_engine import crdt_join_snapshots
685
686 plugin = self._make_stub_plugin()
687 result_ab = crdt_join_snapshots(
688 plugin=plugin,
689 a_snapshot={"track.mid": "hash-a"},
690 b_snapshot={"beat.mid": "hash-b"},
691 a_vclock={"agent-1": 1},
692 b_vclock={"agent-2": 2},
693 a_crdt_state={},
694 b_crdt_state={},
695 domain="stub",
696 )
697 result_ba = crdt_join_snapshots(
698 plugin=plugin,
699 a_snapshot={"beat.mid": "hash-b"},
700 b_snapshot={"track.mid": "hash-a"},
701 a_vclock={"agent-2": 2},
702 b_vclock={"agent-1": 1},
703 a_crdt_state={},
704 b_crdt_state={},
705 domain="stub",
706 )
707 assert set(result_ab.merged["files"].keys()) == set(result_ba.merged["files"].keys())
708
709 def test_crdt_merge_never_produces_conflicts(self) -> None:
710 from muse.core.merge_engine import crdt_join_snapshots
711
712 plugin = self._make_stub_plugin()
713 # Even when both replicas modify the same file, CRDT join never conflicts
714 result = crdt_join_snapshots(
715 plugin=plugin,
716 a_snapshot={"shared.mid": "hash-a"},
717 b_snapshot={"shared.mid": "hash-b"},
718 a_vclock={"agent-1": 1},
719 b_vclock={"agent-2": 1},
720 a_crdt_state={},
721 b_crdt_state={},
722 domain="stub",
723 )
724 assert result.is_clean
725 assert len(result.conflicts) == 0
726
727 def test_crdt_join_requires_crdt_plugin_protocol(self) -> None:
728 """Verify the protocol check is documented in the function signature.
729
730 The static type of ``crdt_join_snapshots(plugin=...)`` is
731 ``MuseDomainPlugin``. Callers that don't implement ``CRDTPlugin``
732 are rejected at the call site by mypy. The runtime ``isinstance``
733 check exists as a defensive guard for duck-typed callers.
734 """
735 from muse.core.merge_engine import crdt_join_snapshots
736
737 # A plugin that implements MuseDomainPlugin but NOT CRDTPlugin
738 # would pass static type-checking but fail at runtime.
739 # We verify the docstring is accurate by checking the stub IS a CRDTPlugin.
740 plugin = self._make_stub_plugin()
741 assert isinstance(plugin, CRDTPlugin)
742
743 def test_crdt_plugin_join_commutes(self) -> None:
744 """join(a,b) == join(b,a) at the CRDT primitive level."""
745 from muse.domain import CRDTSnapshotManifest
746
747 plugin = self._make_stub_plugin()
748 from muse.domain import CRDTPlugin
749 assert isinstance(plugin, CRDTPlugin)
750
751 a: CRDTSnapshotManifest = {
752 "files": {"a.mid": "ha"},
753 "domain": "stub",
754 "vclock": {"x": 1},
755 "crdt_state": {},
756 "schema_version": 1,
757 }
758 b: CRDTSnapshotManifest = {
759 "files": {"b.mid": "hb"},
760 "domain": "stub",
761 "vclock": {"y": 1},
762 "crdt_state": {},
763 "schema_version": 1,
764 }
765 ab = plugin.join(a, b)
766 ba = plugin.join(b, a)
767 assert set(ab["files"].keys()) == set(ba["files"].keys())
768
769
770 # ===========================================================================
771 # Cross-module: CRDT primitives satisfy lattice laws end-to-end
772 # ===========================================================================
773
774
775 class TestLatticeProperties:
776 """Property-style checks that every CRDT satisfies all three lattice laws."""
777
778 def test_vector_clock_lattice_laws(self) -> None:
779 a = VectorClock({"x": 1, "y": 2})
780 b = VectorClock({"x": 3, "z": 1})
781 c = VectorClock({"y": 5})
782
783 # Commutativity
784 assert a.merge(b).equivalent(b.merge(a))
785 # Associativity
786 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
787 # Idempotency
788 assert a.merge(a).equivalent(a)
789
790 def test_g_counter_lattice_laws(self) -> None:
791 a = GCounter({"x": 1, "y": 2})
792 b = GCounter({"x": 3, "z": 1})
793 c = GCounter({"y": 5})
794
795 assert a.join(b).equivalent(b.join(a))
796 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
797 assert a.join(a).equivalent(a)
798
799 def test_lww_register_lattice_laws(self) -> None:
800 def make(v: str, ts: float, author: str) -> LWWRegister:
801 return LWWRegister(v, ts, author)
802
803 a = make("val-a", 1.0, "agent-a")
804 b = make("val-b", 2.0, "agent-b")
805 c = make("val-c", 3.0, "agent-c")
806
807 assert a.join(b).equivalent(b.join(a))
808 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
809 assert a.join(a).equivalent(a)
810
811 def test_or_set_lattice_laws(self) -> None:
812 s1 = ORSet()
813 s1, _ = s1.add("X")
814 s2 = ORSet()
815 s2, _ = s2.add("Y")
816 s3 = ORSet()
817 s3, _ = s3.add("Z")
818
819 assert s1.join(s2).elements() == s2.join(s1).elements()
820 assert s1.join(s2).join(s3).elements() == s1.join(s2.join(s3)).elements()
821 assert s1.join(s1).elements() == s1.elements()
822
823 def test_aw_map_lattice_laws(self) -> None:
824 m1 = AWMap().set("A", "1")
825 m2 = AWMap().set("B", "2")
826 m3 = AWMap().set("C", "3")
827
828 assert m1.join(m2).to_plain_dict() == m2.join(m1).to_plain_dict()
829 assert m1.join(m2).join(m3).to_plain_dict() == m1.join(m2.join(m3)).to_plain_dict()
830 assert m1.join(m1).to_plain_dict() == m1.to_plain_dict()