cgcardona / muse public
cherry_pick.py python
141 lines 5.0 KB
8d5137ed fix(security): full surface hardening — validation, path containment, p… Gabriel Cardona <cgcardona@gmail.com> 10h ago
1 """muse cherry-pick — apply a specific commit's changes on top of HEAD."""
2
3 from __future__ import annotations
4
5 import datetime
6 import json
7 import logging
8 import pathlib
9 import shutil
10
11 import typer
12
13 from muse.core.errors import ExitCode
14 from muse.core.merge_engine import write_merge_state
15 from muse.core.object_store import restore_object
16 from muse.core.repo import require_repo
17 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
18 from muse.core.store import (
19 CommitRecord,
20 SnapshotRecord,
21 get_head_commit_id,
22 get_head_snapshot_manifest,
23 read_commit,
24 read_snapshot,
25 resolve_commit_ref,
26 write_commit,
27 write_snapshot,
28 )
29 from muse.core.validation import contain_path, sanitize_display
30 from muse.domain import SnapshotManifest
31 from muse.plugins.registry import read_domain, resolve_plugin
32
33 logger = logging.getLogger(__name__)
34
35 app = typer.Typer()
36
37
38 def _read_branch(root: pathlib.Path) -> str:
39 head_ref = (root / ".muse" / "HEAD").read_text().strip()
40 return head_ref.removeprefix("refs/heads/").strip()
41
42
43 def _read_repo_id(root: pathlib.Path) -> str:
44 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
45
46
47 @app.callback(invoke_without_command=True)
48 def cherry_pick(
49 ctx: typer.Context,
50 ref: str = typer.Argument(..., help="Commit ID to apply."),
51 no_commit: bool = typer.Option(False, "-n", "--no-commit", help="Apply but do not commit."),
52 ) -> None:
53 """Apply a specific commit's changes on top of HEAD."""
54 root = require_repo()
55 repo_id = _read_repo_id(root)
56 branch = _read_branch(root)
57 domain = read_domain(root)
58 plugin = resolve_plugin(root)
59
60 target = resolve_commit_ref(root, repo_id, branch, ref)
61 if target is None:
62 typer.echo(f"❌ Commit '{ref}' not found.")
63 raise typer.Exit(code=ExitCode.USER_ERROR)
64
65 # The delta for this cherry-pick is: target vs its parent.
66 # Applying that delta on top of HEAD is a three-way merge where the
67 # base is the target's parent, left is HEAD, and right is the target.
68 base_manifest: dict[str, str] = {}
69 if target.parent_commit_id:
70 parent_commit = read_commit(root, target.parent_commit_id)
71 if parent_commit:
72 parent_snap = read_snapshot(root, parent_commit.snapshot_id)
73 if parent_snap:
74 base_manifest = parent_snap.manifest
75
76 target_snap_rec = read_snapshot(root, target.snapshot_id)
77 target_manifest = target_snap_rec.manifest if target_snap_rec else {}
78 ours_manifest = get_head_snapshot_manifest(root, repo_id, branch) or {}
79
80 base_snap = SnapshotManifest(files=base_manifest, domain=domain)
81 ours_snap = SnapshotManifest(files=ours_manifest, domain=domain)
82 target_snap = SnapshotManifest(files=target_manifest, domain=domain)
83
84 result = plugin.merge(base_snap, ours_snap, target_snap)
85
86 if not result.is_clean:
87 write_merge_state(
88 root,
89 base_commit=target.parent_commit_id or "",
90 ours_commit=get_head_commit_id(root, branch) or "",
91 theirs_commit=target.commit_id,
92 conflict_paths=result.conflicts,
93 )
94 typer.echo(f"❌ Cherry-pick conflict in {len(result.conflicts)} file(s):")
95 for p in sorted(result.conflicts):
96 typer.echo(f" CONFLICT (both modified): {p}")
97 raise typer.Exit(code=ExitCode.USER_ERROR)
98
99 merged_manifest = result.merged["files"]
100 workdir = root / "muse-work"
101 if workdir.exists():
102 shutil.rmtree(workdir)
103 workdir.mkdir()
104 for rel_path, object_id in merged_manifest.items():
105 try:
106 safe_dest = contain_path(workdir, rel_path)
107 except ValueError as exc:
108 logger.warning("⚠️ Skipping unsafe manifest path %r: %s", rel_path, exc)
109 continue
110 restore_object(root, object_id, safe_dest)
111
112 if no_commit:
113 typer.echo(f"Applied {target.commit_id[:8]} to muse-work/. Run 'muse commit' to record.")
114 return
115
116 head_commit_id = get_head_commit_id(root, branch)
117 # merged_manifest contains only object IDs already in the store
118 # (sourced from base, ours, or theirs — all previously committed).
119 # No re-scan or object re-write is needed.
120 manifest = merged_manifest
121 snapshot_id = compute_snapshot_id(manifest)
122 committed_at = datetime.datetime.now(datetime.timezone.utc)
123 commit_id = compute_commit_id(
124 parent_ids=[head_commit_id] if head_commit_id else [],
125 snapshot_id=snapshot_id,
126 message=target.message,
127 committed_at_iso=committed_at.isoformat(),
128 )
129
130 write_snapshot(root, SnapshotRecord(snapshot_id=snapshot_id, manifest=manifest))
131 write_commit(root, CommitRecord(
132 commit_id=commit_id,
133 repo_id=repo_id,
134 branch=branch,
135 snapshot_id=snapshot_id,
136 message=target.message,
137 committed_at=committed_at,
138 parent_commit_id=head_commit_id,
139 ))
140 (root / ".muse" / "refs" / "heads" / branch).write_text(commit_id)
141 typer.echo(f"[{sanitize_display(branch)} {commit_id[:8]}] {sanitize_display(target.message)}")