gabriel / muse public
verify_object.py python
200 lines 6.0 KB
00373ad0 feat: migrate CLI from typer to argparse (POSIX-compliant, order-independent) Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """muse plumbing verify-object — verify the integrity of stored objects.
2
3 Reads one or more objects from the content-addressed store and re-hashes each
4 one to confirm that its on-disk content still matches its claimed SHA-256
5 identity. Reports the result per object and exits non-zero if any object
6 fails verification.
7
8 This is the integrity primitive used by backup systems, replication agents,
9 and CI pipelines to detect silent data corruption without a full fsck.
10
11 Output (JSON, default)::
12
13 {
14 "results": [
15 {"object_id": "<sha256>", "ok": true, "size_bytes": 4096},
16 {"object_id": "<sha256>", "ok": false, "size_bytes": 512,
17 "error": "hash mismatch: stored <sha256a> recomputed <sha256b>"},
18 {"object_id": "<sha256>", "ok": false, "size_bytes": null,
19 "error": "object not found in store"}
20 ],
21 "all_ok": false,
22 "checked": 3,
23 "failed": 2
24 }
25
26 Text output (``--format text``)::
27
28 OK <sha256> (4096 bytes)
29 FAIL <sha256> hash mismatch
30 FAIL <sha256> object not found in store
31
32 With ``--quiet``: no output; exits 0 if all pass, exits 1 otherwise.
33
34 Plumbing contract
35 -----------------
36
37 - Exit 0: all objects verified successfully.
38 - Exit 1: one or more objects failed verification; object not found; bad args.
39 - Exit 3: unexpected I/O error (e.g. disk read failure).
40 """
41
42 from __future__ import annotations
43
44 import argparse
45 import hashlib
46 import json
47 import logging
48 import pathlib
49 import sys
50 from typing import TypedDict
51
52 from muse.core.errors import ExitCode
53 from muse.core.object_store import object_path
54 from muse.core.repo import require_repo
55 from muse.core.validation import validate_object_id
56
57 logger = logging.getLogger(__name__)
58
59 _FORMAT_CHOICES = ("json", "text")
60 _CHUNK = 65536 # 64 KiB read chunks — keeps the heap clean for large blobs
61
62
63 class _ObjectResult(TypedDict):
64 object_id: str
65 ok: bool
66 size_bytes: int | None
67 error: str | None
68
69
70 def _verify_one(root: pathlib.Path, object_id: str) -> _ObjectResult:
71 """Integrity-check a single object and return its result record.
72
73 Streams the object in 64 KiB chunks to avoid loading large blobs into
74 memory. Returns an :class:`_ObjectResult` — never raises.
75 """
76 try:
77 validate_object_id(object_id)
78 except ValueError as exc:
79 return {"object_id": object_id, "ok": False, "size_bytes": None, "error": str(exc)}
80
81 dest = object_path(root, object_id)
82 if not dest.exists():
83 return {
84 "object_id": object_id,
85 "ok": False,
86 "size_bytes": None,
87 "error": "object not found in store",
88 }
89
90 try:
91 size = dest.stat().st_size
92 h = hashlib.sha256()
93 with dest.open("rb") as fh:
94 for chunk in iter(lambda: fh.read(_CHUNK), b""):
95 h.update(chunk)
96 actual = h.hexdigest()
97 except OSError as exc:
98 return {
99 "object_id": object_id,
100 "ok": False,
101 "size_bytes": None,
102 "error": f"I/O error: {exc}",
103 }
104
105 if actual != object_id:
106 return {
107 "object_id": object_id,
108 "ok": False,
109 "size_bytes": size,
110 "error": (
111 f"hash mismatch: stored {object_id[:12]}… "
112 f"recomputed {actual[:12]}…"
113 ),
114 }
115
116 return {"object_id": object_id, "ok": True, "size_bytes": size, "error": None}
117
118
119 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
120 """Register the verify-object subcommand."""
121 parser = subparsers.add_parser(
122 "verify-object",
123 help="Re-hash stored objects to detect data corruption.",
124 description=__doc__,
125 )
126 parser.add_argument(
127 "object_ids",
128 nargs="+",
129 help="One or more SHA-256 object IDs to verify.",
130 )
131 parser.add_argument(
132 "--quiet", "-q",
133 action="store_true",
134 help="No output. Exit 0 if all objects are intact, exit 1 otherwise.",
135 )
136 parser.add_argument(
137 "--format", "-f",
138 dest="fmt",
139 default="json",
140 metavar="FORMAT",
141 help="Output format: json or text. (default: json)",
142 )
143 parser.set_defaults(func=run)
144
145
146 def run(args: argparse.Namespace) -> None:
147 """Verify the integrity of one or more objects in the store.
148
149 Re-hashes each object's on-disk content and confirms it matches the SHA-256
150 identity used as its filename. Any mismatch indicates silent data
151 corruption and is reported as a failure.
152 """
153 fmt: str = args.fmt
154 object_ids: list[str] = args.object_ids
155 quiet: bool = args.quiet
156
157 if fmt not in _FORMAT_CHOICES:
158 print(
159 json.dumps(
160 {"error": f"Unknown format {fmt!r}. Valid: {', '.join(_FORMAT_CHOICES)}"}
161 )
162 )
163 raise SystemExit(ExitCode.USER_ERROR)
164
165 if not object_ids:
166 print(json.dumps({"error": "At least one object ID argument is required."}))
167 raise SystemExit(ExitCode.USER_ERROR)
168
169 root = require_repo()
170
171 results: list[_ObjectResult] = [_verify_one(root, oid) for oid in object_ids]
172 all_ok = all(r["ok"] for r in results)
173 failed_count = sum(1 for r in results if not r["ok"])
174
175 if quiet:
176 raise SystemExit(0 if all_ok else ExitCode.USER_ERROR)
177
178 if fmt == "text":
179 for r in results:
180 status = "OK " if r["ok"] else "FAIL"
181 size_str = f" ({r['size_bytes']} bytes)" if r["size_bytes"] is not None else ""
182 err_str = f" {r['error']}" if not r["ok"] and r["error"] else ""
183 print(f"{status} {r['object_id']}{size_str}{err_str}")
184 if not all_ok:
185 raise SystemExit(ExitCode.USER_ERROR)
186 return
187
188 print(
189 json.dumps(
190 {
191 "results": [dict(r) for r in results],
192 "all_ok": all_ok,
193 "checked": len(results),
194 "failed": failed_count,
195 }
196 )
197 )
198
199 if not all_ok:
200 raise SystemExit(ExitCode.USER_ERROR)