aw_map.py
python
| 1 | """Add-Wins Map (AW-Map) — CRDT map where adds win over concurrent removes. |
| 2 | |
| 3 | An AW-Map is a dictionary where keys map to arbitrary CRDT values (represented |
| 4 | as strings — content hashes referencing the object store). The "add-wins" |
| 5 | property means that if agent A sets key K and agent B concurrently removes |
| 6 | key K, the merged result still contains K with A's value. |
| 7 | |
| 8 | This is built using the same token-tag mechanism as :class:`~muse.core.crdts.or_set.ORSet`: |
| 9 | each key entry carries a set of unique tokens; removal tombstones all observed |
| 10 | tokens for that key. |
| 11 | |
| 12 | Use cases in Muse: |
| 13 | - File manifests (path → content hash) where agent A can add a file while |
| 14 | agent B removes a different file. |
| 15 | - Plugin configuration maps (dimension → value) for independent per-dimension |
| 16 | settings. |
| 17 | - Annotation maps (element_id → annotation blob hash). |
| 18 | |
| 19 | **Lattice laws satisfied by** :meth:`join`: |
| 20 | 1. Commutativity: ``join(a, b) == join(b, a)`` |
| 21 | 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))`` |
| 22 | 3. Idempotency: ``join(a, a) == a`` |
| 23 | |
| 24 | Public API |
| 25 | ---------- |
| 26 | - :class:`AWMapEntry` — ``TypedDict`` for one map entry (key, value, token). |
| 27 | - :class:`AWMapDict` — ``TypedDict`` wire format for a complete AW-Map. |
| 28 | - :class:`AWMap` — the map itself. |
| 29 | """ |
| 30 | |
| 31 | |
| 32 | import logging |
| 33 | import uuid |
| 34 | from typing import TypedDict |
| 35 | |
| 36 | logger = logging.getLogger(__name__) |
| 37 | |
| 38 | |
| 39 | class AWMapEntry(TypedDict): |
| 40 | """A single (key, value, token) triple in an :class:`AWMap`. |
| 41 | |
| 42 | ``key`` is the map key (e.g. a file path or dimension name). |
| 43 | ``value`` is the associated value (e.g. a content hash). |
| 44 | ``token`` is the unique identifier of this specific *setting* of the key; |
| 45 | it is regenerated on every ``set`` call so concurrent sets of the same key |
| 46 | by different agents can be distinguished. |
| 47 | """ |
| 48 | |
| 49 | key: str |
| 50 | value: str |
| 51 | token: str |
| 52 | |
| 53 | |
| 54 | class AWMapDict(TypedDict): |
| 55 | """Wire format for a complete :class:`AWMap`. |
| 56 | |
| 57 | ``entries`` holds all live ``(key, value, token)`` triples. |
| 58 | ``tombstones`` holds all token strings that have been removed. |
| 59 | """ |
| 60 | |
| 61 | entries: list[AWMapEntry] |
| 62 | tombstones: list[str] |
| 63 | |
| 64 | |
| 65 | class AWMap: |
| 66 | """Add-Wins Map — an unordered map CRDT where adds win over concurrent removes. |
| 67 | |
| 68 | Keys and values are strings. Each logical key may temporarily have |
| 69 | multiple (value, token) pairs during concurrent writes; the visible value |
| 70 | for a key is resolved by taking the entry with the lexicographically |
| 71 | greatest token among all live entries for that key. This gives a |
| 72 | deterministic LWW-like resolution for concurrent writes to the same key |
| 73 | without requiring wall-clock timestamps. |
| 74 | |
| 75 | All mutating methods return new :class:`AWMap` instances; ``self`` is |
| 76 | never modified. |
| 77 | |
| 78 | Example:: |
| 79 | |
| 80 | m = AWMap() |
| 81 | m = m.set("tempo", "120bpm") |
| 82 | m = m.set("key", "C major") |
| 83 | assert m.get("tempo") == "120bpm" |
| 84 | assert m.get("key") == "C major" |
| 85 | assert m.remove("tempo").get("tempo") is None |
| 86 | """ |
| 87 | |
| 88 | def __init__( |
| 89 | self, |
| 90 | entries: set[tuple[str, str, str]] | None = None, |
| 91 | tombstones: set[str] | None = None, |
| 92 | ) -> None: |
| 93 | """Initialise an AW-Map, optionally pre-populated. |
| 94 | |
| 95 | Args: |
| 96 | entries: Set of ``(key, value, token)`` triples (live entries). |
| 97 | tombstones: Set of removed token strings. |
| 98 | """ |
| 99 | self._entries: set[tuple[str, str, str]] = set(entries) if entries else set() |
| 100 | self._tombstones: set[str] = set(tombstones) if tombstones else set() |
| 101 | |
| 102 | # ------------------------------------------------------------------ |
| 103 | # Mutations (return new AWMap) |
| 104 | # ------------------------------------------------------------------ |
| 105 | |
| 106 | def set(self, key: str, value: str) -> AWMap: |
| 107 | """Set *key* to *value*, replacing all existing live entries for *key*. |
| 108 | |
| 109 | Old tokens for *key* are tombstoned; a new token is generated for the |
| 110 | new value, giving the add-wins property for concurrent operations. |
| 111 | |
| 112 | Args: |
| 113 | key: The map key to set. |
| 114 | value: The new value to associate with *key*. |
| 115 | |
| 116 | Returns: |
| 117 | A new :class:`AWMap` with *key* updated to *value*. |
| 118 | """ |
| 119 | # Tombstone all existing live entries for key |
| 120 | existing_tokens = {t for k, v, t in self._entries if k == key and t not in self._tombstones} |
| 121 | new_tombstones = self._tombstones | existing_tokens |
| 122 | new_entries = {e for e in self._entries if not (e[0] == key and e[2] in existing_tokens)} |
| 123 | # Add new entry with fresh token |
| 124 | new_token = str(uuid.uuid4()) |
| 125 | new_entries.add((key, value, new_token)) |
| 126 | return AWMap(new_entries, new_tombstones) |
| 127 | |
| 128 | def remove(self, key: str) -> AWMap: |
| 129 | """Remove *key* by tombstoning all currently observed tokens for it. |
| 130 | |
| 131 | Concurrent adds with new tokens survive this remove. |
| 132 | |
| 133 | Args: |
| 134 | key: The map key to remove. |
| 135 | |
| 136 | Returns: |
| 137 | A new :class:`AWMap` with *key* removed. |
| 138 | """ |
| 139 | observed_tokens = {t for k, v, t in self._entries if k == key} |
| 140 | new_tombstones = self._tombstones | observed_tokens |
| 141 | new_entries = {e for e in self._entries if not (e[0] == key)} |
| 142 | return AWMap(new_entries, new_tombstones) |
| 143 | |
| 144 | # ------------------------------------------------------------------ |
| 145 | # CRDT join |
| 146 | # ------------------------------------------------------------------ |
| 147 | |
| 148 | def join(self, other: AWMap) -> AWMap: |
| 149 | """Return the lattice join — union of entries minus all tombstones. |
| 150 | |
| 151 | Args: |
| 152 | other: The AW-Map to merge with. |
| 153 | |
| 154 | Returns: |
| 155 | A new :class:`AWMap` that is the join of ``self`` and *other*. |
| 156 | """ |
| 157 | all_tombstones = self._tombstones | other._tombstones |
| 158 | all_raw_entries = self._entries | other._entries |
| 159 | live_entries = {e for e in all_raw_entries if e[2] not in all_tombstones} |
| 160 | return AWMap(live_entries, all_tombstones) |
| 161 | |
| 162 | # ------------------------------------------------------------------ |
| 163 | # Query |
| 164 | # ------------------------------------------------------------------ |
| 165 | |
| 166 | def get(self, key: str) -> str | None: |
| 167 | """Return the current value for *key*, or ``None`` if absent. |
| 168 | |
| 169 | When multiple live entries exist for *key* (due to concurrent un-joined |
| 170 | writes), the one with the lexicographically greatest token is returned. |
| 171 | This gives a deterministic, consistent result without wall-clock time. |
| 172 | |
| 173 | Args: |
| 174 | key: The map key to look up. |
| 175 | |
| 176 | Returns: |
| 177 | The value string, or ``None`` if *key* has no live entry. |
| 178 | """ |
| 179 | live = [(v, t) for k, v, t in self._entries if k == key and t not in self._tombstones] |
| 180 | if not live: |
| 181 | return None |
| 182 | return max(live, key=lambda pair: pair[1])[0] |
| 183 | |
| 184 | def keys(self) -> frozenset[str]: |
| 185 | """Return the set of keys with at least one live entry. |
| 186 | |
| 187 | Returns: |
| 188 | Frozenset of key strings currently in the map. |
| 189 | """ |
| 190 | return frozenset(k for k, v, t in self._entries if t not in self._tombstones) |
| 191 | |
| 192 | def to_plain_dict(self) -> dict[str, str]: |
| 193 | """Return a plain ``{key: value}`` dict of visible entries. |
| 194 | |
| 195 | Concurrent-write conflicts are resolved by lexicographic token order |
| 196 | (the same rule as :meth:`get`). |
| 197 | |
| 198 | Returns: |
| 199 | ``{key: resolved_value}`` for all live keys. |
| 200 | """ |
| 201 | result: dict[str, str] = {} |
| 202 | for k in self.keys(): |
| 203 | v = self.get(k) |
| 204 | if v is not None: |
| 205 | result[k] = v |
| 206 | return result |
| 207 | |
| 208 | def __contains__(self, key: str) -> bool: |
| 209 | return key in self.keys() |
| 210 | |
| 211 | # ------------------------------------------------------------------ |
| 212 | # Serialisation |
| 213 | # ------------------------------------------------------------------ |
| 214 | |
| 215 | def to_dict(self) -> AWMapDict: |
| 216 | """Return a JSON-serialisable :class:`AWMapDict`. |
| 217 | |
| 218 | Returns: |
| 219 | Dict with ``"entries"`` and ``"tombstones"`` lists. |
| 220 | """ |
| 221 | entries: list[AWMapEntry] = [ |
| 222 | {"key": k, "value": v, "token": t} |
| 223 | for k, v, t in sorted(self._entries) |
| 224 | ] |
| 225 | return {"entries": entries, "tombstones": sorted(self._tombstones)} |
| 226 | |
| 227 | @classmethod |
| 228 | def from_dict(cls, data: AWMapDict) -> AWMap: |
| 229 | """Reconstruct an :class:`AWMap` from its wire representation. |
| 230 | |
| 231 | Args: |
| 232 | data: Dict as produced by :meth:`to_dict`. |
| 233 | |
| 234 | Returns: |
| 235 | A new :class:`AWMap`. |
| 236 | """ |
| 237 | entries = {(e["key"], e["value"], e["token"]) for e in data["entries"]} |
| 238 | tombstones = set(data["tombstones"]) |
| 239 | return cls(entries, tombstones) |
| 240 | |
| 241 | # ------------------------------------------------------------------ |
| 242 | # Python dunder helpers |
| 243 | # ------------------------------------------------------------------ |
| 244 | |
| 245 | def equivalent(self, other: AWMap) -> bool: |
| 246 | """Return ``True`` if both AW-Maps have the same visible key-value pairs and tombstones. |
| 247 | |
| 248 | Args: |
| 249 | other: The AW-Map to compare against. |
| 250 | |
| 251 | Returns: |
| 252 | ``True`` when plain dict views and tombstone sets are identical. |
| 253 | """ |
| 254 | return self.to_plain_dict() == other.to_plain_dict() and self._tombstones == other._tombstones |
| 255 | |
| 256 | def __repr__(self) -> str: |
| 257 | return f"AWMap(keys={set(self.keys())!r})" |