vclock.py
python
| 1 | """Vector clock for causal ordering in distributed multi-agent writes. |
| 2 | |
| 3 | A vector clock (Lamport, 1978 / Fidge, 1988) tracks how many events each |
| 4 | agent has observed. Two clocks can be compared to determine whether one |
| 5 | *causally precedes* the other or whether they are *concurrent* (neither |
| 6 | dominates). |
| 7 | |
| 8 | This is the foundational primitive for all CRDT coordination in Muse: |
| 9 | |
| 10 | - :class:`LWWRegister` uses vector clock comparison to break same-timestamp |
| 11 | ties deterministically. |
| 12 | - :class:`RGA` uses ``agent_id`` for deterministic concurrent-insert ordering. |
| 13 | - The ``CRDTPlugin.join()`` protocol uses causal ordering to detect which |
| 14 | writes truly conflict vs. which are simply out-of-delivery-order. |
| 15 | |
| 16 | Public API |
| 17 | ---------- |
| 18 | - :class:`VClockDict` — ``TypedDict`` wire format ``{agent_id: count}``. |
| 19 | - :class:`VectorClock` — the clock itself, with ``increment``, ``merge``, |
| 20 | ``happens_before``, ``concurrent_with``, ``to_dict``, ``from_dict``. |
| 21 | """ |
| 22 | |
| 23 | from __future__ import annotations |
| 24 | |
| 25 | from typing import TypedDict |
| 26 | |
| 27 | |
| 28 | class VClockDict(TypedDict, total=False): |
| 29 | """Wire format for a vector clock — ``{agent_id: event_count}``. |
| 30 | |
| 31 | ``total=False`` because the presence of a key is meaningful (an absent key |
| 32 | is equivalent to the value ``0``). Serialise with :meth:`VectorClock.to_dict` |
| 33 | and deserialise with :meth:`VectorClock.from_dict`. |
| 34 | """ |
| 35 | |
| 36 | class VectorClock: |
| 37 | """Causal clock for distributed agent writes. |
| 38 | |
| 39 | Stores a mapping from agent identifiers (arbitrary strings) to the number |
| 40 | of events that agent has performed. An absent agent is equivalent to |
| 41 | count ``0``. |
| 42 | |
| 43 | Instances are **immutable** from the outside: every mutating method returns |
| 44 | a new :class:`VectorClock` rather than modifying ``self``. This makes |
| 45 | clocks safe to store as dict values without defensive copying. |
| 46 | |
| 47 | Lattice laws satisfied by :meth:`merge`: |
| 48 | - **Commutativity**: ``merge(a, b) == merge(b, a)`` |
| 49 | - **Associativity**: ``merge(merge(a, b), c) == merge(a, merge(b, c))`` |
| 50 | - **Idempotency**: ``merge(a, a) == a`` |
| 51 | """ |
| 52 | |
| 53 | def __init__(self, counts: dict[str, int] | None = None) -> None: |
| 54 | """Create a vector clock, optionally pre-populated from *counts*. |
| 55 | |
| 56 | Args: |
| 57 | counts: Initial ``{agent_id: count}`` mapping. Copied defensively. |
| 58 | """ |
| 59 | self._counts: dict[str, int] = dict(counts) if counts else {} |
| 60 | |
| 61 | # ------------------------------------------------------------------ |
| 62 | # Mutation (returns new clock) |
| 63 | # ------------------------------------------------------------------ |
| 64 | |
| 65 | def increment(self, agent_id: str) -> VectorClock: |
| 66 | """Return a new clock with ``agent_id``'s counter incremented by 1. |
| 67 | |
| 68 | Args: |
| 69 | agent_id: The agent performing an event. |
| 70 | |
| 71 | Returns: |
| 72 | A new :class:`VectorClock` with the updated count. |
| 73 | """ |
| 74 | new_counts = dict(self._counts) |
| 75 | new_counts[agent_id] = new_counts.get(agent_id, 0) + 1 |
| 76 | return VectorClock(new_counts) |
| 77 | |
| 78 | def merge(self, other: VectorClock) -> VectorClock: |
| 79 | """Return the least-upper-bound of ``self`` and *other*. |
| 80 | |
| 81 | For each agent, the result holds the *maximum* count seen in either |
| 82 | clock. This is the lattice join operation; it satisfies |
| 83 | commutativity, associativity, and idempotency. |
| 84 | |
| 85 | Args: |
| 86 | other: The clock to merge with. |
| 87 | |
| 88 | Returns: |
| 89 | A new :class:`VectorClock` holding per-agent maximums. |
| 90 | """ |
| 91 | all_agents = set(self._counts) | set(other._counts) |
| 92 | merged = { |
| 93 | agent: max(self._counts.get(agent, 0), other._counts.get(agent, 0)) |
| 94 | for agent in all_agents |
| 95 | } |
| 96 | return VectorClock(merged) |
| 97 | |
| 98 | # ------------------------------------------------------------------ |
| 99 | # Comparison |
| 100 | # ------------------------------------------------------------------ |
| 101 | |
| 102 | def happens_before(self, other: VectorClock) -> bool: |
| 103 | """Return ``True`` if ``self`` causally precedes *other*. |
| 104 | |
| 105 | ``a`` happens before ``b`` iff every agent counter in ``a`` is |
| 106 | ≤ the corresponding counter in ``b``, and at least one counter is |
| 107 | strictly less (i.e. ``a != b``). |
| 108 | |
| 109 | Args: |
| 110 | other: The clock to compare against. |
| 111 | |
| 112 | Returns: |
| 113 | ``True`` when ``self < other`` in causal order. |
| 114 | """ |
| 115 | all_agents = set(self._counts) | set(other._counts) |
| 116 | leq = all( |
| 117 | self._counts.get(agent, 0) <= other._counts.get(agent, 0) |
| 118 | for agent in all_agents |
| 119 | ) |
| 120 | return leq and not self.equivalent(other) |
| 121 | |
| 122 | def concurrent_with(self, other: VectorClock) -> bool: |
| 123 | """Return ``True`` if neither clock causally precedes the other. |
| 124 | |
| 125 | Two clocks are concurrent when each has at least one counter strictly |
| 126 | greater than the other's corresponding counter. This is the condition |
| 127 | that a CRDT ``join`` must handle: there is no causal order between the |
| 128 | two writes, so neither can be simply discarded. |
| 129 | |
| 130 | Args: |
| 131 | other: The clock to compare against. |
| 132 | |
| 133 | Returns: |
| 134 | ``True`` when ``self`` and *other* are incomparable. |
| 135 | """ |
| 136 | return not self.happens_before(other) and not other.happens_before(self) and not self.equivalent(other) |
| 137 | |
| 138 | # ------------------------------------------------------------------ |
| 139 | # Serialisation |
| 140 | # ------------------------------------------------------------------ |
| 141 | |
| 142 | def to_dict(self) -> dict[str, int]: |
| 143 | """Return a JSON-serialisable ``{agent_id: count}`` mapping. |
| 144 | |
| 145 | Returns: |
| 146 | A shallow copy of the internal counts dictionary. |
| 147 | """ |
| 148 | return dict(self._counts) |
| 149 | |
| 150 | @classmethod |
| 151 | def from_dict(cls, data: dict[str, int]) -> VectorClock: |
| 152 | """Reconstruct a :class:`VectorClock` from its wire representation. |
| 153 | |
| 154 | Args: |
| 155 | data: ``{agent_id: count}`` mapping as produced by :meth:`to_dict`. |
| 156 | |
| 157 | Returns: |
| 158 | A new :class:`VectorClock` with the given counts. |
| 159 | """ |
| 160 | return cls(data) |
| 161 | |
| 162 | # ------------------------------------------------------------------ |
| 163 | # Python dunder helpers |
| 164 | # ------------------------------------------------------------------ |
| 165 | |
| 166 | def equivalent(self, other: VectorClock) -> bool: |
| 167 | """Return ``True`` if both clocks represent identical causal state. |
| 168 | |
| 169 | Two clocks are equivalent when every agent's count is the same in both, |
| 170 | treating absent agents as count 0. This is a stricter check than |
| 171 | ``happens_before`` — it requires exact equality, not domination. |
| 172 | |
| 173 | Args: |
| 174 | other: The vector clock to compare against. |
| 175 | |
| 176 | Returns: |
| 177 | ``True`` when ``self`` and *other* are causally identical. |
| 178 | """ |
| 179 | all_agents = set(self._counts) | set(other._counts) |
| 180 | return all( |
| 181 | self._counts.get(a, 0) == other._counts.get(a, 0) |
| 182 | for a in all_agents |
| 183 | ) |
| 184 | |
| 185 | def __repr__(self) -> str: |
| 186 | return f"VectorClock({self._counts!r})" |