semantic_cherry_pick.py
python
| 1 | """muse semantic-cherry-pick — cherry-pick specific symbols, not files. |
| 2 | |
| 3 | Extracts named symbols from a source commit and applies them to the current |
| 4 | working tree, replacing only those symbols. All other code is left untouched. |
| 5 | |
| 6 | This is the semantic counterpart to ``git cherry-pick``, which operates at the |
| 7 | file-hunk level. ``muse semantic-cherry-pick`` operates at the symbol level: |
| 8 | you name the exact functions, classes, or methods you want to bring forward. |
| 9 | |
| 10 | Multiple symbols can be cherry-picked in a single invocation. They are |
| 11 | applied left-to-right. If any symbol fails to apply, the remaining are |
| 12 | skipped and the error is reported. |
| 13 | |
| 14 | Usage:: |
| 15 | |
| 16 | muse semantic-cherry-pick "src/billing.py::compute_total" --from abc12345 |
| 17 | muse semantic-cherry-pick \\ |
| 18 | "src/auth.py::validate_token" \\ |
| 19 | "src/auth.py::refresh_token" \\ |
| 20 | --from feature-branch |
| 21 | muse semantic-cherry-pick "src/core.py::hash_content" --from HEAD~5 --dry-run |
| 22 | muse semantic-cherry-pick "src/billing.py::Invoice.pay" --from v1.0 --json |
| 23 | |
| 24 | Output:: |
| 25 | |
| 26 | Semantic cherry-pick from commit abc12345 |
| 27 | ────────────────────────────────────────────────────────────── |
| 28 | |
| 29 | ✅ src/auth.py::validate_token applied (lines 12–34 → 12–29) |
| 30 | ✅ src/auth.py::refresh_token applied (lines 36–58 → 36–52) |
| 31 | ❌ src/billing.py::compute_total not found in source commit |
| 32 | |
| 33 | 2 applied, 1 failed |
| 34 | |
| 35 | Flags: |
| 36 | |
| 37 | ``--from REF`` |
| 38 | Required. Commit or branch to cherry-pick from. |
| 39 | |
| 40 | ``--dry-run`` |
| 41 | Print what would change without writing anything. |
| 42 | |
| 43 | ``--json`` |
| 44 | Emit per-symbol results as JSON. |
| 45 | """ |
| 46 | |
| 47 | from __future__ import annotations |
| 48 | |
| 49 | import json |
| 50 | import logging |
| 51 | import pathlib |
| 52 | from typing import Literal |
| 53 | |
| 54 | import typer |
| 55 | |
| 56 | from muse.core.errors import ExitCode |
| 57 | from muse.core.object_store import read_object |
| 58 | from muse.core.repo import require_repo |
| 59 | from muse.core.store import get_commit_snapshot_manifest, resolve_commit_ref |
| 60 | from muse.plugins.code.ast_parser import parse_symbols |
| 61 | |
| 62 | logger = logging.getLogger(__name__) |
| 63 | |
| 64 | app = typer.Typer() |
| 65 | |
| 66 | ApplyStatus = Literal["applied", "not_found", "file_missing", "parse_error", "already_current"] |
| 67 | |
| 68 | |
| 69 | def _read_repo_id(root: pathlib.Path) -> str: |
| 70 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 71 | |
| 72 | |
| 73 | def _read_branch(root: pathlib.Path) -> str: |
| 74 | head_ref = (root / ".muse" / "HEAD").read_text().strip() |
| 75 | return head_ref.removeprefix("refs/heads/").strip() |
| 76 | |
| 77 | |
| 78 | class _PickResult: |
| 79 | def __init__( |
| 80 | self, |
| 81 | address: str, |
| 82 | status: ApplyStatus, |
| 83 | detail: str = "", |
| 84 | old_lines: int = 0, |
| 85 | new_lines: int = 0, |
| 86 | ) -> None: |
| 87 | self.address = address |
| 88 | self.status = status |
| 89 | self.detail = detail |
| 90 | self.old_lines = old_lines |
| 91 | self.new_lines = new_lines |
| 92 | |
| 93 | def to_dict(self) -> dict[str, str | int]: |
| 94 | return { |
| 95 | "address": self.address, |
| 96 | "status": self.status, |
| 97 | "detail": self.detail, |
| 98 | "old_lines": self.old_lines, |
| 99 | "new_lines": self.new_lines, |
| 100 | } |
| 101 | |
| 102 | |
| 103 | def _apply_symbol( |
| 104 | root: pathlib.Path, |
| 105 | address: str, |
| 106 | src_manifest: dict[str, str], |
| 107 | dry_run: bool, |
| 108 | ) -> _PickResult: |
| 109 | """Apply one symbol from *src_manifest* to the working tree.""" |
| 110 | if "::" not in address: |
| 111 | return _PickResult(address, "not_found", "address has no '::' separator") |
| 112 | |
| 113 | file_rel = address.split("::")[0] |
| 114 | |
| 115 | # Read historical blob. |
| 116 | obj_id = src_manifest.get(file_rel) |
| 117 | if obj_id is None: |
| 118 | return _PickResult(address, "file_missing", f"'{file_rel}' not in source snapshot") |
| 119 | |
| 120 | src_raw = read_object(root, obj_id) |
| 121 | if src_raw is None: |
| 122 | return _PickResult(address, "file_missing", f"blob {obj_id[:8]} missing") |
| 123 | |
| 124 | try: |
| 125 | src_tree = parse_symbols(src_raw, file_rel) |
| 126 | except Exception as exc: |
| 127 | return _PickResult(address, "parse_error", str(exc)) |
| 128 | |
| 129 | src_rec = src_tree.get(address) |
| 130 | if src_rec is None: |
| 131 | return _PickResult(address, "not_found", f"symbol not found in source commit") |
| 132 | |
| 133 | src_lines_list = src_raw.decode("utf-8", errors="replace").splitlines(keepends=True) |
| 134 | src_symbol_lines = src_lines_list[src_rec["lineno"] - 1:src_rec["end_lineno"]] |
| 135 | |
| 136 | # Read current working tree. |
| 137 | working_file = root / file_rel |
| 138 | if not working_file.exists(): |
| 139 | # File doesn't exist in working tree — create it with just the symbol. |
| 140 | if not dry_run: |
| 141 | working_file.parent.mkdir(parents=True, exist_ok=True) |
| 142 | working_file.write_text("".join(src_symbol_lines), encoding="utf-8") |
| 143 | return _PickResult(address, "applied", "created file", 0, len(src_symbol_lines)) |
| 144 | |
| 145 | current_text = working_file.read_text(encoding="utf-8", errors="replace") |
| 146 | current_lines = current_text.splitlines(keepends=True) |
| 147 | |
| 148 | # Find the symbol in the current working tree. |
| 149 | current_raw = current_text.encode("utf-8") |
| 150 | try: |
| 151 | current_tree = parse_symbols(current_raw, file_rel) |
| 152 | except Exception as exc: |
| 153 | return _PickResult(address, "parse_error", f"current file: {exc}") |
| 154 | |
| 155 | current_rec = current_tree.get(address) |
| 156 | |
| 157 | if current_rec is not None: |
| 158 | # Check if already current (content_id matches). |
| 159 | if current_rec["content_id"] == src_rec["content_id"]: |
| 160 | return _PickResult(address, "already_current", "content identical", 0, 0) |
| 161 | old_start = current_rec["lineno"] - 1 |
| 162 | old_end = current_rec["end_lineno"] |
| 163 | old_count = old_end - old_start |
| 164 | new_lines = current_lines[:old_start] + src_symbol_lines + current_lines[old_end:] |
| 165 | detail = f"lines {current_rec['lineno']}–{current_rec['end_lineno']} → {len(src_symbol_lines)} lines" |
| 166 | else: |
| 167 | # Symbol not in current tree — append at end. |
| 168 | new_lines = current_lines + ["\n"] + src_symbol_lines |
| 169 | old_count = 0 |
| 170 | detail = "appended at end (symbol not found in current tree)" |
| 171 | |
| 172 | if not dry_run: |
| 173 | working_file.write_text("".join(new_lines), encoding="utf-8") |
| 174 | |
| 175 | return _PickResult(address, "applied", detail, old_count, len(src_symbol_lines)) |
| 176 | |
| 177 | |
| 178 | @app.callback(invoke_without_command=True) |
| 179 | def semantic_cherry_pick( |
| 180 | ctx: typer.Context, |
| 181 | addresses: list[str] = typer.Argument( |
| 182 | ..., metavar="ADDRESS...", |
| 183 | help='Symbol addresses to cherry-pick, e.g. "src/auth.py::validate_token".', |
| 184 | ), |
| 185 | from_ref: str = typer.Option( |
| 186 | ..., "--from", metavar="REF", |
| 187 | help="Commit or branch to cherry-pick symbols from (required).", |
| 188 | ), |
| 189 | dry_run: bool = typer.Option( |
| 190 | False, "--dry-run", |
| 191 | help="Print what would change without writing anything.", |
| 192 | ), |
| 193 | as_json: bool = typer.Option(False, "--json", help="Emit per-symbol results as JSON."), |
| 194 | ) -> None: |
| 195 | """Cherry-pick specific named symbols from a historical commit. |
| 196 | |
| 197 | Extracts each listed symbol from the source commit and splices it into |
| 198 | the current working-tree file at the symbol's current location. Only |
| 199 | the target symbol's lines change; all surrounding code is preserved. |
| 200 | |
| 201 | If the symbol does not exist in the current working tree, the historical |
| 202 | version is appended to the end of the file. |
| 203 | |
| 204 | ``--dry-run`` shows what would change without writing anything. |
| 205 | ``--json`` emits per-symbol results for machine consumption. |
| 206 | """ |
| 207 | root = require_repo() |
| 208 | repo_id = _read_repo_id(root) |
| 209 | branch = _read_branch(root) |
| 210 | |
| 211 | if not addresses: |
| 212 | typer.echo("❌ At least one ADDRESS is required.", err=True) |
| 213 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 214 | |
| 215 | from_commit = resolve_commit_ref(root, repo_id, branch, from_ref) |
| 216 | if from_commit is None: |
| 217 | typer.echo(f"❌ --from ref '{from_ref}' not found.", err=True) |
| 218 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 219 | |
| 220 | src_manifest = get_commit_snapshot_manifest(root, from_commit.commit_id) or {} |
| 221 | |
| 222 | results: list[_PickResult] = [] |
| 223 | for address in addresses: |
| 224 | result = _apply_symbol(root, address, src_manifest, dry_run) |
| 225 | results.append(result) |
| 226 | |
| 227 | if as_json: |
| 228 | typer.echo(json.dumps( |
| 229 | { |
| 230 | "from_commit": from_commit.commit_id[:8], |
| 231 | "dry_run": dry_run, |
| 232 | "results": [r.to_dict() for r in results], |
| 233 | "applied": sum(1 for r in results if r.status == "applied"), |
| 234 | "failed": sum(1 for r in results if r.status not in ("applied", "already_current")), |
| 235 | "already_current": sum(1 for r in results if r.status == "already_current"), |
| 236 | }, |
| 237 | indent=2, |
| 238 | )) |
| 239 | return |
| 240 | |
| 241 | action = "Dry-run" if dry_run else "Semantic cherry-pick" |
| 242 | typer.echo(f"\n{action} from commit {from_commit.commit_id[:8]}") |
| 243 | typer.echo("─" * 62) |
| 244 | |
| 245 | max_addr = max(len(r.address) for r in results) |
| 246 | applied = 0 |
| 247 | failed = 0 |
| 248 | |
| 249 | for r in results: |
| 250 | if r.status == "applied": |
| 251 | icon = "✅" |
| 252 | label = f"applied ({r.detail})" |
| 253 | applied += 1 |
| 254 | elif r.status == "already_current": |
| 255 | icon = "ℹ️ " |
| 256 | label = "already current — no change needed" |
| 257 | else: |
| 258 | icon = "❌" |
| 259 | label = f"{r.status} ({r.detail})" |
| 260 | failed += 1 |
| 261 | typer.echo(f"\n {icon} {r.address:<{max_addr}} {label}") |
| 262 | |
| 263 | typer.echo(f"\n {applied} applied, {failed} failed") |
| 264 | if dry_run: |
| 265 | typer.echo(" (dry run — no files were written)") |