gabriel / muse public
store.py python
955 lines 34.2 KB
f7645c07 feat(store): self-describing HEAD format with typed read/write API (#163) Gabriel Cardona <cgcardona@gmail.com> 3d ago
1 """File-based commit and snapshot store for the Muse VCS.
2
3 All commit and snapshot metadata is stored as JSON files under ``.muse/`` —
4 no external database required.
5
6 Layout
7 ------
8
9 .muse/
10 commits/<commit_id>.json — one JSON file per commit
11 snapshots/<snapshot_id>.json — one JSON file per snapshot manifest
12 tags/<repo_id>/<tag_id>.json — tag records
13 objects/<sha2>/<sha62> — content-addressed blobs (via object_store.py)
14 refs/heads/<branch> — branch HEAD pointers (plain text commit IDs)
15 HEAD — "ref: refs/heads/main" | "commit: <sha256>"
16 repo.json — repository identity
17
18 Commit JSON schema
19 ------------------
20
21 {
22 "commit_id": "<sha256>",
23 "repo_id": "<uuid>",
24 "branch": "main",
25 "parent_commit_id": null | "<sha256>",
26 "parent2_commit_id": null | "<sha256>",
27 "snapshot_id": "<sha256>",
28 "message": "Add verse melody",
29 "author": "gabriel",
30 "committed_at": "2026-03-16T12:00:00+00:00",
31 "metadata": {}
32 }
33
34 Snapshot JSON schema
35 --------------------
36
37 {
38 "snapshot_id": "<sha256>",
39 "manifest": {"tracks/drums.mid": "<sha256>", ...},
40 "created_at": "2026-03-16T12:00:00+00:00"
41 }
42
43 All functions are synchronous — file I/O on a local ``.muse/`` directory
44 does not require async. This removes the SQLAlchemy/asyncpg dependency from
45 the CLI entirely.
46 """
47
48 from __future__ import annotations
49
50 import datetime
51 import json
52 import logging
53 import pathlib
54 import re
55 import uuid
56 from dataclasses import dataclass, field
57 from typing import Literal, TypedDict
58
59 from muse.core.validation import (
60 sanitize_glob_prefix,
61 validate_branch_name,
62 validate_ref_id,
63 validate_repo_id,
64 )
65 from muse.domain import SemVerBump, StructuredDelta
66
67 logger = logging.getLogger(__name__)
68
69 _COMMITS_DIR = "commits"
70 _SNAPSHOTS_DIR = "snapshots"
71 _TAGS_DIR = "tags"
72
73 # ---------------------------------------------------------------------------
74 # HEAD file — typed I/O
75 # ---------------------------------------------------------------------------
76 #
77 # Muse HEAD format
78 # ----------------
79 # The ``.muse/HEAD`` file is always one of two self-describing forms:
80 #
81 # ref: refs/heads/<branch> — symbolic ref; HEAD points to a branch
82 # commit: <sha256> — detached HEAD; HEAD points to a commit
83 #
84 # The ``ref:`` prefix is adopted from Git because it is the right design:
85 # a file that can hold two semantically different things should say which
86 # one it holds. The ``commit:`` prefix for detached HEAD is a Muse
87 # extension — Git uses a bare SHA, which is ambiguous (SHA-1? SHA-256?).
88 # Muse makes the hash algorithm implicit in the prefix, leaving the door
89 # open for future algorithm identifiers without changing the parsing rule.
90 #
91 # There is no backward-compatibility layer; every write site uses
92 # ``write_head_branch`` / ``write_head_commit`` and every read site uses
93 # ``read_head`` / ``read_current_branch``.
94
95
96 class SymbolicHead(TypedDict):
97 """HEAD points to a named branch."""
98
99 kind: Literal["branch"]
100 branch: str
101
102
103 class DetachedHead(TypedDict):
104 """HEAD points directly to a commit (detached HEAD state)."""
105
106 kind: Literal["commit"]
107 commit_id: str
108
109
110 HeadState = SymbolicHead | DetachedHead
111
112
113 def read_head(repo_root: pathlib.Path) -> HeadState:
114 """Parse ``.muse/HEAD`` and return a typed :data:`HeadState`.
115
116 Raises :exc:`ValueError` for any content that does not match the two
117 expected forms so callers never receive an ambiguous raw string.
118 """
119 raw = (repo_root / ".muse" / "HEAD").read_text().strip()
120 if raw.startswith("ref: refs/heads/"):
121 branch = raw.removeprefix("ref: refs/heads/").strip()
122 validate_branch_name(branch)
123 return SymbolicHead(kind="branch", branch=branch)
124 if raw.startswith("commit: "):
125 commit_id = raw.removeprefix("commit: ").strip()
126 if not re.fullmatch(r"[0-9a-f]{64}", commit_id):
127 raise ValueError(f"Malformed commit ID in HEAD: {commit_id!r}")
128 return DetachedHead(kind="commit", commit_id=commit_id)
129 raise ValueError(
130 f"Malformed HEAD: {raw!r}. "
131 "Expected 'ref: refs/heads/<branch>' or 'commit: <sha256>'."
132 )
133
134
135 def read_current_branch(repo_root: pathlib.Path) -> str:
136 """Return the currently checked-out branch name.
137
138 Raises :exc:`ValueError` when the repository is in detached HEAD state
139 so callers that cannot operate without a branch get a clear error
140 rather than silently receiving a commit ID as a branch name.
141 """
142 state = read_head(repo_root)
143 if state["kind"] != "branch":
144 raise ValueError(
145 "Repository is in detached HEAD state. "
146 "Run 'muse checkout <branch>' to return to a branch."
147 )
148 return state["branch"]
149
150
151 def write_head_branch(repo_root: pathlib.Path, branch: str) -> None:
152 """Write a symbolic ref to ``.muse/HEAD``.
153
154 Format: ``ref: refs/heads/<branch>`` — self-describing; the ``ref:``
155 prefix unambiguously identifies the entry as a symbolic reference.
156 """
157 validate_branch_name(branch)
158 (repo_root / ".muse" / "HEAD").write_text(f"ref: refs/heads/{branch}\n")
159
160
161 def write_head_commit(repo_root: pathlib.Path, commit_id: str) -> None:
162 """Write a direct commit reference to ``.muse/HEAD`` (detached HEAD).
163
164 Format: ``commit: <sha256>`` — the ``commit:`` prefix is a Muse
165 extension that makes the entry self-describing in all states. Unlike
166 Git (which stores a bare hash), this makes the hash type explicit and
167 leaves room for future algorithm prefixes without parsing heuristics.
168 """
169 if not re.fullmatch(r"[0-9a-f]{64}", commit_id):
170 raise ValueError(f"commit_id must be a 64-char hex string, got: {commit_id!r}")
171 (repo_root / ".muse" / "HEAD").write_text(f"commit: {commit_id}\n")
172
173
174 # ---------------------------------------------------------------------------
175 # Wire-format TypedDicts (JSON-serialisable, used by to_dict / from_dict)
176 # ---------------------------------------------------------------------------
177
178
179 class CommitDict(TypedDict, total=False):
180 """JSON-serialisable representation of a CommitRecord.
181
182 ``structured_delta`` is the typed delta produced by the domain plugin's
183 ``diff()`` at commit time. ``None`` on the initial commit (no parent to
184 diff against).
185
186 ``sem_ver_bump`` and ``breaking_changes`` are semantic versioning
187 metadata. Absent (treated as ``"none"`` / ``[]``) for older records and
188 non-code domains.
189
190 Agent provenance fields (all optional, default ``""`` for older records):
191
192 ``agent_id`` Stable identity string for the committing agent or human
193 (e.g. ``"counterpoint-bot"`` or ``"gabriel"``).
194 ``model_id`` Model identifier when the author is an AI agent
195 (e.g. ``"claude-opus-4"``). Empty for human authors.
196 ``toolchain_id`` Toolchain that produced the commit
197 (e.g. ``"cursor-agent-v2"``).
198 ``prompt_hash`` SHA-256 of the instruction/prompt that triggered this
199 commit. Privacy-preserving: the hash identifies the
200 prompt without storing its content.
201 ``signature`` HMAC-SHA256 hex digest of ``commit_id`` using the
202 agent's shared key. Verifiable with
203 :func:`muse.core.provenance.verify_commit_hmac`.
204 ``signer_key_id`` Fingerprint of the signing key
205 (SHA-256[:16] of the raw key bytes).
206 ``format_version`` Schema evolution counter. Each phase of the Muse
207 supercharge plan that extends the commit record bumps
208 this value. Readers use it to know which optional fields
209 are present:
210
211 - ``1`` — base record (commit_id, snapshot_id, parent, message, author)
212 - ``2`` — adds ``structured_delta`` (Phase 1: Typed Delta Algebra)
213 - ``3`` — adds ``sem_ver_bump``, ``breaking_changes``
214 (Phase 2: Domain Schema)
215 - ``4`` — adds agent provenance: ``agent_id``, ``model_id``,
216 ``toolchain_id``, ``prompt_hash``, ``signature``,
217 ``signer_key_id`` (Phase 4: Agent Identity)
218 - ``5`` — adds CRDT annotation fields: ``reviewed_by``
219 (ORSet of reviewer IDs), ``test_runs``
220 (GCounter of test-run events)
221
222 Old records without this field default to ``1``.
223 """
224
225 commit_id: str
226 repo_id: str
227 branch: str
228 snapshot_id: str
229 message: str
230 committed_at: str
231 parent_commit_id: str | None
232 parent2_commit_id: str | None
233 author: str
234 metadata: dict[str, str]
235 structured_delta: StructuredDelta | None
236 sem_ver_bump: SemVerBump
237 breaking_changes: list[str]
238 agent_id: str
239 model_id: str
240 toolchain_id: str
241 prompt_hash: str
242 signature: str
243 signer_key_id: str
244 format_version: int
245 # CRDT-backed annotation fields (format_version >= 5).
246 # ``reviewed_by`` is the logical state of an ORSet: a list of unique
247 # reviewer identifiers. Merging two records takes the union (set join).
248 # ``test_runs`` is a GCounter: monotonically increasing test-run count.
249 # Both fields are absent in older records and default to [] / 0.
250 reviewed_by: list[str]
251 test_runs: int
252
253
254 class SnapshotDict(TypedDict):
255 """JSON-serialisable representation of a SnapshotRecord."""
256
257 snapshot_id: str
258 manifest: dict[str, str]
259 created_at: str
260
261
262 class TagDict(TypedDict):
263 """JSON-serialisable representation of a TagRecord."""
264
265 tag_id: str
266 repo_id: str
267 commit_id: str
268 tag: str
269 created_at: str
270
271
272 class RemoteCommitPayload(TypedDict, total=False):
273 """Wire format received from a remote during push/pull.
274
275 All fields are optional because the payload may omit fields that are
276 unknown to older protocol versions. Callers validate required fields
277 before constructing a CommitRecord from this payload.
278 """
279
280 commit_id: str
281 repo_id: str
282 branch: str
283 snapshot_id: str
284 message: str
285 committed_at: str
286 parent_commit_id: str | None
287 parent2_commit_id: str | None
288 author: str
289 metadata: dict[str, str]
290 manifest: dict[str, str]
291
292
293 # ---------------------------------------------------------------------------
294 # Data classes
295 # ---------------------------------------------------------------------------
296
297
298 @dataclass
299 class CommitRecord:
300 """An immutable commit record stored as a JSON file under .muse/commits/.
301
302 ``sem_ver_bump`` and ``breaking_changes`` are populated by the commit command
303 when a code-domain delta is available. They default to ``"none"`` and ``[]``
304 for older records and non-code domains.
305
306 Agent provenance fields default to ``""`` so that existing JSON without
307 them deserialises without error. See :class:`CommitDict` for field semantics.
308 """
309
310 commit_id: str
311 repo_id: str
312 branch: str
313 snapshot_id: str
314 message: str
315 committed_at: datetime.datetime
316 parent_commit_id: str | None = None
317 parent2_commit_id: str | None = None
318 author: str = ""
319 metadata: dict[str, str] = field(default_factory=dict)
320 structured_delta: StructuredDelta | None = None
321 sem_ver_bump: SemVerBump = "none"
322 breaking_changes: list[str] = field(default_factory=list)
323 agent_id: str = ""
324 model_id: str = ""
325 toolchain_id: str = ""
326 prompt_hash: str = ""
327 signature: str = ""
328 signer_key_id: str = ""
329 #: Schema evolution counter — see :class:`CommitDict` for the version table.
330 #: Version 5 adds ``reviewed_by`` (ORSet) and ``test_runs`` (GCounter).
331 format_version: int = 5
332 reviewed_by: list[str] = field(default_factory=list)
333 test_runs: int = 0
334
335 def to_dict(self) -> CommitDict:
336 return CommitDict(
337 commit_id=self.commit_id,
338 repo_id=self.repo_id,
339 branch=self.branch,
340 snapshot_id=self.snapshot_id,
341 message=self.message,
342 committed_at=self.committed_at.isoformat(),
343 parent_commit_id=self.parent_commit_id,
344 parent2_commit_id=self.parent2_commit_id,
345 author=self.author,
346 metadata=dict(self.metadata),
347 structured_delta=self.structured_delta,
348 sem_ver_bump=self.sem_ver_bump,
349 breaking_changes=list(self.breaking_changes),
350 agent_id=self.agent_id,
351 model_id=self.model_id,
352 toolchain_id=self.toolchain_id,
353 prompt_hash=self.prompt_hash,
354 signature=self.signature,
355 signer_key_id=self.signer_key_id,
356 format_version=self.format_version,
357 reviewed_by=list(self.reviewed_by),
358 test_runs=self.test_runs,
359 )
360
361 @classmethod
362 def from_dict(cls, d: CommitDict) -> "CommitRecord":
363 try:
364 committed_at = datetime.datetime.fromisoformat(d["committed_at"])
365 except (ValueError, KeyError):
366 logger.warning(
367 "⚠️ Commit record has missing or unparseable committed_at; "
368 "substituting current time. The record may have been tampered with."
369 )
370 committed_at = datetime.datetime.now(datetime.timezone.utc)
371
372 # Runtime type guards — JSON can contain anything; fail loud rather than
373 # silently carrying non-string IDs into path construction.
374 commit_id = d["commit_id"]
375 if not isinstance(commit_id, str):
376 raise TypeError(f"commit_id must be str, got {type(commit_id).__name__}")
377 snapshot_id = d["snapshot_id"]
378 if not isinstance(snapshot_id, str):
379 raise TypeError(f"snapshot_id must be str, got {type(snapshot_id).__name__}")
380 branch = d["branch"]
381 if not isinstance(branch, str):
382 raise TypeError(f"branch must be str, got {type(branch).__name__}")
383
384 return cls(
385 commit_id=commit_id,
386 repo_id=d["repo_id"] if isinstance(d.get("repo_id"), str) else "",
387 branch=branch,
388 snapshot_id=snapshot_id,
389 message=d["message"] if isinstance(d.get("message"), str) else "",
390 committed_at=committed_at,
391 parent_commit_id=d.get("parent_commit_id"),
392 parent2_commit_id=d.get("parent2_commit_id"),
393 author=d.get("author", ""),
394 metadata=dict(d.get("metadata") or {}),
395 structured_delta=d.get("structured_delta"),
396 sem_ver_bump=d.get("sem_ver_bump", "none"),
397 breaking_changes=list(d.get("breaking_changes") or []),
398 agent_id=d.get("agent_id", ""),
399 model_id=d.get("model_id", ""),
400 toolchain_id=d.get("toolchain_id", ""),
401 prompt_hash=d.get("prompt_hash", ""),
402 signature=d.get("signature", ""),
403 signer_key_id=d.get("signer_key_id", ""),
404 format_version=d.get("format_version", 1),
405 reviewed_by=list(d.get("reviewed_by") or []),
406 test_runs=int(d.get("test_runs") or 0),
407 )
408
409
410 @dataclass
411 class SnapshotRecord:
412 """An immutable snapshot record stored as a JSON file under .muse/snapshots/."""
413
414 snapshot_id: str
415 manifest: dict[str, str]
416 created_at: datetime.datetime = field(
417 default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
418 )
419
420 def to_dict(self) -> SnapshotDict:
421 return SnapshotDict(
422 snapshot_id=self.snapshot_id,
423 manifest=self.manifest,
424 created_at=self.created_at.isoformat(),
425 )
426
427 @classmethod
428 def from_dict(cls, d: SnapshotDict) -> "SnapshotRecord":
429 try:
430 created_at = datetime.datetime.fromisoformat(d["created_at"])
431 except (ValueError, KeyError):
432 created_at = datetime.datetime.now(datetime.timezone.utc)
433 return cls(
434 snapshot_id=d["snapshot_id"],
435 manifest=dict(d.get("manifest") or {}),
436 created_at=created_at,
437 )
438
439
440 @dataclass
441 class TagRecord:
442 """A semantic tag attached to a commit."""
443
444 tag_id: str
445 repo_id: str
446 commit_id: str
447 tag: str
448 created_at: datetime.datetime = field(
449 default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
450 )
451
452 def to_dict(self) -> TagDict:
453 return TagDict(
454 tag_id=self.tag_id,
455 repo_id=self.repo_id,
456 commit_id=self.commit_id,
457 tag=self.tag,
458 created_at=self.created_at.isoformat(),
459 )
460
461 @classmethod
462 def from_dict(cls, d: TagDict) -> "TagRecord":
463 try:
464 created_at = datetime.datetime.fromisoformat(d["created_at"])
465 except (ValueError, KeyError):
466 created_at = datetime.datetime.now(datetime.timezone.utc)
467 return cls(
468 tag_id=d.get("tag_id", str(uuid.uuid4())),
469 repo_id=d["repo_id"],
470 commit_id=d["commit_id"],
471 tag=d["tag"],
472 created_at=created_at,
473 )
474
475
476 # ---------------------------------------------------------------------------
477 # Path helpers
478 # ---------------------------------------------------------------------------
479
480
481 def _commits_dir(repo_root: pathlib.Path) -> pathlib.Path:
482 return repo_root / ".muse" / _COMMITS_DIR
483
484
485 def _snapshots_dir(repo_root: pathlib.Path) -> pathlib.Path:
486 return repo_root / ".muse" / _SNAPSHOTS_DIR
487
488
489 def _tags_dir(repo_root: pathlib.Path, repo_id: str) -> pathlib.Path:
490 # Validate repo_id to prevent path traversal via crafted IDs from remote data.
491 # Uses a best-effort guard (no path separators or dot-sequences).
492 if "/" in repo_id or "\\" in repo_id or ".." in repo_id or not repo_id:
493 raise ValueError(f"repo_id {repo_id!r} contains unsafe path components.")
494 return repo_root / ".muse" / _TAGS_DIR / repo_id
495
496
497 def _commit_path(repo_root: pathlib.Path, commit_id: str) -> pathlib.Path:
498 return _commits_dir(repo_root) / f"{commit_id}.json"
499
500
501 def _snapshot_path(repo_root: pathlib.Path, snapshot_id: str) -> pathlib.Path:
502 return _snapshots_dir(repo_root) / f"{snapshot_id}.json"
503
504
505 # ---------------------------------------------------------------------------
506 # Commit operations
507 # ---------------------------------------------------------------------------
508
509
510 def write_commit(repo_root: pathlib.Path, commit: CommitRecord) -> None:
511 """Persist a commit record to ``.muse/commits/<commit_id>.json``."""
512 _commits_dir(repo_root).mkdir(parents=True, exist_ok=True)
513 path = _commit_path(repo_root, commit.commit_id)
514 if path.exists():
515 logger.debug("⚠️ Commit %s already exists — skipped", commit.commit_id[:8])
516 return
517 path.write_text(json.dumps(commit.to_dict(), indent=2) + "\n")
518 logger.debug("✅ Stored commit %s branch=%r", commit.commit_id[:8], commit.branch)
519
520
521 def read_commit(repo_root: pathlib.Path, commit_id: str) -> CommitRecord | None:
522 """Load a commit record by ID, or ``None`` if it does not exist.
523
524 Callers that accept user-supplied or remote-supplied commit IDs should
525 validate the ID with :func:`~muse.core.validation.validate_ref_id` before
526 calling this function. This function itself accepts any string to support
527 internal uses with computed IDs.
528 """
529 path = _commit_path(repo_root, commit_id)
530 if not path.exists():
531 return None
532 try:
533 return CommitRecord.from_dict(json.loads(path.read_text()))
534 except (json.JSONDecodeError, KeyError, TypeError) as exc:
535 logger.warning("⚠️ Corrupt commit file %s: %s", path, exc)
536 return None
537
538
539 def overwrite_commit(repo_root: pathlib.Path, commit: CommitRecord) -> None:
540 """Overwrite an existing commit record on disk (e.g. for annotation updates).
541
542 Unlike :func:`write_commit`, this function always writes the record even if
543 the file already exists. Use only for annotation fields
544 (``reviewed_by``, ``test_runs``) that are semantically additive — never
545 for changing history (commit_id, parent, snapshot, message).
546
547 Args:
548 repo_root: Repository root.
549 commit: The updated commit record to persist.
550 """
551 _commits_dir(repo_root).mkdir(parents=True, exist_ok=True)
552 path = _commit_path(repo_root, commit.commit_id)
553 path.write_text(json.dumps(commit.to_dict(), indent=2) + "\n")
554 logger.debug("✅ Updated annotation on commit %s", commit.commit_id[:8])
555
556
557 def update_commit_metadata(
558 repo_root: pathlib.Path,
559 commit_id: str,
560 key: str,
561 value: str,
562 ) -> bool:
563 """Set a single string key in a commit's metadata dict.
564
565 Returns ``True`` on success, ``False`` if the commit is not found.
566 """
567 commit = read_commit(repo_root, commit_id)
568 if commit is None:
569 logger.warning("⚠️ Commit %s not found — cannot update metadata", commit_id[:8])
570 return False
571 commit.metadata[key] = value
572 path = _commit_path(repo_root, commit_id)
573 path.write_text(json.dumps(commit.to_dict(), indent=2) + "\n")
574 logger.debug("✅ Set %s=%r on commit %s", key, value, commit_id[:8])
575 return True
576
577
578 def get_head_commit_id(repo_root: pathlib.Path, branch: str) -> str | None:
579 """Return the commit ID at HEAD of *branch*, or ``None`` for an empty branch."""
580 validate_branch_name(branch)
581 ref_path = repo_root / ".muse" / "refs" / "heads" / branch
582 if not ref_path.exists():
583 return None
584 raw = ref_path.read_text().strip()
585 return raw if raw else None
586
587
588 def get_head_snapshot_id(
589 repo_root: pathlib.Path,
590 repo_id: str,
591 branch: str,
592 ) -> str | None:
593 """Return the snapshot_id at HEAD of *branch*, or ``None``."""
594 commit_id = get_head_commit_id(repo_root, branch)
595 if commit_id is None:
596 return None
597 commit = read_commit(repo_root, commit_id)
598 if commit is None:
599 return None
600 return commit.snapshot_id
601
602
603 def resolve_commit_ref(
604 repo_root: pathlib.Path,
605 repo_id: str,
606 branch: str,
607 ref: str | None,
608 ) -> CommitRecord | None:
609 """Resolve a commit reference to a ``CommitRecord``.
610
611 *ref* may be:
612 - ``None`` / ``"HEAD"`` — the most recent commit on *branch*.
613 - A full or abbreviated commit SHA — resolved by prefix scan.
614
615 Performs a safe prefix scan (glob metacharacters stripped from *ref*) so
616 user-supplied references cannot glob the entire commits directory.
617 """
618 if ref is None or ref.upper() == "HEAD":
619 commit_id = get_head_commit_id(repo_root, branch)
620 if commit_id is None:
621 return None
622 return read_commit(repo_root, commit_id)
623
624 # Sanitize user-supplied ref before using it in any filesystem operation.
625 safe_ref = sanitize_glob_prefix(ref)
626
627 # Try exact match — only if it looks like a full 64-char hex ID.
628 try:
629 validate_ref_id(safe_ref)
630 commit = read_commit(repo_root, safe_ref)
631 if commit is not None:
632 return commit
633 except ValueError:
634 pass # Not a full hex ID — fall through to prefix scan.
635
636 # Prefix scan with sanitized prefix.
637 return _find_commit_by_prefix(repo_root, safe_ref)
638
639
640 def _find_commit_by_prefix(
641 repo_root: pathlib.Path, prefix: str
642 ) -> CommitRecord | None:
643 """Find the first commit whose ID starts with *prefix*.
644
645 Glob metacharacters are stripped from *prefix* before use to prevent
646 callers from turning a targeted lookup into an arbitrary directory scan.
647 """
648 commits_dir = _commits_dir(repo_root)
649 if not commits_dir.exists():
650 return None
651 safe_prefix = sanitize_glob_prefix(prefix)
652 for path in commits_dir.glob(f"{safe_prefix}*.json"):
653 try:
654 return CommitRecord.from_dict(json.loads(path.read_text()))
655 except (json.JSONDecodeError, KeyError, TypeError):
656 continue
657 return None
658
659
660 def find_commits_by_prefix(
661 repo_root: pathlib.Path, prefix: str
662 ) -> list[CommitRecord]:
663 """Return all commits whose ID starts with *prefix*."""
664 commits_dir = _commits_dir(repo_root)
665 if not commits_dir.exists():
666 return []
667 safe_prefix = sanitize_glob_prefix(prefix)
668 results: list[CommitRecord] = []
669 for path in commits_dir.glob(f"{safe_prefix}*.json"):
670 try:
671 results.append(CommitRecord.from_dict(json.loads(path.read_text())))
672 except (json.JSONDecodeError, KeyError, TypeError):
673 continue
674 return results
675
676
677 def get_commits_for_branch(
678 repo_root: pathlib.Path,
679 repo_id: str,
680 branch: str,
681 ) -> list[CommitRecord]:
682 """Return all commits on *branch*, newest first, by walking the parent chain."""
683 commits: list[CommitRecord] = []
684 commit_id = get_head_commit_id(repo_root, branch)
685 seen: set[str] = set()
686 while commit_id and commit_id not in seen:
687 seen.add(commit_id)
688 commit = read_commit(repo_root, commit_id)
689 if commit is None:
690 break
691 commits.append(commit)
692 commit_id = commit.parent_commit_id
693 return commits
694
695
696 def get_all_commits(repo_root: pathlib.Path) -> list[CommitRecord]:
697 """Return all commits in the store (order not guaranteed)."""
698 commits_dir = _commits_dir(repo_root)
699 if not commits_dir.exists():
700 return []
701 results: list[CommitRecord] = []
702 for path in commits_dir.glob("*.json"):
703 try:
704 results.append(CommitRecord.from_dict(json.loads(path.read_text())))
705 except (json.JSONDecodeError, KeyError, TypeError):
706 continue
707 return results
708
709
710 def walk_commits_between(
711 repo_root: pathlib.Path,
712 to_commit_id: str,
713 from_commit_id: str | None = None,
714 max_commits: int = 10_000,
715 ) -> list[CommitRecord]:
716 """Return commits reachable from *to_commit_id*, stopping before *from_commit_id*.
717
718 Walks the parent chain from *to_commit_id* backwards. Returns commits in
719 newest-first order (callers can reverse for oldest-first).
720
721 Args:
722 repo_root: Repository root.
723 to_commit_id: Inclusive end of the range.
724 from_commit_id: Exclusive start; ``None`` means walk to the initial commit.
725 max_commits: Safety cap.
726
727 Returns:
728 List of ``CommitRecord`` objects, newest first.
729 """
730 commits: list[CommitRecord] = []
731 seen: set[str] = set()
732 current_id: str | None = to_commit_id
733 while current_id and current_id not in seen and len(commits) < max_commits:
734 seen.add(current_id)
735 if current_id == from_commit_id:
736 break
737 commit = read_commit(repo_root, current_id)
738 if commit is None:
739 break
740 commits.append(commit)
741 current_id = commit.parent_commit_id
742 return commits
743
744
745 # ---------------------------------------------------------------------------
746 # Snapshot operations
747 # ---------------------------------------------------------------------------
748
749
750 def write_snapshot(repo_root: pathlib.Path, snapshot: SnapshotRecord) -> None:
751 """Persist a snapshot record to ``.muse/snapshots/<snapshot_id>.json``."""
752 _snapshots_dir(repo_root).mkdir(parents=True, exist_ok=True)
753 path = _snapshot_path(repo_root, snapshot.snapshot_id)
754 if path.exists():
755 logger.debug("⚠️ Snapshot %s already exists — skipped", snapshot.snapshot_id[:8])
756 return
757 path.write_text(json.dumps(snapshot.to_dict(), indent=2) + "\n")
758 logger.debug("✅ Stored snapshot %s (%d files)", snapshot.snapshot_id[:8], len(snapshot.manifest))
759
760
761 def read_snapshot(repo_root: pathlib.Path, snapshot_id: str) -> SnapshotRecord | None:
762 """Load a snapshot record by ID, or ``None`` if it does not exist.
763
764 Callers that accept user-supplied or remote-supplied snapshot IDs should
765 validate the ID with :func:`~muse.core.validation.validate_ref_id` before
766 calling this function. This function itself accepts any string to support
767 internal uses with computed IDs.
768 """
769 path = _snapshot_path(repo_root, snapshot_id)
770 if not path.exists():
771 return None
772 try:
773 return SnapshotRecord.from_dict(json.loads(path.read_text()))
774 except (json.JSONDecodeError, KeyError, TypeError) as exc:
775 logger.warning("⚠️ Corrupt snapshot file %s: %s", path, exc)
776 return None
777
778
779 def get_commit_snapshot_manifest(
780 repo_root: pathlib.Path, commit_id: str
781 ) -> dict[str, str] | None:
782 """Return the file manifest for the snapshot attached to *commit_id*, or ``None``."""
783 commit = read_commit(repo_root, commit_id)
784 if commit is None:
785 logger.warning("⚠️ Commit %s not found", commit_id[:8])
786 return None
787 snapshot = read_snapshot(repo_root, commit.snapshot_id)
788 if snapshot is None:
789 logger.warning(
790 "⚠️ Snapshot %s referenced by commit %s not found",
791 commit.snapshot_id[:8],
792 commit_id[:8],
793 )
794 return None
795 return dict(snapshot.manifest)
796
797
798 def get_head_snapshot_manifest(
799 repo_root: pathlib.Path, repo_id: str, branch: str
800 ) -> dict[str, str] | None:
801 """Return the manifest of the most recent commit on *branch*, or ``None``."""
802 snapshot_id = get_head_snapshot_id(repo_root, repo_id, branch)
803 if snapshot_id is None:
804 return None
805 snapshot = read_snapshot(repo_root, snapshot_id)
806 if snapshot is None:
807 return None
808 return dict(snapshot.manifest)
809
810
811 def get_all_object_ids(repo_root: pathlib.Path, repo_id: str) -> list[str]:
812 """Return all object IDs referenced by any snapshot in this repo."""
813 object_ids: set[str] = set()
814 for commit in get_all_commits(repo_root):
815 snapshot = read_snapshot(repo_root, commit.snapshot_id)
816 if snapshot is not None:
817 object_ids.update(snapshot.manifest.values())
818 return sorted(object_ids)
819
820
821 # ---------------------------------------------------------------------------
822 # Tag operations
823 # ---------------------------------------------------------------------------
824
825
826 def write_tag(repo_root: pathlib.Path, tag: TagRecord) -> None:
827 """Persist a tag record to ``.muse/tags/<repo_id>/<tag_id>.json``."""
828 tags_dir = _tags_dir(repo_root, tag.repo_id)
829 tags_dir.mkdir(parents=True, exist_ok=True)
830 path = tags_dir / f"{tag.tag_id}.json"
831 path.write_text(json.dumps(tag.to_dict(), indent=2) + "\n")
832 logger.debug("✅ Stored tag %r on commit %s", tag.tag, tag.commit_id[:8])
833
834
835 def get_tags_for_commit(
836 repo_root: pathlib.Path, repo_id: str, commit_id: str
837 ) -> list[TagRecord]:
838 """Return all tags attached to *commit_id*."""
839 tags_dir = _tags_dir(repo_root, repo_id)
840 if not tags_dir.exists():
841 return []
842 results: list[TagRecord] = []
843 for path in tags_dir.glob("*.json"):
844 try:
845 record = TagRecord.from_dict(json.loads(path.read_text()))
846 if record.commit_id == commit_id:
847 results.append(record)
848 except (json.JSONDecodeError, KeyError, TypeError):
849 continue
850 return results
851
852
853 def get_all_tags(repo_root: pathlib.Path, repo_id: str) -> list[TagRecord]:
854 """Return all tags in this repository."""
855 tags_dir = _tags_dir(repo_root, repo_id)
856 if not tags_dir.exists():
857 return []
858 results: list[TagRecord] = []
859 for path in tags_dir.glob("*.json"):
860 try:
861 results.append(TagRecord.from_dict(json.loads(path.read_text())))
862 except (json.JSONDecodeError, KeyError, TypeError):
863 continue
864 return results
865
866
867 # ---------------------------------------------------------------------------
868 # Remote sync helpers (push/pull)
869 # ---------------------------------------------------------------------------
870
871
872 def store_pulled_commit(
873 repo_root: pathlib.Path, commit_data: RemoteCommitPayload
874 ) -> bool:
875 """Persist a commit received from a remote into local storage.
876
877 Idempotent — silently skips if the commit already exists. Returns
878 ``True`` if the row was newly written, ``False`` if it already existed.
879
880 All ID fields from the remote payload are validated before any filesystem
881 operation to prevent path-traversal attacks via crafted remote responses.
882 """
883 commit_id = commit_data.get("commit_id") or ""
884 if not commit_id:
885 logger.warning("⚠️ store_pulled_commit: missing commit_id — skipping")
886 return False
887
888 try:
889 validate_ref_id(commit_id)
890 except ValueError as exc:
891 logger.warning("⚠️ store_pulled_commit: invalid commit_id %r — %s", commit_id, exc)
892 return False
893
894 snapshot_id = commit_data.get("snapshot_id") or ""
895 if snapshot_id:
896 try:
897 validate_ref_id(snapshot_id)
898 except ValueError as exc:
899 logger.warning(
900 "⚠️ store_pulled_commit: invalid snapshot_id %r — %s", snapshot_id, exc
901 )
902 return False
903
904 branch = commit_data.get("branch") or ""
905 if branch:
906 try:
907 validate_branch_name(branch)
908 except ValueError as exc:
909 logger.warning(
910 "⚠️ store_pulled_commit: invalid branch %r — %s", branch, exc
911 )
912 return False
913
914 if read_commit(repo_root, commit_id) is not None:
915 logger.debug("⚠️ Pulled commit %s already exists — skipped", commit_id[:8])
916 return False
917
918 commit_dict = CommitDict(
919 commit_id=commit_id,
920 repo_id=commit_data.get("repo_id") or "",
921 branch=commit_data.get("branch") or "",
922 snapshot_id=commit_data.get("snapshot_id") or "",
923 message=commit_data.get("message") or "",
924 committed_at=commit_data.get("committed_at") or "",
925 parent_commit_id=commit_data.get("parent_commit_id"),
926 parent2_commit_id=commit_data.get("parent2_commit_id"),
927 author=commit_data.get("author") or "",
928 metadata=dict(commit_data.get("metadata") or {}),
929 structured_delta=None,
930 )
931 write_commit(repo_root, CommitRecord.from_dict(commit_dict))
932
933 # Ensure a (possibly stub) snapshot record exists.
934 snapshot_id = commit_data.get("snapshot_id") or ""
935 if snapshot_id and read_snapshot(repo_root, snapshot_id) is None:
936 manifest: dict[str, str] = dict(commit_data.get("manifest") or {})
937 write_snapshot(repo_root, SnapshotRecord(
938 snapshot_id=snapshot_id,
939 manifest=manifest,
940 ))
941
942 return True
943
944
945 def store_pulled_object_metadata(
946 repo_root: pathlib.Path, object_data: dict[str, str]
947 ) -> bool:
948 """Register an object descriptor received from a remote.
949
950 The actual blob bytes are stored by ``object_store.write_object``.
951 This function records that the object is known (for GC and push-delta
952 computation). Currently a no-op since objects are content-addressed
953 files — presence in ``.muse/objects/`` is the ground truth.
954 """
955 return True