gabriel / muse public
semantic_cherry_pick.py python
264 lines 9.3 KB
b4e8aaf2 feat(code): Phase 1 — lineage, api-surface, codemap, clones, checkout-s… Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """muse semantic-cherry-pick — cherry-pick specific symbols, not files.
2
3 Extracts named symbols from a source commit and applies them to the current
4 working tree, replacing only those symbols. All other code is left untouched.
5
6 This is the semantic counterpart to ``git cherry-pick``, which operates at the
7 file-hunk level. ``muse semantic-cherry-pick`` operates at the symbol level:
8 you name the exact functions, classes, or methods you want to bring forward.
9
10 Multiple symbols can be cherry-picked in a single invocation. They are
11 applied left-to-right. If any symbol fails to apply, the remaining are
12 skipped and the error is reported.
13
14 Usage::
15
16 muse semantic-cherry-pick "src/billing.py::compute_total" --from abc12345
17 muse semantic-cherry-pick \\
18 "src/auth.py::validate_token" \\
19 "src/auth.py::refresh_token" \\
20 --from feature-branch
21 muse semantic-cherry-pick "src/core.py::hash_content" --from HEAD~5 --dry-run
22 muse semantic-cherry-pick "src/billing.py::Invoice.pay" --from v1.0 --json
23
24 Output::
25
26 Semantic cherry-pick from commit abc12345
27 ──────────────────────────────────────────────────────────────
28
29 ✅ src/auth.py::validate_token applied (lines 12–34 → 12–29)
30 ✅ src/auth.py::refresh_token applied (lines 36–58 → 36–52)
31 ❌ src/billing.py::compute_total not found in source commit
32
33 2 applied, 1 failed
34
35 Flags:
36
37 ``--from REF``
38 Required. Commit or branch to cherry-pick from.
39
40 ``--dry-run``
41 Print what would change without writing anything.
42
43 ``--json``
44 Emit per-symbol results as JSON.
45 """
46 from __future__ import annotations
47
48 import json
49 import logging
50 import pathlib
51 from typing import Literal
52
53 import typer
54
55 from muse.core.errors import ExitCode
56 from muse.core.object_store import read_object
57 from muse.core.repo import require_repo
58 from muse.core.store import get_commit_snapshot_manifest, resolve_commit_ref
59 from muse.plugins.code.ast_parser import parse_symbols
60
61 logger = logging.getLogger(__name__)
62
63 app = typer.Typer()
64
65 ApplyStatus = Literal["applied", "not_found", "file_missing", "parse_error", "already_current"]
66
67
68 def _read_repo_id(root: pathlib.Path) -> str:
69 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
70
71
72 def _read_branch(root: pathlib.Path) -> str:
73 head_ref = (root / ".muse" / "HEAD").read_text().strip()
74 return head_ref.removeprefix("refs/heads/").strip()
75
76
77 class _PickResult:
78 def __init__(
79 self,
80 address: str,
81 status: ApplyStatus,
82 detail: str = "",
83 old_lines: int = 0,
84 new_lines: int = 0,
85 ) -> None:
86 self.address = address
87 self.status = status
88 self.detail = detail
89 self.old_lines = old_lines
90 self.new_lines = new_lines
91
92 def to_dict(self) -> dict[str, str | int]:
93 return {
94 "address": self.address,
95 "status": self.status,
96 "detail": self.detail,
97 "old_lines": self.old_lines,
98 "new_lines": self.new_lines,
99 }
100
101
102 def _apply_symbol(
103 root: pathlib.Path,
104 address: str,
105 src_manifest: dict[str, str],
106 dry_run: bool,
107 ) -> _PickResult:
108 """Apply one symbol from *src_manifest* to the working tree."""
109 if "::" not in address:
110 return _PickResult(address, "not_found", "address has no '::' separator")
111
112 file_rel = address.split("::")[0]
113
114 # Read historical blob.
115 obj_id = src_manifest.get(file_rel)
116 if obj_id is None:
117 return _PickResult(address, "file_missing", f"'{file_rel}' not in source snapshot")
118
119 src_raw = read_object(root, obj_id)
120 if src_raw is None:
121 return _PickResult(address, "file_missing", f"blob {obj_id[:8]} missing")
122
123 try:
124 src_tree = parse_symbols(src_raw, file_rel)
125 except Exception as exc:
126 return _PickResult(address, "parse_error", str(exc))
127
128 src_rec = src_tree.get(address)
129 if src_rec is None:
130 return _PickResult(address, "not_found", f"symbol not found in source commit")
131
132 src_lines_list = src_raw.decode("utf-8", errors="replace").splitlines(keepends=True)
133 src_symbol_lines = src_lines_list[src_rec["lineno"] - 1:src_rec["end_lineno"]]
134
135 # Read current working tree.
136 working_file = root / file_rel
137 if not working_file.exists():
138 # File doesn't exist in working tree — create it with just the symbol.
139 if not dry_run:
140 working_file.parent.mkdir(parents=True, exist_ok=True)
141 working_file.write_text("".join(src_symbol_lines), encoding="utf-8")
142 return _PickResult(address, "applied", "created file", 0, len(src_symbol_lines))
143
144 current_text = working_file.read_text(encoding="utf-8", errors="replace")
145 current_lines = current_text.splitlines(keepends=True)
146
147 # Find the symbol in the current working tree.
148 current_raw = current_text.encode("utf-8")
149 try:
150 current_tree = parse_symbols(current_raw, file_rel)
151 except Exception as exc:
152 return _PickResult(address, "parse_error", f"current file: {exc}")
153
154 current_rec = current_tree.get(address)
155
156 if current_rec is not None:
157 # Check if already current (content_id matches).
158 if current_rec["content_id"] == src_rec["content_id"]:
159 return _PickResult(address, "already_current", "content identical", 0, 0)
160 old_start = current_rec["lineno"] - 1
161 old_end = current_rec["end_lineno"]
162 old_count = old_end - old_start
163 new_lines = current_lines[:old_start] + src_symbol_lines + current_lines[old_end:]
164 detail = f"lines {current_rec['lineno']}–{current_rec['end_lineno']} → {len(src_symbol_lines)} lines"
165 else:
166 # Symbol not in current tree — append at end.
167 new_lines = current_lines + ["\n"] + src_symbol_lines
168 old_count = 0
169 detail = "appended at end (symbol not found in current tree)"
170
171 if not dry_run:
172 working_file.write_text("".join(new_lines), encoding="utf-8")
173
174 return _PickResult(address, "applied", detail, old_count, len(src_symbol_lines))
175
176
177 @app.callback(invoke_without_command=True)
178 def semantic_cherry_pick(
179 ctx: typer.Context,
180 addresses: list[str] = typer.Argument(
181 ..., metavar="ADDRESS...",
182 help='Symbol addresses to cherry-pick, e.g. "src/auth.py::validate_token".',
183 ),
184 from_ref: str = typer.Option(
185 ..., "--from", metavar="REF",
186 help="Commit or branch to cherry-pick symbols from (required).",
187 ),
188 dry_run: bool = typer.Option(
189 False, "--dry-run",
190 help="Print what would change without writing anything.",
191 ),
192 as_json: bool = typer.Option(False, "--json", help="Emit per-symbol results as JSON."),
193 ) -> None:
194 """Cherry-pick specific named symbols from a historical commit.
195
196 Extracts each listed symbol from the source commit and splices it into
197 the current working-tree file at the symbol's current location. Only
198 the target symbol's lines change; all surrounding code is preserved.
199
200 If the symbol does not exist in the current working tree, the historical
201 version is appended to the end of the file.
202
203 ``--dry-run`` shows what would change without writing anything.
204 ``--json`` emits per-symbol results for machine consumption.
205 """
206 root = require_repo()
207 repo_id = _read_repo_id(root)
208 branch = _read_branch(root)
209
210 if not addresses:
211 typer.echo("❌ At least one ADDRESS is required.", err=True)
212 raise typer.Exit(code=ExitCode.USER_ERROR)
213
214 from_commit = resolve_commit_ref(root, repo_id, branch, from_ref)
215 if from_commit is None:
216 typer.echo(f"❌ --from ref '{from_ref}' not found.", err=True)
217 raise typer.Exit(code=ExitCode.USER_ERROR)
218
219 src_manifest = get_commit_snapshot_manifest(root, from_commit.commit_id) or {}
220
221 results: list[_PickResult] = []
222 for address in addresses:
223 result = _apply_symbol(root, address, src_manifest, dry_run)
224 results.append(result)
225
226 if as_json:
227 typer.echo(json.dumps(
228 {
229 "from_commit": from_commit.commit_id[:8],
230 "dry_run": dry_run,
231 "results": [r.to_dict() for r in results],
232 "applied": sum(1 for r in results if r.status == "applied"),
233 "failed": sum(1 for r in results if r.status not in ("applied", "already_current")),
234 "already_current": sum(1 for r in results if r.status == "already_current"),
235 },
236 indent=2,
237 ))
238 return
239
240 action = "Dry-run" if dry_run else "Semantic cherry-pick"
241 typer.echo(f"\n{action} from commit {from_commit.commit_id[:8]}")
242 typer.echo("─" * 62)
243
244 max_addr = max(len(r.address) for r in results)
245 applied = 0
246 failed = 0
247
248 for r in results:
249 if r.status == "applied":
250 icon = "✅"
251 label = f"applied ({r.detail})"
252 applied += 1
253 elif r.status == "already_current":
254 icon = "ℹ️ "
255 label = "already current — no change needed"
256 else:
257 icon = "❌"
258 label = f"{r.status} ({r.detail})"
259 failed += 1
260 typer.echo(f"\n {icon} {r.address:<{max_addr}} {label}")
261
262 typer.echo(f"\n {applied} applied, {failed} failed")
263 if dry_run:
264 typer.echo(" (dry run — no files were written)")