cgcardona / muse public
test_stress_crdts_exhaustive.py python
536 lines 19.3 KB
119290fc Add mission-critical stress test suite (9 new files, 1716 tests total) (#76) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Exhaustive CRDT stress tests — lattice laws, concurrent writes, adversarial inputs.
2
3 For each CRDT primitive, verifies:
4 1. Commutativity: join(a, b) == join(b, a)
5 2. Associativity: join(join(a, b), c) == join(a, join(b, c))
6 3. Idempotency: join(a, a) == a
7 4. Monotonicity: join never loses information (|join(a,b)| >= |a|)
8 5. Serialisation: from_dict(to_dict(x)) == x (round-trip)
9
10 Additional adversarial cases:
11 - 100-agent GCounter: concurrent increments from all agents.
12 - VectorClock causal ordering under partition / reconnect.
13 - ORSet: add → remove semantics, concurrent add wins over remove.
14 - RGA: concurrent inserts from N agents produce a deterministic total order.
15 - AWMap: nested concurrent updates converge.
16 - LWWRegister: last writer wins with tie-broken by agent_id.
17 """
18 from __future__ import annotations
19
20 import datetime
21 import itertools
22
23 import pytest
24
25 from muse.core.crdts import AWMap, GCounter, LWWRegister, ORSet, RGA, VectorClock
26 from muse.core.crdts.lww_register import LWWValue
27 from muse.core.crdts.or_set import ORSetDict
28 from muse.core.crdts.rga import RGAElement
29
30
31 # ===========================================================================
32 # VectorClock — exhaustive
33 # ===========================================================================
34
35
36 class TestVectorClockExhaustive:
37 # --- lattice laws ---
38
39 def _clock(self, **kw: int) -> VectorClock:
40 return VectorClock(kw)
41
42 def test_commutativity_many_agents(self) -> None:
43 for n in range(1, 11):
44 a = VectorClock({f"agent-{i}": i for i in range(n)})
45 b = VectorClock({f"agent-{i}": n - i for i in range(n)})
46 assert a.merge(b).equivalent(b.merge(a))
47
48 def test_associativity_three_replicas(self) -> None:
49 a = self._clock(**{"a": 3, "b": 1})
50 b = self._clock(**{"a": 1, "b": 5, "c": 2})
51 c = self._clock(**{"b": 3, "c": 4, "d": 1})
52 left = a.merge(b).merge(c)
53 right = a.merge(b.merge(c))
54 assert left.equivalent(right)
55
56 def test_idempotency(self) -> None:
57 for n in [1, 5, 20]:
58 clk = VectorClock({f"a{i}": i * 3 for i in range(n)})
59 assert clk.merge(clk).equivalent(clk)
60
61 def test_merge_is_monotone(self) -> None:
62 a = VectorClock({"x": 1, "y": 2})
63 b = VectorClock({"x": 3, "z": 1})
64 merged = a.merge(b)
65 # merged must dominate both a and b (or be equal to them in each slot).
66 for agent in ["x", "y", "z"]:
67 assert merged.to_dict().get(agent, 0) >= a.to_dict().get(agent, 0)
68 assert merged.to_dict().get(agent, 0) >= b.to_dict().get(agent, 0)
69
70 # --- causal ordering ---
71
72 def test_happens_before_empty_clocks(self) -> None:
73 empty = VectorClock()
74 assert not empty.happens_before(empty)
75
76 def test_happens_before_empty_before_non_empty(self) -> None:
77 empty = VectorClock()
78 non_empty = VectorClock({"a": 1})
79 assert empty.happens_before(non_empty)
80 assert not non_empty.happens_before(empty)
81
82 def test_transitivity_of_causal_order(self) -> None:
83 a = VectorClock({"a": 1})
84 b = VectorClock({"a": 2, "b": 1})
85 c = VectorClock({"a": 3, "b": 2, "c": 1})
86 assert a.happens_before(b)
87 assert b.happens_before(c)
88 assert a.happens_before(c)
89
90 def test_concurrent_detection_both_ways(self) -> None:
91 x = VectorClock({"agent-X": 5, "agent-Y": 1})
92 y = VectorClock({"agent-X": 1, "agent-Y": 5})
93 assert x.concurrent_with(y)
94 assert y.concurrent_with(x)
95
96 def test_not_concurrent_with_itself(self) -> None:
97 clk = VectorClock({"a": 3, "b": 2})
98 assert not clk.concurrent_with(clk)
99
100 def test_not_concurrent_with_ancestor(self) -> None:
101 ancestor = VectorClock({"a": 1})
102 descendant = VectorClock({"a": 2})
103 assert not ancestor.concurrent_with(descendant)
104
105 def test_partition_and_reconnect_converges(self) -> None:
106 """Simulate network partition: A and B increment independently, then merge."""
107 shared = VectorClock({"base": 5})
108 # Partition: A and B both do 10 events independently.
109 a_partition = shared
110 b_partition = shared
111 for i in range(10):
112 a_partition = a_partition.increment("agent-A")
113 b_partition = b_partition.increment("agent-B")
114 # Reconnect.
115 merged = a_partition.merge(b_partition)
116 assert merged.to_dict()["base"] == 5
117 assert merged.to_dict()["agent-A"] == 10
118 assert merged.to_dict()["agent-B"] == 10
119
120 def test_round_trip(self) -> None:
121 clk = VectorClock({"x": 7, "y": 3, "z": 0})
122 restored = VectorClock.from_dict(clk.to_dict())
123 assert restored.equivalent(clk)
124
125
126 # ===========================================================================
127 # GCounter — exhaustive
128 # ===========================================================================
129
130
131 class TestGCounterExhaustive:
132 def test_commutativity(self) -> None:
133 a = GCounter({"a1": 5, "a2": 3})
134 b = GCounter({"a1": 2, "a3": 7})
135 assert a.join(b).equivalent(b.join(a))
136
137 def test_associativity(self) -> None:
138 a = GCounter({"x": 1})
139 b = GCounter({"y": 2})
140 c = GCounter({"z": 3, "x": 5})
141 left = a.join(b).join(c)
142 right = a.join(b.join(c))
143 assert left.equivalent(right)
144
145 def test_idempotency(self) -> None:
146 g = GCounter({"a": 5, "b": 3, "c": 1})
147 assert g.join(g).equivalent(g)
148
149 def test_100_agents_concurrent_increment(self) -> None:
150 """100 agents each increment once concurrently; merged total must equal 100."""
151 counters = [GCounter().increment(f"agent-{i}") for i in range(100)]
152 merged = counters[0]
153 for c in counters[1:]:
154 merged = merged.join(c)
155 assert merged.value() == 100
156
157 def test_increment_by_multiple(self) -> None:
158 g = GCounter().increment("agent", by=10)
159 assert g.value_for("agent") == 10
160 g2 = g.increment("agent", by=5)
161 assert g2.value_for("agent") == 15
162
163 def test_join_takes_max_per_slot(self) -> None:
164 a = GCounter({"x": 3, "y": 1})
165 b = GCounter({"x": 1, "y": 5})
166 merged = a.join(b)
167 assert merged.value_for("x") == 3
168 assert merged.value_for("y") == 5
169
170 def test_monotone_join(self) -> None:
171 a = GCounter({"a": 10})
172 b = GCounter({"a": 20})
173 merged = a.join(b)
174 assert merged.value() >= a.value()
175 assert merged.value() >= b.value()
176
177 def test_round_trip(self) -> None:
178 g = GCounter({"x": 7, "y": 3})
179 restored = GCounter.from_dict(g.to_dict())
180 assert restored.equivalent(g)
181
182 def test_empty_counter_join(self) -> None:
183 empty = GCounter()
184 g = GCounter({"a": 5})
185 assert empty.join(g).equivalent(g)
186 assert g.join(empty).equivalent(g)
187
188 def test_value_is_sum_of_slots(self) -> None:
189 g = GCounter({"a": 3, "b": 4, "c": 5})
190 assert g.value() == 12
191
192 def test_stress_many_increments_single_agent(self) -> None:
193 g = GCounter()
194 for i in range(1000):
195 g = g.increment("solo")
196 assert g.value() == 1000
197
198
199 # ===========================================================================
200 # ORSet — exhaustive
201 # ===========================================================================
202
203
204 class TestORSetExhaustive:
205 def test_commutativity(self) -> None:
206 s1 = ORSet()
207 s1, t1 = s1.add("apple")
208 s2 = ORSet()
209 s2, t2 = s2.add("banana")
210 assert s1.join(s2).elements() == s2.join(s1).elements()
211
212 def test_associativity(self) -> None:
213 s1, _ = ORSet().add("a")
214 s2, _ = ORSet().add("b")
215 s3, _ = ORSet().add("c")
216 left = s1.join(s2).join(s3)
217 right = s1.join(s2.join(s3))
218 assert left.elements() == right.elements()
219
220 def test_idempotency(self) -> None:
221 s, _ = ORSet().add("x")
222 s2, _ = s.add("y")
223 assert s2.join(s2).elements() == s2.elements()
224
225 def test_add_then_remove(self) -> None:
226 s, tok = ORSet().add("element")
227 s2 = s.remove("element", tok)
228 assert "element" not in s2.elements()
229
230 def test_concurrent_add_wins_over_remove(self) -> None:
231 """ORSet semantics: concurrent add always beats remove."""
232 # Replica A: add "item" with token t1.
233 a, t1 = ORSet().add("item")
234 # Replica B: independently add "item" with token t2 and then remove t1.
235 b, t2 = ORSet().add("item")
236 b_removed = b.remove("item", t1)
237 # After join, "item" survives because t2 is still alive in b's replica.
238 merged = a.join(b_removed)
239 assert "item" in merged.elements()
240
241 def test_remove_nonexistent_is_noop(self) -> None:
242 s = ORSet()
243 # Removing something that was never added should not crash.
244 s2 = s.remove("ghost", "fake-token")
245 assert "ghost" not in s2.elements()
246
247 def test_100_elements_all_present(self) -> None:
248 s = ORSet()
249 for i in range(100):
250 s, _ = s.add(f"item-{i}")
251 elems = s.elements()
252 for i in range(100):
253 assert f"item-{i}" in elems
254
255 def test_round_trip_serialisation(self) -> None:
256 s = ORSet()
257 tokens: list[str] = []
258 for label in ["alpha", "beta", "gamma"]:
259 s, tok = s.add(label)
260 tokens.append(tok)
261 d: ORSetDict = s.to_dict()
262 restored = ORSet.from_dict(d)
263 assert restored.elements() == s.elements()
264
265 def test_join_preserves_all_adds(self) -> None:
266 s1, _ = ORSet().add("from-s1")
267 s2, _ = ORSet().add("from-s2")
268 merged = s1.join(s2)
269 assert "from-s1" in merged.elements()
270 assert "from-s2" in merged.elements()
271
272 def test_many_add_remove_cycles(self) -> None:
273 """Rapidly add and remove the same element; final state must be empty."""
274 s = ORSet()
275 tok = ""
276 for _ in range(10):
277 s, tok = s.add("target")
278 s = s.remove("target", tok)
279 assert "target" not in s.elements()
280
281
282 # ===========================================================================
283 # LWWRegister — exhaustive
284 # ===========================================================================
285
286
287 class TestLWWRegisterExhaustive:
288 def test_later_timestamp_wins(self) -> None:
289 r1 = LWWRegister("old", 1.0, "agent-A")
290 r2 = LWWRegister("new", 2.0, "agent-A")
291 merged = r1.join(r2)
292 assert merged.read() == "new"
293
294 def test_earlier_timestamp_does_not_overwrite(self) -> None:
295 current = LWWRegister("current", 5.0, "agent-A")
296 stale = LWWRegister("stale", 3.0, "agent-A")
297 merged = current.join(stale)
298 assert merged.read() == "current"
299
300 def test_concurrent_write_tie_broken_by_agent_id(self) -> None:
301 """When timestamps are equal, lexicographically larger author wins."""
302 ts = 10.0
303 r1 = LWWRegister("value-Z", ts, "agent-Z")
304 r2 = LWWRegister("value-A", ts, "agent-A")
305 merged = r1.join(r2)
306 assert merged.read() == "value-Z" # "agent-Z" > "agent-A"
307
308 def test_join_commutativity(self) -> None:
309 ts = 1.0
310 r1 = LWWRegister("alpha", ts, "agent-A")
311 r2 = LWWRegister("beta", ts, "agent-B")
312 m1 = r1.join(r2)
313 m2 = r2.join(r1)
314 assert m1.read() == m2.read()
315
316 def test_idempotency(self) -> None:
317 r = LWWRegister("val", 1.0, "agent-A")
318 assert r.join(r).read() == r.read()
319
320 def test_write_method_updates_register(self) -> None:
321 r = LWWRegister("old", 1.0, "agent-A")
322 r2 = r.write("new", 2.0, "agent-A")
323 assert r2.read() == "new"
324
325 def test_round_trip(self) -> None:
326 r = LWWRegister("test-value", 42.0, "agent-X")
327 d = r.to_dict()
328 restored = LWWRegister.from_dict(d)
329 assert restored.read() == "test-value"
330
331 def test_100_concurrent_agents_settle_deterministically(self) -> None:
332 """100 agents all write at the same timestamp; result must be deterministic."""
333 ts = 0.0
334 registers = [LWWRegister(f"val-{i:03d}", ts, f"agent-{i:03d}") for i in range(100)]
335 merged = registers[0]
336 for r in registers[1:]:
337 merged = merged.join(r)
338 winner = merged.read()
339 assert winner is not None
340 # Shuffle and merge again — result must be identical.
341 import random
342 shuffled = list(registers)
343 random.shuffle(shuffled)
344 merged2 = shuffled[0]
345 for r in shuffled[1:]:
346 merged2 = merged2.join(r)
347 assert merged2.read() == winner
348
349 def test_higher_timestamp_beats_larger_author(self) -> None:
350 """Even if agent-A would lose tiebreaker, newer timestamp wins."""
351 r_early_z = LWWRegister("early-z", 1.0, "agent-Z")
352 r_late_a = LWWRegister("late-a", 2.0, "agent-A")
353 assert r_early_z.join(r_late_a).read() == "late-a"
354
355
356 # ===========================================================================
357 # RGA — exhaustive
358 # ===========================================================================
359
360
361 class TestRGAExhaustive:
362 def _eid(self, label: str) -> str:
363 """Generate a deterministic element_id."""
364 return f"1.0@{label}"
365
366 def test_single_insert(self) -> None:
367 rga = RGA()
368 rga2 = rga.insert(None, "hello", element_id="1.0@agent-A")
369 assert rga2.to_sequence() == ["hello"]
370
371 def test_insert_order_preserved(self) -> None:
372 rga = RGA()
373 rga = rga.insert(None, "a", element_id="1.0@a")
374 rga = rga.insert("1.0@a", "b", element_id="2.0@b")
375 rga = rga.insert("2.0@b", "c", element_id="3.0@c")
376 assert rga.to_sequence() == ["a", "b", "c"]
377
378 def test_delete_removes_element(self) -> None:
379 rga = RGA()
380 rga = rga.insert(None, "x", element_id="1.0@a")
381 rga = rga.delete("1.0@a")
382 assert "x" not in rga.to_sequence()
383
384 def test_commutativity(self) -> None:
385 rga1 = RGA()
386 rga2 = RGA()
387 rga1 = rga1.insert(None, "A", element_id="1.0@agent-1")
388 rga2 = rga2.insert(None, "B", element_id="1.0@agent-2")
389 m1 = rga1.join(rga2)
390 m2 = rga2.join(rga1)
391 assert m1.to_sequence() == m2.to_sequence()
392
393 def test_idempotency(self) -> None:
394 rga = RGA()
395 rga = rga.insert(None, "elem", element_id="1.0@agent")
396 assert rga.join(rga).to_sequence() == rga.to_sequence()
397
398 def test_concurrent_inserts_deterministic_ordering(self) -> None:
399 """5 agents insert concurrently; joined result is always the same total order."""
400 agents = [f"agent-{i}" for i in range(5)]
401 replicas: list[RGA] = []
402 for a in agents:
403 r = RGA()
404 r = r.insert(None, a, element_id=f"1.0@{a}")
405 replicas.append(r)
406
407 merged = replicas[0]
408 for r in replicas[1:]:
409 merged = merged.join(r)
410
411 result = merged.to_sequence()
412 assert sorted(result) == sorted(agents)
413
414 # Merge in reverse order — must give the same deterministic ordering.
415 merged2 = replicas[-1]
416 for r in reversed(replicas[:-1]):
417 merged2 = merged2.join(r)
418 assert merged2.to_sequence() == result
419
420 def test_tombstone_not_revived_after_join(self) -> None:
421 rga1 = RGA()
422 rga1 = rga1.insert(None, "deleted", element_id="1.0@a")
423 rga1 = rga1.delete("1.0@a")
424
425 rga2 = RGA()
426 rga2 = rga2.insert(None, "other", element_id="1.0@b")
427
428 merged = rga1.join(rga2)
429 assert "deleted" not in merged.to_sequence()
430 assert "other" in merged.to_sequence()
431
432 def test_large_sequence_sequential_insert(self) -> None:
433 rga = RGA()
434 prev_id: str | None = None
435 for i in range(50):
436 eid = f"{i}.0@agent"
437 rga = rga.insert(prev_id, str(i), element_id=eid)
438 prev_id = eid
439 seq = rga.to_sequence()
440 assert len(seq) == 50
441 assert seq[0] == "0"
442 assert seq[-1] == "49"
443
444
445 # ===========================================================================
446 # AWMap — exhaustive
447 # ===========================================================================
448
449
450 class TestAWMapExhaustive:
451 def test_set_and_get(self) -> None:
452 m = AWMap()
453 m = m.set("key", "value")
454 assert m.get("key") == "value"
455
456 def test_remove_entry(self) -> None:
457 m = AWMap()
458 m = m.set("key", "value")
459 m = m.remove("key")
460 assert m.get("key") is None
461
462 def test_concurrent_set_same_key_converges(self) -> None:
463 """Concurrent writes to the same key; join must be deterministic."""
464 m1 = AWMap().set("k", "from-A")
465 m2 = AWMap().set("k", "from-Z")
466 merged_ab = m1.join(m2)
467 merged_ba = m2.join(m1)
468 # Both orderings must produce the same result (commutativity).
469 assert merged_ab.get("k") == merged_ba.get("k")
470 assert merged_ab.get("k") is not None
471
472 def test_commutativity(self) -> None:
473 m1 = AWMap().set("x", "1")
474 m2 = AWMap().set("y", "2")
475 merged_ab = m1.join(m2)
476 merged_ba = m2.join(m1)
477 assert merged_ab.get("x") == merged_ba.get("x")
478 assert merged_ab.get("y") == merged_ba.get("y")
479
480 def test_idempotency(self) -> None:
481 m = AWMap().set("a", "alpha")
482 assert m.join(m).get("a") == m.get("a")
483
484 def test_add_wins_over_remove_from_empty(self) -> None:
485 """Add-wins: if one replica has the key and the other doesn't, add wins."""
486 m1 = AWMap().set("key", "added")
487 m2 = AWMap().remove("key") # remove on empty = no-op
488 merged = m1.join(m2)
489 # m2 never had the key, so no token to tombstone → add from m1 wins.
490 assert merged.get("key") == "added"
491
492 def test_multiple_keys_independent(self) -> None:
493 m = AWMap()
494 for i in range(20):
495 m = m.set(f"key-{i}", f"val-{i}")
496 for i in range(20):
497 assert m.get(f"key-{i}") == f"val-{i}"
498
499 def test_remove_absent_key_is_noop(self) -> None:
500 m = AWMap().remove("ghost")
501 assert m.get("ghost") is None
502
503 def test_update_replaces_value(self) -> None:
504 m = AWMap().set("k", "v1").set("k", "v2")
505 assert m.get("k") == "v2"
506
507
508 # ===========================================================================
509 # Cross-CRDT: vector-clock-guided merge
510 # ===========================================================================
511
512
513 class TestCrossTypeConsistency:
514 def test_gcounter_and_vclock_agree_on_agent_counts(self) -> None:
515 """GCounter and VectorClock track the same 'per-agent count' invariant."""
516 gc = GCounter()
517 vc = VectorClock()
518 agents = ["agent-A", "agent-B", "agent-C"]
519 for agent in agents:
520 for _ in range(3):
521 gc = gc.increment(agent)
522 vc = vc.increment(agent)
523 for agent in agents:
524 assert gc.value_for(agent) == vc.to_dict()[agent]
525
526 def test_lattice_join_never_reduces_information(self) -> None:
527 """join(a, b) must always contain at least as much information as a."""
528 for _ in range(20):
529 import random
530 slots = {f"a{i}": random.randint(0, 10) for i in range(5)}
531 a = GCounter(slots)
532 extra = {f"a{i}": random.randint(0, 5) for i in range(5)}
533 b = GCounter(extra)
534 merged = a.join(b)
535 for agent in slots:
536 assert merged.value_for(agent) >= a.value_for(agent)