cgcardona / muse public
g_counter.py python
183 lines 6.3 KB
4e224376 feat(phase-4): CRDT semantics for convergent multi-agent writes (691 te… Gabriel Cardona <cgcardona@gmail.com> 2d ago
1 """Grow-only Counter (G-Counter) — CRDT for monotonically increasing counts.
2
3 A G-Counter assigns one slot per agent. Each agent may only increment its own
4 slot. The global value is the sum of all slots. ``join`` takes the per-slot
5 maximum (matching the Vector Clock pattern), which is commutative, associative,
6 and idempotent.
7
8 Use cases in Muse:
9 - Commit-count metrics per agent (how many commits has each agent contributed?).
10 - Replay counters (how many times has this element been touched?).
11 - Any monotonically increasing quantity across a distributed agent fleet.
12
13 **Lattice laws satisfied by** :meth:`join`:
14 1. Commutativity: ``join(a, b) == join(b, a)``
15 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))``
16 3. Idempotency: ``join(a, a) == a``
17
18 Proof: ``join`` is ``max`` per slot and ``value`` is ``sum`` — both operations
19 on the non-negative integer lattice are trivially lattice-correct.
20
21 Public API
22 ----------
23 - :class:`GCounterDict` — ``TypedDict`` wire format ``{agent_id: count}``.
24 - :class:`GCounter` — the counter itself.
25 """
26
27 from __future__ import annotations
28
29 import logging
30 from typing import TypedDict
31
32 logger = logging.getLogger(__name__)
33
34
35 class GCounterDict(TypedDict, total=False):
36 """Wire format for a :class:`GCounter` — ``{agent_id: count}``.
37
38 ``total=False`` because absent keys are equivalent to ``0``. Serialise
39 with :meth:`GCounter.to_dict` and deserialise with :meth:`GCounter.from_dict`.
40 """
41
42
43 class GCounter:
44 """Grow-only Counter — a CRDT counter that only ever increases.
45
46 Each agent increments its own private slot; the global value is the sum of
47 all slots. Only the owning agent may increment a slot (this is enforced by
48 convention — not cryptographically).
49
50 All mutating methods return new :class:`GCounter` instances; ``self`` is
51 never modified.
52
53 Example::
54
55 c1 = GCounter().increment("agent-1")
56 c2 = GCounter().increment("agent-2").increment("agent-2")
57 merged = c1.join(c2)
58 assert merged.value() == 3 # 1 from agent-1 + 2 from agent-2
59 assert merged.join(c1).value() == 3 # idempotent
60 """
61
62 def __init__(self, counts: dict[str, int] | None = None) -> None:
63 """Construct a G-Counter, optionally pre-populated.
64
65 Args:
66 counts: Initial ``{agent_id: count}`` mapping. Copied defensively.
67 All values must be non-negative integers.
68 """
69 self._counts: dict[str, int] = dict(counts) if counts else {}
70
71 # ------------------------------------------------------------------
72 # Mutation (returns new GCounter)
73 # ------------------------------------------------------------------
74
75 def increment(self, agent_id: str, by: int = 1) -> GCounter:
76 """Return a new counter with *agent_id*'s slot incremented by *by*.
77
78 Args:
79 agent_id: The agent performing the increment (must be the caller's
80 own agent ID to maintain CRDT invariants).
81 by: Amount to increment; must be a positive integer.
82
83 Returns:
84 A new :class:`GCounter` with the updated slot.
85
86 Raises:
87 ValueError: If *by* is not a positive integer.
88 """
89 if by <= 0:
90 raise ValueError(f"GCounter.increment: 'by' must be positive, got {by}")
91 new_counts = dict(self._counts)
92 new_counts[agent_id] = new_counts.get(agent_id, 0) + by
93 return GCounter(new_counts)
94
95 # ------------------------------------------------------------------
96 # CRDT join
97 # ------------------------------------------------------------------
98
99 def join(self, other: GCounter) -> GCounter:
100 """Return the lattice join — per-slot maximum of ``self`` and *other*.
101
102 Args:
103 other: The counter to merge with.
104
105 Returns:
106 A new :class:`GCounter` holding the per-agent maximum counts.
107 """
108 all_agents = set(self._counts) | set(other._counts)
109 merged = {
110 agent: max(self._counts.get(agent, 0), other._counts.get(agent, 0))
111 for agent in all_agents
112 }
113 return GCounter(merged)
114
115 # ------------------------------------------------------------------
116 # Query
117 # ------------------------------------------------------------------
118
119 def value(self) -> int:
120 """Return the global counter value — the sum of all agent slots.
121
122 Returns:
123 Non-negative integer.
124 """
125 return sum(self._counts.values())
126
127 def value_for(self, agent_id: str) -> int:
128 """Return the count for a specific agent.
129
130 Args:
131 agent_id: The agent to query.
132
133 Returns:
134 The agent's slot value, or ``0`` if the agent has not incremented.
135 """
136 return self._counts.get(agent_id, 0)
137
138 # ------------------------------------------------------------------
139 # Serialisation
140 # ------------------------------------------------------------------
141
142 def to_dict(self) -> dict[str, int]:
143 """Return a JSON-serialisable ``{agent_id: count}`` mapping.
144
145 Returns:
146 A shallow copy of the internal counts dictionary.
147 """
148 return dict(self._counts)
149
150 @classmethod
151 def from_dict(cls, data: dict[str, int]) -> GCounter:
152 """Reconstruct a :class:`GCounter` from its wire representation.
153
154 Args:
155 data: ``{agent_id: count}`` mapping as produced by :meth:`to_dict`.
156
157 Returns:
158 A new :class:`GCounter`.
159 """
160 return cls(data)
161
162 # ------------------------------------------------------------------
163 # Python dunder helpers
164 # ------------------------------------------------------------------
165
166 def equivalent(self, other: GCounter) -> bool:
167 """Return ``True`` if both counters hold identical per-agent counts.
168
169 Args:
170 other: The G-Counter to compare against.
171
172 Returns:
173 ``True`` when every agent slot has the same value in both counters
174 (treating absent agents as count 0).
175 """
176 all_agents = set(self._counts) | set(other._counts)
177 return all(
178 self._counts.get(a, 0) == other._counts.get(a, 0)
179 for a in all_agents
180 )
181
182 def __repr__(self) -> str:
183 return f"GCounter(value={self.value()}, slots={self._counts!r})"