gabriel / muse public
checkout.py python
220 lines 8.6 KB
00373ad0 feat: migrate CLI from typer to argparse (POSIX-compliant, order-independent) Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """muse checkout — switch branches or restore working tree from a commit.
2
3 Usage::
4
5 muse checkout <branch> — switch to existing branch
6 muse checkout -b <branch> — create and switch to new branch
7 muse checkout <commit-id> — detach HEAD at a specific commit
8 """
9
10 from __future__ import annotations
11
12 import argparse
13 import json
14 import logging
15 import pathlib
16 import sys
17
18 from muse.core.errors import ExitCode
19 from muse.core.object_store import restore_object
20 from muse.core.repo import require_repo
21 from muse.core.store import (
22 get_head_commit_id,
23 get_head_snapshot_id,
24 read_current_branch,
25 read_snapshot,
26 resolve_commit_ref,
27 write_head_branch,
28 write_head_commit,
29 )
30 from muse.core.reflog import append_reflog
31 from muse.core.validation import contain_path, sanitize_display, validate_branch_name
32 from muse.domain import SnapshotManifest
33 from muse.plugins.registry import read_domain, resolve_plugin
34
35 logger = logging.getLogger(__name__)
36
37
38 def _read_current_branch(root: pathlib.Path) -> str:
39 return read_current_branch(root)
40
41
42 def _read_repo_id(root: pathlib.Path) -> str:
43 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
44
45
46 def _checkout_snapshot(
47 root: pathlib.Path,
48 target_snapshot_id: str,
49 current_snapshot_id: str | None,
50 ) -> None:
51 """Incrementally update state/ from current to target snapshot.
52
53 Uses the domain plugin to compute the delta between the two snapshots and
54 only touches files that actually changed — removing deleted paths and
55 restoring added/modified ones from the object store. Calls
56 ``plugin.apply()`` as the domain-level post-checkout hook.
57 """
58 plugin = resolve_plugin(root)
59 domain = read_domain(root)
60
61 target_snap_rec = read_snapshot(root, target_snapshot_id)
62 if target_snap_rec is None:
63 print(f"❌ Snapshot {target_snapshot_id[:8]} not found in object store.")
64 raise SystemExit(ExitCode.INTERNAL_ERROR)
65
66 target_snap = SnapshotManifest(files=target_snap_rec.manifest, domain=domain)
67
68 if current_snapshot_id is not None:
69 cur_rec = read_snapshot(root, current_snapshot_id)
70 current_snap = (
71 SnapshotManifest(files=cur_rec.manifest, domain=domain)
72 if cur_rec else SnapshotManifest(files={}, domain=domain)
73 )
74 else:
75 current_snap = SnapshotManifest(files={}, domain=domain)
76
77 delta = plugin.diff(current_snap, target_snap)
78
79 # Remove files that no longer exist in the target snapshot.
80 removed = [op["address"] for op in delta["ops"] if op["op"] == "delete"]
81 for rel_path in removed:
82 fp = root / rel_path
83 if fp.exists():
84 fp.unlink()
85
86 # Restore added and modified files from the content-addressed store.
87 # InsertOp, ReplaceOp, and PatchOp all mean the file's content changed;
88 # the authoritative hash for each is in the target snapshot manifest.
89 to_restore = [
90 op["address"] for op in delta["ops"]
91 if op["op"] in ("insert", "replace", "patch")
92 ]
93 for rel_path in to_restore:
94 object_id = target_snap_rec.manifest[rel_path]
95 try:
96 safe_dest = contain_path(root, rel_path)
97 except ValueError as exc:
98 logger.warning("⚠️ Skipping unsafe manifest path %r: %s", rel_path, exc)
99 continue
100 if not restore_object(root, object_id, safe_dest):
101 print(f"⚠️ Object {object_id[:8]} for '{sanitize_display(rel_path)}' not in local store — skipped.")
102
103 # Domain-level post-checkout hook: rescan the workdir to confirm state.
104 plugin.apply(delta, root)
105
106
107 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
108 """Register the checkout subcommand."""
109 parser = subparsers.add_parser(
110 "checkout",
111 help="Switch branches or restore working tree from a commit.",
112 description=__doc__,
113 )
114 parser.add_argument("target", help="Branch name or commit ID to check out.")
115 parser.add_argument("-b", "--create", action="store_true", help="Create a new branch.")
116 parser.add_argument("--force", "-f", action="store_true", help="Discard uncommitted changes.")
117 parser.add_argument("--format", default="text", dest="fmt", help="Output format: text or json.")
118 parser.set_defaults(func=run)
119
120
121 def run(args: argparse.Namespace) -> None:
122 """Switch branches or restore working tree from a commit.
123
124 Agents should pass ``--format json`` to get a machine-readable result::
125
126 {"action": "created|switched|detached|already_on", "branch": "<name>", "commit_id": "<sha256>"}
127 """
128 target: str = args.target
129 create: bool = args.create
130 force: bool = args.force
131 fmt: str = args.fmt
132
133 if fmt not in ("text", "json"):
134 from muse.core.validation import sanitize_display as _sd
135 print(f"❌ Unknown --format '{_sd(fmt)}'. Choose text or json.", file=sys.stderr)
136 raise SystemExit(ExitCode.USER_ERROR)
137 root = require_repo()
138 repo_id = _read_repo_id(root)
139 current_branch = _read_current_branch(root)
140 muse_dir = root / ".muse"
141
142 current_snapshot_id = get_head_snapshot_id(root, repo_id, current_branch)
143
144 if create:
145 try:
146 validate_branch_name(target)
147 except ValueError as exc:
148 print(f"❌ Invalid branch name: {exc}")
149 raise SystemExit(ExitCode.USER_ERROR)
150 ref_file = muse_dir / "refs" / "heads" / target
151 if ref_file.exists():
152 print(f"❌ Branch '{sanitize_display(target)}' already exists. Use 'muse checkout {sanitize_display(target)}' to switch to it.")
153 raise SystemExit(ExitCode.USER_ERROR)
154 current_commit = get_head_commit_id(root, current_branch) or ""
155 ref_file.parent.mkdir(parents=True, exist_ok=True)
156 ref_file.write_text(current_commit)
157 write_head_branch(root, target)
158 append_reflog(
159 root, target, old_id=None, new_id=current_commit or ("0" * 64),
160 author="user", operation=f"branch: created from {sanitize_display(current_branch)}",
161 )
162 if fmt == "json":
163 print(json.dumps({"action": "created", "branch": target, "commit_id": current_commit}))
164 else:
165 print(f"Switched to a new branch '{sanitize_display(target)}'")
166 return
167
168 # Check if target is a known branch — validate name before using as path component.
169 try:
170 validate_branch_name(target)
171 except ValueError as exc:
172 print(f"❌ Invalid branch name: {exc}", file=sys.stderr)
173 raise SystemExit(ExitCode.USER_ERROR)
174
175 ref_file = muse_dir / "refs" / "heads" / target
176 if ref_file.exists():
177 if target == current_branch:
178 if fmt == "json":
179 print(json.dumps({"action": "already_on", "branch": target,
180 "commit_id": get_head_commit_id(root, target) or ""}))
181 else:
182 print(f"Already on '{sanitize_display(target)}'")
183 return
184
185 target_commit_id = get_head_commit_id(root, target) or ""
186 current_commit_id = get_head_commit_id(root, current_branch) or ""
187 target_snapshot_id = get_head_snapshot_id(root, repo_id, target)
188 if target_snapshot_id:
189 _checkout_snapshot(root, target_snapshot_id, current_snapshot_id)
190
191 write_head_branch(root, target)
192 append_reflog(
193 root, target, old_id=current_commit_id or None, new_id=target_commit_id or ("0" * 64),
194 author="user",
195 operation=f"checkout: moving from {sanitize_display(current_branch)} to {sanitize_display(target)}",
196 )
197 if fmt == "json":
198 print(json.dumps({"action": "switched", "branch": target, "commit_id": target_commit_id}))
199 else:
200 print(f"Switched to branch '{sanitize_display(target)}'")
201 return
202
203 # Try as a commit ID (detached HEAD)
204 commit = resolve_commit_ref(root, repo_id, current_branch, target)
205 if commit is None:
206 print(f"❌ '{target}' is not a branch or commit ID.")
207 raise SystemExit(ExitCode.USER_ERROR)
208
209 current_commit_id = get_head_commit_id(root, current_branch) or ""
210 _checkout_snapshot(root, commit.snapshot_id, current_snapshot_id)
211 write_head_commit(root, commit.commit_id)
212 append_reflog(
213 root, current_branch, old_id=current_commit_id or None, new_id=commit.commit_id,
214 author="user",
215 operation=f"checkout: detaching HEAD at {commit.commit_id[:12]}",
216 )
217 if fmt == "json":
218 print(json.dumps({"action": "detached", "branch": None, "commit_id": commit.commit_id}))
219 else:
220 print(f"HEAD is now at {commit.commit_id[:8]} {sanitize_display(commit.message)}")