hash_utils.py
python
| 1 | """Deterministic contract hashing for execution lineage verification. |
| 2 | |
| 3 | Rules: |
| 4 | - Structural fields participate in hashes. |
| 5 | - Advisory / meta fields are excluded. |
| 6 | - Serialization is canonical: sorted keys, no whitespace, json.dumps. |
| 7 | - Hash is SHA-256, truncated to 16 hex chars (64-bit short hash). |
| 8 | - No MD5, no pickle, no repr(). |
| 9 | |
| 10 | Excluded fields (advisory / meta / visual / runtime): |
| 11 | contract_version, contract_hash, parent_contract_hash, execution_hash, |
| 12 | l2_generate_prompt, region_name, gm_guidance, |
| 13 | assigned_color, existing_track_id. |
| 14 | """ |
| 15 | |
| 16 | |
| 17 | import dataclasses |
| 18 | import hashlib |
| 19 | import json |
| 20 | |
| 21 | from musehub.contracts.json_types import JSONObject, JSONValue |
| 22 | |
| 23 | |
| 24 | _HASH_EXCLUDED_FIELDS = frozenset({ |
| 25 | "contract_version", |
| 26 | "contract_hash", |
| 27 | "parent_contract_hash", |
| 28 | "execution_hash", |
| 29 | "l2_generate_prompt", |
| 30 | "region_name", |
| 31 | "gm_guidance", |
| 32 | "assigned_color", |
| 33 | "existing_track_id", |
| 34 | }) |
| 35 | |
| 36 | |
| 37 | def _normalize_value(value: object) -> JSONValue: |
| 38 | """Recursively normalize a value for canonical serialization.""" |
| 39 | if dataclasses.is_dataclass(value) and not isinstance(value, type): |
| 40 | return canonical_contract_dict(value) |
| 41 | if isinstance(value, dict): |
| 42 | return {k: _normalize_value(v) for k, v in sorted(value.items())} |
| 43 | if isinstance(value, (list, tuple)): |
| 44 | return [_normalize_value(item) for item in value] |
| 45 | if isinstance(value, (int, float, str, bool, type(None))): |
| 46 | return value |
| 47 | return str(value) |
| 48 | |
| 49 | |
| 50 | def canonical_contract_dict(obj: object) -> JSONObject: |
| 51 | """Convert a frozen dataclass to a canonical ordered dict for hashing. |
| 52 | |
| 53 | Excludes advisory/meta fields defined in ``_HASH_EXCLUDED_FIELDS``. |
| 54 | Recursively normalizes nested dataclasses and collections. |
| 55 | Keys are sorted for deterministic serialization. |
| 56 | |
| 57 | Special case: ``CompositionContract.sections`` is serialized as a |
| 58 | sorted list of section contract hashes (not full objects), keeping |
| 59 | the root hash compact and order-independent. |
| 60 | """ |
| 61 | if not dataclasses.is_dataclass(obj): |
| 62 | raise TypeError(f"Expected a dataclass, got {type(obj).__name__}") |
| 63 | |
| 64 | _is_composition = type(obj).__name__ == "CompositionContract" |
| 65 | |
| 66 | result: JSONObject = {} |
| 67 | for f in dataclasses.fields(obj): |
| 68 | if f.name in _HASH_EXCLUDED_FIELDS: |
| 69 | continue |
| 70 | value = getattr(obj, f.name) |
| 71 | if _is_composition and f.name == "sections": |
| 72 | result["sections"] = sorted( |
| 73 | getattr(s, "contract_hash", "") for s in value |
| 74 | ) |
| 75 | else: |
| 76 | result[f.name] = _normalize_value(value) |
| 77 | |
| 78 | return dict(sorted(result.items())) |
| 79 | |
| 80 | |
| 81 | def compute_contract_hash(obj: object) -> str: |
| 82 | """Compute a deterministic SHA-256 short hash of structural contract fields. |
| 83 | |
| 84 | Returns the first 16 hex characters (64-bit collision resistance). |
| 85 | """ |
| 86 | canonical = canonical_contract_dict(obj) |
| 87 | serialized = json.dumps(canonical, separators=(",", ":"), sort_keys=True) |
| 88 | digest = hashlib.sha256(serialized.encode("utf-8")).hexdigest() |
| 89 | return digest[:16] |
| 90 | |
| 91 | |
| 92 | def seal_contract(obj: object, parent_hash: str = "") -> None: |
| 93 | """Compute and set ``contract_hash`` on a frozen dataclass. |
| 94 | |
| 95 | Uses ``object.__setattr__`` to bypass frozen enforcement. |
| 96 | Optionally sets ``parent_contract_hash`` if provided. |
| 97 | """ |
| 98 | if parent_hash: |
| 99 | object.__setattr__(obj, "parent_contract_hash", parent_hash) |
| 100 | h = compute_contract_hash(obj) |
| 101 | object.__setattr__(obj, "contract_hash", h) |
| 102 | |
| 103 | |
| 104 | def set_parent_hash(obj: object, parent_hash: str) -> None: |
| 105 | """Set ``parent_contract_hash`` on a frozen dataclass without unfreezing it. |
| 106 | |
| 107 | Uses ``object.__setattr__`` to bypass the ``frozen=True`` restriction. |
| 108 | Call this when linking a child contract to its parent before sealing the |
| 109 | child with ``seal_contract``. |
| 110 | """ |
| 111 | object.__setattr__(obj, "parent_contract_hash", parent_hash) |
| 112 | |
| 113 | |
| 114 | def verify_contract_hash(obj: object) -> bool: |
| 115 | """Recompute hash and compare to the stored ``contract_hash``. |
| 116 | |
| 117 | Returns ``True`` if the stored hash matches the recomputed hash. |
| 118 | """ |
| 119 | stored = getattr(obj, "contract_hash", "") |
| 120 | if not stored: |
| 121 | return False |
| 122 | return compute_contract_hash(obj) == stored |
| 123 | |
| 124 | |
| 125 | def hash_list_canonical(items: list[str]) -> str: |
| 126 | """Collision-proof parent hash from a list of child hashes. |
| 127 | |
| 128 | Sorts lexicographically, JSON-encodes the sorted list, then |
| 129 | SHA-256 hashes the result. Returns the first 16 hex chars. |
| 130 | |
| 131 | This replaces the old ``SHA256("hashA:hashB")`` pattern which |
| 132 | was vulnerable to delimiter collisions. |
| 133 | """ |
| 134 | serialized = json.dumps(sorted(items)) |
| 135 | return hashlib.sha256(serialized.encode("utf-8")).hexdigest()[:16] |
| 136 | |
| 137 | |
| 138 | def compute_execution_hash(contract_hash: str, trace_id: str) -> str: |
| 139 | """Bind an execution to a specific contract + session. |
| 140 | |
| 141 | Prevents replay attacks: same contract in a different session |
| 142 | produces a different execution_hash. Returns 16 hex chars. |
| 143 | """ |
| 144 | payload = (contract_hash + trace_id).encode("utf-8") |
| 145 | return hashlib.sha256(payload).hexdigest()[:16] |