cgcardona / muse public
rga.py python
325 lines 12.4 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Replicated Growable Array (RGA) — CRDT for ordered sequences.
2
3 The RGA (Roh et al., 2011 "Replicated abstract data types") provides
4 Google-Docs-style collaborative editing semantics for any ordered sequence
5 domain. Every element carries a globally unique, immutable identifier
6 ``f"{timestamp}@{author}"``; this identifier determines insertion order when
7 two agents concurrently insert at the same position.
8
9 **Core invariant**: the visible sequence is the list of elements whose
10 ``deleted`` flag is ``False``, in the order determined by their identifiers.
11 Deletions are *tombstoned* (``deleted=True``) rather than physically removed
12 so that identifiers remain stable across replicas.
13
14 **Insertion semantics**: ``insert(after_id, element)`` inserts *element*
15 immediately after the element with ``id == after_id`` (``None`` means
16 prepend). Concurrent inserts at the same position are resolved by sorting
17 the new element's ID lexicographically (descending) — the "bigger" ID wins
18 and is placed first, giving a deterministic outcome independent of delivery
19 order.
20
21 **Lattice laws satisfied by** :meth:`join`:
22 1. Commutativity: ``join(a, b) == join(b, a)``
23 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))``
24 3. Idempotency: ``join(a, a) == a``
25
26 Public API
27 ----------
28 - :class:`RGAElement` — ``TypedDict`` for one array element.
29 - :class:`RGA` — the array itself.
30 """
31
32
33 from __future__ import annotations
34
35 import logging
36 from typing import TypedDict
37
38 logger = logging.getLogger(__name__)
39
40
41 class RGAElement(TypedDict):
42 """A single element in an :class:`RGA`.
43
44 ``id`` is the stable unique identifier ``"{timestamp}@{author}"`` assigned
45 at insertion time. ``value`` is the content hash of the element (it
46 references the object store — all binary content lives there).
47 ``deleted`` is ``True`` for tombstoned elements that no longer appear in
48 the visible sequence.
49
50 ``parent_id`` is the ``id`` of the element this one was inserted after
51 (``None`` means it was prepended — inserted at the head). This is
52 required for the commutative join algorithm to correctly place concurrent
53 inserts regardless of which replica initiates the join.
54 """
55
56 id: str
57 value: str
58 deleted: bool
59 parent_id: str | None
60
61
62 class RGA:
63 """Replicated Growable Array — CRDT for ordered sequences.
64
65 Provides ``insert``, ``delete``, ``join``, and ``to_sequence`` operations.
66 All mutating methods return new :class:`RGA` instances; ``self`` is
67 never modified.
68
69 The internal representation is a list of :class:`RGAElement` dicts in
70 insertion order (not visible order — tombstones are kept inline).
71
72 Example::
73
74 rga = RGA()
75 rga, id_a = rga.insert(None, "note-hash-A") # prepend
76 rga, id_b = rga.insert(id_a, "note-hash-B") # insert after A
77 rga = rga.delete(id_a) # tombstone A
78 assert rga.to_sequence() == ["note-hash-B"]
79 """
80
81 def __init__(self, elements: list[RGAElement] | None = None) -> None:
82 """Construct an RGA, optionally pre-populated.
83
84 Args:
85 elements: Ordered list of :class:`RGAElement` dicts (may contain
86 tombstones). Copied defensively.
87 """
88 self._elements: list[RGAElement] = list(elements) if elements else []
89
90 # ------------------------------------------------------------------
91 # Mutations (return new RGA)
92 # ------------------------------------------------------------------
93
94 def insert(self, after_id: str | None, value: str, *, element_id: str) -> RGA:
95 """Return a new RGA with *value* inserted after *after_id*.
96
97 Concurrent inserts at the same position are resolved by placing the
98 element with the lexicographically *larger* ``element_id`` first.
99
100 Args:
101 after_id: The ``id`` of the element to insert after, or ``None``
102 to prepend (insert before all existing elements).
103 value: The content hash of the new element.
104 element_id: The stable unique ID for the new element; callers
105 should use ``f"{timestamp}@{author}"`` to ensure global
106 uniqueness across agents.
107
108 Returns:
109 A new :class:`RGA` with the element inserted at the correct position.
110 """
111 new_elem: RGAElement = {
112 "id": element_id,
113 "value": value,
114 "deleted": False,
115 "parent_id": after_id,
116 }
117 elems = list(self._elements)
118
119 if after_id is None:
120 # Prepend: among concurrent prepends (same parent_id=None), larger ID goes first.
121 insert_pos = 0
122 while (
123 insert_pos < len(elems)
124 and elems[insert_pos]["parent_id"] is None
125 and elems[insert_pos]["id"] > element_id
126 ):
127 insert_pos += 1
128 elems.insert(insert_pos, new_elem)
129 else:
130 # Find the anchor element.
131 anchor_idx = next(
132 (i for i, e in enumerate(elems) if e["id"] == after_id), None
133 )
134 if anchor_idx is None:
135 # Unknown anchor — append at end (safe degradation).
136 logger.warning("RGA.insert: unknown after_id=%r, appending at end", after_id)
137 elems.append(new_elem)
138 else:
139 # Insert after anchor. Skip any existing elements that also
140 # have the same parent_id AND a larger element ID (concurrent
141 # inserts at the same position; larger ID wins leftmost slot).
142 insert_pos = anchor_idx + 1
143 while (
144 insert_pos < len(elems)
145 and elems[insert_pos]["parent_id"] == after_id
146 and elems[insert_pos]["id"] > element_id
147 ):
148 insert_pos += 1
149 elems.insert(insert_pos, new_elem)
150
151 return RGA(elems)
152
153 def delete(self, element_id: str) -> RGA:
154 """Return a new RGA with *element_id* tombstoned.
155
156 Tombstoning is idempotent — deleting an already-deleted or unknown
157 element is a no-op.
158
159 Args:
160 element_id: The ``id`` of the element to tombstone.
161
162 Returns:
163 A new :class:`RGA` with the element marked ``deleted=True``.
164 """
165 new_elems: list[RGAElement] = []
166 for elem in self._elements:
167 if elem["id"] == element_id:
168 new_elems.append({
169 "id": elem["id"],
170 "value": elem["value"],
171 "deleted": True,
172 "parent_id": elem["parent_id"],
173 })
174 else:
175 new_elems.append({
176 "id": elem["id"],
177 "value": elem["value"],
178 "deleted": elem["deleted"],
179 "parent_id": elem["parent_id"],
180 })
181 return RGA(new_elems)
182
183 # ------------------------------------------------------------------
184 # CRDT join
185 # ------------------------------------------------------------------
186
187 def join(self, other: RGA) -> RGA:
188 """Return the lattice join — the union of both arrays.
189
190 Elements are keyed by ``id``. The join:
191 1. Takes the union of all element IDs from both replicas.
192 2. For each ID, marks the element ``deleted`` if *either* replica has
193 it tombstoned (once deleted, always deleted — monotone).
194 3. Preserves the insertion-order sequence from ``self``; appends any
195 elements from ``other`` not yet seen in ``self``.
196
197 Args:
198 other: The RGA to merge with.
199
200 Returns:
201 A new :class:`RGA` that is the join of ``self`` and *other*.
202 """
203 # Build ID → element maps from both replicas.
204 self_map: dict[str, RGAElement] = {e["id"]: e for e in self._elements}
205 other_map: dict[str, RGAElement] = {e["id"]: e for e in other._elements}
206
207 # Merge deletions monotonically: once deleted in either, always deleted.
208 merged_map: dict[str, RGAElement] = {}
209 all_ids = set(self_map) | set(other_map)
210 for eid in all_ids:
211 if eid in self_map and eid in other_map:
212 s = self_map[eid]
213 o = other_map[eid]
214 # In practice the same element_id always carries the same value
215 # (because element_id = "{timestamp}@{author}" uniquely identifies
216 # a write). If values differ (only possible in crafted test scenarios),
217 # pick the lexicographically larger value for commutativity.
218 winning_value = s["value"] if s["value"] >= o["value"] else o["value"]
219 merged_map[eid] = {
220 "id": eid,
221 "value": winning_value,
222 "deleted": s["deleted"] or o["deleted"],
223 "parent_id": s["parent_id"],
224 }
225 elif eid in self_map:
226 src = self_map[eid]
227 merged_map[eid] = {
228 "id": src["id"],
229 "value": src["value"],
230 "deleted": src["deleted"],
231 "parent_id": src["parent_id"],
232 }
233 else:
234 src = other_map[eid]
235 merged_map[eid] = {
236 "id": src["id"],
237 "value": src["value"],
238 "deleted": src["deleted"],
239 "parent_id": src["parent_id"],
240 }
241
242 # Rebuild a canonical ordered sequence using parent_id links.
243 # Group elements by parent_id. Within each group, sort by ID
244 # descending (larger ID → leftmost, per concurrent-insert tiebreak rule).
245 # Traverse recursively: start with children of None (prepended), then
246 # recurse on each child's children.
247 from collections import defaultdict
248 children: dict[str | None, list[str]] = defaultdict(list)
249 for eid, elem in merged_map.items():
250 children[elem["parent_id"]].append(eid)
251 for group in children.values():
252 group.sort(reverse=True) # larger ID first
253
254 ordered: list[RGAElement] = []
255
256 def _traverse(parent: str | None) -> None:
257 for eid in children.get(parent, []):
258 ordered.append(merged_map[eid])
259 _traverse(eid)
260
261 _traverse(None)
262 return RGA(ordered)
263
264 # ------------------------------------------------------------------
265 # Query
266 # ------------------------------------------------------------------
267
268 def to_sequence(self) -> list[str]:
269 """Return the visible element values (excluding tombstones).
270
271 Returns:
272 List of ``value`` strings in document order, tombstones excluded.
273 """
274 return [e["value"] for e in self._elements if not e["deleted"]]
275
276 def __len__(self) -> int:
277 return len([e for e in self._elements if not e["deleted"]])
278
279 # ------------------------------------------------------------------
280 # Serialisation
281 # ------------------------------------------------------------------
282
283 def to_dict(self) -> list[RGAElement]:
284 """Return a JSON-serialisable list of :class:`RGAElement` dicts.
285
286 Returns:
287 Ordered list of all elements (including tombstones).
288 """
289 return [
290 {"id": e["id"], "value": e["value"], "deleted": e["deleted"], "parent_id": e["parent_id"]}
291 for e in self._elements
292 ]
293
294 @classmethod
295 def from_dict(cls, data: list[RGAElement]) -> RGA:
296 """Reconstruct an :class:`RGA` from its wire representation.
297
298 Args:
299 data: List of :class:`RGAElement` dicts as produced by
300 :meth:`to_dict`.
301
302 Returns:
303 A new :class:`RGA`.
304 """
305 return cls(list(data))
306
307 # ------------------------------------------------------------------
308 # Python dunder helpers
309 # ------------------------------------------------------------------
310
311 def equivalent(self, other: RGA) -> bool:
312 """Return ``True`` if both RGAs have identical element lists (including tombstones).
313
314 Note: use :meth:`to_sequence` comparison when only visible content matters.
315
316 Args:
317 other: The RGA to compare against.
318
319 Returns:
320 ``True`` when the full internal element lists are equal.
321 """
322 return self._elements == other._elements
323
324 def __repr__(self) -> str:
325 return f"RGA(len={len(self)}, elements={self._elements!r})"