verify_pack.py
python
| 1 | """muse plumbing verify-pack — verify the integrity of a PackBundle. |
| 2 | |
| 3 | Reads a PackBundle JSON from stdin (or a file) and performs three levels of |
| 4 | integrity checking: |
| 5 | |
| 6 | 1. **Object integrity** — every ``objects`` entry is base64-decoded and its |
| 7 | SHA-256 is recomputed. The digest must match the declared ``object_id``. |
| 8 | |
| 9 | 2. **Snapshot consistency** — every snapshot in the bundle references only |
| 10 | object IDs that are either in the bundle itself or already present in the |
| 11 | local store. Orphaned manifest entries are reported as failures. |
| 12 | |
| 13 | 3. **Commit consistency** — every commit in the bundle references a |
| 14 | ``snapshot_id`` that is either in the bundle or already in the local store. |
| 15 | |
| 16 | Pipe from ``pack-objects`` to validate before sending to a remote:: |
| 17 | |
| 18 | muse plumbing pack-objects main | muse plumbing verify-pack |
| 19 | |
| 20 | Or verify a saved bundle file:: |
| 21 | |
| 22 | muse plumbing verify-pack --file bundle.json |
| 23 | |
| 24 | Output (JSON, default):: |
| 25 | |
| 26 | { |
| 27 | "objects_checked": 42, |
| 28 | "snapshots_checked": 5, |
| 29 | "commits_checked": 5, |
| 30 | "all_ok": true, |
| 31 | "failures": [] |
| 32 | } |
| 33 | |
| 34 | With failures:: |
| 35 | |
| 36 | { |
| 37 | "objects_checked": 42, |
| 38 | "snapshots_checked": 5, |
| 39 | "commits_checked": 5, |
| 40 | "all_ok": false, |
| 41 | "failures": [ |
| 42 | {"kind": "object", "id": "<sha256>", "error": "hash mismatch"}, |
| 43 | {"kind": "snapshot", "id": "<sha256>", "error": "missing object: <sha256>"} |
| 44 | ] |
| 45 | } |
| 46 | |
| 47 | Plumbing contract |
| 48 | ----------------- |
| 49 | |
| 50 | - Exit 0: bundle is fully intact. |
| 51 | - Exit 1: one or more integrity failures; malformed JSON input; missing args. |
| 52 | - Exit 3: I/O error reading stdin or the bundle file. |
| 53 | """ |
| 54 | |
| 55 | from __future__ import annotations |
| 56 | |
| 57 | import argparse |
| 58 | import base64 |
| 59 | import hashlib |
| 60 | import json |
| 61 | import logging |
| 62 | import pathlib |
| 63 | import sys |
| 64 | from typing import TypedDict |
| 65 | |
| 66 | from muse.core.errors import ExitCode |
| 67 | from muse.core.object_store import has_object |
| 68 | from muse.core.repo import require_repo |
| 69 | from muse.core.store import read_snapshot |
| 70 | |
| 71 | logger = logging.getLogger(__name__) |
| 72 | |
| 73 | _FORMAT_CHOICES = ("json", "text") |
| 74 | _CHUNK = 65536 # 64 KiB for streaming hash |
| 75 | |
| 76 | |
| 77 | class _Failure(TypedDict): |
| 78 | kind: str |
| 79 | id: str |
| 80 | error: str |
| 81 | |
| 82 | |
| 83 | class _VerifyPackResult(TypedDict): |
| 84 | objects_checked: int |
| 85 | snapshots_checked: int |
| 86 | commits_checked: int |
| 87 | all_ok: bool |
| 88 | failures: list[_Failure] |
| 89 | |
| 90 | |
| 91 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 92 | """Register the verify-pack subcommand.""" |
| 93 | parser = subparsers.add_parser( |
| 94 | "verify-pack", |
| 95 | help="Verify the integrity of a PackBundle JSON.", |
| 96 | description=__doc__, |
| 97 | ) |
| 98 | parser.add_argument( |
| 99 | "--file", "-i", |
| 100 | default="", |
| 101 | dest="bundle_file", |
| 102 | metavar="PATH", |
| 103 | help="Path to a PackBundle JSON file. Reads from stdin when omitted.", |
| 104 | ) |
| 105 | parser.add_argument( |
| 106 | "--quiet", "-q", |
| 107 | action="store_true", |
| 108 | help="No output. Exit 0 if all checks pass, exit 1 otherwise.", |
| 109 | ) |
| 110 | parser.add_argument( |
| 111 | "--no-local", "-L", |
| 112 | action="store_true", |
| 113 | dest="skip_local_check", |
| 114 | help="Skip checking the local store for missing snapshot/commit refs.", |
| 115 | ) |
| 116 | parser.add_argument( |
| 117 | "--format", "-f", |
| 118 | dest="fmt", |
| 119 | default="json", |
| 120 | metavar="FORMAT", |
| 121 | help="Output format: json or text. (default: json)", |
| 122 | ) |
| 123 | parser.set_defaults(func=run) |
| 124 | |
| 125 | |
| 126 | def run(args: argparse.Namespace) -> None: |
| 127 | """Verify the integrity of a PackBundle. |
| 128 | |
| 129 | Reads a PackBundle JSON from stdin or ``--file`` and checks: |
| 130 | |
| 131 | - Every object's payload decodes and hashes to its declared ID. |
| 132 | - Every snapshot's manifest references objects present in the bundle or |
| 133 | the local store. |
| 134 | - Every commit's snapshot ID is present in the bundle or the local store. |
| 135 | """ |
| 136 | fmt: str = args.fmt |
| 137 | bundle_file: str = args.bundle_file |
| 138 | quiet: bool = args.quiet |
| 139 | skip_local_check: bool = args.skip_local_check |
| 140 | |
| 141 | if fmt not in _FORMAT_CHOICES: |
| 142 | print( |
| 143 | json.dumps( |
| 144 | {"error": f"Unknown format {fmt!r}. Valid: {', '.join(_FORMAT_CHOICES)}"} |
| 145 | ) |
| 146 | ) |
| 147 | raise SystemExit(ExitCode.USER_ERROR) |
| 148 | |
| 149 | # Read input. |
| 150 | if bundle_file: |
| 151 | try: |
| 152 | with open(bundle_file, encoding="utf-8") as fh: |
| 153 | raw = fh.read() |
| 154 | except OSError as exc: |
| 155 | print(json.dumps({"error": f"Cannot read file: {exc}"})) |
| 156 | raise SystemExit(ExitCode.INTERNAL_ERROR) |
| 157 | else: |
| 158 | try: |
| 159 | raw = sys.stdin.read() |
| 160 | except OSError as exc: |
| 161 | print(json.dumps({"error": f"Cannot read stdin: {exc}"})) |
| 162 | raise SystemExit(ExitCode.INTERNAL_ERROR) |
| 163 | |
| 164 | try: |
| 165 | bundle = json.loads(raw) |
| 166 | except json.JSONDecodeError as exc: |
| 167 | print(json.dumps({"error": f"Invalid JSON: {exc}"})) |
| 168 | raise SystemExit(ExitCode.USER_ERROR) |
| 169 | |
| 170 | if not isinstance(bundle, dict): |
| 171 | print(json.dumps({"error": "PackBundle must be a JSON object."})) |
| 172 | raise SystemExit(ExitCode.USER_ERROR) |
| 173 | |
| 174 | # We need the repo root for local-store checks (optional). |
| 175 | root: pathlib.Path | None = require_repo() if not skip_local_check else None |
| 176 | |
| 177 | failures: list[_Failure] = [] |
| 178 | |
| 179 | # ----------------------------------------------------------------------- |
| 180 | # 1. Object integrity — re-hash each base64 payload. |
| 181 | # ----------------------------------------------------------------------- |
| 182 | bundle_object_ids: set[str] = set() |
| 183 | objects_raw = bundle.get("objects", []) |
| 184 | if not isinstance(objects_raw, list): |
| 185 | print(json.dumps({"error": "'objects' field must be a list."})) |
| 186 | raise SystemExit(ExitCode.USER_ERROR) |
| 187 | |
| 188 | for entry in objects_raw: |
| 189 | if not isinstance(entry, dict): |
| 190 | failures.append( |
| 191 | _Failure(kind="object", id="(unknown)", error="entry is not a dict") |
| 192 | ) |
| 193 | continue |
| 194 | oid = entry.get("object_id", "") |
| 195 | b64 = entry.get("content_b64", "") |
| 196 | if not isinstance(oid, str) or not isinstance(b64, str): |
| 197 | failures.append( |
| 198 | _Failure( |
| 199 | kind="object", |
| 200 | id=str(oid), |
| 201 | error="missing or invalid object_id / content_b64 fields", |
| 202 | ) |
| 203 | ) |
| 204 | continue |
| 205 | |
| 206 | try: |
| 207 | raw_bytes = base64.b64decode(b64) |
| 208 | except Exception as exc: |
| 209 | failures.append( |
| 210 | _Failure(kind="object", id=oid, error=f"base64 decode failed: {exc}") |
| 211 | ) |
| 212 | continue |
| 213 | |
| 214 | actual = hashlib.sha256(raw_bytes).hexdigest() |
| 215 | if actual != oid: |
| 216 | failures.append( |
| 217 | _Failure( |
| 218 | kind="object", |
| 219 | id=oid, |
| 220 | error=f"hash mismatch: declared {oid[:12]}… recomputed {actual[:12]}…", |
| 221 | ) |
| 222 | ) |
| 223 | else: |
| 224 | bundle_object_ids.add(oid) |
| 225 | |
| 226 | objects_checked = len(objects_raw) |
| 227 | |
| 228 | # ----------------------------------------------------------------------- |
| 229 | # 2. Snapshot consistency — manifest entries must be present. |
| 230 | # ----------------------------------------------------------------------- |
| 231 | bundle_snapshot_ids: set[str] = set() |
| 232 | snapshots_raw = bundle.get("snapshots", []) |
| 233 | if not isinstance(snapshots_raw, list): |
| 234 | print(json.dumps({"error": "'snapshots' field must be a list."})) |
| 235 | raise SystemExit(ExitCode.USER_ERROR) |
| 236 | |
| 237 | for snap_entry in snapshots_raw: |
| 238 | if not isinstance(snap_entry, dict): |
| 239 | failures.append( |
| 240 | _Failure( |
| 241 | kind="snapshot", id="(unknown)", error="snapshot entry is not a dict" |
| 242 | ) |
| 243 | ) |
| 244 | continue |
| 245 | snap_id = snap_entry.get("snapshot_id", "") |
| 246 | if not isinstance(snap_id, str): |
| 247 | failures.append( |
| 248 | _Failure(kind="snapshot", id="(unknown)", error="missing snapshot_id") |
| 249 | ) |
| 250 | continue |
| 251 | |
| 252 | bundle_snapshot_ids.add(snap_id) |
| 253 | manifest = snap_entry.get("manifest", {}) |
| 254 | if not isinstance(manifest, dict): |
| 255 | continue |
| 256 | |
| 257 | for path, obj_id in manifest.items(): |
| 258 | if not isinstance(obj_id, str): |
| 259 | continue |
| 260 | if obj_id in bundle_object_ids: |
| 261 | continue |
| 262 | # Check local store if allowed. |
| 263 | if root is not None and has_object(root, obj_id): |
| 264 | continue |
| 265 | failures.append( |
| 266 | _Failure( |
| 267 | kind="snapshot", |
| 268 | id=snap_id, |
| 269 | error=f"manifest path {path!r} references missing object {obj_id[:12]}…", |
| 270 | ) |
| 271 | ) |
| 272 | |
| 273 | snapshots_checked = len(snapshots_raw) |
| 274 | |
| 275 | # ----------------------------------------------------------------------- |
| 276 | # 3. Commit consistency — snapshot_id must be resolvable. |
| 277 | # ----------------------------------------------------------------------- |
| 278 | commits_raw = bundle.get("commits", []) |
| 279 | if not isinstance(commits_raw, list): |
| 280 | print(json.dumps({"error": "'commits' field must be a list."})) |
| 281 | raise SystemExit(ExitCode.USER_ERROR) |
| 282 | |
| 283 | for commit_entry in commits_raw: |
| 284 | if not isinstance(commit_entry, dict): |
| 285 | failures.append( |
| 286 | _Failure( |
| 287 | kind="commit", id="(unknown)", error="commit entry is not a dict" |
| 288 | ) |
| 289 | ) |
| 290 | continue |
| 291 | commit_id = commit_entry.get("commit_id", "") |
| 292 | snap_id = commit_entry.get("snapshot_id", "") |
| 293 | if not isinstance(commit_id, str) or not isinstance(snap_id, str): |
| 294 | failures.append( |
| 295 | _Failure( |
| 296 | kind="commit", |
| 297 | id=str(commit_id), |
| 298 | error="missing commit_id or snapshot_id", |
| 299 | ) |
| 300 | ) |
| 301 | continue |
| 302 | |
| 303 | if snap_id in bundle_snapshot_ids: |
| 304 | continue |
| 305 | if root is not None and read_snapshot(root, snap_id) is not None: |
| 306 | continue |
| 307 | if not skip_local_check: |
| 308 | failures.append( |
| 309 | _Failure( |
| 310 | kind="commit", |
| 311 | id=commit_id, |
| 312 | error=f"references snapshot {snap_id[:12]}… not in bundle or local store", |
| 313 | ) |
| 314 | ) |
| 315 | |
| 316 | commits_checked = len(commits_raw) |
| 317 | all_ok = len(failures) == 0 |
| 318 | |
| 319 | if quiet: |
| 320 | raise SystemExit(0 if all_ok else ExitCode.USER_ERROR) |
| 321 | |
| 322 | if fmt == "text": |
| 323 | print( |
| 324 | f"objects={objects_checked} snapshots={snapshots_checked} " |
| 325 | f"commits={commits_checked} all_ok={all_ok}" |
| 326 | ) |
| 327 | for f in failures: |
| 328 | print(f" FAIL [{f['kind']}] {f['id'][:16]}… {f['error']}") |
| 329 | if not all_ok: |
| 330 | raise SystemExit(ExitCode.USER_ERROR) |
| 331 | return |
| 332 | |
| 333 | result: _VerifyPackResult = { |
| 334 | "objects_checked": objects_checked, |
| 335 | "snapshots_checked": snapshots_checked, |
| 336 | "commits_checked": commits_checked, |
| 337 | "all_ok": all_ok, |
| 338 | "failures": failures, |
| 339 | } |
| 340 | print(json.dumps(result)) |
| 341 | if not all_ok: |
| 342 | raise SystemExit(ExitCode.USER_ERROR) |