breakage.py
python
| 1 | """muse breakage — detect symbol-level breakage in the working tree. |
| 2 | |
| 3 | Checks the current working tree against the HEAD snapshot for structural |
| 4 | breakage that would fail at runtime or import time: |
| 5 | |
| 6 | 1. **Unresolved imports** — a ``from X import Y`` where Y no longer exists |
| 7 | in the committed version of X (detected via symbol graph). |
| 8 | 2. **Stale imports** — imports of symbols that were deleted from the source |
| 9 | in the current working tree's own edits (not yet committed). |
| 10 | 3. **Broken interface obligations** — a class promises a method (e.g. via |
| 11 | an inherited abstract method) that no longer exists in its body. |
| 12 | |
| 13 | This runs on committed snapshot data + the current working-tree parse. It |
| 14 | does not execute code, install dependencies, or run a type checker. It is a |
| 15 | pure structural analysis using the symbol graph. |
| 16 | |
| 17 | Usage:: |
| 18 | |
| 19 | muse breakage |
| 20 | muse breakage --language Python |
| 21 | muse breakage --json |
| 22 | |
| 23 | Output:: |
| 24 | |
| 25 | Breakage check — working tree vs HEAD (a1b2c3d4) |
| 26 | ────────────────────────────────────────────────────────────── |
| 27 | |
| 28 | 🔴 stale_import |
| 29 | src/billing.py imports compute_total from src/utils.py |
| 30 | but compute_total was removed in HEAD snapshot |
| 31 | |
| 32 | ⚠️ missing_interface_method |
| 33 | src/billing.py::Invoice inherits abstract method pay() |
| 34 | but pay() is not in the current class body |
| 35 | |
| 36 | 2 issue(s) found |
| 37 | |
| 38 | Flags: |
| 39 | |
| 40 | ``--language LANG`` |
| 41 | Restrict analysis to files of this language. |
| 42 | |
| 43 | ``--json`` |
| 44 | Emit results as JSON. |
| 45 | """ |
| 46 | |
| 47 | from __future__ import annotations |
| 48 | |
| 49 | import json |
| 50 | import logging |
| 51 | import pathlib |
| 52 | |
| 53 | import typer |
| 54 | |
| 55 | from muse.core.repo import require_repo |
| 56 | from muse.core.store import get_commit_snapshot_manifest, resolve_commit_ref |
| 57 | from muse.plugins.code._query import is_semantic, language_of, symbols_for_snapshot |
| 58 | from muse.plugins.code.ast_parser import parse_symbols |
| 59 | |
| 60 | logger = logging.getLogger(__name__) |
| 61 | |
| 62 | app = typer.Typer() |
| 63 | |
| 64 | |
| 65 | def _read_repo_id(root: pathlib.Path) -> str: |
| 66 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 67 | |
| 68 | |
| 69 | def _read_branch(root: pathlib.Path) -> str: |
| 70 | head_ref = (root / ".muse" / "HEAD").read_text().strip() |
| 71 | return head_ref.removeprefix("refs/heads/").strip() |
| 72 | |
| 73 | |
| 74 | class _BreakageIssue: |
| 75 | def __init__( |
| 76 | self, |
| 77 | issue_type: str, |
| 78 | file_path: str, |
| 79 | description: str, |
| 80 | severity: str = "error", |
| 81 | ) -> None: |
| 82 | self.issue_type = issue_type |
| 83 | self.file_path = file_path |
| 84 | self.description = description |
| 85 | self.severity = severity |
| 86 | |
| 87 | def to_dict(self) -> dict[str, str]: |
| 88 | return { |
| 89 | "issue_type": self.issue_type, |
| 90 | "file_path": self.file_path, |
| 91 | "description": self.description, |
| 92 | "severity": self.severity, |
| 93 | } |
| 94 | |
| 95 | |
| 96 | def _check_file( |
| 97 | root: pathlib.Path, |
| 98 | file_path: str, |
| 99 | head_symbols_flat: dict[str, str], # address → content_id from HEAD |
| 100 | language_filter: str | None, |
| 101 | ) -> list[_BreakageIssue]: |
| 102 | """Check one working-tree file for breakage.""" |
| 103 | if language_filter and language_of(file_path) != language_filter: |
| 104 | return [] |
| 105 | if not is_semantic(file_path): |
| 106 | return [] |
| 107 | |
| 108 | working_file = root / file_path |
| 109 | if not working_file.exists(): |
| 110 | return [] |
| 111 | |
| 112 | issues: list[_BreakageIssue] = [] |
| 113 | raw = working_file.read_bytes() |
| 114 | tree = parse_symbols(raw, file_path) |
| 115 | |
| 116 | # Check 1: stale imports — symbols imported that no longer exist in HEAD. |
| 117 | for addr, rec in tree.items(): |
| 118 | if rec["kind"] != "import": |
| 119 | continue |
| 120 | imported_name = rec["name"] |
| 121 | # Check if this name appears as a symbol anywhere in HEAD. |
| 122 | # Simple heuristic: if the imported name was in HEAD's flat symbol set |
| 123 | # and is now gone, it's a stale import. |
| 124 | found_in_head = any( |
| 125 | a.split("::")[-1] == imported_name |
| 126 | for a in head_symbols_flat |
| 127 | ) |
| 128 | found_in_working = any( |
| 129 | a.split("::")[-1] == imported_name |
| 130 | for a in tree |
| 131 | if tree[a]["kind"] != "import" |
| 132 | ) |
| 133 | if not found_in_head and not found_in_working: |
| 134 | issues.append(_BreakageIssue( |
| 135 | issue_type="stale_import", |
| 136 | file_path=file_path, |
| 137 | description=f"imports '{imported_name}' but it is not found in the HEAD snapshot", |
| 138 | severity="warning", |
| 139 | )) |
| 140 | |
| 141 | # Check 2: broken interface obligations — class missing expected methods. |
| 142 | # Python only: look at base class names; if they are in the same file and |
| 143 | # have methods not present in the subclass, flag it. |
| 144 | suffix = pathlib.PurePosixPath(file_path).suffix.lower() |
| 145 | if suffix in {".py", ".pyi"}: |
| 146 | for addr, rec in tree.items(): |
| 147 | if rec["kind"] != "class": |
| 148 | continue |
| 149 | class_name = rec["name"] |
| 150 | # Find base class methods from HEAD. |
| 151 | head_base_methods: set[str] = set() |
| 152 | for head_addr in head_symbols_flat: |
| 153 | # Look for methods of other classes that share a name with |
| 154 | # this class's qualified bases (heuristic: base class in same file). |
| 155 | parts = head_addr.split("::") |
| 156 | if len(parts) == 2: |
| 157 | sym_name = parts[1] |
| 158 | # Does the working tree's class body have this method? |
| 159 | if "." in sym_name: |
| 160 | parent, method = sym_name.split(".", 1) |
| 161 | if parent != class_name and not method.startswith("_"): |
| 162 | head_base_methods.add(method) |
| 163 | |
| 164 | # Check that all expected methods exist in the working class. |
| 165 | working_class_methods = { |
| 166 | a.split("::")[-1].split(".")[-1] |
| 167 | for a in tree |
| 168 | if f"::{class_name}." in a |
| 169 | } |
| 170 | for method in sorted(head_base_methods): |
| 171 | if method not in working_class_methods: |
| 172 | issues.append(_BreakageIssue( |
| 173 | issue_type="missing_interface_method", |
| 174 | file_path=file_path, |
| 175 | description=( |
| 176 | f"class {class_name!r} does not implement expected method " |
| 177 | f"'{method}' (found in HEAD snapshot)" |
| 178 | ), |
| 179 | severity="warning", |
| 180 | )) |
| 181 | |
| 182 | return issues |
| 183 | |
| 184 | |
| 185 | @app.callback(invoke_without_command=True) |
| 186 | def breakage( |
| 187 | ctx: typer.Context, |
| 188 | language: str | None = typer.Option( |
| 189 | None, "--language", "-l", metavar="LANG", |
| 190 | help="Restrict to files of this language.", |
| 191 | ), |
| 192 | as_json: bool = typer.Option(False, "--json", help="Emit results as JSON."), |
| 193 | ) -> None: |
| 194 | """Detect symbol-level breakage in the working tree vs HEAD snapshot. |
| 195 | |
| 196 | Checks for: |
| 197 | - **stale_import**: imports of symbols that no longer exist in HEAD |
| 198 | - **missing_interface_method**: class body missing expected methods |
| 199 | |
| 200 | Purely structural analysis — no code execution, no type checking. |
| 201 | Operates on the committed symbol graph + current working-tree parse. |
| 202 | """ |
| 203 | root = require_repo() |
| 204 | repo_id = _read_repo_id(root) |
| 205 | branch = _read_branch(root) |
| 206 | |
| 207 | commit = resolve_commit_ref(root, repo_id, branch, None) |
| 208 | if commit is None: |
| 209 | typer.echo("❌ No HEAD commit found.", err=True) |
| 210 | raise typer.Exit(code=1) |
| 211 | |
| 212 | manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {} |
| 213 | head_sym_map = symbols_for_snapshot(root, manifest) |
| 214 | |
| 215 | # Build flat address → content_id map from HEAD. |
| 216 | head_flat: dict[str, str] = {} |
| 217 | for _fp, tree in head_sym_map.items(): |
| 218 | for addr, rec in tree.items(): |
| 219 | head_flat[addr] = rec["content_id"] |
| 220 | |
| 221 | all_issues: list[_BreakageIssue] = [] |
| 222 | |
| 223 | # Find all semantic files in the working tree. |
| 224 | for file_path in sorted(manifest.keys()): |
| 225 | issues = _check_file(root, file_path, head_flat, language) |
| 226 | all_issues.extend(issues) |
| 227 | |
| 228 | if as_json: |
| 229 | typer.echo(json.dumps( |
| 230 | { |
| 231 | "schema_version": 1, |
| 232 | "commit": commit.commit_id[:8], |
| 233 | "language_filter": language, |
| 234 | "issues": [i.to_dict() for i in all_issues], |
| 235 | "total": len(all_issues), |
| 236 | "errors": sum(1 for i in all_issues if i.severity == "error"), |
| 237 | "warnings": sum(1 for i in all_issues if i.severity == "warning"), |
| 238 | }, |
| 239 | indent=2, |
| 240 | )) |
| 241 | return |
| 242 | |
| 243 | typer.echo( |
| 244 | f"\nBreakage check — working tree vs HEAD ({commit.commit_id[:8]})" |
| 245 | ) |
| 246 | if language: |
| 247 | typer.echo(f" (language: {language})") |
| 248 | typer.echo("─" * 62) |
| 249 | |
| 250 | if not all_issues: |
| 251 | typer.echo("\n ✅ No structural breakage detected.") |
| 252 | return |
| 253 | |
| 254 | for issue in all_issues: |
| 255 | icon = "🔴" if issue.severity == "error" else "⚠️ " |
| 256 | typer.echo(f"\n{icon} {issue.issue_type}") |
| 257 | typer.echo(f" {issue.file_path}") |
| 258 | typer.echo(f" {issue.description}") |
| 259 | |
| 260 | errors = sum(1 for i in all_issues if i.severity == "error") |
| 261 | warnings = sum(1 for i in all_issues if i.severity == "warning") |
| 262 | typer.echo(f"\n {errors} error(s), {warnings} warning(s)") |