gabriel / muse public
semantic_cherry_pick.py python
265 lines 9.3 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """muse semantic-cherry-pick — cherry-pick specific symbols, not files.
2
3 Extracts named symbols from a source commit and applies them to the current
4 working tree, replacing only those symbols. All other code is left untouched.
5
6 This is the semantic counterpart to ``git cherry-pick``, which operates at the
7 file-hunk level. ``muse semantic-cherry-pick`` operates at the symbol level:
8 you name the exact functions, classes, or methods you want to bring forward.
9
10 Multiple symbols can be cherry-picked in a single invocation. They are
11 applied left-to-right. If any symbol fails to apply, the remaining are
12 skipped and the error is reported.
13
14 Usage::
15
16 muse semantic-cherry-pick "src/billing.py::compute_total" --from abc12345
17 muse semantic-cherry-pick \\
18 "src/auth.py::validate_token" \\
19 "src/auth.py::refresh_token" \\
20 --from feature-branch
21 muse semantic-cherry-pick "src/core.py::hash_content" --from HEAD~5 --dry-run
22 muse semantic-cherry-pick "src/billing.py::Invoice.pay" --from v1.0 --json
23
24 Output::
25
26 Semantic cherry-pick from commit abc12345
27 ──────────────────────────────────────────────────────────────
28
29 ✅ src/auth.py::validate_token applied (lines 12–34 → 12–29)
30 ✅ src/auth.py::refresh_token applied (lines 36–58 → 36–52)
31 ❌ src/billing.py::compute_total not found in source commit
32
33 2 applied, 1 failed
34
35 Flags:
36
37 ``--from REF``
38 Required. Commit or branch to cherry-pick from.
39
40 ``--dry-run``
41 Print what would change without writing anything.
42
43 ``--json``
44 Emit per-symbol results as JSON.
45 """
46
47 from __future__ import annotations
48
49 import json
50 import logging
51 import pathlib
52 from typing import Literal
53
54 import typer
55
56 from muse.core.errors import ExitCode
57 from muse.core.object_store import read_object
58 from muse.core.repo import require_repo
59 from muse.core.store import get_commit_snapshot_manifest, resolve_commit_ref
60 from muse.plugins.code.ast_parser import parse_symbols
61
62 logger = logging.getLogger(__name__)
63
64 app = typer.Typer()
65
66 ApplyStatus = Literal["applied", "not_found", "file_missing", "parse_error", "already_current"]
67
68
69 def _read_repo_id(root: pathlib.Path) -> str:
70 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
71
72
73 def _read_branch(root: pathlib.Path) -> str:
74 head_ref = (root / ".muse" / "HEAD").read_text().strip()
75 return head_ref.removeprefix("refs/heads/").strip()
76
77
78 class _PickResult:
79 def __init__(
80 self,
81 address: str,
82 status: ApplyStatus,
83 detail: str = "",
84 old_lines: int = 0,
85 new_lines: int = 0,
86 ) -> None:
87 self.address = address
88 self.status = status
89 self.detail = detail
90 self.old_lines = old_lines
91 self.new_lines = new_lines
92
93 def to_dict(self) -> dict[str, str | int]:
94 return {
95 "address": self.address,
96 "status": self.status,
97 "detail": self.detail,
98 "old_lines": self.old_lines,
99 "new_lines": self.new_lines,
100 }
101
102
103 def _apply_symbol(
104 root: pathlib.Path,
105 address: str,
106 src_manifest: dict[str, str],
107 dry_run: bool,
108 ) -> _PickResult:
109 """Apply one symbol from *src_manifest* to the working tree."""
110 if "::" not in address:
111 return _PickResult(address, "not_found", "address has no '::' separator")
112
113 file_rel = address.split("::")[0]
114
115 # Read historical blob.
116 obj_id = src_manifest.get(file_rel)
117 if obj_id is None:
118 return _PickResult(address, "file_missing", f"'{file_rel}' not in source snapshot")
119
120 src_raw = read_object(root, obj_id)
121 if src_raw is None:
122 return _PickResult(address, "file_missing", f"blob {obj_id[:8]} missing")
123
124 try:
125 src_tree = parse_symbols(src_raw, file_rel)
126 except Exception as exc:
127 return _PickResult(address, "parse_error", str(exc))
128
129 src_rec = src_tree.get(address)
130 if src_rec is None:
131 return _PickResult(address, "not_found", f"symbol not found in source commit")
132
133 src_lines_list = src_raw.decode("utf-8", errors="replace").splitlines(keepends=True)
134 src_symbol_lines = src_lines_list[src_rec["lineno"] - 1:src_rec["end_lineno"]]
135
136 # Read current working tree.
137 working_file = root / file_rel
138 if not working_file.exists():
139 # File doesn't exist in working tree — create it with just the symbol.
140 if not dry_run:
141 working_file.parent.mkdir(parents=True, exist_ok=True)
142 working_file.write_text("".join(src_symbol_lines), encoding="utf-8")
143 return _PickResult(address, "applied", "created file", 0, len(src_symbol_lines))
144
145 current_text = working_file.read_text(encoding="utf-8", errors="replace")
146 current_lines = current_text.splitlines(keepends=True)
147
148 # Find the symbol in the current working tree.
149 current_raw = current_text.encode("utf-8")
150 try:
151 current_tree = parse_symbols(current_raw, file_rel)
152 except Exception as exc:
153 return _PickResult(address, "parse_error", f"current file: {exc}")
154
155 current_rec = current_tree.get(address)
156
157 if current_rec is not None:
158 # Check if already current (content_id matches).
159 if current_rec["content_id"] == src_rec["content_id"]:
160 return _PickResult(address, "already_current", "content identical", 0, 0)
161 old_start = current_rec["lineno"] - 1
162 old_end = current_rec["end_lineno"]
163 old_count = old_end - old_start
164 new_lines = current_lines[:old_start] + src_symbol_lines + current_lines[old_end:]
165 detail = f"lines {current_rec['lineno']}–{current_rec['end_lineno']} → {len(src_symbol_lines)} lines"
166 else:
167 # Symbol not in current tree — append at end.
168 new_lines = current_lines + ["\n"] + src_symbol_lines
169 old_count = 0
170 detail = "appended at end (symbol not found in current tree)"
171
172 if not dry_run:
173 working_file.write_text("".join(new_lines), encoding="utf-8")
174
175 return _PickResult(address, "applied", detail, old_count, len(src_symbol_lines))
176
177
178 @app.callback(invoke_without_command=True)
179 def semantic_cherry_pick(
180 ctx: typer.Context,
181 addresses: list[str] = typer.Argument(
182 ..., metavar="ADDRESS...",
183 help='Symbol addresses to cherry-pick, e.g. "src/auth.py::validate_token".',
184 ),
185 from_ref: str = typer.Option(
186 ..., "--from", metavar="REF",
187 help="Commit or branch to cherry-pick symbols from (required).",
188 ),
189 dry_run: bool = typer.Option(
190 False, "--dry-run",
191 help="Print what would change without writing anything.",
192 ),
193 as_json: bool = typer.Option(False, "--json", help="Emit per-symbol results as JSON."),
194 ) -> None:
195 """Cherry-pick specific named symbols from a historical commit.
196
197 Extracts each listed symbol from the source commit and splices it into
198 the current working-tree file at the symbol's current location. Only
199 the target symbol's lines change; all surrounding code is preserved.
200
201 If the symbol does not exist in the current working tree, the historical
202 version is appended to the end of the file.
203
204 ``--dry-run`` shows what would change without writing anything.
205 ``--json`` emits per-symbol results for machine consumption.
206 """
207 root = require_repo()
208 repo_id = _read_repo_id(root)
209 branch = _read_branch(root)
210
211 if not addresses:
212 typer.echo("❌ At least one ADDRESS is required.", err=True)
213 raise typer.Exit(code=ExitCode.USER_ERROR)
214
215 from_commit = resolve_commit_ref(root, repo_id, branch, from_ref)
216 if from_commit is None:
217 typer.echo(f"❌ --from ref '{from_ref}' not found.", err=True)
218 raise typer.Exit(code=ExitCode.USER_ERROR)
219
220 src_manifest = get_commit_snapshot_manifest(root, from_commit.commit_id) or {}
221
222 results: list[_PickResult] = []
223 for address in addresses:
224 result = _apply_symbol(root, address, src_manifest, dry_run)
225 results.append(result)
226
227 if as_json:
228 typer.echo(json.dumps(
229 {
230 "from_commit": from_commit.commit_id[:8],
231 "dry_run": dry_run,
232 "results": [r.to_dict() for r in results],
233 "applied": sum(1 for r in results if r.status == "applied"),
234 "failed": sum(1 for r in results if r.status not in ("applied", "already_current")),
235 "already_current": sum(1 for r in results if r.status == "already_current"),
236 },
237 indent=2,
238 ))
239 return
240
241 action = "Dry-run" if dry_run else "Semantic cherry-pick"
242 typer.echo(f"\n{action} from commit {from_commit.commit_id[:8]}")
243 typer.echo("─" * 62)
244
245 max_addr = max(len(r.address) for r in results)
246 applied = 0
247 failed = 0
248
249 for r in results:
250 if r.status == "applied":
251 icon = "✅"
252 label = f"applied ({r.detail})"
253 applied += 1
254 elif r.status == "already_current":
255 icon = "ℹ️ "
256 label = "already current — no change needed"
257 else:
258 icon = "❌"
259 label = f"{r.status} ({r.detail})"
260 failed += 1
261 typer.echo(f"\n {icon} {r.address:<{max_addr}} {label}")
262
263 typer.echo(f"\n {applied} applied, {failed} failed")
264 if dry_run:
265 typer.echo(" (dry run — no files were written)")