cgcardona / muse public
checkout.py python
145 lines 5.2 KB
cc9bbc18 feat: complete MuseDomainPlugin integration — apply(), incremental checkout Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """muse checkout — switch branches or restore working tree from a commit.
2
3 Usage::
4
5 muse checkout <branch> — switch to existing branch
6 muse checkout -b <branch> — create and switch to new branch
7 muse checkout <commit-id> — detach HEAD at a specific commit
8 """
9 from __future__ import annotations
10
11 import json
12 import logging
13 import pathlib
14
15 import typer
16
17 from muse.core.errors import ExitCode
18 from muse.core.object_store import restore_object
19 from muse.core.repo import require_repo
20 from muse.core.store import (
21 get_head_commit_id,
22 get_head_snapshot_id,
23 read_snapshot,
24 resolve_commit_ref,
25 )
26 from muse.domain import SnapshotManifest
27 from muse.plugins.registry import read_domain, resolve_plugin
28
29 logger = logging.getLogger(__name__)
30
31 app = typer.Typer()
32
33
34 def _read_current_branch(root: pathlib.Path) -> str:
35 head_ref = (root / ".muse" / "HEAD").read_text().strip()
36 return head_ref.removeprefix("refs/heads/").strip()
37
38
39 def _read_repo_id(root: pathlib.Path) -> str:
40 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
41
42
43 def _checkout_snapshot(
44 root: pathlib.Path,
45 target_snapshot_id: str,
46 current_snapshot_id: str | None,
47 ) -> None:
48 """Incrementally update muse-work/ from current to target snapshot.
49
50 Uses the domain plugin to compute the delta between the two snapshots and
51 only touches files that actually changed — removing deleted paths and
52 restoring added/modified ones from the object store. Calls
53 ``plugin.apply()`` as the domain-level post-checkout hook.
54 """
55 plugin = resolve_plugin(root)
56 domain = read_domain(root)
57
58 target_snap_rec = read_snapshot(root, target_snapshot_id)
59 if target_snap_rec is None:
60 typer.echo(f"❌ Snapshot {target_snapshot_id[:8]} not found in object store.")
61 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
62
63 target_snap = SnapshotManifest(files=target_snap_rec.manifest, domain=domain)
64
65 if current_snapshot_id is not None:
66 cur_rec = read_snapshot(root, current_snapshot_id)
67 current_snap = (
68 SnapshotManifest(files=cur_rec.manifest, domain=domain)
69 if cur_rec else SnapshotManifest(files={}, domain=domain)
70 )
71 else:
72 current_snap = SnapshotManifest(files={}, domain=domain)
73
74 delta = plugin.diff(current_snap, target_snap)
75
76 workdir = root / "muse-work"
77 workdir.mkdir(exist_ok=True)
78
79 # Remove files that no longer exist in the target snapshot.
80 for rel_path in delta["removed"]:
81 fp = workdir / rel_path
82 if fp.exists():
83 fp.unlink()
84
85 # Restore added and modified files from the content-addressed store.
86 for rel_path in delta["added"] + delta["modified"]:
87 object_id = target_snap_rec.manifest[rel_path]
88 if not restore_object(root, object_id, workdir / rel_path):
89 typer.echo(f"⚠️ Object {object_id[:8]} for '{rel_path}' not in local store — skipped.")
90
91 # Domain-level post-checkout hook: rescan the workdir to confirm state.
92 plugin.apply(delta, workdir)
93
94
95 @app.callback(invoke_without_command=True)
96 def checkout(
97 ctx: typer.Context,
98 target: str = typer.Argument(..., help="Branch name or commit ID to check out."),
99 create: bool = typer.Option(False, "-b", "--create", help="Create a new branch."),
100 force: bool = typer.Option(False, "--force", "-f", help="Discard uncommitted changes."),
101 ) -> None:
102 """Switch branches or restore working tree from a commit."""
103 root = require_repo()
104 repo_id = _read_repo_id(root)
105 current_branch = _read_current_branch(root)
106 muse_dir = root / ".muse"
107
108 current_snapshot_id = get_head_snapshot_id(root, repo_id, current_branch)
109
110 if create:
111 ref_file = muse_dir / "refs" / "heads" / target
112 if ref_file.exists():
113 typer.echo(f"❌ Branch '{target}' already exists. Use 'muse checkout {target}' to switch to it.")
114 raise typer.Exit(code=ExitCode.USER_ERROR)
115 current_commit = get_head_commit_id(root, current_branch) or ""
116 ref_file.parent.mkdir(parents=True, exist_ok=True)
117 ref_file.write_text(current_commit)
118 (muse_dir / "HEAD").write_text(f"refs/heads/{target}\n")
119 typer.echo(f"Switched to a new branch '{target}'")
120 return
121
122 # Check if target is a known branch
123 ref_file = muse_dir / "refs" / "heads" / target
124 if ref_file.exists():
125 if target == current_branch:
126 typer.echo(f"Already on '{target}'")
127 return
128
129 target_snapshot_id = get_head_snapshot_id(root, repo_id, target)
130 if target_snapshot_id:
131 _checkout_snapshot(root, target_snapshot_id, current_snapshot_id)
132
133 (muse_dir / "HEAD").write_text(f"refs/heads/{target}\n")
134 typer.echo(f"Switched to branch '{target}'")
135 return
136
137 # Try as a commit ID (detached HEAD)
138 commit = resolve_commit_ref(root, repo_id, current_branch, target)
139 if commit is None:
140 typer.echo(f"❌ '{target}' is not a branch or commit ID.")
141 raise typer.Exit(code=ExitCode.USER_ERROR)
142
143 _checkout_snapshot(root, commit.snapshot_id, current_snapshot_id)
144 (muse_dir / "HEAD").write_text(commit.commit_id + "\n")
145 typer.echo(f"HEAD is now at {commit.commit_id[:8]} {commit.message}")