verify_object.py
python
| 1 | """muse plumbing verify-object — verify the integrity of stored objects. |
| 2 | |
| 3 | Reads one or more objects from the content-addressed store and re-hashes each |
| 4 | one to confirm that its on-disk content still matches its claimed SHA-256 |
| 5 | identity. Reports the result per object and exits non-zero if any object |
| 6 | fails verification. |
| 7 | |
| 8 | This is the integrity primitive used by backup systems, replication agents, |
| 9 | and CI pipelines to detect silent data corruption without a full fsck. |
| 10 | |
| 11 | Output (JSON, default):: |
| 12 | |
| 13 | { |
| 14 | "results": [ |
| 15 | {"object_id": "<sha256>", "ok": true, "size_bytes": 4096}, |
| 16 | {"object_id": "<sha256>", "ok": false, "size_bytes": 512, |
| 17 | "error": "hash mismatch: stored <sha256a> recomputed <sha256b>"}, |
| 18 | {"object_id": "<sha256>", "ok": false, "size_bytes": null, |
| 19 | "error": "object not found in store"} |
| 20 | ], |
| 21 | "all_ok": false, |
| 22 | "checked": 3, |
| 23 | "failed": 2 |
| 24 | } |
| 25 | |
| 26 | Text output (``--format text``):: |
| 27 | |
| 28 | OK <sha256> (4096 bytes) |
| 29 | FAIL <sha256> hash mismatch |
| 30 | FAIL <sha256> object not found in store |
| 31 | |
| 32 | With ``--quiet``: no output; exits 0 if all pass, exits 1 otherwise. |
| 33 | |
| 34 | Plumbing contract |
| 35 | ----------------- |
| 36 | |
| 37 | - Exit 0: all objects verified successfully. |
| 38 | - Exit 1: one or more objects failed verification; object not found; bad args. |
| 39 | - Exit 3: unexpected I/O error (e.g. disk read failure). |
| 40 | """ |
| 41 | |
| 42 | from __future__ import annotations |
| 43 | |
| 44 | import argparse |
| 45 | import hashlib |
| 46 | import json |
| 47 | import logging |
| 48 | import pathlib |
| 49 | import sys |
| 50 | from typing import TypedDict |
| 51 | |
| 52 | from muse.core.errors import ExitCode |
| 53 | from muse.core.object_store import object_path |
| 54 | from muse.core.repo import require_repo |
| 55 | from muse.core.validation import validate_object_id |
| 56 | |
| 57 | logger = logging.getLogger(__name__) |
| 58 | |
| 59 | _FORMAT_CHOICES = ("json", "text") |
| 60 | _CHUNK = 65536 # 64 KiB read chunks — keeps the heap clean for large blobs |
| 61 | |
| 62 | |
| 63 | class _ObjectResult(TypedDict): |
| 64 | object_id: str |
| 65 | ok: bool |
| 66 | size_bytes: int | None |
| 67 | error: str | None |
| 68 | |
| 69 | |
| 70 | def _verify_one(root: pathlib.Path, object_id: str) -> _ObjectResult: |
| 71 | """Integrity-check a single object and return its result record. |
| 72 | |
| 73 | Streams the object in 64 KiB chunks to avoid loading large blobs into |
| 74 | memory. Returns an :class:`_ObjectResult` — never raises. |
| 75 | """ |
| 76 | try: |
| 77 | validate_object_id(object_id) |
| 78 | except ValueError as exc: |
| 79 | return {"object_id": object_id, "ok": False, "size_bytes": None, "error": str(exc)} |
| 80 | |
| 81 | dest = object_path(root, object_id) |
| 82 | if not dest.exists(): |
| 83 | return { |
| 84 | "object_id": object_id, |
| 85 | "ok": False, |
| 86 | "size_bytes": None, |
| 87 | "error": "object not found in store", |
| 88 | } |
| 89 | |
| 90 | try: |
| 91 | size = dest.stat().st_size |
| 92 | h = hashlib.sha256() |
| 93 | with dest.open("rb") as fh: |
| 94 | for chunk in iter(lambda: fh.read(_CHUNK), b""): |
| 95 | h.update(chunk) |
| 96 | actual = h.hexdigest() |
| 97 | except OSError as exc: |
| 98 | return { |
| 99 | "object_id": object_id, |
| 100 | "ok": False, |
| 101 | "size_bytes": None, |
| 102 | "error": f"I/O error: {exc}", |
| 103 | } |
| 104 | |
| 105 | if actual != object_id: |
| 106 | return { |
| 107 | "object_id": object_id, |
| 108 | "ok": False, |
| 109 | "size_bytes": size, |
| 110 | "error": ( |
| 111 | f"hash mismatch: stored {object_id[:12]}… " |
| 112 | f"recomputed {actual[:12]}…" |
| 113 | ), |
| 114 | } |
| 115 | |
| 116 | return {"object_id": object_id, "ok": True, "size_bytes": size, "error": None} |
| 117 | |
| 118 | |
| 119 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 120 | """Register the verify-object subcommand.""" |
| 121 | parser = subparsers.add_parser( |
| 122 | "verify-object", |
| 123 | help="Re-hash stored objects to detect data corruption.", |
| 124 | description=__doc__, |
| 125 | ) |
| 126 | parser.add_argument( |
| 127 | "object_ids", |
| 128 | nargs="+", |
| 129 | help="One or more SHA-256 object IDs to verify.", |
| 130 | ) |
| 131 | parser.add_argument( |
| 132 | "--quiet", "-q", |
| 133 | action="store_true", |
| 134 | help="No output. Exit 0 if all objects are intact, exit 1 otherwise.", |
| 135 | ) |
| 136 | parser.add_argument( |
| 137 | "--format", "-f", |
| 138 | dest="fmt", |
| 139 | default="json", |
| 140 | metavar="FORMAT", |
| 141 | help="Output format: json or text. (default: json)", |
| 142 | ) |
| 143 | parser.set_defaults(func=run) |
| 144 | |
| 145 | |
| 146 | def run(args: argparse.Namespace) -> None: |
| 147 | """Verify the integrity of one or more objects in the store. |
| 148 | |
| 149 | Re-hashes each object's on-disk content and confirms it matches the SHA-256 |
| 150 | identity used as its filename. Any mismatch indicates silent data |
| 151 | corruption and is reported as a failure. |
| 152 | """ |
| 153 | fmt: str = args.fmt |
| 154 | object_ids: list[str] = args.object_ids |
| 155 | quiet: bool = args.quiet |
| 156 | |
| 157 | if fmt not in _FORMAT_CHOICES: |
| 158 | print( |
| 159 | json.dumps( |
| 160 | {"error": f"Unknown format {fmt!r}. Valid: {', '.join(_FORMAT_CHOICES)}"} |
| 161 | ) |
| 162 | ) |
| 163 | raise SystemExit(ExitCode.USER_ERROR) |
| 164 | |
| 165 | if not object_ids: |
| 166 | print(json.dumps({"error": "At least one object ID argument is required."})) |
| 167 | raise SystemExit(ExitCode.USER_ERROR) |
| 168 | |
| 169 | root = require_repo() |
| 170 | |
| 171 | results: list[_ObjectResult] = [_verify_one(root, oid) for oid in object_ids] |
| 172 | all_ok = all(r["ok"] for r in results) |
| 173 | failed_count = sum(1 for r in results if not r["ok"]) |
| 174 | |
| 175 | if quiet: |
| 176 | raise SystemExit(0 if all_ok else ExitCode.USER_ERROR) |
| 177 | |
| 178 | if fmt == "text": |
| 179 | for r in results: |
| 180 | status = "OK " if r["ok"] else "FAIL" |
| 181 | size_str = f" ({r['size_bytes']} bytes)" if r["size_bytes"] is not None else "" |
| 182 | err_str = f" {r['error']}" if not r["ok"] and r["error"] else "" |
| 183 | print(f"{status} {r['object_id']}{size_str}{err_str}") |
| 184 | if not all_ok: |
| 185 | raise SystemExit(ExitCode.USER_ERROR) |
| 186 | return |
| 187 | |
| 188 | print( |
| 189 | json.dumps( |
| 190 | { |
| 191 | "results": [dict(r) for r in results], |
| 192 | "all_ok": all_ok, |
| 193 | "checked": len(results), |
| 194 | "failed": failed_count, |
| 195 | } |
| 196 | ) |
| 197 | ) |
| 198 | |
| 199 | if not all_ok: |
| 200 | raise SystemExit(ExitCode.USER_ERROR) |