rga.py
python
| 1 | """Replicated Growable Array (RGA) — CRDT for ordered sequences. |
| 2 | |
| 3 | The RGA (Roh et al., 2011 "Replicated abstract data types") provides |
| 4 | Google-Docs-style collaborative editing semantics for any ordered sequence |
| 5 | domain. Every element carries a globally unique, immutable identifier |
| 6 | ``f"{timestamp}@{author}"``; this identifier determines insertion order when |
| 7 | two agents concurrently insert at the same position. |
| 8 | |
| 9 | **Core invariant**: the visible sequence is the list of elements whose |
| 10 | ``deleted`` flag is ``False``, in the order determined by their identifiers. |
| 11 | Deletions are *tombstoned* (``deleted=True``) rather than physically removed |
| 12 | so that identifiers remain stable across replicas. |
| 13 | |
| 14 | **Insertion semantics**: ``insert(after_id, element)`` inserts *element* |
| 15 | immediately after the element with ``id == after_id`` (``None`` means |
| 16 | prepend). Concurrent inserts at the same position are resolved by sorting |
| 17 | the new element's ID lexicographically (descending) — the "bigger" ID wins |
| 18 | and is placed first, giving a deterministic outcome independent of delivery |
| 19 | order. |
| 20 | |
| 21 | **Lattice laws satisfied by** :meth:`join`: |
| 22 | 1. Commutativity: ``join(a, b) == join(b, a)`` |
| 23 | 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))`` |
| 24 | 3. Idempotency: ``join(a, a) == a`` |
| 25 | |
| 26 | Public API |
| 27 | ---------- |
| 28 | - :class:`RGAElement` — ``TypedDict`` for one array element. |
| 29 | - :class:`RGA` — the array itself. |
| 30 | """ |
| 31 | |
| 32 | |
| 33 | from __future__ import annotations |
| 34 | |
| 35 | import logging |
| 36 | from typing import TypedDict |
| 37 | |
| 38 | logger = logging.getLogger(__name__) |
| 39 | |
| 40 | |
| 41 | class RGAElement(TypedDict): |
| 42 | """A single element in an :class:`RGA`. |
| 43 | |
| 44 | ``id`` is the stable unique identifier ``"{timestamp}@{author}"`` assigned |
| 45 | at insertion time. ``value`` is the content hash of the element (it |
| 46 | references the object store — all binary content lives there). |
| 47 | ``deleted`` is ``True`` for tombstoned elements that no longer appear in |
| 48 | the visible sequence. |
| 49 | |
| 50 | ``parent_id`` is the ``id`` of the element this one was inserted after |
| 51 | (``None`` means it was prepended — inserted at the head). This is |
| 52 | required for the commutative join algorithm to correctly place concurrent |
| 53 | inserts regardless of which replica initiates the join. |
| 54 | """ |
| 55 | |
| 56 | id: str |
| 57 | value: str |
| 58 | deleted: bool |
| 59 | parent_id: str | None |
| 60 | |
| 61 | |
| 62 | class RGA: |
| 63 | """Replicated Growable Array — CRDT for ordered sequences. |
| 64 | |
| 65 | Provides ``insert``, ``delete``, ``join``, and ``to_sequence`` operations. |
| 66 | All mutating methods return new :class:`RGA` instances; ``self`` is |
| 67 | never modified. |
| 68 | |
| 69 | The internal representation is a list of :class:`RGAElement` dicts in |
| 70 | insertion order (not visible order — tombstones are kept inline). |
| 71 | |
| 72 | Example:: |
| 73 | |
| 74 | rga = RGA() |
| 75 | rga, id_a = rga.insert(None, "note-hash-A") # prepend |
| 76 | rga, id_b = rga.insert(id_a, "note-hash-B") # insert after A |
| 77 | rga = rga.delete(id_a) # tombstone A |
| 78 | assert rga.to_sequence() == ["note-hash-B"] |
| 79 | """ |
| 80 | |
| 81 | def __init__(self, elements: list[RGAElement] | None = None) -> None: |
| 82 | """Construct an RGA, optionally pre-populated. |
| 83 | |
| 84 | Args: |
| 85 | elements: Ordered list of :class:`RGAElement` dicts (may contain |
| 86 | tombstones). Copied defensively. |
| 87 | """ |
| 88 | self._elements: list[RGAElement] = list(elements) if elements else [] |
| 89 | |
| 90 | # ------------------------------------------------------------------ |
| 91 | # Mutations (return new RGA) |
| 92 | # ------------------------------------------------------------------ |
| 93 | |
| 94 | def insert(self, after_id: str | None, value: str, *, element_id: str) -> RGA: |
| 95 | """Return a new RGA with *value* inserted after *after_id*. |
| 96 | |
| 97 | Concurrent inserts at the same position are resolved by placing the |
| 98 | element with the lexicographically *larger* ``element_id`` first. |
| 99 | |
| 100 | Args: |
| 101 | after_id: The ``id`` of the element to insert after, or ``None`` |
| 102 | to prepend (insert before all existing elements). |
| 103 | value: The content hash of the new element. |
| 104 | element_id: The stable unique ID for the new element; callers |
| 105 | should use ``f"{timestamp}@{author}"`` to ensure global |
| 106 | uniqueness across agents. |
| 107 | |
| 108 | Returns: |
| 109 | A new :class:`RGA` with the element inserted at the correct position. |
| 110 | """ |
| 111 | new_elem: RGAElement = { |
| 112 | "id": element_id, |
| 113 | "value": value, |
| 114 | "deleted": False, |
| 115 | "parent_id": after_id, |
| 116 | } |
| 117 | elems = list(self._elements) |
| 118 | |
| 119 | if after_id is None: |
| 120 | # Prepend: among concurrent prepends (same parent_id=None), larger ID goes first. |
| 121 | insert_pos = 0 |
| 122 | while ( |
| 123 | insert_pos < len(elems) |
| 124 | and elems[insert_pos]["parent_id"] is None |
| 125 | and elems[insert_pos]["id"] > element_id |
| 126 | ): |
| 127 | insert_pos += 1 |
| 128 | elems.insert(insert_pos, new_elem) |
| 129 | else: |
| 130 | # Find the anchor element. |
| 131 | anchor_idx = next( |
| 132 | (i for i, e in enumerate(elems) if e["id"] == after_id), None |
| 133 | ) |
| 134 | if anchor_idx is None: |
| 135 | # Unknown anchor — append at end (safe degradation). |
| 136 | logger.warning("RGA.insert: unknown after_id=%r, appending at end", after_id) |
| 137 | elems.append(new_elem) |
| 138 | else: |
| 139 | # Insert after anchor. Skip any existing elements that also |
| 140 | # have the same parent_id AND a larger element ID (concurrent |
| 141 | # inserts at the same position; larger ID wins leftmost slot). |
| 142 | insert_pos = anchor_idx + 1 |
| 143 | while ( |
| 144 | insert_pos < len(elems) |
| 145 | and elems[insert_pos]["parent_id"] == after_id |
| 146 | and elems[insert_pos]["id"] > element_id |
| 147 | ): |
| 148 | insert_pos += 1 |
| 149 | elems.insert(insert_pos, new_elem) |
| 150 | |
| 151 | return RGA(elems) |
| 152 | |
| 153 | def delete(self, element_id: str) -> RGA: |
| 154 | """Return a new RGA with *element_id* tombstoned. |
| 155 | |
| 156 | Tombstoning is idempotent — deleting an already-deleted or unknown |
| 157 | element is a no-op. |
| 158 | |
| 159 | Args: |
| 160 | element_id: The ``id`` of the element to tombstone. |
| 161 | |
| 162 | Returns: |
| 163 | A new :class:`RGA` with the element marked ``deleted=True``. |
| 164 | """ |
| 165 | new_elems: list[RGAElement] = [] |
| 166 | for elem in self._elements: |
| 167 | if elem["id"] == element_id: |
| 168 | new_elems.append({ |
| 169 | "id": elem["id"], |
| 170 | "value": elem["value"], |
| 171 | "deleted": True, |
| 172 | "parent_id": elem["parent_id"], |
| 173 | }) |
| 174 | else: |
| 175 | new_elems.append({ |
| 176 | "id": elem["id"], |
| 177 | "value": elem["value"], |
| 178 | "deleted": elem["deleted"], |
| 179 | "parent_id": elem["parent_id"], |
| 180 | }) |
| 181 | return RGA(new_elems) |
| 182 | |
| 183 | # ------------------------------------------------------------------ |
| 184 | # CRDT join |
| 185 | # ------------------------------------------------------------------ |
| 186 | |
| 187 | def join(self, other: RGA) -> RGA: |
| 188 | """Return the lattice join — the union of both arrays. |
| 189 | |
| 190 | Elements are keyed by ``id``. The join: |
| 191 | 1. Takes the union of all element IDs from both replicas. |
| 192 | 2. For each ID, marks the element ``deleted`` if *either* replica has |
| 193 | it tombstoned (once deleted, always deleted — monotone). |
| 194 | 3. Preserves the insertion-order sequence from ``self``; appends any |
| 195 | elements from ``other`` not yet seen in ``self``. |
| 196 | |
| 197 | Args: |
| 198 | other: The RGA to merge with. |
| 199 | |
| 200 | Returns: |
| 201 | A new :class:`RGA` that is the join of ``self`` and *other*. |
| 202 | """ |
| 203 | # Build ID → element maps from both replicas. |
| 204 | self_map: dict[str, RGAElement] = {e["id"]: e for e in self._elements} |
| 205 | other_map: dict[str, RGAElement] = {e["id"]: e for e in other._elements} |
| 206 | |
| 207 | # Merge deletions monotonically: once deleted in either, always deleted. |
| 208 | merged_map: dict[str, RGAElement] = {} |
| 209 | all_ids = set(self_map) | set(other_map) |
| 210 | for eid in all_ids: |
| 211 | if eid in self_map and eid in other_map: |
| 212 | s = self_map[eid] |
| 213 | o = other_map[eid] |
| 214 | # In practice the same element_id always carries the same value |
| 215 | # (because element_id = "{timestamp}@{author}" uniquely identifies |
| 216 | # a write). If values differ (only possible in crafted test scenarios), |
| 217 | # pick the lexicographically larger value for commutativity. |
| 218 | winning_value = s["value"] if s["value"] >= o["value"] else o["value"] |
| 219 | merged_map[eid] = { |
| 220 | "id": eid, |
| 221 | "value": winning_value, |
| 222 | "deleted": s["deleted"] or o["deleted"], |
| 223 | "parent_id": s["parent_id"], |
| 224 | } |
| 225 | elif eid in self_map: |
| 226 | src = self_map[eid] |
| 227 | merged_map[eid] = { |
| 228 | "id": src["id"], |
| 229 | "value": src["value"], |
| 230 | "deleted": src["deleted"], |
| 231 | "parent_id": src["parent_id"], |
| 232 | } |
| 233 | else: |
| 234 | src = other_map[eid] |
| 235 | merged_map[eid] = { |
| 236 | "id": src["id"], |
| 237 | "value": src["value"], |
| 238 | "deleted": src["deleted"], |
| 239 | "parent_id": src["parent_id"], |
| 240 | } |
| 241 | |
| 242 | # Rebuild a canonical ordered sequence using parent_id links. |
| 243 | # Group elements by parent_id. Within each group, sort by ID |
| 244 | # descending (larger ID → leftmost, per concurrent-insert tiebreak rule). |
| 245 | # Traverse recursively: start with children of None (prepended), then |
| 246 | # recurse on each child's children. |
| 247 | from collections import defaultdict |
| 248 | children: dict[str | None, list[str]] = defaultdict(list) |
| 249 | for eid, elem in merged_map.items(): |
| 250 | children[elem["parent_id"]].append(eid) |
| 251 | for group in children.values(): |
| 252 | group.sort(reverse=True) # larger ID first |
| 253 | |
| 254 | ordered: list[RGAElement] = [] |
| 255 | |
| 256 | def _traverse(parent: str | None) -> None: |
| 257 | for eid in children.get(parent, []): |
| 258 | ordered.append(merged_map[eid]) |
| 259 | _traverse(eid) |
| 260 | |
| 261 | _traverse(None) |
| 262 | return RGA(ordered) |
| 263 | |
| 264 | # ------------------------------------------------------------------ |
| 265 | # Query |
| 266 | # ------------------------------------------------------------------ |
| 267 | |
| 268 | def to_sequence(self) -> list[str]: |
| 269 | """Return the visible element values (excluding tombstones). |
| 270 | |
| 271 | Returns: |
| 272 | List of ``value`` strings in document order, tombstones excluded. |
| 273 | """ |
| 274 | return [e["value"] for e in self._elements if not e["deleted"]] |
| 275 | |
| 276 | def __len__(self) -> int: |
| 277 | return len([e for e in self._elements if not e["deleted"]]) |
| 278 | |
| 279 | # ------------------------------------------------------------------ |
| 280 | # Serialisation |
| 281 | # ------------------------------------------------------------------ |
| 282 | |
| 283 | def to_dict(self) -> list[RGAElement]: |
| 284 | """Return a JSON-serialisable list of :class:`RGAElement` dicts. |
| 285 | |
| 286 | Returns: |
| 287 | Ordered list of all elements (including tombstones). |
| 288 | """ |
| 289 | return [ |
| 290 | {"id": e["id"], "value": e["value"], "deleted": e["deleted"], "parent_id": e["parent_id"]} |
| 291 | for e in self._elements |
| 292 | ] |
| 293 | |
| 294 | @classmethod |
| 295 | def from_dict(cls, data: list[RGAElement]) -> RGA: |
| 296 | """Reconstruct an :class:`RGA` from its wire representation. |
| 297 | |
| 298 | Args: |
| 299 | data: List of :class:`RGAElement` dicts as produced by |
| 300 | :meth:`to_dict`. |
| 301 | |
| 302 | Returns: |
| 303 | A new :class:`RGA`. |
| 304 | """ |
| 305 | return cls(list(data)) |
| 306 | |
| 307 | # ------------------------------------------------------------------ |
| 308 | # Python dunder helpers |
| 309 | # ------------------------------------------------------------------ |
| 310 | |
| 311 | def equivalent(self, other: RGA) -> bool: |
| 312 | """Return ``True`` if both RGAs have identical element lists (including tombstones). |
| 313 | |
| 314 | Note: use :meth:`to_sequence` comparison when only visible content matters. |
| 315 | |
| 316 | Args: |
| 317 | other: The RGA to compare against. |
| 318 | |
| 319 | Returns: |
| 320 | ``True`` when the full internal element lists are equal. |
| 321 | """ |
| 322 | return self._elements == other._elements |
| 323 | |
| 324 | def __repr__(self) -> str: |
| 325 | return f"RGA(len={len(self)}, elements={self._elements!r})" |