gabriel / muse public
codemap.py python
320 lines 10.8 KB
00373ad0 feat: migrate CLI from typer to argparse (POSIX-compliant, order-independent) Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """muse codemap — repository semantic topology.
2
3 Generates a structural map of the codebase from committed snapshot data:
4
5 * **Modules ranked by size** — symbol count and lines of code per file
6 * **Import in-degree** — how many other files import each module
7 * **Import cycles** — circular dependency chains detected via DFS
8 * **High-centrality symbols** — functions called from the most callers
9 * **Boundary files** — high fan-out (imports many) but low fan-in (few import it)
10
11 This is a semantic topology view, not a file-system listing. It reveals the
12 actual shape of a codebase — where the load-bearing columns are, where the
13 cycles hide, and where parallel agents can safely work without collision.
14
15 Usage::
16
17 muse codemap
18 muse codemap --commit HEAD~10
19 muse codemap --language Python
20 muse codemap --top 20
21 muse codemap --json
22
23 Output::
24
25 Semantic codemap — commit a1b2c3d4
26 ──────────────────────────────────────────────────────────────
27
28 Top modules by size:
29 src/billing.py 42 symbols (12 importers) ⬛ HIGH CENTRALITY
30 src/models.py 31 symbols (8 importers)
31 src/auth.py 18 symbols (5 importers)
32
33 Import cycles (2):
34 src/billing.py → src/utils.py → src/billing.py
35 src/api.py → src/auth.py → src/api.py
36
37 High-centrality symbols (most callers):
38 src/billing.py::compute_total 14 callers
39 src/auth.py::validate_token 9 callers
40
41 Boundary files (high fan-out, low fan-in):
42 src/cli.py imports 8 modules ← imported by 0
43
44 Flags:
45
46 ``--commit, -c REF``
47 Analyse a historical snapshot instead of HEAD.
48
49 ``--language LANG``
50 Restrict analysis to files of this language.
51
52 ``--top N``
53 Show top N entries in each section (default: 15).
54
55 ``--json``
56 Emit the full codemap as JSON.
57 """
58
59 from __future__ import annotations
60
61 import argparse
62 import json
63 import logging
64 import pathlib
65 import sys
66
67 from muse._version import __version__
68 from muse.core.errors import ExitCode
69 from muse.core.object_store import read_object
70 from muse.core.repo import require_repo
71 from muse.core.store import get_commit_snapshot_manifest, read_current_branch, resolve_commit_ref
72 from muse.plugins.code._callgraph import build_reverse_graph
73 from muse.plugins.code._query import language_of, symbols_for_snapshot
74 from muse.plugins.code.ast_parser import parse_symbols
75
76 logger = logging.getLogger(__name__)
77
78 _PY_SUFFIXES: frozenset[str] = frozenset({".py", ".pyi"})
79
80
81 def _read_repo_id(root: pathlib.Path) -> str:
82 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
83
84
85 def _read_branch(root: pathlib.Path) -> str:
86 return read_current_branch(root)
87
88
89 def _file_stem(file_path: str) -> str:
90 return pathlib.PurePosixPath(file_path).stem
91
92
93 def _build_import_graph(
94 root: pathlib.Path,
95 manifest: dict[str, str],
96 language_filter: str | None,
97 ) -> tuple[dict[str, list[str]], dict[str, int]]:
98 """Return ``(imports_out, import_in_degree)`` for all files in manifest.
99
100 ``imports_out[file_path]`` is the list of file_paths that *file_path* imports
101 (best-effort heuristic matching by module stem).
102 ``import_in_degree[file_path]`` counts how many files import *file_path*.
103 """
104 # Step 1: build stem → file_path map
105 stem_to_file: dict[str, str] = {}
106 for fp in manifest:
107 if language_filter and language_of(fp) != language_filter:
108 continue
109 stem_to_file[_file_stem(fp)] = fp
110
111 # Step 2: scan import symbols in each file
112 imports_out: dict[str, list[str]] = {fp: [] for fp in manifest}
113 in_degree: dict[str, int] = {fp: 0 for fp in manifest}
114
115 for file_path, obj_id in sorted(manifest.items()):
116 if language_filter and language_of(file_path) != language_filter:
117 continue
118 raw = read_object(root, obj_id)
119 if raw is None:
120 continue
121 tree = parse_symbols(raw, file_path)
122 for rec in tree.values():
123 if rec["kind"] != "import":
124 continue
125 # Match the imported module name against known stems.
126 imported = rec["qualified_name"].split(".")[-1]
127 target = stem_to_file.get(imported)
128 if target and target != file_path:
129 imports_out[file_path].append(target)
130 in_degree[target] = in_degree.get(target, 0) + 1
131
132 return imports_out, in_degree
133
134
135 def _find_cycles(imports_out: dict[str, list[str]]) -> list[list[str]]:
136 """Detect import cycles via iterative DFS. Returns cycle paths.
137
138 Uses an explicit stack instead of recursion so that deeply nested import
139 graphs (thousands of files in a chain) cannot exhaust Python's call stack.
140 O(V+E) — every node is visited at most once.
141 """
142 cycles: list[list[str]] = []
143 visited: set[str] = set()
144
145 for start in imports_out:
146 if start in visited:
147 continue
148 # Each stack frame: (node, path-so-far, in-stack set for this path)
149 stack: list[tuple[str, list[str], set[str]]] = [(start, [], set())]
150 while stack:
151 node, path, in_stack = stack.pop()
152 if node in in_stack:
153 idx = path.index(node)
154 cycles.append(path[idx:] + [node])
155 continue
156 if node in visited:
157 continue
158 visited.add(node)
159 new_in_stack = in_stack | {node}
160 for neighbour in imports_out.get(node, []):
161 stack.append((neighbour, path + [node], new_in_stack))
162
163 return cycles
164
165
166 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
167 """Register the codemap subcommand."""
168 parser = subparsers.add_parser(
169 "codemap",
170 help="Generate a semantic topology map of the repository.",
171 description=__doc__,
172 )
173 parser.add_argument(
174 "--commit", "-c", default=None, metavar="REF", dest="ref",
175 help="Analyse this commit instead of HEAD.",
176 )
177 parser.add_argument(
178 "--language", "-l", default=None, metavar="LANG", dest="language",
179 help="Restrict analysis to this language.",
180 )
181 parser.add_argument(
182 "--top", "-n", type=int, default=15, metavar="N", dest="top",
183 help="Number of entries to show in each ranked section.",
184 )
185 parser.add_argument(
186 "--json", action="store_true", dest="as_json",
187 help="Emit results as JSON.",
188 )
189 parser.set_defaults(func=run)
190
191
192 def run(args: argparse.Namespace) -> None:
193 """Generate a semantic topology map of the repository.
194
195 Ranks modules by size, detects import cycles, finds high-centrality
196 symbols, and identifies boundary files (high fan-out, low fan-in).
197
198 This reveals the structural shape of the codebase — load-bearing modules,
199 hidden cycles, and safe parallel-work zones — without reading a single
200 working-tree file.
201 """
202 ref: str | None = args.ref
203 language: str | None = args.language
204 top: int = args.top
205 as_json: bool = args.as_json
206
207 root = require_repo()
208 repo_id = _read_repo_id(root)
209 branch = _read_branch(root)
210
211 commit = resolve_commit_ref(root, repo_id, branch, ref)
212 if commit is None:
213 print(f"❌ Commit '{ref or 'HEAD'}' not found.", file=sys.stderr)
214 raise SystemExit(ExitCode.USER_ERROR)
215
216 manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {}
217
218 # Symbol counts per file.
219 sym_map = symbols_for_snapshot(root, manifest, language_filter=language)
220 file_sym_counts: dict[str, int] = {
221 fp: len(tree) for fp, tree in sym_map.items()
222 }
223
224 # Import graph.
225 imports_out, in_degree = _build_import_graph(root, manifest, language)
226
227 # Cycles.
228 cycles = _find_cycles(imports_out)
229
230 # High-centrality symbols (Python only — needs call graph).
231 reverse = build_reverse_graph(root, manifest)
232 centrality: list[tuple[str, int]] = sorted(
233 [(name, len(callers)) for name, callers in reverse.items()],
234 key=lambda t: t[1],
235 reverse=True,
236 )[:top]
237
238 # Boundary files: imports many but is imported by few.
239 fan_out = {fp: len(targets) for fp, targets in imports_out.items() if targets}
240 boundaries: list[tuple[str, int, int]] = sorted(
241 [
242 (fp, fan_out.get(fp, 0), in_degree.get(fp, 0))
243 for fp in manifest
244 if fan_out.get(fp, 0) >= 3 and in_degree.get(fp, 0) == 0
245 ],
246 key=lambda t: t[1],
247 reverse=True,
248 )[:top]
249
250 # Ranked modules.
251 ranked = sorted(
252 file_sym_counts.items(),
253 key=lambda t: t[1],
254 reverse=True,
255 )[:top]
256
257 if as_json:
258 print(json.dumps(
259 {
260 "schema_version": __version__,
261 "commit": commit.commit_id[:8],
262 "language_filter": language,
263 "modules": [
264 {
265 "file": fp,
266 "symbol_count": cnt,
267 "importers": in_degree.get(fp, 0),
268 "imports": len(imports_out.get(fp, [])),
269 }
270 for fp, cnt in ranked
271 ],
272 "import_cycles": [c for c in cycles],
273 "high_centrality": [
274 {"name": name, "callers": cnt}
275 for name, cnt in centrality
276 ],
277 "boundary_files": [
278 {"file": fp, "fan_out": fo, "fan_in": fi}
279 for fp, fo, fi in boundaries
280 ],
281 },
282 indent=2,
283 ))
284 return
285
286 print(f"\nSemantic codemap — commit {commit.commit_id[:8]}")
287 if language:
288 print(f" (language: {language})")
289 print("─" * 62)
290
291 print(f"\nTop modules by size (top {min(top, len(ranked))}):")
292 if ranked:
293 max_fp = max(len(fp) for fp, _ in ranked)
294 for fp, cnt in ranked:
295 imp = in_degree.get(fp, 0)
296 imp_label = f"({imp} importers)" if imp else "(not imported)"
297 print(f" {fp:<{max_fp}} {cnt:>3} symbols {imp_label}")
298 else:
299 print(" (no semantic files found)")
300
301 print(f"\nImport cycles ({len(cycles)}):")
302 if cycles:
303 for cycle in cycles[:top]:
304 print(" " + " → ".join(cycle))
305 else:
306 print(" ✅ No import cycles detected")
307
308 print(f"\nHigh-centrality symbols — most callers (Python):")
309 if centrality:
310 for name, cnt in centrality:
311 print(f" {name:<40} {cnt} caller(s)")
312 else:
313 print(" (no Python call graph available)")
314
315 print(f"\nBoundary files — high fan-out, zero fan-in:")
316 if boundaries:
317 for fp, fo, fi in boundaries:
318 print(f" {fp} imports {fo} ← imported by {fi}")
319 else:
320 print(" (none detected)")