gabriel / muse public
verify_pack.py python
342 lines 10.9 KB
00373ad0 feat: migrate CLI from typer to argparse (POSIX-compliant, order-independent) Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """muse plumbing verify-pack — verify the integrity of a PackBundle.
2
3 Reads a PackBundle JSON from stdin (or a file) and performs three levels of
4 integrity checking:
5
6 1. **Object integrity** — every ``objects`` entry is base64-decoded and its
7 SHA-256 is recomputed. The digest must match the declared ``object_id``.
8
9 2. **Snapshot consistency** — every snapshot in the bundle references only
10 object IDs that are either in the bundle itself or already present in the
11 local store. Orphaned manifest entries are reported as failures.
12
13 3. **Commit consistency** — every commit in the bundle references a
14 ``snapshot_id`` that is either in the bundle or already in the local store.
15
16 Pipe from ``pack-objects`` to validate before sending to a remote::
17
18 muse plumbing pack-objects main | muse plumbing verify-pack
19
20 Or verify a saved bundle file::
21
22 muse plumbing verify-pack --file bundle.json
23
24 Output (JSON, default)::
25
26 {
27 "objects_checked": 42,
28 "snapshots_checked": 5,
29 "commits_checked": 5,
30 "all_ok": true,
31 "failures": []
32 }
33
34 With failures::
35
36 {
37 "objects_checked": 42,
38 "snapshots_checked": 5,
39 "commits_checked": 5,
40 "all_ok": false,
41 "failures": [
42 {"kind": "object", "id": "<sha256>", "error": "hash mismatch"},
43 {"kind": "snapshot", "id": "<sha256>", "error": "missing object: <sha256>"}
44 ]
45 }
46
47 Plumbing contract
48 -----------------
49
50 - Exit 0: bundle is fully intact.
51 - Exit 1: one or more integrity failures; malformed JSON input; missing args.
52 - Exit 3: I/O error reading stdin or the bundle file.
53 """
54
55 from __future__ import annotations
56
57 import argparse
58 import base64
59 import hashlib
60 import json
61 import logging
62 import pathlib
63 import sys
64 from typing import TypedDict
65
66 from muse.core.errors import ExitCode
67 from muse.core.object_store import has_object
68 from muse.core.repo import require_repo
69 from muse.core.store import read_snapshot
70
71 logger = logging.getLogger(__name__)
72
73 _FORMAT_CHOICES = ("json", "text")
74 _CHUNK = 65536 # 64 KiB for streaming hash
75
76
77 class _Failure(TypedDict):
78 kind: str
79 id: str
80 error: str
81
82
83 class _VerifyPackResult(TypedDict):
84 objects_checked: int
85 snapshots_checked: int
86 commits_checked: int
87 all_ok: bool
88 failures: list[_Failure]
89
90
91 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
92 """Register the verify-pack subcommand."""
93 parser = subparsers.add_parser(
94 "verify-pack",
95 help="Verify the integrity of a PackBundle JSON.",
96 description=__doc__,
97 )
98 parser.add_argument(
99 "--file", "-i",
100 default="",
101 dest="bundle_file",
102 metavar="PATH",
103 help="Path to a PackBundle JSON file. Reads from stdin when omitted.",
104 )
105 parser.add_argument(
106 "--quiet", "-q",
107 action="store_true",
108 help="No output. Exit 0 if all checks pass, exit 1 otherwise.",
109 )
110 parser.add_argument(
111 "--no-local", "-L",
112 action="store_true",
113 dest="skip_local_check",
114 help="Skip checking the local store for missing snapshot/commit refs.",
115 )
116 parser.add_argument(
117 "--format", "-f",
118 dest="fmt",
119 default="json",
120 metavar="FORMAT",
121 help="Output format: json or text. (default: json)",
122 )
123 parser.set_defaults(func=run)
124
125
126 def run(args: argparse.Namespace) -> None:
127 """Verify the integrity of a PackBundle.
128
129 Reads a PackBundle JSON from stdin or ``--file`` and checks:
130
131 - Every object's payload decodes and hashes to its declared ID.
132 - Every snapshot's manifest references objects present in the bundle or
133 the local store.
134 - Every commit's snapshot ID is present in the bundle or the local store.
135 """
136 fmt: str = args.fmt
137 bundle_file: str = args.bundle_file
138 quiet: bool = args.quiet
139 skip_local_check: bool = args.skip_local_check
140
141 if fmt not in _FORMAT_CHOICES:
142 print(
143 json.dumps(
144 {"error": f"Unknown format {fmt!r}. Valid: {', '.join(_FORMAT_CHOICES)}"}
145 )
146 )
147 raise SystemExit(ExitCode.USER_ERROR)
148
149 # Read input.
150 if bundle_file:
151 try:
152 with open(bundle_file, encoding="utf-8") as fh:
153 raw = fh.read()
154 except OSError as exc:
155 print(json.dumps({"error": f"Cannot read file: {exc}"}))
156 raise SystemExit(ExitCode.INTERNAL_ERROR)
157 else:
158 try:
159 raw = sys.stdin.read()
160 except OSError as exc:
161 print(json.dumps({"error": f"Cannot read stdin: {exc}"}))
162 raise SystemExit(ExitCode.INTERNAL_ERROR)
163
164 try:
165 bundle = json.loads(raw)
166 except json.JSONDecodeError as exc:
167 print(json.dumps({"error": f"Invalid JSON: {exc}"}))
168 raise SystemExit(ExitCode.USER_ERROR)
169
170 if not isinstance(bundle, dict):
171 print(json.dumps({"error": "PackBundle must be a JSON object."}))
172 raise SystemExit(ExitCode.USER_ERROR)
173
174 # We need the repo root for local-store checks (optional).
175 root: pathlib.Path | None = require_repo() if not skip_local_check else None
176
177 failures: list[_Failure] = []
178
179 # -----------------------------------------------------------------------
180 # 1. Object integrity — re-hash each base64 payload.
181 # -----------------------------------------------------------------------
182 bundle_object_ids: set[str] = set()
183 objects_raw = bundle.get("objects", [])
184 if not isinstance(objects_raw, list):
185 print(json.dumps({"error": "'objects' field must be a list."}))
186 raise SystemExit(ExitCode.USER_ERROR)
187
188 for entry in objects_raw:
189 if not isinstance(entry, dict):
190 failures.append(
191 _Failure(kind="object", id="(unknown)", error="entry is not a dict")
192 )
193 continue
194 oid = entry.get("object_id", "")
195 b64 = entry.get("content_b64", "")
196 if not isinstance(oid, str) or not isinstance(b64, str):
197 failures.append(
198 _Failure(
199 kind="object",
200 id=str(oid),
201 error="missing or invalid object_id / content_b64 fields",
202 )
203 )
204 continue
205
206 try:
207 raw_bytes = base64.b64decode(b64)
208 except Exception as exc:
209 failures.append(
210 _Failure(kind="object", id=oid, error=f"base64 decode failed: {exc}")
211 )
212 continue
213
214 actual = hashlib.sha256(raw_bytes).hexdigest()
215 if actual != oid:
216 failures.append(
217 _Failure(
218 kind="object",
219 id=oid,
220 error=f"hash mismatch: declared {oid[:12]}… recomputed {actual[:12]}…",
221 )
222 )
223 else:
224 bundle_object_ids.add(oid)
225
226 objects_checked = len(objects_raw)
227
228 # -----------------------------------------------------------------------
229 # 2. Snapshot consistency — manifest entries must be present.
230 # -----------------------------------------------------------------------
231 bundle_snapshot_ids: set[str] = set()
232 snapshots_raw = bundle.get("snapshots", [])
233 if not isinstance(snapshots_raw, list):
234 print(json.dumps({"error": "'snapshots' field must be a list."}))
235 raise SystemExit(ExitCode.USER_ERROR)
236
237 for snap_entry in snapshots_raw:
238 if not isinstance(snap_entry, dict):
239 failures.append(
240 _Failure(
241 kind="snapshot", id="(unknown)", error="snapshot entry is not a dict"
242 )
243 )
244 continue
245 snap_id = snap_entry.get("snapshot_id", "")
246 if not isinstance(snap_id, str):
247 failures.append(
248 _Failure(kind="snapshot", id="(unknown)", error="missing snapshot_id")
249 )
250 continue
251
252 bundle_snapshot_ids.add(snap_id)
253 manifest = snap_entry.get("manifest", {})
254 if not isinstance(manifest, dict):
255 continue
256
257 for path, obj_id in manifest.items():
258 if not isinstance(obj_id, str):
259 continue
260 if obj_id in bundle_object_ids:
261 continue
262 # Check local store if allowed.
263 if root is not None and has_object(root, obj_id):
264 continue
265 failures.append(
266 _Failure(
267 kind="snapshot",
268 id=snap_id,
269 error=f"manifest path {path!r} references missing object {obj_id[:12]}…",
270 )
271 )
272
273 snapshots_checked = len(snapshots_raw)
274
275 # -----------------------------------------------------------------------
276 # 3. Commit consistency — snapshot_id must be resolvable.
277 # -----------------------------------------------------------------------
278 commits_raw = bundle.get("commits", [])
279 if not isinstance(commits_raw, list):
280 print(json.dumps({"error": "'commits' field must be a list."}))
281 raise SystemExit(ExitCode.USER_ERROR)
282
283 for commit_entry in commits_raw:
284 if not isinstance(commit_entry, dict):
285 failures.append(
286 _Failure(
287 kind="commit", id="(unknown)", error="commit entry is not a dict"
288 )
289 )
290 continue
291 commit_id = commit_entry.get("commit_id", "")
292 snap_id = commit_entry.get("snapshot_id", "")
293 if not isinstance(commit_id, str) or not isinstance(snap_id, str):
294 failures.append(
295 _Failure(
296 kind="commit",
297 id=str(commit_id),
298 error="missing commit_id or snapshot_id",
299 )
300 )
301 continue
302
303 if snap_id in bundle_snapshot_ids:
304 continue
305 if root is not None and read_snapshot(root, snap_id) is not None:
306 continue
307 if not skip_local_check:
308 failures.append(
309 _Failure(
310 kind="commit",
311 id=commit_id,
312 error=f"references snapshot {snap_id[:12]}… not in bundle or local store",
313 )
314 )
315
316 commits_checked = len(commits_raw)
317 all_ok = len(failures) == 0
318
319 if quiet:
320 raise SystemExit(0 if all_ok else ExitCode.USER_ERROR)
321
322 if fmt == "text":
323 print(
324 f"objects={objects_checked} snapshots={snapshots_checked} "
325 f"commits={commits_checked} all_ok={all_ok}"
326 )
327 for f in failures:
328 print(f" FAIL [{f['kind']}] {f['id'][:16]}… {f['error']}")
329 if not all_ok:
330 raise SystemExit(ExitCode.USER_ERROR)
331 return
332
333 result: _VerifyPackResult = {
334 "objects_checked": objects_checked,
335 "snapshots_checked": snapshots_checked,
336 "commits_checked": commits_checked,
337 "all_ok": all_ok,
338 "failures": failures,
339 }
340 print(json.dumps(result))
341 if not all_ok:
342 raise SystemExit(ExitCode.USER_ERROR)