semantic_cherry_pick.py
python
| 1 | """muse semantic-cherry-pick — cherry-pick specific symbols, not files. |
| 2 | |
| 3 | Extracts named symbols from a source commit and applies them to the current |
| 4 | working tree, replacing only those symbols. All other code is left untouched. |
| 5 | |
| 6 | This is the semantic counterpart to ``git cherry-pick``, which operates at the |
| 7 | file-hunk level. ``muse semantic-cherry-pick`` operates at the symbol level: |
| 8 | you name the exact functions, classes, or methods you want to bring forward. |
| 9 | |
| 10 | Multiple symbols can be cherry-picked in a single invocation. They are |
| 11 | applied left-to-right. If any symbol fails to apply, the remaining are |
| 12 | skipped and the error is reported. |
| 13 | |
| 14 | Usage:: |
| 15 | |
| 16 | muse semantic-cherry-pick "src/billing.py::compute_total" --from abc12345 |
| 17 | muse semantic-cherry-pick \\ |
| 18 | "src/auth.py::validate_token" \\ |
| 19 | "src/auth.py::refresh_token" \\ |
| 20 | --from feature-branch |
| 21 | muse semantic-cherry-pick "src/core.py::hash_content" --from HEAD~5 --dry-run |
| 22 | muse semantic-cherry-pick "src/billing.py::Invoice.pay" --from v1.0 --json |
| 23 | |
| 24 | Output:: |
| 25 | |
| 26 | Semantic cherry-pick from commit abc12345 |
| 27 | ────────────────────────────────────────────────────────────── |
| 28 | |
| 29 | ✅ src/auth.py::validate_token applied (lines 12–34 → 12–29) |
| 30 | ✅ src/auth.py::refresh_token applied (lines 36–58 → 36–52) |
| 31 | ❌ src/billing.py::compute_total not found in source commit |
| 32 | |
| 33 | 2 applied, 1 failed |
| 34 | |
| 35 | Flags: |
| 36 | |
| 37 | ``--from REF`` |
| 38 | Required. Commit or branch to cherry-pick from. |
| 39 | |
| 40 | ``--dry-run`` |
| 41 | Print what would change without writing anything. |
| 42 | |
| 43 | ``--json`` |
| 44 | Emit per-symbol results as JSON. |
| 45 | """ |
| 46 | from __future__ import annotations |
| 47 | |
| 48 | import json |
| 49 | import logging |
| 50 | import pathlib |
| 51 | from typing import Literal |
| 52 | |
| 53 | import typer |
| 54 | |
| 55 | from muse.core.errors import ExitCode |
| 56 | from muse.core.object_store import read_object |
| 57 | from muse.core.repo import require_repo |
| 58 | from muse.core.store import get_commit_snapshot_manifest, resolve_commit_ref |
| 59 | from muse.plugins.code.ast_parser import parse_symbols |
| 60 | |
| 61 | logger = logging.getLogger(__name__) |
| 62 | |
| 63 | app = typer.Typer() |
| 64 | |
| 65 | ApplyStatus = Literal["applied", "not_found", "file_missing", "parse_error", "already_current"] |
| 66 | |
| 67 | |
| 68 | def _read_repo_id(root: pathlib.Path) -> str: |
| 69 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 70 | |
| 71 | |
| 72 | def _read_branch(root: pathlib.Path) -> str: |
| 73 | head_ref = (root / ".muse" / "HEAD").read_text().strip() |
| 74 | return head_ref.removeprefix("refs/heads/").strip() |
| 75 | |
| 76 | |
| 77 | class _PickResult: |
| 78 | def __init__( |
| 79 | self, |
| 80 | address: str, |
| 81 | status: ApplyStatus, |
| 82 | detail: str = "", |
| 83 | old_lines: int = 0, |
| 84 | new_lines: int = 0, |
| 85 | ) -> None: |
| 86 | self.address = address |
| 87 | self.status = status |
| 88 | self.detail = detail |
| 89 | self.old_lines = old_lines |
| 90 | self.new_lines = new_lines |
| 91 | |
| 92 | def to_dict(self) -> dict[str, str | int]: |
| 93 | return { |
| 94 | "address": self.address, |
| 95 | "status": self.status, |
| 96 | "detail": self.detail, |
| 97 | "old_lines": self.old_lines, |
| 98 | "new_lines": self.new_lines, |
| 99 | } |
| 100 | |
| 101 | |
| 102 | def _apply_symbol( |
| 103 | root: pathlib.Path, |
| 104 | address: str, |
| 105 | src_manifest: dict[str, str], |
| 106 | dry_run: bool, |
| 107 | ) -> _PickResult: |
| 108 | """Apply one symbol from *src_manifest* to the working tree.""" |
| 109 | if "::" not in address: |
| 110 | return _PickResult(address, "not_found", "address has no '::' separator") |
| 111 | |
| 112 | file_rel = address.split("::")[0] |
| 113 | |
| 114 | # Read historical blob. |
| 115 | obj_id = src_manifest.get(file_rel) |
| 116 | if obj_id is None: |
| 117 | return _PickResult(address, "file_missing", f"'{file_rel}' not in source snapshot") |
| 118 | |
| 119 | src_raw = read_object(root, obj_id) |
| 120 | if src_raw is None: |
| 121 | return _PickResult(address, "file_missing", f"blob {obj_id[:8]} missing") |
| 122 | |
| 123 | try: |
| 124 | src_tree = parse_symbols(src_raw, file_rel) |
| 125 | except Exception as exc: |
| 126 | return _PickResult(address, "parse_error", str(exc)) |
| 127 | |
| 128 | src_rec = src_tree.get(address) |
| 129 | if src_rec is None: |
| 130 | return _PickResult(address, "not_found", f"symbol not found in source commit") |
| 131 | |
| 132 | src_lines_list = src_raw.decode("utf-8", errors="replace").splitlines(keepends=True) |
| 133 | src_symbol_lines = src_lines_list[src_rec["lineno"] - 1:src_rec["end_lineno"]] |
| 134 | |
| 135 | # Read current working tree. |
| 136 | working_file = root / file_rel |
| 137 | if not working_file.exists(): |
| 138 | # File doesn't exist in working tree — create it with just the symbol. |
| 139 | if not dry_run: |
| 140 | working_file.parent.mkdir(parents=True, exist_ok=True) |
| 141 | working_file.write_text("".join(src_symbol_lines), encoding="utf-8") |
| 142 | return _PickResult(address, "applied", "created file", 0, len(src_symbol_lines)) |
| 143 | |
| 144 | current_text = working_file.read_text(encoding="utf-8", errors="replace") |
| 145 | current_lines = current_text.splitlines(keepends=True) |
| 146 | |
| 147 | # Find the symbol in the current working tree. |
| 148 | current_raw = current_text.encode("utf-8") |
| 149 | try: |
| 150 | current_tree = parse_symbols(current_raw, file_rel) |
| 151 | except Exception as exc: |
| 152 | return _PickResult(address, "parse_error", f"current file: {exc}") |
| 153 | |
| 154 | current_rec = current_tree.get(address) |
| 155 | |
| 156 | if current_rec is not None: |
| 157 | # Check if already current (content_id matches). |
| 158 | if current_rec["content_id"] == src_rec["content_id"]: |
| 159 | return _PickResult(address, "already_current", "content identical", 0, 0) |
| 160 | old_start = current_rec["lineno"] - 1 |
| 161 | old_end = current_rec["end_lineno"] |
| 162 | old_count = old_end - old_start |
| 163 | new_lines = current_lines[:old_start] + src_symbol_lines + current_lines[old_end:] |
| 164 | detail = f"lines {current_rec['lineno']}–{current_rec['end_lineno']} → {len(src_symbol_lines)} lines" |
| 165 | else: |
| 166 | # Symbol not in current tree — append at end. |
| 167 | new_lines = current_lines + ["\n"] + src_symbol_lines |
| 168 | old_count = 0 |
| 169 | detail = "appended at end (symbol not found in current tree)" |
| 170 | |
| 171 | if not dry_run: |
| 172 | working_file.write_text("".join(new_lines), encoding="utf-8") |
| 173 | |
| 174 | return _PickResult(address, "applied", detail, old_count, len(src_symbol_lines)) |
| 175 | |
| 176 | |
| 177 | @app.callback(invoke_without_command=True) |
| 178 | def semantic_cherry_pick( |
| 179 | ctx: typer.Context, |
| 180 | addresses: list[str] = typer.Argument( |
| 181 | ..., metavar="ADDRESS...", |
| 182 | help='Symbol addresses to cherry-pick, e.g. "src/auth.py::validate_token".', |
| 183 | ), |
| 184 | from_ref: str = typer.Option( |
| 185 | ..., "--from", metavar="REF", |
| 186 | help="Commit or branch to cherry-pick symbols from (required).", |
| 187 | ), |
| 188 | dry_run: bool = typer.Option( |
| 189 | False, "--dry-run", |
| 190 | help="Print what would change without writing anything.", |
| 191 | ), |
| 192 | as_json: bool = typer.Option(False, "--json", help="Emit per-symbol results as JSON."), |
| 193 | ) -> None: |
| 194 | """Cherry-pick specific named symbols from a historical commit. |
| 195 | |
| 196 | Extracts each listed symbol from the source commit and splices it into |
| 197 | the current working-tree file at the symbol's current location. Only |
| 198 | the target symbol's lines change; all surrounding code is preserved. |
| 199 | |
| 200 | If the symbol does not exist in the current working tree, the historical |
| 201 | version is appended to the end of the file. |
| 202 | |
| 203 | ``--dry-run`` shows what would change without writing anything. |
| 204 | ``--json`` emits per-symbol results for machine consumption. |
| 205 | """ |
| 206 | root = require_repo() |
| 207 | repo_id = _read_repo_id(root) |
| 208 | branch = _read_branch(root) |
| 209 | |
| 210 | if not addresses: |
| 211 | typer.echo("❌ At least one ADDRESS is required.", err=True) |
| 212 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 213 | |
| 214 | from_commit = resolve_commit_ref(root, repo_id, branch, from_ref) |
| 215 | if from_commit is None: |
| 216 | typer.echo(f"❌ --from ref '{from_ref}' not found.", err=True) |
| 217 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 218 | |
| 219 | src_manifest = get_commit_snapshot_manifest(root, from_commit.commit_id) or {} |
| 220 | |
| 221 | results: list[_PickResult] = [] |
| 222 | for address in addresses: |
| 223 | result = _apply_symbol(root, address, src_manifest, dry_run) |
| 224 | results.append(result) |
| 225 | |
| 226 | if as_json: |
| 227 | typer.echo(json.dumps( |
| 228 | { |
| 229 | "from_commit": from_commit.commit_id[:8], |
| 230 | "dry_run": dry_run, |
| 231 | "results": [r.to_dict() for r in results], |
| 232 | "applied": sum(1 for r in results if r.status == "applied"), |
| 233 | "failed": sum(1 for r in results if r.status not in ("applied", "already_current")), |
| 234 | "already_current": sum(1 for r in results if r.status == "already_current"), |
| 235 | }, |
| 236 | indent=2, |
| 237 | )) |
| 238 | return |
| 239 | |
| 240 | action = "Dry-run" if dry_run else "Semantic cherry-pick" |
| 241 | typer.echo(f"\n{action} from commit {from_commit.commit_id[:8]}") |
| 242 | typer.echo("─" * 62) |
| 243 | |
| 244 | max_addr = max(len(r.address) for r in results) |
| 245 | applied = 0 |
| 246 | failed = 0 |
| 247 | |
| 248 | for r in results: |
| 249 | if r.status == "applied": |
| 250 | icon = "✅" |
| 251 | label = f"applied ({r.detail})" |
| 252 | applied += 1 |
| 253 | elif r.status == "already_current": |
| 254 | icon = "ℹ️ " |
| 255 | label = "already current — no change needed" |
| 256 | else: |
| 257 | icon = "❌" |
| 258 | label = f"{r.status} ({r.detail})" |
| 259 | failed += 1 |
| 260 | typer.echo(f"\n {icon} {r.address:<{max_addr}} {label}") |
| 261 | |
| 262 | typer.echo(f"\n {applied} applied, {failed} failed") |
| 263 | if dry_run: |
| 264 | typer.echo(" (dry run — no files were written)") |