gabriel / muse public
rga.py python
324 lines 12.4 KB
d1dfb412 feat: implement 3 missing plan items — property tests, format_version, … Gabriel Cardona <gabriel@tellurstori.com> 5d ago
1 """Replicated Growable Array (RGA) — CRDT for ordered sequences.
2
3 The RGA (Roh et al., 2011 "Replicated abstract data types") provides
4 Google-Docs-style collaborative editing semantics for any ordered sequence
5 domain. Every element carries a globally unique, immutable identifier
6 ``f"{timestamp}@{author}"``; this identifier determines insertion order when
7 two agents concurrently insert at the same position.
8
9 **Core invariant**: the visible sequence is the list of elements whose
10 ``deleted`` flag is ``False``, in the order determined by their identifiers.
11 Deletions are *tombstoned* (``deleted=True``) rather than physically removed
12 so that identifiers remain stable across replicas.
13
14 **Insertion semantics**: ``insert(after_id, element)`` inserts *element*
15 immediately after the element with ``id == after_id`` (``None`` means
16 prepend). Concurrent inserts at the same position are resolved by sorting
17 the new element's ID lexicographically (descending) — the "bigger" ID wins
18 and is placed first, giving a deterministic outcome independent of delivery
19 order.
20
21 **Lattice laws satisfied by** :meth:`join`:
22 1. Commutativity: ``join(a, b) == join(b, a)``
23 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))``
24 3. Idempotency: ``join(a, a) == a``
25
26 Public API
27 ----------
28 - :class:`RGAElement` — ``TypedDict`` for one array element.
29 - :class:`RGA` — the array itself.
30 """
31
32 from __future__ import annotations
33
34 import logging
35 from typing import TypedDict
36
37 logger = logging.getLogger(__name__)
38
39
40 class RGAElement(TypedDict):
41 """A single element in an :class:`RGA`.
42
43 ``id`` is the stable unique identifier ``"{timestamp}@{author}"`` assigned
44 at insertion time. ``value`` is the content hash of the element (it
45 references the object store — all binary content lives there).
46 ``deleted`` is ``True`` for tombstoned elements that no longer appear in
47 the visible sequence.
48
49 ``parent_id`` is the ``id`` of the element this one was inserted after
50 (``None`` means it was prepended — inserted at the head). This is
51 required for the commutative join algorithm to correctly place concurrent
52 inserts regardless of which replica initiates the join.
53 """
54
55 id: str
56 value: str
57 deleted: bool
58 parent_id: str | None
59
60
61 class RGA:
62 """Replicated Growable Array — CRDT for ordered sequences.
63
64 Provides ``insert``, ``delete``, ``join``, and ``to_sequence`` operations.
65 All mutating methods return new :class:`RGA` instances; ``self`` is
66 never modified.
67
68 The internal representation is a list of :class:`RGAElement` dicts in
69 insertion order (not visible order — tombstones are kept inline).
70
71 Example::
72
73 rga = RGA()
74 rga, id_a = rga.insert(None, "note-hash-A") # prepend
75 rga, id_b = rga.insert(id_a, "note-hash-B") # insert after A
76 rga = rga.delete(id_a) # tombstone A
77 assert rga.to_sequence() == ["note-hash-B"]
78 """
79
80 def __init__(self, elements: list[RGAElement] | None = None) -> None:
81 """Construct an RGA, optionally pre-populated.
82
83 Args:
84 elements: Ordered list of :class:`RGAElement` dicts (may contain
85 tombstones). Copied defensively.
86 """
87 self._elements: list[RGAElement] = list(elements) if elements else []
88
89 # ------------------------------------------------------------------
90 # Mutations (return new RGA)
91 # ------------------------------------------------------------------
92
93 def insert(self, after_id: str | None, value: str, *, element_id: str) -> RGA:
94 """Return a new RGA with *value* inserted after *after_id*.
95
96 Concurrent inserts at the same position are resolved by placing the
97 element with the lexicographically *larger* ``element_id`` first.
98
99 Args:
100 after_id: The ``id`` of the element to insert after, or ``None``
101 to prepend (insert before all existing elements).
102 value: The content hash of the new element.
103 element_id: The stable unique ID for the new element; callers
104 should use ``f"{timestamp}@{author}"`` to ensure global
105 uniqueness across agents.
106
107 Returns:
108 A new :class:`RGA` with the element inserted at the correct position.
109 """
110 new_elem: RGAElement = {
111 "id": element_id,
112 "value": value,
113 "deleted": False,
114 "parent_id": after_id,
115 }
116 elems = list(self._elements)
117
118 if after_id is None:
119 # Prepend: among concurrent prepends (same parent_id=None), larger ID goes first.
120 insert_pos = 0
121 while (
122 insert_pos < len(elems)
123 and elems[insert_pos]["parent_id"] is None
124 and elems[insert_pos]["id"] > element_id
125 ):
126 insert_pos += 1
127 elems.insert(insert_pos, new_elem)
128 else:
129 # Find the anchor element.
130 anchor_idx = next(
131 (i for i, e in enumerate(elems) if e["id"] == after_id), None
132 )
133 if anchor_idx is None:
134 # Unknown anchor — append at end (safe degradation).
135 logger.warning("RGA.insert: unknown after_id=%r, appending at end", after_id)
136 elems.append(new_elem)
137 else:
138 # Insert after anchor. Skip any existing elements that also
139 # have the same parent_id AND a larger element ID (concurrent
140 # inserts at the same position; larger ID wins leftmost slot).
141 insert_pos = anchor_idx + 1
142 while (
143 insert_pos < len(elems)
144 and elems[insert_pos]["parent_id"] == after_id
145 and elems[insert_pos]["id"] > element_id
146 ):
147 insert_pos += 1
148 elems.insert(insert_pos, new_elem)
149
150 return RGA(elems)
151
152 def delete(self, element_id: str) -> RGA:
153 """Return a new RGA with *element_id* tombstoned.
154
155 Tombstoning is idempotent — deleting an already-deleted or unknown
156 element is a no-op.
157
158 Args:
159 element_id: The ``id`` of the element to tombstone.
160
161 Returns:
162 A new :class:`RGA` with the element marked ``deleted=True``.
163 """
164 new_elems: list[RGAElement] = []
165 for elem in self._elements:
166 if elem["id"] == element_id:
167 new_elems.append({
168 "id": elem["id"],
169 "value": elem["value"],
170 "deleted": True,
171 "parent_id": elem["parent_id"],
172 })
173 else:
174 new_elems.append({
175 "id": elem["id"],
176 "value": elem["value"],
177 "deleted": elem["deleted"],
178 "parent_id": elem["parent_id"],
179 })
180 return RGA(new_elems)
181
182 # ------------------------------------------------------------------
183 # CRDT join
184 # ------------------------------------------------------------------
185
186 def join(self, other: RGA) -> RGA:
187 """Return the lattice join — the union of both arrays.
188
189 Elements are keyed by ``id``. The join:
190 1. Takes the union of all element IDs from both replicas.
191 2. For each ID, marks the element ``deleted`` if *either* replica has
192 it tombstoned (once deleted, always deleted — monotone).
193 3. Preserves the insertion-order sequence from ``self``; appends any
194 elements from ``other`` not yet seen in ``self``.
195
196 Args:
197 other: The RGA to merge with.
198
199 Returns:
200 A new :class:`RGA` that is the join of ``self`` and *other*.
201 """
202 # Build ID → element maps from both replicas.
203 self_map: dict[str, RGAElement] = {e["id"]: e for e in self._elements}
204 other_map: dict[str, RGAElement] = {e["id"]: e for e in other._elements}
205
206 # Merge deletions monotonically: once deleted in either, always deleted.
207 merged_map: dict[str, RGAElement] = {}
208 all_ids = set(self_map) | set(other_map)
209 for eid in all_ids:
210 if eid in self_map and eid in other_map:
211 s = self_map[eid]
212 o = other_map[eid]
213 # In practice the same element_id always carries the same value
214 # (because element_id = "{timestamp}@{author}" uniquely identifies
215 # a write). If values differ (only possible in crafted test scenarios),
216 # pick the lexicographically larger value for commutativity.
217 winning_value = s["value"] if s["value"] >= o["value"] else o["value"]
218 merged_map[eid] = {
219 "id": eid,
220 "value": winning_value,
221 "deleted": s["deleted"] or o["deleted"],
222 "parent_id": s["parent_id"],
223 }
224 elif eid in self_map:
225 src = self_map[eid]
226 merged_map[eid] = {
227 "id": src["id"],
228 "value": src["value"],
229 "deleted": src["deleted"],
230 "parent_id": src["parent_id"],
231 }
232 else:
233 src = other_map[eid]
234 merged_map[eid] = {
235 "id": src["id"],
236 "value": src["value"],
237 "deleted": src["deleted"],
238 "parent_id": src["parent_id"],
239 }
240
241 # Rebuild a canonical ordered sequence using parent_id links.
242 # Group elements by parent_id. Within each group, sort by ID
243 # descending (larger ID → leftmost, per concurrent-insert tiebreak rule).
244 # Traverse recursively: start with children of None (prepended), then
245 # recurse on each child's children.
246 from collections import defaultdict
247 children: dict[str | None, list[str]] = defaultdict(list)
248 for eid, elem in merged_map.items():
249 children[elem["parent_id"]].append(eid)
250 for group in children.values():
251 group.sort(reverse=True) # larger ID first
252
253 ordered: list[RGAElement] = []
254
255 def _traverse(parent: str | None) -> None:
256 for eid in children.get(parent, []):
257 ordered.append(merged_map[eid])
258 _traverse(eid)
259
260 _traverse(None)
261 return RGA(ordered)
262
263 # ------------------------------------------------------------------
264 # Query
265 # ------------------------------------------------------------------
266
267 def to_sequence(self) -> list[str]:
268 """Return the visible element values (excluding tombstones).
269
270 Returns:
271 List of ``value`` strings in document order, tombstones excluded.
272 """
273 return [e["value"] for e in self._elements if not e["deleted"]]
274
275 def __len__(self) -> int:
276 return len([e for e in self._elements if not e["deleted"]])
277
278 # ------------------------------------------------------------------
279 # Serialisation
280 # ------------------------------------------------------------------
281
282 def to_dict(self) -> list[RGAElement]:
283 """Return a JSON-serialisable list of :class:`RGAElement` dicts.
284
285 Returns:
286 Ordered list of all elements (including tombstones).
287 """
288 return [
289 {"id": e["id"], "value": e["value"], "deleted": e["deleted"], "parent_id": e["parent_id"]}
290 for e in self._elements
291 ]
292
293 @classmethod
294 def from_dict(cls, data: list[RGAElement]) -> RGA:
295 """Reconstruct an :class:`RGA` from its wire representation.
296
297 Args:
298 data: List of :class:`RGAElement` dicts as produced by
299 :meth:`to_dict`.
300
301 Returns:
302 A new :class:`RGA`.
303 """
304 return cls(list(data))
305
306 # ------------------------------------------------------------------
307 # Python dunder helpers
308 # ------------------------------------------------------------------
309
310 def equivalent(self, other: RGA) -> bool:
311 """Return ``True`` if both RGAs have identical element lists (including tombstones).
312
313 Note: use :meth:`to_sequence` comparison when only visible content matters.
314
315 Args:
316 other: The RGA to compare against.
317
318 Returns:
319 ``True`` when the full internal element lists are equal.
320 """
321 return self._elements == other._elements
322
323 def __repr__(self) -> str:
324 return f"RGA(len={len(self)}, elements={self._elements!r})"