gabriel / muse public
test_crdts.py python
831 lines 28.4 KB
8aa515d5 refactor: consolidate schema_version to single source of truth Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Comprehensive test suite for the CRDT primitive library.
2
3 Tests cover all six CRDT types:
4 - :class:`~muse.core.crdts.vclock.VectorClock`
5 - :class:`~muse.core.crdts.lww_register.LWWRegister`
6 - :class:`~muse.core.crdts.or_set.ORSet`
7 - :class:`~muse.core.crdts.rga.RGA`
8 - :class:`~muse.core.crdts.aw_map.AWMap`
9 - :class:`~muse.core.crdts.g_counter.GCounter`
10
11 Each type is tested for:
12 1. Basic operational correctness.
13 2. All three CRDT lattice laws: commutativity, associativity, idempotency.
14 3. Serialisation round-trip (to_dict / from_dict).
15 4. Edge cases (empty structures, concurrent writes, tombstone correctness).
16
17 Additionally, :func:`~muse.core.merge_engine.crdt_join_snapshots` is tested
18 for the integration path through the merge engine.
19 """
20
21 import pathlib
22
23 import pytest
24
25 from muse._version import __version__
26 from muse.domain import CRDTPlugin
27 from muse.core.crdts import (
28 AWMap,
29 GCounter,
30 LWWRegister,
31 ORSet,
32 RGA,
33 VectorClock,
34 )
35 from muse.core.crdts.lww_register import LWWValue
36 from muse.core.crdts.or_set import ORSetDict
37 from muse.core.crdts.rga import RGAElement
38
39
40 # ===========================================================================
41 # VectorClock
42 # ===========================================================================
43
44
45 class TestVectorClock:
46 def test_increment_own_agent(self) -> None:
47 vc = VectorClock()
48 vc2 = vc.increment("agent-A")
49 assert vc2.to_dict() == {"agent-A": 1}
50
51 def test_increment_twice(self) -> None:
52 vc = VectorClock().increment("agent-A").increment("agent-A")
53 assert vc.to_dict()["agent-A"] == 2
54
55 def test_merge_takes_max_per_agent(self) -> None:
56 a = VectorClock({"agent-A": 3, "agent-B": 1})
57 b = VectorClock({"agent-A": 1, "agent-B": 5, "agent-C": 2})
58 merged = a.merge(b)
59 assert merged.to_dict() == {"agent-A": 3, "agent-B": 5, "agent-C": 2}
60
61 def test_happens_before_simple(self) -> None:
62 a = VectorClock({"agent-A": 1})
63 b = VectorClock({"agent-A": 2})
64 assert a.happens_before(b)
65 assert not b.happens_before(a)
66
67 def test_happens_before_multi_agent(self) -> None:
68 a = VectorClock({"agent-A": 1, "agent-B": 2})
69 b = VectorClock({"agent-A": 2, "agent-B": 3})
70 assert a.happens_before(b)
71
72 def test_not_happens_before_concurrent(self) -> None:
73 a = VectorClock({"agent-A": 2, "agent-B": 1})
74 b = VectorClock({"agent-A": 1, "agent-B": 2})
75 assert not a.happens_before(b)
76 assert not b.happens_before(a)
77
78 def test_concurrent_with_neither_dominates(self) -> None:
79 a = VectorClock({"agent-A": 2, "agent-B": 1})
80 b = VectorClock({"agent-A": 1, "agent-B": 2})
81 assert a.concurrent_with(b)
82 assert b.concurrent_with(a)
83
84 def test_not_concurrent_with_itself(self) -> None:
85 a = VectorClock({"agent-A": 1})
86 assert not a.concurrent_with(a)
87
88 def test_idempotent_merge(self) -> None:
89 a = VectorClock({"agent-A": 3, "agent-B": 1})
90 assert a.merge(a).equivalent(a)
91
92 def test_merge_commutativity(self) -> None:
93 a = VectorClock({"agent-A": 3, "agent-B": 1})
94 b = VectorClock({"agent-A": 1, "agent-B": 5})
95 assert a.merge(b).equivalent(b.merge(a))
96
97 def test_merge_associativity(self) -> None:
98 a = VectorClock({"agent-A": 1})
99 b = VectorClock({"agent-B": 2})
100 c = VectorClock({"agent-C": 3})
101 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
102
103 def test_round_trip_to_from_dict(self) -> None:
104 vc = VectorClock({"agent-A": 5, "agent-B": 3})
105 assert VectorClock.from_dict(vc.to_dict()).equivalent(vc)
106
107 def test_empty_clock_happens_before_non_empty(self) -> None:
108 empty = VectorClock()
109 non_empty = VectorClock({"agent-A": 1})
110 assert empty.happens_before(non_empty)
111
112 def test_equal_clocks_not_happens_before(self) -> None:
113 a = VectorClock({"agent-A": 1})
114 b = VectorClock({"agent-A": 1})
115 assert not a.happens_before(b)
116 assert not b.happens_before(a)
117
118
119 # ===========================================================================
120 # LWWRegister
121 # ===========================================================================
122
123
124 class TestLWWRegister:
125 def _make(self, value: str, ts: float, author: str) -> LWWRegister:
126 data: LWWValue = {"value": value, "timestamp": ts, "author": author}
127 return LWWRegister.from_dict(data)
128
129 def test_read_returns_value(self) -> None:
130 r = self._make("C major", 1.0, "agent-1")
131 assert r.read() == "C major"
132
133 def test_lww_later_timestamp_wins(self) -> None:
134 a = self._make("C major", 1.0, "agent-1")
135 b = self._make("G major", 2.0, "agent-2")
136 assert a.join(b).read() == "G major"
137 assert b.join(a).read() == "G major"
138
139 def test_lww_same_timestamp_author_tiebreak(self) -> None:
140 # Lexicographically larger author wins
141 a = self._make("C major", 1.0, "agent-A")
142 b = self._make("G major", 1.0, "agent-B")
143 # "agent-B" > "agent-A" lexicographically
144 result = a.join(b)
145 assert result.read() == "G major"
146 result2 = b.join(a)
147 assert result2.read() == "G major"
148
149 def test_join_is_commutative(self) -> None:
150 a = self._make("C major", 1.0, "agent-1")
151 b = self._make("G major", 2.0, "agent-2")
152 assert a.join(b).equivalent(b.join(a))
153
154 def test_join_is_associative(self) -> None:
155 a = self._make("C major", 1.0, "agent-1")
156 b = self._make("G major", 2.0, "agent-2")
157 c = self._make("D minor", 3.0, "agent-3")
158 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
159
160 def test_join_is_idempotent(self) -> None:
161 a = self._make("C major", 1.0, "agent-1")
162 assert a.join(a).equivalent(a)
163
164 def test_write_returns_winner(self) -> None:
165 r = self._make("C major", 5.0, "agent-1")
166 r2 = r.write("G major", 3.0, "agent-2")
167 # older write loses
168 assert r2.read() == "C major"
169
170 def test_round_trip_to_from_dict(self) -> None:
171 r = self._make("A minor", 42.0, "agent-x")
172 assert LWWRegister.from_dict(r.to_dict()).equivalent(r)
173
174
175 # ===========================================================================
176 # ORSet
177 # ===========================================================================
178
179
180 class TestORSet:
181 def test_add_element(self) -> None:
182 s = ORSet()
183 s, _ = s.add("note-A")
184 assert "note-A" in s
185
186 def test_remove_element(self) -> None:
187 s = ORSet()
188 s, tok = s.add("note-A")
189 s = s.remove("note-A", {tok})
190 assert "note-A" not in s
191
192 def test_add_survives_concurrent_remove(self) -> None:
193 # Agent 1 adds note-A with token_1
194 s1 = ORSet()
195 s1, tok1 = s1.add("note-A")
196
197 # Agent 2 removes note-A by tombstoning token_1
198 s2 = ORSet()
199 s2, _ = s2.add("note-A") # s2 adds with its own token
200 s2 = s2.remove("note-A", {tok1}) # removes agent-1's token only
201
202 # Agent 1 concurrently adds note-A again with token_2 (new token, survives)
203 s1_v2, tok2 = s1.add("note-A")
204
205 # Merge: agent-1's new add survives agent-2's remove of old token
206 merged = s1_v2.join(s2)
207 assert "note-A" in merged
208
209 def test_remove_observed_element_works(self) -> None:
210 s = ORSet()
211 s, tok = s.add("note-B")
212 tokens = s.tokens_for("note-B")
213 s = s.remove("note-B", tokens)
214 assert "note-B" not in s
215
216 def test_join_is_commutative(self) -> None:
217 s1 = ORSet()
218 s1, _ = s1.add("X")
219
220 s2 = ORSet()
221 s2, _ = s2.add("Y")
222
223 assert s1.join(s2).elements() == s2.join(s1).elements()
224
225 def test_join_is_associative(self) -> None:
226 s1 = ORSet()
227 s1, _ = s1.add("X")
228 s2 = ORSet()
229 s2, _ = s2.add("Y")
230 s3 = ORSet()
231 s3, _ = s3.add("Z")
232
233 left = s1.join(s2).join(s3)
234 right = s1.join(s2.join(s3))
235 assert left.elements() == right.elements()
236
237 def test_join_is_idempotent(self) -> None:
238 s = ORSet()
239 s, _ = s.add("X")
240 assert s.join(s).elements() == s.elements()
241
242 def test_tokens_for_returns_live_tokens(self) -> None:
243 s = ORSet()
244 s, tok = s.add("X")
245 assert tok in s.tokens_for("X")
246
247 def test_contains_dunder(self) -> None:
248 s = ORSet()
249 s, _ = s.add("Z")
250 assert "Z" in s
251 assert "W" not in s
252
253 def test_round_trip_to_from_dict(self) -> None:
254 s = ORSet()
255 s, _ = s.add("A")
256 s, _ = s.add("B")
257 data: ORSetDict = s.to_dict()
258 s2 = ORSet.from_dict(data)
259 assert s2.elements() == s.elements()
260
261 def test_add_multiple_same_element(self) -> None:
262 s = ORSet()
263 s, tok1 = s.add("X")
264 s, tok2 = s.add("X")
265 # Both tokens are live
266 assert tok1 in s.tokens_for("X")
267 assert tok2 in s.tokens_for("X")
268
269
270 # ===========================================================================
271 # RGA
272 # ===========================================================================
273
274
275 class TestRGA:
276 def test_insert_after_none_is_prepend(self) -> None:
277 rga = RGA()
278 rga = rga.insert(None, "a", element_id="1@agent")
279 assert rga.to_sequence() == ["a"]
280
281 def test_insert_at_end(self) -> None:
282 id_a = "1@agent"
283 rga = RGA()
284 rga = rga.insert(None, "a", element_id=id_a)
285 rga = rga.insert(id_a, "b", element_id="2@agent")
286 assert rga.to_sequence() == ["a", "b"]
287
288 def test_insert_in_middle(self) -> None:
289 # In RGA, more-recently-inserted elements (larger ID) at the same anchor
290 # appear to the LEFT of earlier-inserted elements. So insert "c" first
291 # with a smaller ID, then insert "b" with a larger ID at the same anchor
292 # to get b appearing before c in the visible sequence.
293 id_a = "1@agent"
294 rga = RGA()
295 rga = rga.insert(None, "a", element_id=id_a)
296 rga = rga.insert(id_a, "c", element_id="2@agent") # inserted first → smaller ID
297 rga = rga.insert(id_a, "b", element_id="3@agent") # inserted second → larger ID → goes left
298 assert rga.to_sequence() == ["a", "b", "c"]
299
300 def test_delete_marks_tombstone(self) -> None:
301 id_a = "1@agent"
302 rga = RGA()
303 rga = rga.insert(None, "a", element_id=id_a)
304 rga = rga.delete(id_a)
305 assert rga.to_sequence() == []
306
307 def test_delete_unknown_id_is_noop(self) -> None:
308 rga = RGA()
309 rga = rga.insert(None, "a", element_id="1@agent")
310 rga2 = rga.delete("nonexistent-id")
311 assert rga2.to_sequence() == ["a"]
312
313 def test_concurrent_insert_same_position_deterministic(self) -> None:
314 # Two agents both insert after id_a; larger ID goes first
315 id_a = "1@agent"
316 rga = RGA()
317 rga = rga.insert(None, "a", element_id=id_a)
318
319 rga_agent1 = rga.insert(id_a, "B", element_id="2@agent-z") # larger ID
320 rga_agent2 = rga.insert(id_a, "C", element_id="2@agent-a") # smaller ID
321
322 merged_1_then_2 = rga_agent1.join(rga_agent2)
323 merged_2_then_1 = rga_agent2.join(rga_agent1)
324
325 # Both orderings must produce the same sequence (commutativity)
326 assert merged_1_then_2.to_sequence() == merged_2_then_1.to_sequence()
327
328 def test_join_is_commutative(self) -> None:
329 rga1 = RGA()
330 rga1 = rga1.insert(None, "X", element_id="1@a")
331
332 rga2 = RGA()
333 rga2 = rga2.insert(None, "Y", element_id="1@b")
334
335 assert rga1.join(rga2).to_sequence() == rga2.join(rga1).to_sequence()
336
337 def test_join_is_associative(self) -> None:
338 rga1 = RGA()
339 rga1 = rga1.insert(None, "X", element_id="1@a")
340
341 rga2 = RGA()
342 rga2 = rga2.insert(None, "Y", element_id="1@b")
343
344 rga3 = RGA()
345 rga3 = rga3.insert(None, "Z", element_id="1@c")
346
347 left = rga1.join(rga2).join(rga3)
348 right = rga1.join(rga2.join(rga3))
349 assert left.to_sequence() == right.to_sequence()
350
351 def test_join_is_idempotent(self) -> None:
352 rga = RGA()
353 rga = rga.insert(None, "X", element_id="1@agent")
354 assert rga.join(rga).to_sequence() == rga.to_sequence()
355
356 def test_to_sequence_excludes_tombstones(self) -> None:
357 id1 = "1@agent"
358 rga = RGA()
359 rga = rga.insert(None, "A", element_id=id1)
360 rga = rga.insert(id1, "B", element_id="2@agent")
361 rga = rga.delete(id1)
362 assert rga.to_sequence() == ["B"]
363
364 def test_rga_round_trip_to_from_dict(self) -> None:
365 id1 = "1@agent"
366 rga = RGA()
367 rga = rga.insert(None, "A", element_id=id1)
368 rga = rga.insert(id1, "B", element_id="2@agent")
369 data: list[RGAElement] = rga.to_dict()
370 rga2 = RGA.from_dict(data)
371 assert rga2.to_sequence() == rga.to_sequence()
372
373 def test_len_excludes_tombstones(self) -> None:
374 id1 = "1@agent"
375 rga = RGA()
376 rga = rga.insert(None, "A", element_id=id1)
377 rga = rga.insert(id1, "B", element_id="2@agent")
378 rga = rga.delete(id1)
379 assert len(rga) == 1
380
381 def test_tombstone_survives_join(self) -> None:
382 id1 = "1@agent"
383 rga1 = RGA()
384 rga1 = rga1.insert(None, "A", element_id=id1)
385
386 rga2 = rga1.delete(id1)
387
388 merged = rga1.join(rga2)
389 # Once deleted in either replica, stays deleted
390 assert "A" not in merged.to_sequence()
391
392
393 # ===========================================================================
394 # AWMap
395 # ===========================================================================
396
397
398 class TestAWMap:
399 def test_set_and_get(self) -> None:
400 m = AWMap()
401 m = m.set("tempo", "120bpm")
402 assert m.get("tempo") == "120bpm"
403
404 def test_get_absent_returns_none(self) -> None:
405 m = AWMap()
406 assert m.get("nonexistent") is None
407
408 def test_overwrite_key(self) -> None:
409 m = AWMap()
410 m = m.set("key", "C major")
411 m = m.set("key", "G major")
412 assert m.get("key") == "G major"
413
414 def test_remove_key(self) -> None:
415 m = AWMap()
416 m = m.set("tempo", "120bpm")
417 m = m.remove("tempo")
418 assert m.get("tempo") is None
419
420 def test_add_wins_concurrent_remove(self) -> None:
421 # Agent A sets "tempo"
422 m1 = AWMap()
423 m1 = m1.set("tempo", "120bpm")
424
425 # Agent B removes "tempo" by tombstoning its tokens
426 m2 = AWMap()
427 m2 = m2.set("tempo", "120bpm")
428 m2 = m2.remove("tempo")
429
430 # Agent A concurrently adds a new value (new token)
431 m1_v2 = m1.set("tempo", "140bpm")
432
433 # Merge: the new add survives because it has a fresh token
434 merged = m1_v2.join(m2)
435 assert merged.get("tempo") == "140bpm"
436
437 def test_join_is_commutative(self) -> None:
438 m1 = AWMap()
439 m1 = m1.set("A", "1")
440 m2 = AWMap()
441 m2 = m2.set("B", "2")
442 assert m1.join(m2).to_plain_dict() == m2.join(m1).to_plain_dict()
443
444 def test_join_is_associative(self) -> None:
445 m1 = AWMap().set("A", "1")
446 m2 = AWMap().set("B", "2")
447 m3 = AWMap().set("C", "3")
448 left = m1.join(m2).join(m3)
449 right = m1.join(m2.join(m3))
450 assert left.to_plain_dict() == right.to_plain_dict()
451
452 def test_join_is_idempotent(self) -> None:
453 m = AWMap().set("A", "1")
454 assert m.join(m).to_plain_dict() == m.to_plain_dict()
455
456 def test_keys_returns_live_keys(self) -> None:
457 m = AWMap()
458 m = m.set("X", "1")
459 m = m.set("Y", "2")
460 assert m.keys() == frozenset({"X", "Y"})
461
462 def test_contains(self) -> None:
463 m = AWMap().set("K", "V")
464 assert "K" in m
465 assert "Z" not in m
466
467 def test_round_trip_to_from_dict(self) -> None:
468 m = AWMap().set("A", "1").set("B", "2")
469 m2 = AWMap.from_dict(m.to_dict())
470 assert m2.to_plain_dict() == m.to_plain_dict()
471
472
473 # ===========================================================================
474 # GCounter
475 # ===========================================================================
476
477
478 class TestGCounter:
479 def test_initial_value_is_zero(self) -> None:
480 c = GCounter()
481 assert c.value() == 0
482
483 def test_increment(self) -> None:
484 c = GCounter().increment("agent-1")
485 assert c.value() == 1
486 assert c.value_for("agent-1") == 1
487
488 def test_increment_by_n(self) -> None:
489 c = GCounter().increment("agent-1", by=5)
490 assert c.value() == 5
491
492 def test_increment_rejects_zero(self) -> None:
493 with pytest.raises(ValueError):
494 GCounter().increment("agent-1", by=0)
495
496 def test_increment_rejects_negative(self) -> None:
497 with pytest.raises(ValueError):
498 GCounter().increment("agent-1", by=-1)
499
500 def test_join_takes_max_per_agent(self) -> None:
501 c1 = GCounter({"agent-A": 3, "agent-B": 1})
502 c2 = GCounter({"agent-A": 1, "agent-B": 5})
503 merged = c1.join(c2)
504 assert merged.value_for("agent-A") == 3
505 assert merged.value_for("agent-B") == 5
506 assert merged.value() == 8
507
508 def test_join_is_commutative(self) -> None:
509 c1 = GCounter({"agent-A": 3})
510 c2 = GCounter({"agent-B": 7})
511 assert c1.join(c2).equivalent(c2.join(c1))
512
513 def test_join_is_associative(self) -> None:
514 c1 = GCounter({"agent-A": 1})
515 c2 = GCounter({"agent-B": 2})
516 c3 = GCounter({"agent-C": 3})
517 assert c1.join(c2).join(c3).equivalent(c1.join(c2.join(c3)))
518
519 def test_join_is_idempotent(self) -> None:
520 c = GCounter({"agent-A": 5})
521 assert c.join(c).equivalent(c)
522
523 def test_value_for_absent_agent_is_zero(self) -> None:
524 c = GCounter()
525 assert c.value_for("ghost-agent") == 0
526
527 def test_round_trip_to_from_dict(self) -> None:
528 c = GCounter({"a": 1, "b": 2})
529 c2 = GCounter.from_dict(c.to_dict())
530 assert c2.equivalent(c)
531
532
533 # ===========================================================================
534 # CRDTPlugin integration — merge engine CRDT path
535 # ===========================================================================
536
537
538 class TestCRDTMergeEngineIntegration:
539 """Tests for :func:`~muse.core.merge_engine.crdt_join_snapshots`.
540
541 Since there is no production CRDTPlugin implementation yet (the MIDI plugin
542 is still three-way mode), we create a minimal stub that satisfies the
543 CRDTPlugin protocol.
544 """
545
546 def _make_stub_plugin(self) -> CRDTPlugin:
547 """Return a minimal CRDTPlugin stub."""
548 from muse.domain import (
549 CRDTPlugin,
550 CRDTSnapshotManifest,
551 DriftReport,
552 LiveState,
553 MergeResult,
554 StateSnapshot,
555 StateDelta,
556 StructuredDelta,
557 )
558 from muse.core.schema import CRDTDimensionSpec, DomainSchema
559
560 class StubCRDTPlugin(CRDTPlugin):
561 def snapshot(self, live_state: LiveState) -> StateSnapshot:
562 return {"files": {}, "domain": "stub"}
563
564 def diff(
565 self,
566 base: StateSnapshot,
567 target: StateSnapshot,
568 *,
569 repo_root: pathlib.Path | None = None,
570 ) -> StateDelta:
571 empty_delta: StructuredDelta = {
572 "domain": "stub",
573 "ops": [],
574 "summary": "no changes",
575 }
576 return empty_delta
577
578 def drift(self, committed: StateSnapshot, live_state: LiveState) -> DriftReport:
579 return DriftReport(has_drift=False)
580
581 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState:
582 return live_state
583
584 def merge(
585 self,
586 base: StateSnapshot,
587 left: StateSnapshot,
588 right: StateSnapshot,
589 *,
590 repo_root: pathlib.Path | None = None,
591 ) -> MergeResult:
592 return MergeResult(merged=base)
593
594 def schema(self) -> DomainSchema:
595 from muse.core.schema import SetSchema
596 schema: SetSchema = {
597 "kind": "set",
598 "element_type": "str",
599 "identity": "by_content",
600 }
601 return {
602 "domain": "stub",
603 "description": "Stub CRDT domain",
604 "dimensions": [],
605 "top_level": schema,
606 "merge_mode": "crdt",
607 "schema_version": __version__,
608 }
609
610 def crdt_schema(self) -> list[CRDTDimensionSpec]:
611 return []
612
613 def join(
614 self,
615 a: CRDTSnapshotManifest,
616 b: CRDTSnapshotManifest,
617 ) -> CRDTSnapshotManifest:
618 # Simple merge: union of files, max vclock, union crdt_state
619 from muse.core.crdts.vclock import VectorClock
620
621 vc_a = VectorClock.from_dict(a["vclock"])
622 vc_b = VectorClock.from_dict(b["vclock"])
623 merged_vc = vc_a.merge(vc_b)
624 merged_files = {**a["files"], **b["files"]}
625 merged_crdt_state = {**a["crdt_state"], **b["crdt_state"]}
626 result: CRDTSnapshotManifest = {
627 "files": merged_files,
628 "domain": a["domain"],
629 "vclock": merged_vc.to_dict(),
630 "crdt_state": merged_crdt_state,
631 "schema_version": __version__,
632 }
633 return result
634
635 def to_crdt_state(self, snapshot: StateSnapshot) -> CRDTSnapshotManifest:
636 result: CRDTSnapshotManifest = {
637 "files": snapshot.get("files", {}),
638 "domain": snapshot.get("domain", "stub"),
639 "vclock": {},
640 "crdt_state": {},
641 "schema_version": __version__,
642 }
643 return result
644
645 def from_crdt_state(self, crdt: CRDTSnapshotManifest) -> StateSnapshot:
646 return {"files": crdt["files"], "domain": crdt["domain"]}
647
648 return StubCRDTPlugin()
649
650 def test_crdt_join_produces_merge_result(self) -> None:
651 from muse.core.merge_engine import crdt_join_snapshots
652
653 plugin = self._make_stub_plugin()
654 result = crdt_join_snapshots(
655 plugin=plugin,
656 a_snapshot={"track.mid": "hash-a"},
657 b_snapshot={"beat.mid": "hash-b"},
658 a_vclock={"agent-1": 1},
659 b_vclock={"agent-2": 1},
660 a_crdt_state={},
661 b_crdt_state={},
662 domain="stub",
663 )
664 assert result.is_clean
665 assert result.conflicts == []
666
667 def test_crdt_join_merges_files(self) -> None:
668 from muse.core.merge_engine import crdt_join_snapshots
669
670 plugin = self._make_stub_plugin()
671 result = crdt_join_snapshots(
672 plugin=plugin,
673 a_snapshot={"track.mid": "hash-a"},
674 b_snapshot={"beat.mid": "hash-b"},
675 a_vclock={},
676 b_vclock={},
677 a_crdt_state={},
678 b_crdt_state={},
679 domain="stub",
680 )
681 assert "track.mid" in result.merged["files"]
682 assert "beat.mid" in result.merged["files"]
683
684 def test_crdt_join_is_commutative(self) -> None:
685 from muse.core.merge_engine import crdt_join_snapshots
686
687 plugin = self._make_stub_plugin()
688 result_ab = crdt_join_snapshots(
689 plugin=plugin,
690 a_snapshot={"track.mid": "hash-a"},
691 b_snapshot={"beat.mid": "hash-b"},
692 a_vclock={"agent-1": 1},
693 b_vclock={"agent-2": 2},
694 a_crdt_state={},
695 b_crdt_state={},
696 domain="stub",
697 )
698 result_ba = crdt_join_snapshots(
699 plugin=plugin,
700 a_snapshot={"beat.mid": "hash-b"},
701 b_snapshot={"track.mid": "hash-a"},
702 a_vclock={"agent-2": 2},
703 b_vclock={"agent-1": 1},
704 a_crdt_state={},
705 b_crdt_state={},
706 domain="stub",
707 )
708 assert set(result_ab.merged["files"].keys()) == set(result_ba.merged["files"].keys())
709
710 def test_crdt_merge_never_produces_conflicts(self) -> None:
711 from muse.core.merge_engine import crdt_join_snapshots
712
713 plugin = self._make_stub_plugin()
714 # Even when both replicas modify the same file, CRDT join never conflicts
715 result = crdt_join_snapshots(
716 plugin=plugin,
717 a_snapshot={"shared.mid": "hash-a"},
718 b_snapshot={"shared.mid": "hash-b"},
719 a_vclock={"agent-1": 1},
720 b_vclock={"agent-2": 1},
721 a_crdt_state={},
722 b_crdt_state={},
723 domain="stub",
724 )
725 assert result.is_clean
726 assert len(result.conflicts) == 0
727
728 def test_crdt_join_requires_crdt_plugin_protocol(self) -> None:
729 """Verify the protocol check is documented in the function signature.
730
731 The static type of ``crdt_join_snapshots(plugin=...)`` is
732 ``MuseDomainPlugin``. Callers that don't implement ``CRDTPlugin``
733 are rejected at the call site by mypy. The runtime ``isinstance``
734 check exists as a defensive guard for duck-typed callers.
735 """
736 from muse.core.merge_engine import crdt_join_snapshots
737
738 # A plugin that implements MuseDomainPlugin but NOT CRDTPlugin
739 # would pass static type-checking but fail at runtime.
740 # We verify the docstring is accurate by checking the stub IS a CRDTPlugin.
741 plugin = self._make_stub_plugin()
742 assert isinstance(plugin, CRDTPlugin)
743
744 def test_crdt_plugin_join_commutes(self) -> None:
745 """join(a,b) == join(b,a) at the CRDT primitive level."""
746 from muse.domain import CRDTSnapshotManifest
747
748 plugin = self._make_stub_plugin()
749 from muse.domain import CRDTPlugin
750 assert isinstance(plugin, CRDTPlugin)
751
752 a: CRDTSnapshotManifest = {
753 "files": {"a.mid": "ha"},
754 "domain": "stub",
755 "vclock": {"x": 1},
756 "crdt_state": {},
757 "schema_version": __version__,
758 }
759 b: CRDTSnapshotManifest = {
760 "files": {"b.mid": "hb"},
761 "domain": "stub",
762 "vclock": {"y": 1},
763 "crdt_state": {},
764 "schema_version": __version__,
765 }
766 ab = plugin.join(a, b)
767 ba = plugin.join(b, a)
768 assert set(ab["files"].keys()) == set(ba["files"].keys())
769
770
771 # ===========================================================================
772 # Cross-module: CRDT primitives satisfy lattice laws end-to-end
773 # ===========================================================================
774
775
776 class TestLatticeProperties:
777 """Property-style checks that every CRDT satisfies all three lattice laws."""
778
779 def test_vector_clock_lattice_laws(self) -> None:
780 a = VectorClock({"x": 1, "y": 2})
781 b = VectorClock({"x": 3, "z": 1})
782 c = VectorClock({"y": 5})
783
784 # Commutativity
785 assert a.merge(b).equivalent(b.merge(a))
786 # Associativity
787 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
788 # Idempotency
789 assert a.merge(a).equivalent(a)
790
791 def test_g_counter_lattice_laws(self) -> None:
792 a = GCounter({"x": 1, "y": 2})
793 b = GCounter({"x": 3, "z": 1})
794 c = GCounter({"y": 5})
795
796 assert a.join(b).equivalent(b.join(a))
797 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
798 assert a.join(a).equivalent(a)
799
800 def test_lww_register_lattice_laws(self) -> None:
801 def make(v: str, ts: float, author: str) -> LWWRegister:
802 return LWWRegister(v, ts, author)
803
804 a = make("val-a", 1.0, "agent-a")
805 b = make("val-b", 2.0, "agent-b")
806 c = make("val-c", 3.0, "agent-c")
807
808 assert a.join(b).equivalent(b.join(a))
809 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
810 assert a.join(a).equivalent(a)
811
812 def test_or_set_lattice_laws(self) -> None:
813 s1 = ORSet()
814 s1, _ = s1.add("X")
815 s2 = ORSet()
816 s2, _ = s2.add("Y")
817 s3 = ORSet()
818 s3, _ = s3.add("Z")
819
820 assert s1.join(s2).elements() == s2.join(s1).elements()
821 assert s1.join(s2).join(s3).elements() == s1.join(s2.join(s3)).elements()
822 assert s1.join(s1).elements() == s1.elements()
823
824 def test_aw_map_lattice_laws(self) -> None:
825 m1 = AWMap().set("A", "1")
826 m2 = AWMap().set("B", "2")
827 m3 = AWMap().set("C", "3")
828
829 assert m1.join(m2).to_plain_dict() == m2.join(m1).to_plain_dict()
830 assert m1.join(m2).join(m3).to_plain_dict() == m1.join(m2.join(m3)).to_plain_dict()
831 assert m1.join(m1).to_plain_dict() == m1.to_plain_dict()