stash.py
python
| 1 | """muse stash — temporarily shelve uncommitted changes. |
| 2 | |
| 3 | Saves the current working tree to ``.muse/stash.json`` and restores |
| 4 | the HEAD snapshot to the working tree. |
| 5 | |
| 6 | Usage:: |
| 7 | |
| 8 | muse stash — save current changes and restore HEAD |
| 9 | muse stash pop — restore the most recent stash |
| 10 | muse stash list — list all stash entries |
| 11 | muse stash drop — discard the most recent stash |
| 12 | """ |
| 13 | |
| 14 | from __future__ import annotations |
| 15 | |
| 16 | import datetime |
| 17 | import json |
| 18 | import logging |
| 19 | import os |
| 20 | import pathlib |
| 21 | import tempfile |
| 22 | from typing import TypedDict |
| 23 | |
| 24 | import typer |
| 25 | |
| 26 | from muse.core.errors import ExitCode |
| 27 | from muse.core.object_store import write_object_from_path |
| 28 | from muse.core.repo import require_repo |
| 29 | from muse.core.snapshot import compute_snapshot_id |
| 30 | from muse.core.store import get_head_snapshot_manifest, read_current_branch, read_snapshot |
| 31 | from muse.core.validation import sanitize_display |
| 32 | from muse.core.workdir import apply_manifest |
| 33 | from muse.plugins.registry import resolve_plugin |
| 34 | |
| 35 | _STASH_MAX_BYTES = 64 * 1024 * 1024 # 64 MiB guard against huge stash files |
| 36 | |
| 37 | logger = logging.getLogger(__name__) |
| 38 | |
| 39 | app = typer.Typer() |
| 40 | |
| 41 | _STASH_FILE = ".muse/stash.json" |
| 42 | |
| 43 | |
| 44 | |
| 45 | class StashEntry(TypedDict): |
| 46 | """A single entry in the stash stack.""" |
| 47 | |
| 48 | snapshot_id: str |
| 49 | manifest: dict[str, str] |
| 50 | branch: str |
| 51 | stashed_at: str |
| 52 | |
| 53 | |
| 54 | def _read_branch(root: pathlib.Path) -> str: |
| 55 | return read_current_branch(root) |
| 56 | |
| 57 | |
| 58 | def _read_repo_id(root: pathlib.Path) -> str: |
| 59 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 60 | |
| 61 | |
| 62 | def _load_stash(root: pathlib.Path) -> list[StashEntry]: |
| 63 | stash_file = root / _STASH_FILE |
| 64 | if not stash_file.exists(): |
| 65 | return [] |
| 66 | # Guard against unreasonably large stash files to prevent memory exhaustion. |
| 67 | stat = stash_file.stat() |
| 68 | if stat.st_size > _STASH_MAX_BYTES: |
| 69 | logger.warning("⚠️ stash.json exceeds size limit (%d bytes) — ignoring", stat.st_size) |
| 70 | return [] |
| 71 | raw = json.loads(stash_file.read_text(encoding="utf-8")) |
| 72 | if not isinstance(raw, list): |
| 73 | logger.warning("⚠️ stash.json has unexpected structure — ignoring") |
| 74 | return [] |
| 75 | entries: list[StashEntry] = [] |
| 76 | for item in raw: |
| 77 | if not isinstance(item, dict): |
| 78 | continue |
| 79 | manifest = item.get("manifest") |
| 80 | if not isinstance(manifest, dict): |
| 81 | continue |
| 82 | # Validate that manifest values are strings (object IDs). |
| 83 | safe_manifest: dict[str, str] = { |
| 84 | k: v for k, v in manifest.items() |
| 85 | if isinstance(k, str) and isinstance(v, str) |
| 86 | } |
| 87 | entries.append(StashEntry( |
| 88 | snapshot_id=str(item.get("snapshot_id", "")), |
| 89 | manifest=safe_manifest, |
| 90 | branch=str(item.get("branch", "")), |
| 91 | stashed_at=str(item.get("stashed_at", "")), |
| 92 | )) |
| 93 | return entries |
| 94 | |
| 95 | |
| 96 | def _save_stash(root: pathlib.Path, stash: list[StashEntry]) -> None: |
| 97 | """Write stash atomically via a temp file + rename to survive crashes.""" |
| 98 | target = root / _STASH_FILE |
| 99 | payload = json.dumps(stash, indent=2, ensure_ascii=False) |
| 100 | fd, tmp_path = tempfile.mkstemp(dir=target.parent, prefix=".stash_tmp_", suffix=".json") |
| 101 | try: |
| 102 | with os.fdopen(fd, "w", encoding="utf-8") as fh: |
| 103 | fh.write(payload) |
| 104 | os.replace(tmp_path, target) |
| 105 | except Exception: |
| 106 | try: |
| 107 | os.unlink(tmp_path) |
| 108 | except OSError: |
| 109 | pass |
| 110 | raise |
| 111 | |
| 112 | |
| 113 | @app.callback(invoke_without_command=True) |
| 114 | def stash( |
| 115 | ctx: typer.Context, |
| 116 | fmt: str = typer.Option("text", "--format", "-f", help="Output format: text or json."), |
| 117 | ) -> None: |
| 118 | """Save current state/ changes and restore HEAD. |
| 119 | |
| 120 | Agents should pass ``--format json`` to receive ``{snapshot_id, branch, |
| 121 | stashed_at, stash_size}`` rather than human-readable text. |
| 122 | """ |
| 123 | if ctx.invoked_subcommand is not None: |
| 124 | return |
| 125 | if fmt not in ("text", "json"): |
| 126 | typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True) |
| 127 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 128 | root = require_repo() |
| 129 | repo_id = _read_repo_id(root) |
| 130 | branch = _read_branch(root) |
| 131 | plugin = resolve_plugin(root) |
| 132 | manifest = plugin.snapshot(root)["files"] |
| 133 | if not manifest: |
| 134 | if fmt == "json": |
| 135 | typer.echo(json.dumps({"status": "nothing_to_stash"})) |
| 136 | else: |
| 137 | typer.echo("Nothing to stash.") |
| 138 | return |
| 139 | |
| 140 | snapshot_id = compute_snapshot_id(manifest) |
| 141 | for rel_path, object_id in manifest.items(): |
| 142 | write_object_from_path(root, object_id, root / rel_path) |
| 143 | |
| 144 | stashed_at = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| 145 | stash_entry = StashEntry( |
| 146 | snapshot_id=snapshot_id, |
| 147 | manifest=manifest, |
| 148 | branch=branch, |
| 149 | stashed_at=stashed_at, |
| 150 | ) |
| 151 | entries = _load_stash(root) |
| 152 | entries.insert(0, stash_entry) |
| 153 | _save_stash(root, entries) |
| 154 | |
| 155 | head_manifest = get_head_snapshot_manifest(root, repo_id, branch) or {} |
| 156 | apply_manifest(root, head_manifest) |
| 157 | |
| 158 | if fmt == "json": |
| 159 | typer.echo(json.dumps({ |
| 160 | "snapshot_id": snapshot_id, |
| 161 | "branch": branch, |
| 162 | "stashed_at": stashed_at, |
| 163 | "stash_size": len(entries), |
| 164 | })) |
| 165 | else: |
| 166 | typer.echo(f"Saved working directory (stash@{{0}})") |
| 167 | |
| 168 | |
| 169 | @app.command("pop") |
| 170 | def stash_pop( |
| 171 | fmt: str = typer.Option("text", "--format", "-f", help="Output format: text or json."), |
| 172 | ) -> None: |
| 173 | """Restore the most recent stash. |
| 174 | |
| 175 | Agents should pass ``--format json`` to receive ``{snapshot_id, branch, |
| 176 | stashed_at}`` rather than human-readable text. |
| 177 | """ |
| 178 | if fmt not in ("text", "json"): |
| 179 | typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True) |
| 180 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 181 | root = require_repo() |
| 182 | entries = _load_stash(root) |
| 183 | if not entries: |
| 184 | if fmt == "json": |
| 185 | typer.echo(json.dumps({"error": "no_stash_entries"})) |
| 186 | else: |
| 187 | typer.echo("No stash entries.") |
| 188 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 189 | |
| 190 | entry = entries.pop(0) |
| 191 | _save_stash(root, entries) |
| 192 | |
| 193 | apply_manifest(root, entry["manifest"]) |
| 194 | if fmt == "json": |
| 195 | typer.echo(json.dumps({ |
| 196 | "snapshot_id": entry["snapshot_id"], |
| 197 | "branch": entry["branch"], |
| 198 | "stashed_at": entry["stashed_at"], |
| 199 | })) |
| 200 | else: |
| 201 | typer.echo(f"Restored stash@{{0}} (branch: {sanitize_display(entry['branch'])})") |
| 202 | |
| 203 | |
| 204 | @app.command("list") |
| 205 | def stash_list( |
| 206 | fmt: str = typer.Option("text", "--format", "-f", help="Output format: text or json."), |
| 207 | ) -> None: |
| 208 | """List all stash entries. |
| 209 | |
| 210 | Agents should pass ``--format json`` to receive a JSON array of |
| 211 | ``{index, snapshot_id, branch, stashed_at}`` objects. |
| 212 | """ |
| 213 | if fmt not in ("text", "json"): |
| 214 | typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True) |
| 215 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 216 | root = require_repo() |
| 217 | entries = _load_stash(root) |
| 218 | if fmt == "json": |
| 219 | typer.echo(json.dumps([{ |
| 220 | "index": i, |
| 221 | "snapshot_id": e["snapshot_id"], |
| 222 | "branch": e["branch"], |
| 223 | "stashed_at": e["stashed_at"], |
| 224 | } for i, e in enumerate(entries)])) |
| 225 | return |
| 226 | if not entries: |
| 227 | typer.echo("No stash entries.") |
| 228 | return |
| 229 | for i, entry in enumerate(entries): |
| 230 | typer.echo(f"stash@{{{i}}}: WIP on {sanitize_display(entry['branch'])} — {sanitize_display(entry['stashed_at'])}") |
| 231 | |
| 232 | |
| 233 | @app.command("drop") |
| 234 | def stash_drop( |
| 235 | fmt: str = typer.Option("text", "--format", "-f", help="Output format: text or json."), |
| 236 | ) -> None: |
| 237 | """Discard the most recent stash entry. |
| 238 | |
| 239 | Agents should pass ``--format json`` to receive ``{status, stash_size}`` |
| 240 | rather than human-readable text. |
| 241 | """ |
| 242 | if fmt not in ("text", "json"): |
| 243 | typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True) |
| 244 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 245 | root = require_repo() |
| 246 | entries = _load_stash(root) |
| 247 | if not entries: |
| 248 | if fmt == "json": |
| 249 | typer.echo(json.dumps({"error": "no_stash_entries"})) |
| 250 | else: |
| 251 | typer.echo("No stash entries.") |
| 252 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 253 | entries.pop(0) |
| 254 | _save_stash(root, entries) |
| 255 | if fmt == "json": |
| 256 | typer.echo(json.dumps({"status": "dropped", "stash_size": len(entries)})) |
| 257 | else: |
| 258 | typer.echo("Dropped stash@{0}") |