gabriel / muse public
op_log.py python
412 lines 14.4 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """Append-only operation log for Muse live collaboration.
2
3 The op log is the bridge between real-time collaborative editing and the
4 immutable commit DAG. During a live session, operations are appended to
5 the log as they occur. At commit time the log is collapsed into a
6 :class:`~muse.domain.StructuredDelta` and stored with the commit record.
7
8 Design principles
9 -----------------
10 - **Append-only** — entries are never modified or deleted; the file grows
11 monotonically. Compaction happens through checkpoints (see below).
12 - **Lamport-clocked** — every entry carries a logical Lamport timestamp
13 that imposes a total order across concurrent actors without wall-clock
14 coordination.
15 - **Causally linked** — ``parent_op_ids`` lets any entry declare the ops it
16 depends on, enabling causal replay and CRDT join operations downstream.
17 - **Domain-neutral** — the log stores :class:`~muse.domain.DomainOp` values
18 unchanged; the core engine has no opinion about what those ops mean.
19 - **Checkpoint / compaction** — when a live session crystallises into a Muse
20 commit, a checkpoint record is written that marks the current snapshot.
21 Subsequent reads return only ops that arrived after the checkpoint.
22
23 Layout::
24
25 .muse/op_log/<session_id>/
26 ops.jsonl — one JSON line per OpEntry (append-only)
27 checkpoint.json — most recent checkpoint (snapshot_id + lamport_ts)
28
29 Relationship to the commit DAG
30 -------------------------------
31 The op log does **not** replace the commit DAG. It is a staging area:
32
33 live edits → OpLog.append() → ops.jsonl
34 session end → OpLog.checkpoint(snapshot_id) → commit record
35 commit record → normal Muse commit DAG
36
37 Replaying the log from a checkpoint reproduces the snapshot deterministically,
38 giving the same guarantee as re-running ``git apply`` from a patch file.
39
40 Usage::
41
42 from muse.core.op_log import OpLog, make_op_entry
43
44 log = OpLog(repo_root, session_id="session-abc")
45 entry = make_op_entry(
46 actor_id="counterpoint-bot",
47 domain="midi",
48 domain_op=my_insert_op,
49 lamport_ts=log.next_lamport_ts(),
50 )
51 log.append(entry)
52
53 delta = log.to_structured_delta("midi") # collapse for commit
54 ckpt = log.checkpoint(snapshot_id) # crystallise
55 """
56
57 from __future__ import annotations
58
59 import datetime
60 import json
61 import logging
62 import pathlib
63 import uuid as _uuid_mod
64 from typing import TypedDict
65
66 from muse.domain import DomainOp, StructuredDelta
67
68 logger = logging.getLogger(__name__)
69
70 _OP_LOG_DIR = ".muse/op_log"
71
72
73 # ---------------------------------------------------------------------------
74 # Wire-format TypedDicts
75 # ---------------------------------------------------------------------------
76
77
78 class OpEntry(TypedDict):
79 """A single operation in the append-only op log.
80
81 ``op_id``
82 Stable UUID4 for this entry — used by consumers to deduplicate
83 on replay and by CRDT join to establish causal identity.
84 ``actor_id``
85 The agent or human identity that produced this op.
86 ``lamport_ts``
87 Logical Lamport timestamp. Monotonically increasing within a
88 session; used to establish total ordering when wall-clock times
89 are unavailable or unreliable.
90 ``parent_op_ids``
91 Causal parents — op IDs that this entry depends on. Empty list
92 means this entry has no explicit causal dependency (root entry).
93 Used by CRDT merge and causal replay.
94 ``domain``
95 Domain tag matching the :class:`~muse.domain.MuseDomainPlugin`
96 that produced this op (e.g. ``"midi"``, ``"code"``).
97 ``domain_op``
98 The actual typed domain operation. Stored verbatim.
99 ``created_at``
100 ISO 8601 UTC wall-clock timestamp when the entry was appended.
101 Informational only — use ``lamport_ts`` for ordering.
102 ``intent_id``
103 Links this op to a coordination intent (from
104 :mod:`muse.core.coordination`). Empty string if not applicable.
105 ``reservation_id``
106 Links this op to a coordination reservation. Empty string if not
107 applicable.
108 """
109
110 op_id: str
111 actor_id: str
112 lamport_ts: int
113 parent_op_ids: list[str]
114 domain: str
115 domain_op: DomainOp
116 created_at: str
117 intent_id: str
118 reservation_id: str
119
120
121 class OpLogCheckpoint(TypedDict):
122 """A snapshot of the op log state at commit time.
123
124 Written by :meth:`OpLog.checkpoint` when a live session crystallises
125 into a Muse commit. Subsequent :meth:`OpLog.replay_since_checkpoint`
126 calls return only ops that arrived after this checkpoint.
127
128 ``session_id``
129 The session this checkpoint belongs to.
130 ``snapshot_id``
131 The commit snapshot ID that this checkpoint materialises. All ops
132 up to and including ``lamport_ts`` are captured by this snapshot.
133 ``lamport_ts``
134 The Lamport timestamp of the last op included in this checkpoint.
135 ``op_count``
136 Number of op entries in the log at checkpoint time.
137 ``created_at``
138 ISO 8601 UTC timestamp.
139 """
140
141 session_id: str
142 snapshot_id: str
143 lamport_ts: int
144 op_count: int
145 created_at: str
146
147
148 # ---------------------------------------------------------------------------
149 # Factory
150 # ---------------------------------------------------------------------------
151
152
153 def make_op_entry(
154 actor_id: str,
155 domain: str,
156 domain_op: DomainOp,
157 lamport_ts: int,
158 *,
159 parent_op_ids: list[str] | None = None,
160 intent_id: str = "",
161 reservation_id: str = "",
162 ) -> OpEntry:
163 """Create a new :class:`OpEntry` with a fresh UUID op_id.
164
165 Args:
166 actor_id: Agent or human identity string.
167 domain: Domain tag (e.g. ``"midi"``).
168 domain_op: The typed domain operation to log.
169 lamport_ts: Logical Lamport timestamp for this entry.
170 parent_op_ids: Causal dependencies. Defaults to empty list.
171 intent_id: Optional coordination intent linkage.
172 reservation_id: Optional coordination reservation linkage.
173
174 Returns:
175 A fully populated :class:`OpEntry`.
176 """
177 return OpEntry(
178 op_id=str(_uuid_mod.uuid4()),
179 actor_id=actor_id,
180 lamport_ts=lamport_ts,
181 parent_op_ids=list(parent_op_ids or []),
182 domain=domain,
183 domain_op=domain_op,
184 created_at=datetime.datetime.now(datetime.timezone.utc).isoformat(),
185 intent_id=intent_id,
186 reservation_id=reservation_id,
187 )
188
189
190 # ---------------------------------------------------------------------------
191 # OpLog
192 # ---------------------------------------------------------------------------
193
194
195 class OpLog:
196 """Append-only operation log for a single live collaboration session.
197
198 Each session gets its own directory under ``.muse/op_log/<session_id>/``.
199 The log file is JSON-lines: one :class:`OpEntry` per line. The checkpoint
200 file is a single JSON object written atomically when a session is committed.
201
202 Args:
203 repo_root: Repository root (the directory containing ``.muse/``).
204 session_id: Stable identifier for this collaboration session. Use a
205 UUID, a branch name, or any stable string. The session
206 directory is created on first :meth:`append`.
207 """
208
209 def __init__(self, repo_root: pathlib.Path, session_id: str) -> None:
210 self._repo_root = repo_root
211 self._session_id = session_id
212 self._session_dir = repo_root / _OP_LOG_DIR / session_id
213 self._ops_path = self._session_dir / "ops.jsonl"
214 self._checkpoint_path = self._session_dir / "checkpoint.json"
215 self._lamport: int = 0
216
217 # ------------------------------------------------------------------
218 # Internal helpers
219 # ------------------------------------------------------------------
220
221 def _ensure_dir(self) -> None:
222 self._session_dir.mkdir(parents=True, exist_ok=True)
223
224 def _load_lamport(self) -> int:
225 """Return the highest lamport_ts seen in the log so far."""
226 if not self._ops_path.exists():
227 return 0
228 highest = 0
229 with self._ops_path.open() as fh:
230 for line in fh:
231 line = line.strip()
232 if not line:
233 continue
234 try:
235 entry: OpEntry = json.loads(line)
236 highest = max(highest, entry.get("lamport_ts", 0))
237 except json.JSONDecodeError:
238 continue
239 return highest
240
241 # ------------------------------------------------------------------
242 # Public API
243 # ------------------------------------------------------------------
244
245 def next_lamport_ts(self) -> int:
246 """Return the next Lamport timestamp to use, advancing the counter.
247
248 The counter is initialised lazily from the highest value found in the
249 log on first call (so that a reopened session continues from where it
250 left off).
251
252 Returns:
253 Monotonically increasing integer.
254 """
255 if self._lamport == 0:
256 self._lamport = self._load_lamport()
257 self._lamport += 1
258 return self._lamport
259
260 def append(self, entry: OpEntry) -> None:
261 """Append *entry* to the op log.
262
263 The entry is serialised as a single JSON line and flushed to disk.
264 This is the only write operation on the log file; entries are never
265 modified or deleted.
266
267 Args:
268 entry: A fully populated :class:`OpEntry`.
269 """
270 self._ensure_dir()
271 line = json.dumps(entry, separators=(",", ":")) + "\n"
272 with self._ops_path.open("a") as fh:
273 fh.write(line)
274 logger.debug(
275 "✅ OpLog append: actor=%r domain=%r ts=%d",
276 entry["actor_id"],
277 entry["domain"],
278 entry["lamport_ts"],
279 )
280
281 def read_all(self) -> list[OpEntry]:
282 """Return all entries in the log, in append order.
283
284 Returns:
285 List of :class:`OpEntry` dicts, oldest first.
286 """
287 if not self._ops_path.exists():
288 return []
289 entries: list[OpEntry] = []
290 with self._ops_path.open() as fh:
291 for line in fh:
292 line = line.strip()
293 if not line:
294 continue
295 try:
296 entries.append(json.loads(line))
297 except json.JSONDecodeError as exc:
298 logger.warning("⚠️ Corrupt op log line in %s: %s", self._ops_path, exc)
299 return entries
300
301 def replay_since_checkpoint(self) -> list[OpEntry]:
302 """Return entries that arrived after the last checkpoint.
303
304 If no checkpoint exists, returns all entries (equivalent to
305 :meth:`read_all`).
306
307 Returns:
308 List of :class:`OpEntry` dicts since last checkpoint, oldest first.
309 """
310 checkpoint = self.read_checkpoint()
311 all_entries = self.read_all()
312 if checkpoint is None:
313 return all_entries
314 cutoff = checkpoint["lamport_ts"]
315 return [e for e in all_entries if e["lamport_ts"] > cutoff]
316
317 def to_structured_delta(self, domain: str) -> StructuredDelta:
318 """Collapse all entries since the last checkpoint into a StructuredDelta.
319
320 Ops are ordered by Lamport timestamp. Ops from domains other than
321 *domain* are filtered out (a session may carry cross-domain ops from
322 coordinated agents; each domain collapses its own slice).
323
324 Args:
325 domain: Domain tag to filter by (e.g. ``"midi"``).
326
327 Returns:
328 A :class:`~muse.domain.StructuredDelta` with the ordered op list
329 and a simple count summary.
330 """
331 entries = self.replay_since_checkpoint()
332 entries.sort(key=lambda e: e["lamport_ts"])
333 ops = [e["domain_op"] for e in entries if e["domain"] == domain]
334
335 counts: dict[str, int] = {}
336 for op in ops:
337 kind = op.get("op", "unknown")
338 counts[kind] = counts.get(kind, 0) + 1
339 parts = [f"{v} {k}" for k, v in sorted(counts.items())]
340 summary = ", ".join(parts) if parts else "no ops"
341
342 return StructuredDelta(domain=domain, ops=ops, summary=summary)
343
344 def checkpoint(self, snapshot_id: str) -> OpLogCheckpoint:
345 """Write a checkpoint recording that all current ops are in *snapshot_id*.
346
347 After a checkpoint, :meth:`replay_since_checkpoint` will only return
348 ops that arrive after this call. The op log file itself is never
349 truncated — the checkpoint is a logical marker.
350
351 Args:
352 snapshot_id: The Muse snapshot ID that captured all ops to date.
353
354 Returns:
355 The written :class:`OpLogCheckpoint`.
356 """
357 all_entries = self.read_all()
358 highest_ts = max((e["lamport_ts"] for e in all_entries), default=0)
359 ckpt = OpLogCheckpoint(
360 session_id=self._session_id,
361 snapshot_id=snapshot_id,
362 lamport_ts=highest_ts,
363 op_count=len(all_entries),
364 created_at=datetime.datetime.now(datetime.timezone.utc).isoformat(),
365 )
366 self._ensure_dir()
367 self._checkpoint_path.write_text(
368 json.dumps(ckpt, indent=2) + "\n"
369 )
370 logger.info(
371 "✅ OpLog checkpoint: session=%r snapshot=%s ts=%d ops=%d",
372 self._session_id,
373 snapshot_id[:8],
374 highest_ts,
375 len(all_entries),
376 )
377 return ckpt
378
379 def read_checkpoint(self) -> OpLogCheckpoint | None:
380 """Load the most recent checkpoint, or ``None`` if none exists."""
381 if not self._checkpoint_path.exists():
382 return None
383 try:
384 raw: OpLogCheckpoint = json.loads(self._checkpoint_path.read_text())
385 return raw
386 except (json.JSONDecodeError, KeyError) as exc:
387 logger.warning("⚠️ Corrupt checkpoint file %s: %s", self._checkpoint_path, exc)
388 return None
389
390 def session_id(self) -> str:
391 """Return the session ID for this log."""
392 return self._session_id
393
394
395 # ---------------------------------------------------------------------------
396 # Session listing
397 # ---------------------------------------------------------------------------
398
399
400 def list_sessions(repo_root: pathlib.Path) -> list[str]:
401 """Return all session IDs that have op log directories under *repo_root*.
402
403 Args:
404 repo_root: Repository root.
405
406 Returns:
407 Sorted list of session ID strings.
408 """
409 log_dir = repo_root / _OP_LOG_DIR
410 if not log_dir.exists():
411 return []
412 return sorted(p.name for p in log_dir.iterdir() if p.is_dir())