blame.py
python
| 1 | """muse blame — symbol-level attribution. |
| 2 | |
| 3 | ``git blame`` attributes every *line* to a commit — a 300-line class gives |
| 4 | you 300 attribution entries. ``muse blame`` attributes the *symbol* as a |
| 5 | semantic unit: one answer per function, class, or method, regardless of how |
| 6 | many lines it occupies. |
| 7 | |
| 8 | Usage:: |
| 9 | |
| 10 | muse blame "src/billing.py::compute_invoice_total" |
| 11 | muse blame "api/server.go::Server.HandleRequest" |
| 12 | muse blame "src/models.py::User.save" --json |
| 13 | |
| 14 | Output:: |
| 15 | |
| 16 | src/billing.py::compute_invoice_total |
| 17 | ────────────────────────────────────────────────────────────── |
| 18 | last touched: cb4afaed 2026-03-16 |
| 19 | author: alice |
| 20 | message: "Perf: optimise compute_invoice_total" |
| 21 | change: implementation changed |
| 22 | |
| 23 | previous: 1d2e3faa 2026-03-15 (renamed from calculate_total) |
| 24 | before that: a3f2c9e1 2026-03-14 (created) |
| 25 | """ |
| 26 | |
| 27 | import json |
| 28 | import logging |
| 29 | import pathlib |
| 30 | from dataclasses import dataclass |
| 31 | from typing import Literal |
| 32 | |
| 33 | import typer |
| 34 | |
| 35 | from muse.core.errors import ExitCode |
| 36 | from muse.core.repo import require_repo |
| 37 | from muse.core.store import CommitRecord, resolve_commit_ref |
| 38 | from muse.domain import DomainOp |
| 39 | from muse.plugins.code._query import walk_commits |
| 40 | |
| 41 | logger = logging.getLogger(__name__) |
| 42 | |
| 43 | app = typer.Typer() |
| 44 | |
| 45 | _EventKind = Literal["created", "modified", "renamed", "moved", "deleted", "signature"] |
| 46 | |
| 47 | |
| 48 | @dataclass |
| 49 | class _BlameEvent: |
| 50 | kind: str |
| 51 | commit: CommitRecord |
| 52 | address: str |
| 53 | detail: str |
| 54 | new_address: str | None = None |
| 55 | |
| 56 | def to_dict(self) -> dict[str, str | None]: |
| 57 | return { |
| 58 | "event": self.kind, |
| 59 | "commit_id": self.commit.commit_id, |
| 60 | "author": self.commit.author, |
| 61 | "message": self.commit.message, |
| 62 | "committed_at": self.commit.committed_at.isoformat(), |
| 63 | "address": self.address, |
| 64 | "detail": self.detail, |
| 65 | "new_address": self.new_address, |
| 66 | } |
| 67 | |
| 68 | |
| 69 | def _flat_ops(ops: list[DomainOp]) -> list[DomainOp]: |
| 70 | result: list[DomainOp] = [] |
| 71 | for op in ops: |
| 72 | if op["op"] == "patch": |
| 73 | result.extend(op["child_ops"]) |
| 74 | else: |
| 75 | result.append(op) |
| 76 | return result |
| 77 | |
| 78 | |
| 79 | def _events_in_commit( |
| 80 | commit: CommitRecord, |
| 81 | address: str, |
| 82 | ) -> tuple[list[_BlameEvent], str]: |
| 83 | """Scan *commit* for events touching *address*; return ``(events, next_address)``.""" |
| 84 | events: list[_BlameEvent] = [] |
| 85 | next_address = address |
| 86 | if commit.structured_delta is None: |
| 87 | return events, next_address |
| 88 | for op in _flat_ops(commit.structured_delta["ops"]): |
| 89 | if op["address"] != address: |
| 90 | continue |
| 91 | if op["op"] == "insert": |
| 92 | events.append(_BlameEvent("created", commit, address, op.get("content_summary", "created"))) |
| 93 | elif op["op"] == "delete": |
| 94 | detail = op.get("content_summary", "deleted") |
| 95 | kind = "moved" if "moved to" in detail else "deleted" |
| 96 | events.append(_BlameEvent(kind, commit, address, detail)) |
| 97 | elif op["op"] == "replace": |
| 98 | ns: str = op.get("new_summary", "") |
| 99 | if ns.startswith("renamed to "): |
| 100 | new_name = ns.removeprefix("renamed to ").strip() |
| 101 | file_prefix = address.rsplit("::", 1)[0] |
| 102 | new_addr = f"{file_prefix}::{new_name}" |
| 103 | events.append(_BlameEvent("renamed", commit, address, f"renamed to {new_name}", new_addr)) |
| 104 | next_address = new_addr |
| 105 | elif ns.startswith("moved to "): |
| 106 | events.append(_BlameEvent("moved", commit, address, ns)) |
| 107 | elif "signature" in ns: |
| 108 | events.append(_BlameEvent("signature", commit, address, ns or "signature changed")) |
| 109 | else: |
| 110 | events.append(_BlameEvent("modified", commit, address, ns or "modified")) |
| 111 | return events, next_address |
| 112 | |
| 113 | |
| 114 | def _read_repo_id(root: pathlib.Path) -> str: |
| 115 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 116 | |
| 117 | |
| 118 | def _read_branch(root: pathlib.Path) -> str: |
| 119 | head_ref = (root / ".muse" / "HEAD").read_text().strip() |
| 120 | return head_ref.removeprefix("refs/heads/").strip() |
| 121 | |
| 122 | |
| 123 | @app.callback(invoke_without_command=True) |
| 124 | def blame( |
| 125 | ctx: typer.Context, |
| 126 | address: str = typer.Argument( |
| 127 | ..., metavar="ADDRESS", |
| 128 | help='Symbol address, e.g. "src/billing.py::compute_invoice_total".', |
| 129 | ), |
| 130 | from_ref: str | None = typer.Option( |
| 131 | None, "--from", metavar="REF", |
| 132 | help="Start walking from this commit / branch (default: HEAD).", |
| 133 | ), |
| 134 | show_all: bool = typer.Option( |
| 135 | False, "--all", "-a", |
| 136 | help="Show the full change history, not just the three most recent events.", |
| 137 | ), |
| 138 | as_json: bool = typer.Option( |
| 139 | False, "--json", help="Emit attribution as JSON.", |
| 140 | ), |
| 141 | ) -> None: |
| 142 | """Show which commit last touched a specific symbol. |
| 143 | |
| 144 | ``muse blame`` attributes the symbol as a semantic unit — one answer |
| 145 | per function, class, or method, regardless of line count. The full |
| 146 | chain of prior events (renames, signature changes, etc.) is available |
| 147 | via ``--all``. |
| 148 | |
| 149 | Unlike ``git blame``, which gives per-line attribution across an entire |
| 150 | file, ``muse blame`` gives a single clear answer: *this commit last |
| 151 | changed this symbol, and this is what changed*. |
| 152 | """ |
| 153 | root = require_repo() |
| 154 | repo_id = _read_repo_id(root) |
| 155 | branch = _read_branch(root) |
| 156 | |
| 157 | start_commit = resolve_commit_ref(root, repo_id, branch, from_ref) |
| 158 | if start_commit is None: |
| 159 | typer.echo(f"❌ Commit '{from_ref or 'HEAD'}' not found.", err=True) |
| 160 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 161 | |
| 162 | commits = walk_commits(root, start_commit.commit_id) |
| 163 | |
| 164 | current_address = address |
| 165 | all_events: list[_BlameEvent] = [] |
| 166 | for commit in commits: |
| 167 | evs, current_address = _events_in_commit(commit, current_address) |
| 168 | all_events.extend(evs) |
| 169 | |
| 170 | if as_json: |
| 171 | typer.echo(json.dumps( |
| 172 | {"address": address, "events": [e.to_dict() for e in reversed(all_events)]}, |
| 173 | indent=2, |
| 174 | )) |
| 175 | return |
| 176 | |
| 177 | typer.echo(f"\n{address}") |
| 178 | typer.echo("─" * 62) |
| 179 | |
| 180 | if not all_events: |
| 181 | typer.echo(" (no events found — symbol may not exist in this repository)") |
| 182 | return |
| 183 | |
| 184 | events_to_show = all_events if show_all else all_events[:3] |
| 185 | labels = ["last touched:", "previous: ", "before that: "] |
| 186 | |
| 187 | for idx, ev in enumerate(events_to_show): |
| 188 | label = labels[idx] if idx < len(labels) else " :" |
| 189 | date_str = ev.commit.committed_at.strftime("%Y-%m-%d") |
| 190 | short_id = ev.commit.commit_id[:8] |
| 191 | typer.echo(f"{label} {short_id} {date_str}") |
| 192 | if idx == 0: |
| 193 | typer.echo(f"author: {ev.commit.author or 'unknown'}") |
| 194 | typer.echo(f'message: "{ev.commit.message}"') |
| 195 | typer.echo(f"change: {ev.detail}") |
| 196 | if ev.new_address: |
| 197 | typer.echo(f" (tracking continues as {ev.new_address})") |
| 198 | typer.echo("") |