cgcardona / muse public
pack.py python
278 lines 9.3 KB
189a2e45 feat: three-tier CLI architecture — plumbing, core porcelain, semantic … Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Muse pack format — bundle of commits, snapshots, and blobs for wire transfer.
2
3 A :class:`PackBundle` is the unit of exchange between the Muse CLI and a remote
4 (e.g. MuseHub). It carries everything needed to reconstruct a slice of commit
5 history locally:
6
7 - :class:`CommitDict` records (full metadata)
8 - :class:`SnapshotDict` records (file manifests)
9 - :class:`ObjectPayload` entries (raw blob bytes, base64-encoded for JSON)
10 - ``branch_heads`` mapping (branch name → commit ID, reflecting remote state)
11
12 :func:`build_pack` collects all data reachable from a set of commit IDs.
13 :func:`apply_pack` writes a bundle into a local ``.muse/`` directory.
14
15 JSON + base64 encoding trades some efficiency for universal debuggability and
16 zero external dependencies. A binary-encoded transport can be plugged in later
17 by swapping the ``HttpTransport`` implementation behind the ``MuseTransport``
18 Protocol in :mod:`muse.core.transport`.
19 """
20
21 from __future__ import annotations
22
23 import base64
24 import logging
25 import pathlib
26 from typing import TypedDict
27
28 from muse.core.object_store import read_object, write_object
29 from muse.core.store import (
30 CommitDict,
31 CommitRecord,
32 SnapshotDict,
33 SnapshotRecord,
34 read_commit,
35 read_snapshot,
36 write_commit,
37 write_snapshot,
38 )
39
40 logger = logging.getLogger(__name__)
41
42
43 # ---------------------------------------------------------------------------
44 # Wire-format TypedDicts
45 # ---------------------------------------------------------------------------
46
47
48 class ObjectPayload(TypedDict):
49 """A single content-addressed blob, base64-encoded for JSON transport."""
50
51 object_id: str
52 content_b64: str
53
54
55 class PackBundle(TypedDict, total=False):
56 """The unit of exchange between the Muse CLI and a remote.
57
58 All fields are optional so that partial bundles (fetch-only, objects-only)
59 are valid wire messages. Callers check for presence before consuming.
60 """
61
62 commits: list[CommitDict]
63 snapshots: list[SnapshotDict]
64 objects: list[ObjectPayload]
65 #: Remote branch heads at the time the bundle was produced.
66 branch_heads: dict[str, str]
67
68
69 class RemoteInfo(TypedDict):
70 """Repository metadata returned by ``GET {url}/refs``."""
71
72 repo_id: str
73 domain: str
74 #: Maps branch name → commit ID for every branch on the remote.
75 branch_heads: dict[str, str]
76 default_branch: str
77
78
79 class PushResult(TypedDict):
80 """Server response after a push attempt."""
81
82 ok: bool
83 message: str
84 #: Updated branch heads on the remote after the push (if successful).
85 branch_heads: dict[str, str]
86
87
88 class FetchRequest(TypedDict, total=False):
89 """Body of ``POST {url}/fetch`` — negotiates which commits to transfer.
90
91 ``want`` lists commit IDs the client wants to receive.
92 ``have`` lists commit IDs already present locally, allowing the server
93 to send only the commits the client lacks (delta negotiation).
94 """
95
96 want: list[str]
97 have: list[str]
98
99
100 class ApplyResult(TypedDict):
101 """Counts returned by :func:`apply_pack` describing what was written.
102
103 ``objects_skipped`` counts blobs already present in the store (not
104 rewritten, idempotent). All other counts reflect *new* writes only.
105 """
106
107 commits_written: int
108 snapshots_written: int
109 objects_written: int
110 objects_skipped: int
111
112
113 # ---------------------------------------------------------------------------
114 # Pack building
115 # ---------------------------------------------------------------------------
116
117
118 def build_pack(
119 repo_root: pathlib.Path,
120 commit_ids: list[str],
121 *,
122 have: list[str] | None = None,
123 ) -> PackBundle:
124 """Assemble a :class:`PackBundle` from *commit_ids*, excluding commits in *have*.
125
126 Performs a BFS walk of the commit graph from every ID in *commit_ids*,
127 stopping at any commit already in *have*. Collects all snapshot manifests
128 and object blobs reachable from the selected commits. Object bytes are
129 base64-encoded for JSON transport.
130
131 Missing objects or snapshots are logged and skipped — the caller decides
132 whether that constitutes an error.
133
134 Args:
135 repo_root: Root of the Muse repository.
136 commit_ids: Tip commit IDs to include (e.g. current branch HEAD).
137 have: Commit IDs already known to the receiver. The BFS stops
138 at these, reducing bundle size. Pass ``None`` or ``[]``
139 to send the full history.
140
141 Returns:
142 A :class:`PackBundle` ready for serialisation and transfer.
143 """
144 have_set: set[str] = set(have or [])
145
146 # BFS walk from every tip, treating have_set as already-visited.
147 commits_to_send: list[CommitRecord] = []
148 seen: set[str] = set(have_set)
149 queue: list[str] = [cid for cid in commit_ids if cid not in seen]
150
151 while queue:
152 cid = queue.pop(0)
153 if cid in seen:
154 continue
155 seen.add(cid)
156 commit = read_commit(repo_root, cid)
157 if commit is None:
158 logger.warning("⚠️ build_pack: commit %s not found — skipping", cid[:8])
159 continue
160 commits_to_send.append(commit)
161 if commit.parent_commit_id and commit.parent_commit_id not in seen:
162 queue.append(commit.parent_commit_id)
163 if commit.parent2_commit_id and commit.parent2_commit_id not in seen:
164 queue.append(commit.parent2_commit_id)
165
166 # Unique snapshot IDs referenced by selected commits.
167 snapshot_ids: set[str] = {c.snapshot_id for c in commits_to_send}
168
169 snapshot_dicts: list[SnapshotDict] = []
170 all_object_ids: set[str] = set()
171 for sid in sorted(snapshot_ids):
172 snap = read_snapshot(repo_root, sid)
173 if snap is None:
174 logger.warning("⚠️ build_pack: snapshot %s not found — skipping", sid[:8])
175 continue
176 snapshot_dicts.append(snap.to_dict())
177 all_object_ids.update(snap.manifest.values())
178
179 object_payloads: list[ObjectPayload] = []
180 for oid in sorted(all_object_ids):
181 raw = read_object(repo_root, oid)
182 if raw is None:
183 logger.warning("⚠️ build_pack: blob %s absent from store — skipping", oid[:8])
184 continue
185 object_payloads.append(
186 ObjectPayload(
187 object_id=oid,
188 content_b64=base64.b64encode(raw).decode("ascii"),
189 )
190 )
191
192 bundle: PackBundle = {
193 "commits": [c.to_dict() for c in commits_to_send],
194 "snapshots": snapshot_dicts,
195 "objects": object_payloads,
196 }
197 logger.info(
198 "✅ Built pack: %d commits, %d snapshots, %d objects",
199 len(commits_to_send),
200 len(snapshot_dicts),
201 len(object_payloads),
202 )
203 return bundle
204
205
206 # ---------------------------------------------------------------------------
207 # Pack applying
208 # ---------------------------------------------------------------------------
209
210
211 def apply_pack(repo_root: pathlib.Path, bundle: PackBundle) -> ApplyResult:
212 """Write the contents of *bundle* into a local ``.muse/`` directory.
213
214 Writes in dependency order: objects first (blobs), then snapshots (which
215 reference object IDs), then commits (which reference snapshot IDs). All
216 writes are idempotent — already-present items are silently skipped.
217
218 Args:
219 repo_root: Root of the Muse repository to write into.
220 bundle: :class:`PackBundle` received from the remote.
221
222 Returns:
223 :class:`ApplyResult` with counts of newly written and skipped items.
224 """
225 objects_written = 0
226 objects_skipped = 0
227 snapshots_written = 0
228 commits_written = 0
229
230 for obj in bundle.get("objects") or []:
231 oid = obj.get("object_id", "")
232 b64 = obj.get("content_b64", "")
233 if not oid or not b64:
234 logger.warning("⚠️ apply_pack: blob entry missing fields — skipped")
235 continue
236 try:
237 raw = base64.b64decode(b64)
238 except Exception as exc:
239 logger.warning("⚠️ apply_pack: bad base64 for %s: %s", oid[:8], exc)
240 continue
241 if write_object(repo_root, oid, raw):
242 objects_written += 1
243 else:
244 objects_skipped += 1
245
246 for snap_dict in bundle.get("snapshots") or []:
247 try:
248 snap = SnapshotRecord.from_dict(snap_dict)
249 is_new = read_snapshot(repo_root, snap.snapshot_id) is None
250 write_snapshot(repo_root, snap)
251 if is_new:
252 snapshots_written += 1
253 except (KeyError, ValueError) as exc:
254 logger.warning("⚠️ apply_pack: malformed snapshot — skipped: %s", exc)
255
256 for commit_dict in bundle.get("commits") or []:
257 try:
258 commit = CommitRecord.from_dict(commit_dict)
259 is_new = read_commit(repo_root, commit.commit_id) is None
260 write_commit(repo_root, commit)
261 if is_new:
262 commits_written += 1
263 except (KeyError, ValueError) as exc:
264 logger.warning("⚠️ apply_pack: malformed commit — skipped: %s", exc)
265
266 logger.info(
267 "✅ Applied pack: %d new blobs, %d new snapshots, %d new commits (%d blobs skipped)",
268 objects_written,
269 snapshots_written,
270 commits_written,
271 objects_skipped,
272 )
273 return ApplyResult(
274 commits_written=commits_written,
275 snapshots_written=snapshots_written,
276 objects_written=objects_written,
277 objects_skipped=objects_skipped,
278 )