cgcardona / muse public
breakage.py python
262 lines 9.1 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """muse breakage — detect symbol-level breakage in the working tree.
2
3 Checks the current working tree against the HEAD snapshot for structural
4 breakage that would fail at runtime or import time:
5
6 1. **Unresolved imports** — a ``from X import Y`` where Y no longer exists
7 in the committed version of X (detected via symbol graph).
8 2. **Stale imports** — imports of symbols that were deleted from the source
9 in the current working tree's own edits (not yet committed).
10 3. **Broken interface obligations** — a class promises a method (e.g. via
11 an inherited abstract method) that no longer exists in its body.
12
13 This runs on committed snapshot data + the current working-tree parse. It
14 does not execute code, install dependencies, or run a type checker. It is a
15 pure structural analysis using the symbol graph.
16
17 Usage::
18
19 muse breakage
20 muse breakage --language Python
21 muse breakage --json
22
23 Output::
24
25 Breakage check — working tree vs HEAD (a1b2c3d4)
26 ──────────────────────────────────────────────────────────────
27
28 🔴 stale_import
29 src/billing.py imports compute_total from src/utils.py
30 but compute_total was removed in HEAD snapshot
31
32 ⚠️ missing_interface_method
33 src/billing.py::Invoice inherits abstract method pay()
34 but pay() is not in the current class body
35
36 2 issue(s) found
37
38 Flags:
39
40 ``--language LANG``
41 Restrict analysis to files of this language.
42
43 ``--json``
44 Emit results as JSON.
45 """
46
47 from __future__ import annotations
48
49 import json
50 import logging
51 import pathlib
52
53 import typer
54
55 from muse.core.repo import require_repo
56 from muse.core.store import get_commit_snapshot_manifest, resolve_commit_ref
57 from muse.plugins.code._query import is_semantic, language_of, symbols_for_snapshot
58 from muse.plugins.code.ast_parser import parse_symbols
59
60 logger = logging.getLogger(__name__)
61
62 app = typer.Typer()
63
64
65 def _read_repo_id(root: pathlib.Path) -> str:
66 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
67
68
69 def _read_branch(root: pathlib.Path) -> str:
70 head_ref = (root / ".muse" / "HEAD").read_text().strip()
71 return head_ref.removeprefix("refs/heads/").strip()
72
73
74 class _BreakageIssue:
75 def __init__(
76 self,
77 issue_type: str,
78 file_path: str,
79 description: str,
80 severity: str = "error",
81 ) -> None:
82 self.issue_type = issue_type
83 self.file_path = file_path
84 self.description = description
85 self.severity = severity
86
87 def to_dict(self) -> dict[str, str]:
88 return {
89 "issue_type": self.issue_type,
90 "file_path": self.file_path,
91 "description": self.description,
92 "severity": self.severity,
93 }
94
95
96 def _check_file(
97 root: pathlib.Path,
98 file_path: str,
99 head_symbols_flat: dict[str, str], # address → content_id from HEAD
100 language_filter: str | None,
101 ) -> list[_BreakageIssue]:
102 """Check one working-tree file for breakage."""
103 if language_filter and language_of(file_path) != language_filter:
104 return []
105 if not is_semantic(file_path):
106 return []
107
108 working_file = root / file_path
109 if not working_file.exists():
110 return []
111
112 issues: list[_BreakageIssue] = []
113 raw = working_file.read_bytes()
114 tree = parse_symbols(raw, file_path)
115
116 # Check 1: stale imports — symbols imported that no longer exist in HEAD.
117 for addr, rec in tree.items():
118 if rec["kind"] != "import":
119 continue
120 imported_name = rec["name"]
121 # Check if this name appears as a symbol anywhere in HEAD.
122 # Simple heuristic: if the imported name was in HEAD's flat symbol set
123 # and is now gone, it's a stale import.
124 found_in_head = any(
125 a.split("::")[-1] == imported_name
126 for a in head_symbols_flat
127 )
128 found_in_working = any(
129 a.split("::")[-1] == imported_name
130 for a in tree
131 if tree[a]["kind"] != "import"
132 )
133 if not found_in_head and not found_in_working:
134 issues.append(_BreakageIssue(
135 issue_type="stale_import",
136 file_path=file_path,
137 description=f"imports '{imported_name}' but it is not found in the HEAD snapshot",
138 severity="warning",
139 ))
140
141 # Check 2: broken interface obligations — class missing expected methods.
142 # Python only: look at base class names; if they are in the same file and
143 # have methods not present in the subclass, flag it.
144 suffix = pathlib.PurePosixPath(file_path).suffix.lower()
145 if suffix in {".py", ".pyi"}:
146 for addr, rec in tree.items():
147 if rec["kind"] != "class":
148 continue
149 class_name = rec["name"]
150 # Find base class methods from HEAD.
151 head_base_methods: set[str] = set()
152 for head_addr in head_symbols_flat:
153 # Look for methods of other classes that share a name with
154 # this class's qualified bases (heuristic: base class in same file).
155 parts = head_addr.split("::")
156 if len(parts) == 2:
157 sym_name = parts[1]
158 # Does the working tree's class body have this method?
159 if "." in sym_name:
160 parent, method = sym_name.split(".", 1)
161 if parent != class_name and not method.startswith("_"):
162 head_base_methods.add(method)
163
164 # Check that all expected methods exist in the working class.
165 working_class_methods = {
166 a.split("::")[-1].split(".")[-1]
167 for a in tree
168 if f"::{class_name}." in a
169 }
170 for method in sorted(head_base_methods):
171 if method not in working_class_methods:
172 issues.append(_BreakageIssue(
173 issue_type="missing_interface_method",
174 file_path=file_path,
175 description=(
176 f"class {class_name!r} does not implement expected method "
177 f"'{method}' (found in HEAD snapshot)"
178 ),
179 severity="warning",
180 ))
181
182 return issues
183
184
185 @app.callback(invoke_without_command=True)
186 def breakage(
187 ctx: typer.Context,
188 language: str | None = typer.Option(
189 None, "--language", "-l", metavar="LANG",
190 help="Restrict to files of this language.",
191 ),
192 as_json: bool = typer.Option(False, "--json", help="Emit results as JSON."),
193 ) -> None:
194 """Detect symbol-level breakage in the working tree vs HEAD snapshot.
195
196 Checks for:
197 - **stale_import**: imports of symbols that no longer exist in HEAD
198 - **missing_interface_method**: class body missing expected methods
199
200 Purely structural analysis — no code execution, no type checking.
201 Operates on the committed symbol graph + current working-tree parse.
202 """
203 root = require_repo()
204 repo_id = _read_repo_id(root)
205 branch = _read_branch(root)
206
207 commit = resolve_commit_ref(root, repo_id, branch, None)
208 if commit is None:
209 typer.echo("❌ No HEAD commit found.", err=True)
210 raise typer.Exit(code=1)
211
212 manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {}
213 head_sym_map = symbols_for_snapshot(root, manifest)
214
215 # Build flat address → content_id map from HEAD.
216 head_flat: dict[str, str] = {}
217 for _fp, tree in head_sym_map.items():
218 for addr, rec in tree.items():
219 head_flat[addr] = rec["content_id"]
220
221 all_issues: list[_BreakageIssue] = []
222
223 # Find all semantic files in the working tree.
224 for file_path in sorted(manifest.keys()):
225 issues = _check_file(root, file_path, head_flat, language)
226 all_issues.extend(issues)
227
228 if as_json:
229 typer.echo(json.dumps(
230 {
231 "schema_version": 1,
232 "commit": commit.commit_id[:8],
233 "language_filter": language,
234 "issues": [i.to_dict() for i in all_issues],
235 "total": len(all_issues),
236 "errors": sum(1 for i in all_issues if i.severity == "error"),
237 "warnings": sum(1 for i in all_issues if i.severity == "warning"),
238 },
239 indent=2,
240 ))
241 return
242
243 typer.echo(
244 f"\nBreakage check — working tree vs HEAD ({commit.commit_id[:8]})"
245 )
246 if language:
247 typer.echo(f" (language: {language})")
248 typer.echo("─" * 62)
249
250 if not all_issues:
251 typer.echo("\n ✅ No structural breakage detected.")
252 return
253
254 for issue in all_issues:
255 icon = "🔴" if issue.severity == "error" else "⚠️ "
256 typer.echo(f"\n{icon} {issue.issue_type}")
257 typer.echo(f" {issue.file_path}")
258 typer.echo(f" {issue.description}")
259
260 errors = sum(1 for i in all_issues if i.severity == "error")
261 warnings = sum(1 for i in all_issues if i.severity == "warning")
262 typer.echo(f"\n {errors} error(s), {warnings} warning(s)")