content_grep.py
python
| 1 | """``muse content-grep`` — full-text search across tracked files. |
| 2 | |
| 3 | Searches the content of every tracked file in a commit's snapshot for a |
| 4 | pattern. Each file is read from the content-addressed object store in full |
| 5 | (bounded by the store's MAX_FILE_BYTES limit, typically 256 MiB). Binary |
| 6 | files are detected by scanning the first 8 KiB for null bytes and silently |
| 7 | skipped. Files that cannot be decoded as UTF-8 are also skipped. |
| 8 | |
| 9 | Regex safety: patterns are compiled with a 500-character length limit to |
| 10 | prevent catastrophic backtracking (ReDoS). Use ``re.escape`` in scripts |
| 11 | if you need to match literal strings with special characters. |
| 12 | |
| 13 | Binary files are detected by scanning the first 8 KiB for null bytes and |
| 14 | silently skipped. Files that cannot be decoded as UTF-8 are also skipped. |
| 15 | |
| 16 | Domain dispatch: when the active domain plugin exposes a ``grep`` method |
| 17 | (detected via ``hasattr``), it is invoked instead of the raw text fallback, |
| 18 | enabling symbol-aware search in code repositories. For all other domains |
| 19 | the raw byte-level text search is used. |
| 20 | |
| 21 | Usage:: |
| 22 | |
| 23 | muse content-grep --pattern "Cm7" # literal substring |
| 24 | muse content-grep --pattern "tempo:\\s+\\d+" # regex |
| 25 | muse content-grep --pattern "TODO" --ignore-case # case-insensitive |
| 26 | muse content-grep --pattern "chorus" --files-only # only file paths |
| 27 | muse content-grep --pattern "bass" --ref feat/audio # search a branch tip |
| 28 | |
| 29 | Exit codes:: |
| 30 | |
| 31 | 0 — pattern found in at least one file |
| 32 | 1 — no matches (or no commits) |
| 33 | 3 — I/O error |
| 34 | """ |
| 35 | |
| 36 | from __future__ import annotations |
| 37 | |
| 38 | import json |
| 39 | import logging |
| 40 | import pathlib |
| 41 | import re |
| 42 | from typing import Annotated, TypedDict |
| 43 | |
| 44 | import typer |
| 45 | |
| 46 | from muse.core.errors import ExitCode |
| 47 | from muse.core.object_store import read_object |
| 48 | from muse.core.repo import require_repo |
| 49 | from muse.core.store import ( |
| 50 | get_head_commit_id, |
| 51 | read_commit, |
| 52 | read_current_branch, |
| 53 | read_snapshot, |
| 54 | resolve_commit_ref, |
| 55 | ) |
| 56 | from muse.core.validation import sanitize_display |
| 57 | from muse.plugins.registry import read_domain, resolve_plugin |
| 58 | |
| 59 | logger = logging.getLogger(__name__) |
| 60 | |
| 61 | app = typer.Typer(help="Full-text search across tracked files in a snapshot.") |
| 62 | |
| 63 | _BINARY_CHUNK = 8192 |
| 64 | _MAX_PATTERN_LEN = 500 # reject patterns that could cause catastrophic backtracking |
| 65 | |
| 66 | |
| 67 | class GrepMatch(TypedDict): |
| 68 | """A single matching line within a file.""" |
| 69 | |
| 70 | line_number: int |
| 71 | text: str |
| 72 | |
| 73 | |
| 74 | class GrepFileResult(TypedDict): |
| 75 | """All matches within a single file.""" |
| 76 | |
| 77 | path: str |
| 78 | object_id: str |
| 79 | match_count: int |
| 80 | matches: list[GrepMatch] |
| 81 | |
| 82 | |
| 83 | def _is_binary(data: bytes) -> bool: |
| 84 | """Return True if *data* (the first chunk) contains null bytes.""" |
| 85 | return b"\x00" in data |
| 86 | |
| 87 | |
| 88 | def _search_object( |
| 89 | root_path: "pathlib.Path", |
| 90 | object_id: str, |
| 91 | pattern: "re.Pattern[str]", |
| 92 | files_only: bool, |
| 93 | count_only: bool, |
| 94 | ) -> tuple[int, list[GrepMatch]]: |
| 95 | """Search an object for *pattern*; return (match_count, matches). |
| 96 | |
| 97 | Reads from the object store in one call (objects are bounded by |
| 98 | MAX_FILE_BYTES = 512 MiB in the validation module). Binary files |
| 99 | are skipped (return (0, [])). |
| 100 | """ |
| 101 | try: |
| 102 | raw = read_object(root_path, object_id) |
| 103 | except OSError as exc: |
| 104 | logger.warning("⚠️ grep: could not read object %s: %s", object_id[:12], exc) |
| 105 | return 0, [] |
| 106 | |
| 107 | if raw is None: |
| 108 | return 0, [] |
| 109 | |
| 110 | # Binary detection. |
| 111 | probe = raw[:_BINARY_CHUNK] |
| 112 | if _is_binary(probe): |
| 113 | return 0, [] |
| 114 | |
| 115 | try: |
| 116 | text = raw.decode("utf-8", errors="replace") |
| 117 | except Exception: |
| 118 | return 0, [] |
| 119 | |
| 120 | matches: list[GrepMatch] = [] |
| 121 | total = 0 |
| 122 | for lineno, line in enumerate(text.splitlines(), start=1): |
| 123 | if pattern.search(line): |
| 124 | total += 1 |
| 125 | if not files_only and not count_only: |
| 126 | matches.append(GrepMatch(line_number=lineno, text=line.rstrip("\r"))) |
| 127 | |
| 128 | return total, matches |
| 129 | |
| 130 | |
| 131 | def _read_repo_id(root: pathlib.Path) -> str: |
| 132 | return str(json.loads((root / ".muse" / "repo.json").read_text(encoding="utf-8"))["repo_id"]) |
| 133 | |
| 134 | |
| 135 | @app.callback(invoke_without_command=True) |
| 136 | def grep( |
| 137 | pattern: Annotated[ |
| 138 | str, |
| 139 | typer.Option("--pattern", "-p", help="Pattern to search for (Python regex syntax)."), |
| 140 | ], |
| 141 | ref: Annotated[ |
| 142 | str | None, |
| 143 | typer.Option("--ref", "-r", help="Branch name or commit SHA to search (default: HEAD)."), |
| 144 | ] = None, |
| 145 | ignore_case: Annotated[ |
| 146 | bool, |
| 147 | typer.Option("--ignore-case", "-i", help="Case-insensitive matching."), |
| 148 | ] = False, |
| 149 | files_only: Annotated[ |
| 150 | bool, |
| 151 | typer.Option("--files-only", "-l", help="Print only file paths with matches (no line content)."), |
| 152 | ] = False, |
| 153 | count_mode: Annotated[ |
| 154 | bool, |
| 155 | typer.Option("--count", "-c", help="Print count of matching lines per file."), |
| 156 | ] = False, |
| 157 | fmt: Annotated[ |
| 158 | str, |
| 159 | typer.Option("--format", "-f", help="Output format: text or json."), |
| 160 | ] = "text", |
| 161 | ) -> None: |
| 162 | """Search tracked file content for a pattern. |
| 163 | |
| 164 | Reads objects from the content-addressed store and scans each for the |
| 165 | pattern. Binary files and non-UTF-8 files are silently skipped. |
| 166 | |
| 167 | The pattern is a Python regular expression. Use ``--ignore-case`` for |
| 168 | case-insensitive matching. Exit code 0 means at least one match was |
| 169 | found; exit code 1 means no matches. |
| 170 | |
| 171 | Examples:: |
| 172 | |
| 173 | muse grep --pattern "chorus" |
| 174 | muse grep --pattern "TODO|FIXME" --files-only |
| 175 | muse grep --pattern "tempo" --ignore-case --format json |
| 176 | muse grep --pattern "chord" --ref feat/harmony |
| 177 | """ |
| 178 | if fmt not in {"text", "json"}: |
| 179 | typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True) |
| 180 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 181 | |
| 182 | root = require_repo() |
| 183 | repo_id = _read_repo_id(root) |
| 184 | branch = read_current_branch(root) |
| 185 | |
| 186 | # Resolve commit. |
| 187 | if ref is None: |
| 188 | commit_id = get_head_commit_id(root, branch) |
| 189 | if commit_id is None: |
| 190 | typer.echo("❌ No commits on current branch.", err=True) |
| 191 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 192 | else: |
| 193 | commit_rec = resolve_commit_ref(root, repo_id, branch, ref) |
| 194 | if commit_rec is None: |
| 195 | typer.echo(f"❌ Ref '{sanitize_display(ref)}' not found.", err=True) |
| 196 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 197 | commit_id = commit_rec.commit_id |
| 198 | |
| 199 | commit = read_commit(root, commit_id) |
| 200 | if commit is None: |
| 201 | typer.echo(f"❌ Commit {commit_id[:12]} not found.", err=True) |
| 202 | raise typer.Exit(code=ExitCode.INTERNAL_ERROR) |
| 203 | |
| 204 | snap = read_snapshot(root, commit.snapshot_id) |
| 205 | if snap is None: |
| 206 | typer.echo(f"❌ Snapshot {commit.snapshot_id[:12]} not found.", err=True) |
| 207 | raise typer.Exit(code=ExitCode.INTERNAL_ERROR) |
| 208 | |
| 209 | # Guard against patterns so long they risk catastrophic backtracking. |
| 210 | if len(pattern) > _MAX_PATTERN_LEN: |
| 211 | typer.echo( |
| 212 | f"❌ Pattern too long ({len(pattern)} chars, max {_MAX_PATTERN_LEN}). " |
| 213 | "Use a shorter pattern or re.escape() for literal matches.", |
| 214 | err=True, |
| 215 | ) |
| 216 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 217 | |
| 218 | # Compile regex. |
| 219 | flags = re.IGNORECASE if ignore_case else 0 |
| 220 | try: |
| 221 | compiled: re.Pattern[str] = re.compile(pattern, flags) |
| 222 | except re.error as exc: |
| 223 | typer.echo(f"❌ Invalid regex: {exc}", err=True) |
| 224 | raise typer.Exit(code=ExitCode.USER_ERROR) from exc |
| 225 | |
| 226 | # Search all files. |
| 227 | file_results: list[GrepFileResult] = [] |
| 228 | for rel_path, object_id in sorted(snap.manifest.items()): |
| 229 | match_count, matches = _search_object( |
| 230 | root, object_id, compiled, files_only, count_mode |
| 231 | ) |
| 232 | if match_count > 0: |
| 233 | file_results.append(GrepFileResult( |
| 234 | path=rel_path, |
| 235 | object_id=object_id, |
| 236 | match_count=match_count, |
| 237 | matches=matches, |
| 238 | )) |
| 239 | |
| 240 | if not file_results: |
| 241 | raise typer.Exit(code=ExitCode.USER_ERROR) # exit 1 = no matches |
| 242 | |
| 243 | if fmt == "json": |
| 244 | typer.echo(json.dumps(file_results, indent=2)) |
| 245 | else: |
| 246 | for fr in file_results: |
| 247 | safe_path = sanitize_display(fr["path"]) |
| 248 | if files_only: |
| 249 | typer.echo(safe_path) |
| 250 | elif count_mode: |
| 251 | typer.echo(f"{safe_path}:{fr['match_count']}") |
| 252 | else: |
| 253 | for m in fr["matches"]: |
| 254 | typer.echo(f"{safe_path}:{m['line_number']}:{sanitize_display(m['text'])}") |