gabriel / musehub public
hash_utils.py python
145 lines 4.9 KB
6b53f1af feat: supercharge all pages, full SOC refactor, and Python 3.14 upgrade (#7) Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """Deterministic contract hashing for execution lineage verification.
2
3 Rules:
4 - Structural fields participate in hashes.
5 - Advisory / meta fields are excluded.
6 - Serialization is canonical: sorted keys, no whitespace, json.dumps.
7 - Hash is SHA-256, truncated to 16 hex chars (64-bit short hash).
8 - No MD5, no pickle, no repr().
9
10 Excluded fields (advisory / meta / visual / runtime):
11 contract_version, contract_hash, parent_contract_hash, execution_hash,
12 l2_generate_prompt, region_name, gm_guidance,
13 assigned_color, existing_track_id.
14 """
15
16
17 import dataclasses
18 import hashlib
19 import json
20
21 from musehub.contracts.json_types import JSONObject, JSONValue
22
23
24 _HASH_EXCLUDED_FIELDS = frozenset({
25 "contract_version",
26 "contract_hash",
27 "parent_contract_hash",
28 "execution_hash",
29 "l2_generate_prompt",
30 "region_name",
31 "gm_guidance",
32 "assigned_color",
33 "existing_track_id",
34 })
35
36
37 def _normalize_value(value: object) -> JSONValue:
38 """Recursively normalize a value for canonical serialization."""
39 if dataclasses.is_dataclass(value) and not isinstance(value, type):
40 return canonical_contract_dict(value)
41 if isinstance(value, dict):
42 return {k: _normalize_value(v) for k, v in sorted(value.items())}
43 if isinstance(value, (list, tuple)):
44 return [_normalize_value(item) for item in value]
45 if isinstance(value, (int, float, str, bool, type(None))):
46 return value
47 return str(value)
48
49
50 def canonical_contract_dict(obj: object) -> JSONObject:
51 """Convert a frozen dataclass to a canonical ordered dict for hashing.
52
53 Excludes advisory/meta fields defined in ``_HASH_EXCLUDED_FIELDS``.
54 Recursively normalizes nested dataclasses and collections.
55 Keys are sorted for deterministic serialization.
56
57 Special case: ``CompositionContract.sections`` is serialized as a
58 sorted list of section contract hashes (not full objects), keeping
59 the root hash compact and order-independent.
60 """
61 if not dataclasses.is_dataclass(obj):
62 raise TypeError(f"Expected a dataclass, got {type(obj).__name__}")
63
64 _is_composition = type(obj).__name__ == "CompositionContract"
65
66 result: JSONObject = {}
67 for f in dataclasses.fields(obj):
68 if f.name in _HASH_EXCLUDED_FIELDS:
69 continue
70 value = getattr(obj, f.name)
71 if _is_composition and f.name == "sections":
72 result["sections"] = sorted(
73 getattr(s, "contract_hash", "") for s in value
74 )
75 else:
76 result[f.name] = _normalize_value(value)
77
78 return dict(sorted(result.items()))
79
80
81 def compute_contract_hash(obj: object) -> str:
82 """Compute a deterministic SHA-256 short hash of structural contract fields.
83
84 Returns the first 16 hex characters (64-bit collision resistance).
85 """
86 canonical = canonical_contract_dict(obj)
87 serialized = json.dumps(canonical, separators=(",", ":"), sort_keys=True)
88 digest = hashlib.sha256(serialized.encode("utf-8")).hexdigest()
89 return digest[:16]
90
91
92 def seal_contract(obj: object, parent_hash: str = "") -> None:
93 """Compute and set ``contract_hash`` on a frozen dataclass.
94
95 Uses ``object.__setattr__`` to bypass frozen enforcement.
96 Optionally sets ``parent_contract_hash`` if provided.
97 """
98 if parent_hash:
99 object.__setattr__(obj, "parent_contract_hash", parent_hash)
100 h = compute_contract_hash(obj)
101 object.__setattr__(obj, "contract_hash", h)
102
103
104 def set_parent_hash(obj: object, parent_hash: str) -> None:
105 """Set ``parent_contract_hash`` on a frozen dataclass without unfreezing it.
106
107 Uses ``object.__setattr__`` to bypass the ``frozen=True`` restriction.
108 Call this when linking a child contract to its parent before sealing the
109 child with ``seal_contract``.
110 """
111 object.__setattr__(obj, "parent_contract_hash", parent_hash)
112
113
114 def verify_contract_hash(obj: object) -> bool:
115 """Recompute hash and compare to the stored ``contract_hash``.
116
117 Returns ``True`` if the stored hash matches the recomputed hash.
118 """
119 stored = getattr(obj, "contract_hash", "")
120 if not stored:
121 return False
122 return compute_contract_hash(obj) == stored
123
124
125 def hash_list_canonical(items: list[str]) -> str:
126 """Collision-proof parent hash from a list of child hashes.
127
128 Sorts lexicographically, JSON-encodes the sorted list, then
129 SHA-256 hashes the result. Returns the first 16 hex chars.
130
131 This replaces the old ``SHA256("hashA:hashB")`` pattern which
132 was vulnerable to delimiter collisions.
133 """
134 serialized = json.dumps(sorted(items))
135 return hashlib.sha256(serialized.encode("utf-8")).hexdigest()[:16]
136
137
138 def compute_execution_hash(contract_hash: str, trace_id: str) -> str:
139 """Bind an execution to a specific contract + session.
140
141 Prevents replay attacks: same contract in a different session
142 produces a different execution_hash. Returns 16 hex chars.
143 """
144 payload = (contract_hash + trace_id).encode("utf-8")
145 return hashlib.sha256(payload).hexdigest()[:16]