gabriel / muse public
aw_map.py python
257 lines 9.1 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """Add-Wins Map (AW-Map) — CRDT map where adds win over concurrent removes.
2
3 An AW-Map is a dictionary where keys map to arbitrary CRDT values (represented
4 as strings — content hashes referencing the object store). The "add-wins"
5 property means that if agent A sets key K and agent B concurrently removes
6 key K, the merged result still contains K with A's value.
7
8 This is built using the same token-tag mechanism as :class:`~muse.core.crdts.or_set.ORSet`:
9 each key entry carries a set of unique tokens; removal tombstones all observed
10 tokens for that key.
11
12 Use cases in Muse:
13 - File manifests (path → content hash) where agent A can add a file while
14 agent B removes a different file.
15 - Plugin configuration maps (dimension → value) for independent per-dimension
16 settings.
17 - Annotation maps (element_id → annotation blob hash).
18
19 **Lattice laws satisfied by** :meth:`join`:
20 1. Commutativity: ``join(a, b) == join(b, a)``
21 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))``
22 3. Idempotency: ``join(a, a) == a``
23
24 Public API
25 ----------
26 - :class:`AWMapEntry` — ``TypedDict`` for one map entry (key, value, token).
27 - :class:`AWMapDict` — ``TypedDict`` wire format for a complete AW-Map.
28 - :class:`AWMap` — the map itself.
29 """
30
31
32 import logging
33 import uuid
34 from typing import TypedDict
35
36 logger = logging.getLogger(__name__)
37
38
39 class AWMapEntry(TypedDict):
40 """A single (key, value, token) triple in an :class:`AWMap`.
41
42 ``key`` is the map key (e.g. a file path or dimension name).
43 ``value`` is the associated value (e.g. a content hash).
44 ``token`` is the unique identifier of this specific *setting* of the key;
45 it is regenerated on every ``set`` call so concurrent sets of the same key
46 by different agents can be distinguished.
47 """
48
49 key: str
50 value: str
51 token: str
52
53
54 class AWMapDict(TypedDict):
55 """Wire format for a complete :class:`AWMap`.
56
57 ``entries`` holds all live ``(key, value, token)`` triples.
58 ``tombstones`` holds all token strings that have been removed.
59 """
60
61 entries: list[AWMapEntry]
62 tombstones: list[str]
63
64
65 class AWMap:
66 """Add-Wins Map — an unordered map CRDT where adds win over concurrent removes.
67
68 Keys and values are strings. Each logical key may temporarily have
69 multiple (value, token) pairs during concurrent writes; the visible value
70 for a key is resolved by taking the entry with the lexicographically
71 greatest token among all live entries for that key. This gives a
72 deterministic LWW-like resolution for concurrent writes to the same key
73 without requiring wall-clock timestamps.
74
75 All mutating methods return new :class:`AWMap` instances; ``self`` is
76 never modified.
77
78 Example::
79
80 m = AWMap()
81 m = m.set("tempo", "120bpm")
82 m = m.set("key", "C major")
83 assert m.get("tempo") == "120bpm"
84 assert m.get("key") == "C major"
85 assert m.remove("tempo").get("tempo") is None
86 """
87
88 def __init__(
89 self,
90 entries: set[tuple[str, str, str]] | None = None,
91 tombstones: set[str] | None = None,
92 ) -> None:
93 """Initialise an AW-Map, optionally pre-populated.
94
95 Args:
96 entries: Set of ``(key, value, token)`` triples (live entries).
97 tombstones: Set of removed token strings.
98 """
99 self._entries: set[tuple[str, str, str]] = set(entries) if entries else set()
100 self._tombstones: set[str] = set(tombstones) if tombstones else set()
101
102 # ------------------------------------------------------------------
103 # Mutations (return new AWMap)
104 # ------------------------------------------------------------------
105
106 def set(self, key: str, value: str) -> AWMap:
107 """Set *key* to *value*, replacing all existing live entries for *key*.
108
109 Old tokens for *key* are tombstoned; a new token is generated for the
110 new value, giving the add-wins property for concurrent operations.
111
112 Args:
113 key: The map key to set.
114 value: The new value to associate with *key*.
115
116 Returns:
117 A new :class:`AWMap` with *key* updated to *value*.
118 """
119 # Tombstone all existing live entries for key
120 existing_tokens = {t for k, v, t in self._entries if k == key and t not in self._tombstones}
121 new_tombstones = self._tombstones | existing_tokens
122 new_entries = {e for e in self._entries if not (e[0] == key and e[2] in existing_tokens)}
123 # Add new entry with fresh token
124 new_token = str(uuid.uuid4())
125 new_entries.add((key, value, new_token))
126 return AWMap(new_entries, new_tombstones)
127
128 def remove(self, key: str) -> AWMap:
129 """Remove *key* by tombstoning all currently observed tokens for it.
130
131 Concurrent adds with new tokens survive this remove.
132
133 Args:
134 key: The map key to remove.
135
136 Returns:
137 A new :class:`AWMap` with *key* removed.
138 """
139 observed_tokens = {t for k, v, t in self._entries if k == key}
140 new_tombstones = self._tombstones | observed_tokens
141 new_entries = {e for e in self._entries if not (e[0] == key)}
142 return AWMap(new_entries, new_tombstones)
143
144 # ------------------------------------------------------------------
145 # CRDT join
146 # ------------------------------------------------------------------
147
148 def join(self, other: AWMap) -> AWMap:
149 """Return the lattice join — union of entries minus all tombstones.
150
151 Args:
152 other: The AW-Map to merge with.
153
154 Returns:
155 A new :class:`AWMap` that is the join of ``self`` and *other*.
156 """
157 all_tombstones = self._tombstones | other._tombstones
158 all_raw_entries = self._entries | other._entries
159 live_entries = {e for e in all_raw_entries if e[2] not in all_tombstones}
160 return AWMap(live_entries, all_tombstones)
161
162 # ------------------------------------------------------------------
163 # Query
164 # ------------------------------------------------------------------
165
166 def get(self, key: str) -> str | None:
167 """Return the current value for *key*, or ``None`` if absent.
168
169 When multiple live entries exist for *key* (due to concurrent un-joined
170 writes), the one with the lexicographically greatest token is returned.
171 This gives a deterministic, consistent result without wall-clock time.
172
173 Args:
174 key: The map key to look up.
175
176 Returns:
177 The value string, or ``None`` if *key* has no live entry.
178 """
179 live = [(v, t) for k, v, t in self._entries if k == key and t not in self._tombstones]
180 if not live:
181 return None
182 return max(live, key=lambda pair: pair[1])[0]
183
184 def keys(self) -> frozenset[str]:
185 """Return the set of keys with at least one live entry.
186
187 Returns:
188 Frozenset of key strings currently in the map.
189 """
190 return frozenset(k for k, v, t in self._entries if t not in self._tombstones)
191
192 def to_plain_dict(self) -> dict[str, str]:
193 """Return a plain ``{key: value}`` dict of visible entries.
194
195 Concurrent-write conflicts are resolved by lexicographic token order
196 (the same rule as :meth:`get`).
197
198 Returns:
199 ``{key: resolved_value}`` for all live keys.
200 """
201 result: dict[str, str] = {}
202 for k in self.keys():
203 v = self.get(k)
204 if v is not None:
205 result[k] = v
206 return result
207
208 def __contains__(self, key: str) -> bool:
209 return key in self.keys()
210
211 # ------------------------------------------------------------------
212 # Serialisation
213 # ------------------------------------------------------------------
214
215 def to_dict(self) -> AWMapDict:
216 """Return a JSON-serialisable :class:`AWMapDict`.
217
218 Returns:
219 Dict with ``"entries"`` and ``"tombstones"`` lists.
220 """
221 entries: list[AWMapEntry] = [
222 {"key": k, "value": v, "token": t}
223 for k, v, t in sorted(self._entries)
224 ]
225 return {"entries": entries, "tombstones": sorted(self._tombstones)}
226
227 @classmethod
228 def from_dict(cls, data: AWMapDict) -> AWMap:
229 """Reconstruct an :class:`AWMap` from its wire representation.
230
231 Args:
232 data: Dict as produced by :meth:`to_dict`.
233
234 Returns:
235 A new :class:`AWMap`.
236 """
237 entries = {(e["key"], e["value"], e["token"]) for e in data["entries"]}
238 tombstones = set(data["tombstones"])
239 return cls(entries, tombstones)
240
241 # ------------------------------------------------------------------
242 # Python dunder helpers
243 # ------------------------------------------------------------------
244
245 def equivalent(self, other: AWMap) -> bool:
246 """Return ``True`` if both AW-Maps have the same visible key-value pairs and tombstones.
247
248 Args:
249 other: The AW-Map to compare against.
250
251 Returns:
252 ``True`` when plain dict views and tombstone sets are identical.
253 """
254 return self.to_plain_dict() == other.to_plain_dict() and self._tombstones == other._tombstones
255
256 def __repr__(self) -> str:
257 return f"AWMap(keys={set(self.keys())!r})"