breakage.py
python
| 1 | """muse breakage — detect symbol-level breakage in the working tree. |
| 2 | |
| 3 | Checks the current working tree against the HEAD snapshot for structural |
| 4 | breakage that would fail at runtime or import time: |
| 5 | |
| 6 | 1. **Unresolved imports** — a ``from X import Y`` where Y no longer exists |
| 7 | in the committed version of X (detected via symbol graph). |
| 8 | 2. **Stale imports** — imports of symbols that were deleted from the source |
| 9 | in the current working tree's own edits (not yet committed). |
| 10 | 3. **Broken interface obligations** — a class promises a method (e.g. via |
| 11 | an inherited abstract method) that no longer exists in its body. |
| 12 | |
| 13 | This runs on committed snapshot data + the current working-tree parse. It |
| 14 | does not execute code, install dependencies, or run a type checker. It is a |
| 15 | pure structural analysis using the symbol graph. |
| 16 | |
| 17 | Usage:: |
| 18 | |
| 19 | muse breakage |
| 20 | muse breakage --language Python |
| 21 | muse breakage --json |
| 22 | |
| 23 | Output:: |
| 24 | |
| 25 | Breakage check — working tree vs HEAD (a1b2c3d4) |
| 26 | ────────────────────────────────────────────────────────────── |
| 27 | |
| 28 | 🔴 stale_import |
| 29 | src/billing.py imports compute_total from src/utils.py |
| 30 | but compute_total was removed in HEAD snapshot |
| 31 | |
| 32 | ⚠️ missing_interface_method |
| 33 | src/billing.py::Invoice inherits abstract method pay() |
| 34 | but pay() is not in the current class body |
| 35 | |
| 36 | 2 issue(s) found |
| 37 | |
| 38 | Flags: |
| 39 | |
| 40 | ``--language LANG`` |
| 41 | Restrict analysis to files of this language. |
| 42 | |
| 43 | ``--json`` |
| 44 | Emit results as JSON. |
| 45 | """ |
| 46 | |
| 47 | from __future__ import annotations |
| 48 | |
| 49 | import argparse |
| 50 | import json |
| 51 | import logging |
| 52 | import pathlib |
| 53 | import sys |
| 54 | |
| 55 | from muse._version import __version__ |
| 56 | from muse.core.repo import require_repo |
| 57 | from muse.core.store import get_commit_snapshot_manifest, read_current_branch, resolve_commit_ref |
| 58 | from muse.plugins.code._query import is_semantic, language_of, symbols_for_snapshot |
| 59 | from muse.plugins.code.ast_parser import parse_symbols |
| 60 | |
| 61 | logger = logging.getLogger(__name__) |
| 62 | |
| 63 | |
| 64 | def _read_repo_id(root: pathlib.Path) -> str: |
| 65 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 66 | |
| 67 | |
| 68 | def _read_branch(root: pathlib.Path) -> str: |
| 69 | return read_current_branch(root) |
| 70 | |
| 71 | |
| 72 | class _BreakageIssue: |
| 73 | def __init__( |
| 74 | self, |
| 75 | issue_type: str, |
| 76 | file_path: str, |
| 77 | description: str, |
| 78 | severity: str = "error", |
| 79 | ) -> None: |
| 80 | self.issue_type = issue_type |
| 81 | self.file_path = file_path |
| 82 | self.description = description |
| 83 | self.severity = severity |
| 84 | |
| 85 | def to_dict(self) -> dict[str, str]: |
| 86 | return { |
| 87 | "issue_type": self.issue_type, |
| 88 | "file_path": self.file_path, |
| 89 | "description": self.description, |
| 90 | "severity": self.severity, |
| 91 | } |
| 92 | |
| 93 | |
| 94 | def _check_file( |
| 95 | root: pathlib.Path, |
| 96 | file_path: str, |
| 97 | head_symbols_flat: dict[str, str], # address → content_id from HEAD |
| 98 | language_filter: str | None, |
| 99 | ) -> list[_BreakageIssue]: |
| 100 | """Check one working-tree file for breakage.""" |
| 101 | if language_filter and language_of(file_path) != language_filter: |
| 102 | return [] |
| 103 | if not is_semantic(file_path): |
| 104 | return [] |
| 105 | |
| 106 | working_file = root / file_path |
| 107 | if not working_file.exists(): |
| 108 | return [] |
| 109 | |
| 110 | issues: list[_BreakageIssue] = [] |
| 111 | raw = working_file.read_bytes() |
| 112 | tree = parse_symbols(raw, file_path) |
| 113 | |
| 114 | # Check 1: stale imports — symbols imported that no longer exist in HEAD. |
| 115 | for addr, rec in tree.items(): |
| 116 | if rec["kind"] != "import": |
| 117 | continue |
| 118 | imported_name = rec["name"] |
| 119 | # Check if this name appears as a symbol anywhere in HEAD. |
| 120 | # Simple heuristic: if the imported name was in HEAD's flat symbol set |
| 121 | # and is now gone, it's a stale import. |
| 122 | found_in_head = any( |
| 123 | a.split("::")[-1] == imported_name |
| 124 | for a in head_symbols_flat |
| 125 | ) |
| 126 | found_in_working = any( |
| 127 | a.split("::")[-1] == imported_name |
| 128 | for a in tree |
| 129 | if tree[a]["kind"] != "import" |
| 130 | ) |
| 131 | if not found_in_head and not found_in_working: |
| 132 | issues.append(_BreakageIssue( |
| 133 | issue_type="stale_import", |
| 134 | file_path=file_path, |
| 135 | description=f"imports '{imported_name}' but it is not found in the HEAD snapshot", |
| 136 | severity="warning", |
| 137 | )) |
| 138 | |
| 139 | # Check 2: broken interface obligations — class missing expected methods. |
| 140 | # Python only: look at base class names; if they are in the same file and |
| 141 | # have methods not present in the subclass, flag it. |
| 142 | suffix = pathlib.PurePosixPath(file_path).suffix.lower() |
| 143 | if suffix in {".py", ".pyi"}: |
| 144 | for addr, rec in tree.items(): |
| 145 | if rec["kind"] != "class": |
| 146 | continue |
| 147 | class_name = rec["name"] |
| 148 | # Find base class methods from HEAD. |
| 149 | head_base_methods: set[str] = set() |
| 150 | for head_addr in head_symbols_flat: |
| 151 | # Look for methods of other classes that share a name with |
| 152 | # this class's qualified bases (heuristic: base class in same file). |
| 153 | parts = head_addr.split("::") |
| 154 | if len(parts) == 2: |
| 155 | sym_name = parts[1] |
| 156 | # Does the working tree's class body have this method? |
| 157 | if "." in sym_name: |
| 158 | parent, method = sym_name.split(".", 1) |
| 159 | if parent != class_name and not method.startswith("_"): |
| 160 | head_base_methods.add(method) |
| 161 | |
| 162 | # Check that all expected methods exist in the working class. |
| 163 | working_class_methods = { |
| 164 | a.split("::")[-1].split(".")[-1] |
| 165 | for a in tree |
| 166 | if f"::{class_name}." in a |
| 167 | } |
| 168 | for method in sorted(head_base_methods): |
| 169 | if method not in working_class_methods: |
| 170 | issues.append(_BreakageIssue( |
| 171 | issue_type="missing_interface_method", |
| 172 | file_path=file_path, |
| 173 | description=( |
| 174 | f"class {class_name!r} does not implement expected method " |
| 175 | f"'{method}' (found in HEAD snapshot)" |
| 176 | ), |
| 177 | severity="warning", |
| 178 | )) |
| 179 | |
| 180 | return issues |
| 181 | |
| 182 | |
| 183 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 184 | """Register the breakage subcommand.""" |
| 185 | parser = subparsers.add_parser( |
| 186 | "breakage", |
| 187 | help="Detect symbol-level breakage in the working tree vs HEAD snapshot.", |
| 188 | description=__doc__, |
| 189 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 190 | ) |
| 191 | parser.add_argument( |
| 192 | "--language", "-l", default=None, metavar="LANG", dest="language", |
| 193 | help="Restrict to files of this language.", |
| 194 | ) |
| 195 | parser.add_argument( |
| 196 | "--json", action="store_true", dest="as_json", |
| 197 | help="Emit results as JSON.", |
| 198 | ) |
| 199 | parser.set_defaults(func=run) |
| 200 | |
| 201 | |
| 202 | def run(args: argparse.Namespace) -> None: |
| 203 | """Detect symbol-level breakage in the working tree vs HEAD snapshot. |
| 204 | |
| 205 | Checks for: |
| 206 | - **stale_import**: imports of symbols that no longer exist in HEAD |
| 207 | - **missing_interface_method**: class body missing expected methods |
| 208 | |
| 209 | Purely structural analysis — no code execution, no type checking. |
| 210 | Operates on the committed symbol graph + current working-tree parse. |
| 211 | """ |
| 212 | language: str | None = args.language |
| 213 | as_json: bool = args.as_json |
| 214 | |
| 215 | root = require_repo() |
| 216 | repo_id = _read_repo_id(root) |
| 217 | branch = _read_branch(root) |
| 218 | |
| 219 | commit = resolve_commit_ref(root, repo_id, branch, None) |
| 220 | if commit is None: |
| 221 | print("❌ No HEAD commit found.", file=sys.stderr) |
| 222 | raise SystemExit(1) |
| 223 | |
| 224 | manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {} |
| 225 | head_sym_map = symbols_for_snapshot(root, manifest) |
| 226 | |
| 227 | # Build flat address → content_id map from HEAD. |
| 228 | head_flat: dict[str, str] = {} |
| 229 | for _fp, tree in head_sym_map.items(): |
| 230 | for addr, rec in tree.items(): |
| 231 | head_flat[addr] = rec["content_id"] |
| 232 | |
| 233 | all_issues: list[_BreakageIssue] = [] |
| 234 | |
| 235 | # Find all semantic files in the working tree. |
| 236 | for file_path in sorted(manifest.keys()): |
| 237 | issues = _check_file(root, file_path, head_flat, language) |
| 238 | all_issues.extend(issues) |
| 239 | |
| 240 | if as_json: |
| 241 | print(json.dumps( |
| 242 | { |
| 243 | "schema_version": __version__, |
| 244 | "commit": commit.commit_id[:8], |
| 245 | "language_filter": language, |
| 246 | "issues": [i.to_dict() for i in all_issues], |
| 247 | "total": len(all_issues), |
| 248 | "errors": sum(1 for i in all_issues if i.severity == "error"), |
| 249 | "warnings": sum(1 for i in all_issues if i.severity == "warning"), |
| 250 | }, |
| 251 | indent=2, |
| 252 | )) |
| 253 | return |
| 254 | |
| 255 | print( |
| 256 | f"\nBreakage check — working tree vs HEAD ({commit.commit_id[:8]})" |
| 257 | ) |
| 258 | if language: |
| 259 | print(f" (language: {language})") |
| 260 | print("─" * 62) |
| 261 | |
| 262 | if not all_issues: |
| 263 | print("\n ✅ No structural breakage detected.") |
| 264 | return |
| 265 | |
| 266 | for issue in all_issues: |
| 267 | icon = "🔴" if issue.severity == "error" else "⚠️ " |
| 268 | print(f"\n{icon} {issue.issue_type}") |
| 269 | print(f" {issue.file_path}") |
| 270 | print(f" {issue.description}") |
| 271 | |
| 272 | errors = sum(1 for i in all_issues if i.severity == "error") |
| 273 | warnings = sum(1 for i in all_issues if i.severity == "warning") |
| 274 | print(f"\n {errors} error(s), {warnings} warning(s)") |