query.py
python
| 1 | """muse query — symbol graph predicate query (v2). |
| 2 | |
| 3 | SQL for your codebase. A full predicate DSL over the typed, content-addressed |
| 4 | symbol graph — with OR, NOT, grouping, and an expanded field set. |
| 5 | |
| 6 | v2 grammar:: |
| 7 | |
| 8 | expr = or_expr |
| 9 | or_expr = and_expr ( OR and_expr )* |
| 10 | and_expr = not_expr ( [AND] not_expr )* # implicit AND |
| 11 | not_expr = NOT primary | primary |
| 12 | primary = "(" expr ")" | atom |
| 13 | atom = KEY OP VALUE |
| 14 | |
| 15 | Supported operators:: |
| 16 | |
| 17 | = exact match |
| 18 | ~= contains (case-insensitive) |
| 19 | ^= starts with (case-insensitive) |
| 20 | $= ends with (case-insensitive) |
| 21 | != not equal |
| 22 | |
| 23 | Supported keys:: |
| 24 | |
| 25 | kind function | class | method | variable | import | … |
| 26 | language Python | Go | Rust | TypeScript | … |
| 27 | name bare symbol name |
| 28 | qualified_name dotted name (User.save) |
| 29 | file file path |
| 30 | hash content_id prefix (exact-body match) |
| 31 | body_hash body_hash prefix |
| 32 | signature_id signature_id prefix |
| 33 | lineno_gt symbol starts after line N |
| 34 | lineno_lt symbol starts before line N |
| 35 | |
| 36 | Usage:: |
| 37 | |
| 38 | muse query "kind=function" "language=Python" "name~=validate" |
| 39 | muse query "(kind=function OR kind=method) name^=_" |
| 40 | muse query "NOT kind=import" "file~=billing" |
| 41 | muse query "hash=a3f2c9" |
| 42 | muse query "kind=function" "name$=_test" --commit HEAD~10 |
| 43 | muse query "kind=function" "name~=validate" --all-commits |
| 44 | """ |
| 45 | from __future__ import annotations |
| 46 | |
| 47 | import json |
| 48 | import logging |
| 49 | import pathlib |
| 50 | |
| 51 | import typer |
| 52 | |
| 53 | from muse.core.errors import ExitCode |
| 54 | from muse.core.repo import require_repo |
| 55 | from muse.core.store import CommitRecord, get_all_commits, get_commit_snapshot_manifest, resolve_commit_ref |
| 56 | from muse.plugins.code._predicate import Predicate, PredicateError, parse_query |
| 57 | from muse.plugins.code._query import language_of, symbols_for_snapshot |
| 58 | from muse.plugins.code.ast_parser import SymbolRecord # used in _query_all_commits signature |
| 59 | |
| 60 | logger = logging.getLogger(__name__) |
| 61 | |
| 62 | app = typer.Typer() |
| 63 | |
| 64 | _KIND_ICON: dict[str, str] = { |
| 65 | "function": "fn", |
| 66 | "async_function": "fn~", |
| 67 | "class": "class", |
| 68 | "method": "method", |
| 69 | "async_method": "method~", |
| 70 | "variable": "var", |
| 71 | "import": "import", |
| 72 | } |
| 73 | |
| 74 | |
| 75 | def _read_repo_id(root: pathlib.Path) -> str: |
| 76 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 77 | |
| 78 | |
| 79 | def _read_branch(root: pathlib.Path) -> str: |
| 80 | head_ref = (root / ".muse" / "HEAD").read_text().strip() |
| 81 | return head_ref.removeprefix("refs/heads/").strip() |
| 82 | |
| 83 | |
| 84 | # Predicate parsing is handled by muse.plugins.code._predicate (v2 grammar). |
| 85 | |
| 86 | |
| 87 | class _HistoricalMatch: |
| 88 | """A symbol match found in a historical commit (--all-commits mode).""" |
| 89 | |
| 90 | def __init__( |
| 91 | self, |
| 92 | address: str, |
| 93 | rec: SymbolRecord, |
| 94 | commit: CommitRecord, |
| 95 | first_seen: bool, |
| 96 | ) -> None: |
| 97 | self.address = address |
| 98 | self.rec = rec |
| 99 | self.commit = commit |
| 100 | self.first_seen = first_seen # True when this is the oldest appearance |
| 101 | |
| 102 | def to_dict(self) -> dict[str, str | int | bool]: |
| 103 | return { |
| 104 | "address": self.address, |
| 105 | "kind": self.rec["kind"], |
| 106 | "name": self.rec["name"], |
| 107 | "content_id": self.rec["content_id"], |
| 108 | "first_seen": self.first_seen, |
| 109 | "commit_id": self.commit.commit_id, |
| 110 | "commit_message": self.commit.message, |
| 111 | "committed_at": self.commit.committed_at.isoformat(), |
| 112 | "branch": self.commit.branch, |
| 113 | } |
| 114 | |
| 115 | |
| 116 | def _query_all_commits( |
| 117 | root: pathlib.Path, |
| 118 | filters: list[Predicate], |
| 119 | ) -> list[_HistoricalMatch]: |
| 120 | """Walk every commit oldest-first, apply predicates against each snapshot. |
| 121 | |
| 122 | Returns one entry per (address, commit) pair that matches. The |
| 123 | ``first_seen`` flag is True on the oldest commit where each |
| 124 | (content_id, address) pair appears. |
| 125 | """ |
| 126 | all_commits = get_all_commits(root) |
| 127 | if not all_commits: |
| 128 | return [] |
| 129 | sorted_commits = sorted(all_commits, key=lambda c: c.committed_at) |
| 130 | |
| 131 | results: list[_HistoricalMatch] = [] |
| 132 | # Track content_id → first commit_id for first_seen annotation. |
| 133 | first_seen_map: dict[str, str] = {} |
| 134 | |
| 135 | for commit in sorted_commits: |
| 136 | manifest = _manifest_for_commit(root, commit) |
| 137 | if not manifest: |
| 138 | continue |
| 139 | symbol_map = symbols_for_snapshot(root, manifest) |
| 140 | for file_path, tree in sorted(symbol_map.items()): |
| 141 | for addr, rec in sorted(tree.items(), key=lambda kv: kv[1]["lineno"]): |
| 142 | if not all(f(file_path, rec) for f in filters): |
| 143 | continue |
| 144 | cid = rec["content_id"] |
| 145 | is_first = cid not in first_seen_map |
| 146 | if is_first: |
| 147 | first_seen_map[cid] = commit.commit_id |
| 148 | results.append(_HistoricalMatch(addr, rec, commit, is_first)) |
| 149 | |
| 150 | return results |
| 151 | |
| 152 | |
| 153 | def _manifest_for_commit( |
| 154 | root: pathlib.Path, |
| 155 | commit: CommitRecord, |
| 156 | ) -> dict[str, str]: |
| 157 | """Load the snapshot manifest for *commit*, returning empty dict on failure.""" |
| 158 | snap_path = root / ".muse" / "snapshots" / f"{commit.snapshot_id}.json" |
| 159 | if not snap_path.exists(): |
| 160 | return {} |
| 161 | try: |
| 162 | return dict(json.loads(snap_path.read_text()).get("manifest", {})) |
| 163 | except (json.JSONDecodeError, KeyError): |
| 164 | return {} |
| 165 | |
| 166 | |
| 167 | @app.callback(invoke_without_command=True) |
| 168 | def query( |
| 169 | ctx: typer.Context, |
| 170 | predicates: list[str] = typer.Argument( |
| 171 | ..., metavar="PREDICATE...", |
| 172 | help="One or more predicates, e.g. \"kind=function\" \"name~=validate\".", |
| 173 | ), |
| 174 | ref: str | None = typer.Option( |
| 175 | None, "--commit", "-c", metavar="REF", |
| 176 | help="Query a historical snapshot instead of HEAD.", |
| 177 | ), |
| 178 | all_commits: bool = typer.Option( |
| 179 | False, "--all-commits", |
| 180 | help=( |
| 181 | "Search across ALL commits (every branch). " |
| 182 | "Enables temporal hash= queries: find when a function body first appeared. " |
| 183 | "Mutually exclusive with --commit." |
| 184 | ), |
| 185 | ), |
| 186 | show_hashes: bool = typer.Option( |
| 187 | False, "--hashes", help="Include content hashes in output.", |
| 188 | ), |
| 189 | as_json: bool = typer.Option( |
| 190 | False, "--json", help="Emit results as JSON.", |
| 191 | ), |
| 192 | ) -> None: |
| 193 | """Query the symbol graph with a predicate DSL. |
| 194 | |
| 195 | ``muse query`` is SQL for your codebase. Every predicate is evaluated |
| 196 | against the typed, content-addressed symbol graph — not raw text. |
| 197 | |
| 198 | Predicate syntax: ``key=value`` (exact), ``key~=value`` (contains), |
| 199 | ``key^=value`` (starts with), ``key$=value`` (ends with). |
| 200 | |
| 201 | The ``hash`` predicate finds every symbol whose normalized AST matches |
| 202 | that content hash — duplicate function detection, clone tracking, and |
| 203 | cross-module copy detection in one query. |
| 204 | |
| 205 | With ``--all-commits``, the query searches every commit ever recorded |
| 206 | (across all branches), ordered oldest-first. The first time each unique |
| 207 | ``content_id`` appears is marked. This enables temporal queries: |
| 208 | "when did this function body first enter the repository?" |
| 209 | |
| 210 | \\b |
| 211 | Examples:: |
| 212 | |
| 213 | muse query "kind=function" "language=Python" |
| 214 | muse query "hash=a3f2c9" |
| 215 | muse query "hash=a3f2c9" --all-commits # when did it first appear? |
| 216 | muse query "name~=validate" --all-commits --json |
| 217 | """ |
| 218 | root = require_repo() |
| 219 | repo_id = _read_repo_id(root) |
| 220 | branch = _read_branch(root) |
| 221 | |
| 222 | if not predicates: |
| 223 | typer.echo("❌ At least one predicate is required.", err=True) |
| 224 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 225 | |
| 226 | if all_commits and ref is not None: |
| 227 | typer.echo("❌ --all-commits and --commit are mutually exclusive.", err=True) |
| 228 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 229 | |
| 230 | # Parse predicates using the v2 grammar (OR / NOT / grouping supported). |
| 231 | # Each CLI argument is joined with implicit AND; a single argument may |
| 232 | # contain OR/NOT/parentheses. |
| 233 | try: |
| 234 | combined_predicate: Predicate = parse_query(predicates) |
| 235 | except PredicateError as exc: |
| 236 | typer.echo(f"❌ {exc}", err=True) |
| 237 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 238 | filters: list[Predicate] = [combined_predicate] |
| 239 | |
| 240 | # ---------------------------------------------------------------- |
| 241 | # --all-commits mode: temporal search across every recorded commit |
| 242 | # ---------------------------------------------------------------- |
| 243 | if all_commits: |
| 244 | historical = _query_all_commits(root, filters) |
| 245 | if as_json: |
| 246 | typer.echo(json.dumps( |
| 247 | { |
| 248 | "schema_version": 2, |
| 249 | "mode": "all-commits", |
| 250 | "results": [h.to_dict() for h in historical], |
| 251 | }, |
| 252 | indent=2, |
| 253 | )) |
| 254 | return |
| 255 | if not historical: |
| 256 | pred_display = " AND ".join(predicates) |
| 257 | typer.echo(f" (no symbols matching: {pred_display} [searched all commits])") |
| 258 | return |
| 259 | # Deduplicate for display: show unique addresses with their first-seen commit. |
| 260 | seen_addrs: set[str] = set() |
| 261 | unique: list[_HistoricalMatch] = [] |
| 262 | for h in historical: |
| 263 | if h.first_seen and h.address not in seen_addrs: |
| 264 | seen_addrs.add(h.address) |
| 265 | unique.append(h) |
| 266 | pred_display = " AND ".join(predicates) |
| 267 | typer.echo(f"\n{len(unique)} unique symbol(s) matching [{pred_display}] across all commits\n") |
| 268 | for h in unique: |
| 269 | date_str = h.commit.committed_at.strftime("%Y-%m-%d") |
| 270 | short_id = h.commit.commit_id[:8] |
| 271 | icon = _KIND_ICON.get(h.rec["kind"], h.rec["kind"]) |
| 272 | hash_part = f" {h.rec['content_id'][:8]}.." if show_hashes else "" |
| 273 | branch_label = f" [{h.commit.branch}]" if h.commit.branch else "" |
| 274 | typer.echo( |
| 275 | f" {h.address:<60} {icon:<8}" |
| 276 | f" first seen {short_id} {date_str}{branch_label}{hash_part}" |
| 277 | ) |
| 278 | return |
| 279 | |
| 280 | # ---------------------------------------------------------------- |
| 281 | # Single-snapshot mode (default) |
| 282 | # ---------------------------------------------------------------- |
| 283 | commit = resolve_commit_ref(root, repo_id, branch, ref) |
| 284 | if commit is None: |
| 285 | typer.echo(f"❌ Commit '{ref or 'HEAD'}' not found.", err=True) |
| 286 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 287 | |
| 288 | manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {} |
| 289 | symbol_map = symbols_for_snapshot(root, manifest) |
| 290 | |
| 291 | # Apply all predicates. |
| 292 | matches: list[tuple[str, str, SymbolRecord]] = [] |
| 293 | for file_path, tree in sorted(symbol_map.items()): |
| 294 | for addr, rec in sorted(tree.items(), key=lambda kv: kv[1]["lineno"]): |
| 295 | if all(f(file_path, rec) for f in filters): |
| 296 | matches.append((file_path, addr, rec)) |
| 297 | |
| 298 | if as_json: |
| 299 | out: list[dict[str, str | int]] = [] |
| 300 | for fp, addr, rec in matches: |
| 301 | out.append({ |
| 302 | "address": addr, |
| 303 | "kind": rec["kind"], |
| 304 | "name": rec["name"], |
| 305 | "qualified_name": rec["qualified_name"], |
| 306 | "file": fp, |
| 307 | "lineno": rec["lineno"], |
| 308 | "end_lineno": rec["end_lineno"], |
| 309 | "language": language_of(fp), |
| 310 | "content_id": rec["content_id"], |
| 311 | "body_hash": rec["body_hash"], |
| 312 | "signature_id": rec["signature_id"], |
| 313 | }) |
| 314 | typer.echo(json.dumps( |
| 315 | {"schema_version": 2, "commit": commit.commit_id[:8], "results": out}, |
| 316 | indent=2, |
| 317 | )) |
| 318 | return |
| 319 | |
| 320 | if not matches: |
| 321 | pred_str = " AND ".join(predicates) |
| 322 | typer.echo(f" (no symbols matching: {pred_str})") |
| 323 | return |
| 324 | |
| 325 | files_seen: set[str] = set() |
| 326 | for fp, addr, rec in matches: |
| 327 | files_seen.add(fp) |
| 328 | icon = _KIND_ICON.get(rec["kind"], rec["kind"]) |
| 329 | name = rec["qualified_name"] |
| 330 | line = rec["lineno"] |
| 331 | hash_part = f" {rec['content_id'][:8]}.." if show_hashes else "" |
| 332 | typer.echo(f" {addr:<60} {icon:<10} line {line:>4}{hash_part}") |
| 333 | |
| 334 | pred_display = " AND ".join(predicates) |
| 335 | typer.echo(f"\n{len(matches)} match(es) across {len(files_seen)} file(s) [{pred_display}]") |