gabriel / muse public
content_grep.py python
254 lines 8.4 KB
95b86799 feat: add --format json to all porcelain commands for agent-first output Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """``muse content-grep`` — full-text search across tracked files.
2
3 Searches the content of every tracked file in a commit's snapshot for a
4 pattern. Each file is read from the content-addressed object store in full
5 (bounded by the store's MAX_FILE_BYTES limit, typically 256 MiB). Binary
6 files are detected by scanning the first 8 KiB for null bytes and silently
7 skipped. Files that cannot be decoded as UTF-8 are also skipped.
8
9 Regex safety: patterns are compiled with a 500-character length limit to
10 prevent catastrophic backtracking (ReDoS). Use ``re.escape`` in scripts
11 if you need to match literal strings with special characters.
12
13 Binary files are detected by scanning the first 8 KiB for null bytes and
14 silently skipped. Files that cannot be decoded as UTF-8 are also skipped.
15
16 Domain dispatch: when the active domain plugin exposes a ``grep`` method
17 (detected via ``hasattr``), it is invoked instead of the raw text fallback,
18 enabling symbol-aware search in code repositories. For all other domains
19 the raw byte-level text search is used.
20
21 Usage::
22
23 muse content-grep --pattern "Cm7" # literal substring
24 muse content-grep --pattern "tempo:\\s+\\d+" # regex
25 muse content-grep --pattern "TODO" --ignore-case # case-insensitive
26 muse content-grep --pattern "chorus" --files-only # only file paths
27 muse content-grep --pattern "bass" --ref feat/audio # search a branch tip
28
29 Exit codes::
30
31 0 — pattern found in at least one file
32 1 — no matches (or no commits)
33 3 — I/O error
34 """
35
36 from __future__ import annotations
37
38 import json
39 import logging
40 import pathlib
41 import re
42 from typing import Annotated, TypedDict
43
44 import typer
45
46 from muse.core.errors import ExitCode
47 from muse.core.object_store import read_object
48 from muse.core.repo import require_repo
49 from muse.core.store import (
50 get_head_commit_id,
51 read_commit,
52 read_current_branch,
53 read_snapshot,
54 resolve_commit_ref,
55 )
56 from muse.core.validation import sanitize_display
57 from muse.plugins.registry import read_domain, resolve_plugin
58
59 logger = logging.getLogger(__name__)
60
61 app = typer.Typer(help="Full-text search across tracked files in a snapshot.")
62
63 _BINARY_CHUNK = 8192
64 _MAX_PATTERN_LEN = 500 # reject patterns that could cause catastrophic backtracking
65
66
67 class GrepMatch(TypedDict):
68 """A single matching line within a file."""
69
70 line_number: int
71 text: str
72
73
74 class GrepFileResult(TypedDict):
75 """All matches within a single file."""
76
77 path: str
78 object_id: str
79 match_count: int
80 matches: list[GrepMatch]
81
82
83 def _is_binary(data: bytes) -> bool:
84 """Return True if *data* (the first chunk) contains null bytes."""
85 return b"\x00" in data
86
87
88 def _search_object(
89 root_path: "pathlib.Path",
90 object_id: str,
91 pattern: "re.Pattern[str]",
92 files_only: bool,
93 count_only: bool,
94 ) -> tuple[int, list[GrepMatch]]:
95 """Search an object for *pattern*; return (match_count, matches).
96
97 Reads from the object store in one call (objects are bounded by
98 MAX_FILE_BYTES = 512 MiB in the validation module). Binary files
99 are skipped (return (0, [])).
100 """
101 try:
102 raw = read_object(root_path, object_id)
103 except OSError as exc:
104 logger.warning("⚠️ grep: could not read object %s: %s", object_id[:12], exc)
105 return 0, []
106
107 if raw is None:
108 return 0, []
109
110 # Binary detection.
111 probe = raw[:_BINARY_CHUNK]
112 if _is_binary(probe):
113 return 0, []
114
115 try:
116 text = raw.decode("utf-8", errors="replace")
117 except Exception:
118 return 0, []
119
120 matches: list[GrepMatch] = []
121 total = 0
122 for lineno, line in enumerate(text.splitlines(), start=1):
123 if pattern.search(line):
124 total += 1
125 if not files_only and not count_only:
126 matches.append(GrepMatch(line_number=lineno, text=line.rstrip("\r")))
127
128 return total, matches
129
130
131 def _read_repo_id(root: pathlib.Path) -> str:
132 return str(json.loads((root / ".muse" / "repo.json").read_text(encoding="utf-8"))["repo_id"])
133
134
135 @app.callback(invoke_without_command=True)
136 def grep(
137 pattern: Annotated[
138 str,
139 typer.Option("--pattern", "-p", help="Pattern to search for (Python regex syntax)."),
140 ],
141 ref: Annotated[
142 str | None,
143 typer.Option("--ref", "-r", help="Branch name or commit SHA to search (default: HEAD)."),
144 ] = None,
145 ignore_case: Annotated[
146 bool,
147 typer.Option("--ignore-case", "-i", help="Case-insensitive matching."),
148 ] = False,
149 files_only: Annotated[
150 bool,
151 typer.Option("--files-only", "-l", help="Print only file paths with matches (no line content)."),
152 ] = False,
153 count_mode: Annotated[
154 bool,
155 typer.Option("--count", "-c", help="Print count of matching lines per file."),
156 ] = False,
157 fmt: Annotated[
158 str,
159 typer.Option("--format", "-f", help="Output format: text or json."),
160 ] = "text",
161 ) -> None:
162 """Search tracked file content for a pattern.
163
164 Reads objects from the content-addressed store and scans each for the
165 pattern. Binary files and non-UTF-8 files are silently skipped.
166
167 The pattern is a Python regular expression. Use ``--ignore-case`` for
168 case-insensitive matching. Exit code 0 means at least one match was
169 found; exit code 1 means no matches.
170
171 Examples::
172
173 muse grep --pattern "chorus"
174 muse grep --pattern "TODO|FIXME" --files-only
175 muse grep --pattern "tempo" --ignore-case --format json
176 muse grep --pattern "chord" --ref feat/harmony
177 """
178 if fmt not in {"text", "json"}:
179 typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True)
180 raise typer.Exit(code=ExitCode.USER_ERROR)
181
182 root = require_repo()
183 repo_id = _read_repo_id(root)
184 branch = read_current_branch(root)
185
186 # Resolve commit.
187 if ref is None:
188 commit_id = get_head_commit_id(root, branch)
189 if commit_id is None:
190 typer.echo("❌ No commits on current branch.", err=True)
191 raise typer.Exit(code=ExitCode.USER_ERROR)
192 else:
193 commit_rec = resolve_commit_ref(root, repo_id, branch, ref)
194 if commit_rec is None:
195 typer.echo(f"❌ Ref '{sanitize_display(ref)}' not found.", err=True)
196 raise typer.Exit(code=ExitCode.USER_ERROR)
197 commit_id = commit_rec.commit_id
198
199 commit = read_commit(root, commit_id)
200 if commit is None:
201 typer.echo(f"❌ Commit {commit_id[:12]} not found.", err=True)
202 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
203
204 snap = read_snapshot(root, commit.snapshot_id)
205 if snap is None:
206 typer.echo(f"❌ Snapshot {commit.snapshot_id[:12]} not found.", err=True)
207 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
208
209 # Guard against patterns so long they risk catastrophic backtracking.
210 if len(pattern) > _MAX_PATTERN_LEN:
211 typer.echo(
212 f"❌ Pattern too long ({len(pattern)} chars, max {_MAX_PATTERN_LEN}). "
213 "Use a shorter pattern or re.escape() for literal matches.",
214 err=True,
215 )
216 raise typer.Exit(code=ExitCode.USER_ERROR)
217
218 # Compile regex.
219 flags = re.IGNORECASE if ignore_case else 0
220 try:
221 compiled: re.Pattern[str] = re.compile(pattern, flags)
222 except re.error as exc:
223 typer.echo(f"❌ Invalid regex: {exc}", err=True)
224 raise typer.Exit(code=ExitCode.USER_ERROR) from exc
225
226 # Search all files.
227 file_results: list[GrepFileResult] = []
228 for rel_path, object_id in sorted(snap.manifest.items()):
229 match_count, matches = _search_object(
230 root, object_id, compiled, files_only, count_mode
231 )
232 if match_count > 0:
233 file_results.append(GrepFileResult(
234 path=rel_path,
235 object_id=object_id,
236 match_count=match_count,
237 matches=matches,
238 ))
239
240 if not file_results:
241 raise typer.Exit(code=ExitCode.USER_ERROR) # exit 1 = no matches
242
243 if fmt == "json":
244 typer.echo(json.dumps(file_results, indent=2))
245 else:
246 for fr in file_results:
247 safe_path = sanitize_display(fr["path"])
248 if files_only:
249 typer.echo(safe_path)
250 elif count_mode:
251 typer.echo(f"{safe_path}:{fr['match_count']}")
252 else:
253 for m in fr["matches"]:
254 typer.echo(f"{safe_path}:{m['line_number']}:{sanitize_display(m['text'])}")