gabriel / muse public
checkout.py python
153 lines 5.5 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """muse checkout — switch branches or restore working tree from a commit.
2
3 Usage::
4
5 muse checkout <branch> — switch to existing branch
6 muse checkout -b <branch> — create and switch to new branch
7 muse checkout <commit-id> — detach HEAD at a specific commit
8 """
9
10 from __future__ import annotations
11
12 import json
13 import logging
14 import pathlib
15
16 import typer
17
18 from muse.core.errors import ExitCode
19 from muse.core.object_store import restore_object
20 from muse.core.repo import require_repo
21 from muse.core.store import (
22 get_head_commit_id,
23 get_head_snapshot_id,
24 read_snapshot,
25 resolve_commit_ref,
26 )
27 from muse.domain import SnapshotManifest
28 from muse.plugins.registry import read_domain, resolve_plugin
29
30 logger = logging.getLogger(__name__)
31
32 app = typer.Typer()
33
34
35 def _read_current_branch(root: pathlib.Path) -> str:
36 head_ref = (root / ".muse" / "HEAD").read_text().strip()
37 return head_ref.removeprefix("refs/heads/").strip()
38
39
40 def _read_repo_id(root: pathlib.Path) -> str:
41 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
42
43
44 def _checkout_snapshot(
45 root: pathlib.Path,
46 target_snapshot_id: str,
47 current_snapshot_id: str | None,
48 ) -> None:
49 """Incrementally update muse-work/ from current to target snapshot.
50
51 Uses the domain plugin to compute the delta between the two snapshots and
52 only touches files that actually changed — removing deleted paths and
53 restoring added/modified ones from the object store. Calls
54 ``plugin.apply()`` as the domain-level post-checkout hook.
55 """
56 plugin = resolve_plugin(root)
57 domain = read_domain(root)
58
59 target_snap_rec = read_snapshot(root, target_snapshot_id)
60 if target_snap_rec is None:
61 typer.echo(f"❌ Snapshot {target_snapshot_id[:8]} not found in object store.")
62 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
63
64 target_snap = SnapshotManifest(files=target_snap_rec.manifest, domain=domain)
65
66 if current_snapshot_id is not None:
67 cur_rec = read_snapshot(root, current_snapshot_id)
68 current_snap = (
69 SnapshotManifest(files=cur_rec.manifest, domain=domain)
70 if cur_rec else SnapshotManifest(files={}, domain=domain)
71 )
72 else:
73 current_snap = SnapshotManifest(files={}, domain=domain)
74
75 delta = plugin.diff(current_snap, target_snap)
76
77 workdir = root / "muse-work"
78 workdir.mkdir(exist_ok=True)
79
80 # Remove files that no longer exist in the target snapshot.
81 removed = [op["address"] for op in delta["ops"] if op["op"] == "delete"]
82 for rel_path in removed:
83 fp = workdir / rel_path
84 if fp.exists():
85 fp.unlink()
86
87 # Restore added and modified files from the content-addressed store.
88 # InsertOp, ReplaceOp, and PatchOp all mean the file's content changed;
89 # the authoritative hash for each is in the target snapshot manifest.
90 to_restore = [
91 op["address"] for op in delta["ops"]
92 if op["op"] in ("insert", "replace", "patch")
93 ]
94 for rel_path in to_restore:
95 object_id = target_snap_rec.manifest[rel_path]
96 if not restore_object(root, object_id, workdir / rel_path):
97 typer.echo(f"⚠️ Object {object_id[:8]} for '{rel_path}' not in local store — skipped.")
98
99 # Domain-level post-checkout hook: rescan the workdir to confirm state.
100 plugin.apply(delta, workdir)
101
102
103 @app.callback(invoke_without_command=True)
104 def checkout(
105 ctx: typer.Context,
106 target: str = typer.Argument(..., help="Branch name or commit ID to check out."),
107 create: bool = typer.Option(False, "-b", "--create", help="Create a new branch."),
108 force: bool = typer.Option(False, "--force", "-f", help="Discard uncommitted changes."),
109 ) -> None:
110 """Switch branches or restore working tree from a commit."""
111 root = require_repo()
112 repo_id = _read_repo_id(root)
113 current_branch = _read_current_branch(root)
114 muse_dir = root / ".muse"
115
116 current_snapshot_id = get_head_snapshot_id(root, repo_id, current_branch)
117
118 if create:
119 ref_file = muse_dir / "refs" / "heads" / target
120 if ref_file.exists():
121 typer.echo(f"❌ Branch '{target}' already exists. Use 'muse checkout {target}' to switch to it.")
122 raise typer.Exit(code=ExitCode.USER_ERROR)
123 current_commit = get_head_commit_id(root, current_branch) or ""
124 ref_file.parent.mkdir(parents=True, exist_ok=True)
125 ref_file.write_text(current_commit)
126 (muse_dir / "HEAD").write_text(f"refs/heads/{target}\n")
127 typer.echo(f"Switched to a new branch '{target}'")
128 return
129
130 # Check if target is a known branch
131 ref_file = muse_dir / "refs" / "heads" / target
132 if ref_file.exists():
133 if target == current_branch:
134 typer.echo(f"Already on '{target}'")
135 return
136
137 target_snapshot_id = get_head_snapshot_id(root, repo_id, target)
138 if target_snapshot_id:
139 _checkout_snapshot(root, target_snapshot_id, current_snapshot_id)
140
141 (muse_dir / "HEAD").write_text(f"refs/heads/{target}\n")
142 typer.echo(f"Switched to branch '{target}'")
143 return
144
145 # Try as a commit ID (detached HEAD)
146 commit = resolve_commit_ref(root, repo_id, current_branch, target)
147 if commit is None:
148 typer.echo(f"❌ '{target}' is not a branch or commit ID.")
149 raise typer.Exit(code=ExitCode.USER_ERROR)
150
151 _checkout_snapshot(root, commit.snapshot_id, current_snapshot_id)
152 (muse_dir / "HEAD").write_text(commit.commit_id + "\n")
153 typer.echo(f"HEAD is now at {commit.commit_id[:8]} {commit.message}")