test_cmd_status.py
python
| 1 | """Comprehensive tests for ``muse status``. |
| 2 | |
| 3 | Covers: |
| 4 | - Unit: status correctly identifies clean, added, modified, deleted files |
| 5 | - Integration: status after commit vs after changes |
| 6 | - E2E: CLI flags (--short / -s, --branch / -b, --format json / text) |
| 7 | - Stress: many tracked files, large workspace |
| 8 | """ |
| 9 | |
| 10 | from __future__ import annotations |
| 11 | |
| 12 | import datetime |
| 13 | import hashlib |
| 14 | import json |
| 15 | import pathlib |
| 16 | import uuid |
| 17 | |
| 18 | import pytest |
| 19 | from tests.cli_test_helper import CliRunner |
| 20 | |
| 21 | cli = None # argparse migration — CliRunner ignores this arg |
| 22 | |
| 23 | runner = CliRunner() |
| 24 | |
| 25 | |
| 26 | # --------------------------------------------------------------------------- |
| 27 | # Shared helpers |
| 28 | # --------------------------------------------------------------------------- |
| 29 | |
| 30 | def _env(root: pathlib.Path) -> dict[str, str]: |
| 31 | return {"MUSE_REPO_ROOT": str(root)} |
| 32 | |
| 33 | |
| 34 | def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 35 | muse_dir = tmp_path / ".muse" |
| 36 | muse_dir.mkdir() |
| 37 | repo_id = str(uuid.uuid4()) |
| 38 | (muse_dir / "repo.json").write_text(json.dumps({ |
| 39 | "repo_id": repo_id, |
| 40 | "domain": "midi", |
| 41 | "default_branch": "main", |
| 42 | "created_at": "2025-01-01T00:00:00+00:00", |
| 43 | }), encoding="utf-8") |
| 44 | (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 45 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 46 | (muse_dir / "snapshots").mkdir() |
| 47 | (muse_dir / "commits").mkdir() |
| 48 | (muse_dir / "objects").mkdir() |
| 49 | return tmp_path, repo_id |
| 50 | |
| 51 | |
| 52 | def _make_commit(root: pathlib.Path, repo_id: str, message: str = "test") -> str: |
| 53 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 54 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 55 | |
| 56 | ref_file = root / ".muse" / "refs" / "heads" / "main" |
| 57 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 58 | manifest: dict[str, str] = {} |
| 59 | snap_id = compute_snapshot_id(manifest) |
| 60 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 61 | commit_id = compute_commit_id( |
| 62 | parent_ids=[parent_id] if parent_id else [], |
| 63 | snapshot_id=snap_id, message=message, |
| 64 | committed_at_iso=committed_at.isoformat(), |
| 65 | ) |
| 66 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 67 | write_commit(root, CommitRecord( |
| 68 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 69 | snapshot_id=snap_id, message=message, committed_at=committed_at, |
| 70 | parent_commit_id=parent_id, |
| 71 | )) |
| 72 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 73 | ref_file.write_text(commit_id, encoding="utf-8") |
| 74 | return commit_id |
| 75 | |
| 76 | |
| 77 | # --------------------------------------------------------------------------- |
| 78 | # E2E CLI tests |
| 79 | # --------------------------------------------------------------------------- |
| 80 | |
| 81 | class TestStatusCLI: |
| 82 | def test_status_empty_repo(self, tmp_path: pathlib.Path) -> None: |
| 83 | root, _ = _init_repo(tmp_path) |
| 84 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 85 | assert result.exit_code == 0 |
| 86 | |
| 87 | def test_status_after_commit_shows_clean(self, tmp_path: pathlib.Path) -> None: |
| 88 | root, repo_id = _init_repo(tmp_path) |
| 89 | _make_commit(root, repo_id) |
| 90 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 91 | assert result.exit_code == 0 |
| 92 | |
| 93 | def test_status_shows_branch_name(self, tmp_path: pathlib.Path) -> None: |
| 94 | root, repo_id = _init_repo(tmp_path) |
| 95 | _make_commit(root, repo_id) |
| 96 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 97 | assert "main" in result.output |
| 98 | |
| 99 | def test_status_short_flag(self, tmp_path: pathlib.Path) -> None: |
| 100 | root, repo_id = _init_repo(tmp_path) |
| 101 | _make_commit(root, repo_id) |
| 102 | result = runner.invoke(cli, ["status", "--short"], env=_env(root), catch_exceptions=False) |
| 103 | assert result.exit_code == 0 |
| 104 | |
| 105 | def test_status_short_s_flag(self, tmp_path: pathlib.Path) -> None: |
| 106 | root, repo_id = _init_repo(tmp_path) |
| 107 | _make_commit(root, repo_id) |
| 108 | result = runner.invoke(cli, ["status", "-s"], env=_env(root), catch_exceptions=False) |
| 109 | assert result.exit_code == 0 |
| 110 | |
| 111 | def test_status_branch_flag(self, tmp_path: pathlib.Path) -> None: |
| 112 | root, repo_id = _init_repo(tmp_path) |
| 113 | _make_commit(root, repo_id) |
| 114 | result = runner.invoke(cli, ["status", "--branch"], env=_env(root), catch_exceptions=False) |
| 115 | assert result.exit_code == 0 |
| 116 | assert "main" in result.output |
| 117 | |
| 118 | def test_status_porcelain_flag(self, tmp_path: pathlib.Path) -> None: |
| 119 | root, repo_id = _init_repo(tmp_path) |
| 120 | _make_commit(root, repo_id) |
| 121 | result = runner.invoke(cli, ["status", "--porcelain"], env=_env(root), catch_exceptions=False) |
| 122 | assert result.exit_code == 0 |
| 123 | |
| 124 | def test_status_format_json(self, tmp_path: pathlib.Path) -> None: |
| 125 | root, repo_id = _init_repo(tmp_path) |
| 126 | _make_commit(root, repo_id) |
| 127 | (root / "new.mid").write_bytes(b"new file") |
| 128 | result = runner.invoke(cli, ["status", "--format", "json"], env=_env(root), catch_exceptions=False) |
| 129 | assert result.exit_code == 0 |
| 130 | data = json.loads(result.output) |
| 131 | assert "branch" in data |
| 132 | assert "added" in data |
| 133 | |
| 134 | def test_status_porcelain_output_machine_readable(self, tmp_path: pathlib.Path) -> None: |
| 135 | root, repo_id = _init_repo(tmp_path) |
| 136 | _make_commit(root, repo_id) |
| 137 | (root / "new.mid").write_bytes(b"new file") |
| 138 | result = runner.invoke(cli, ["status", "--porcelain"], env=_env(root), catch_exceptions=False) |
| 139 | assert result.exit_code == 0 |
| 140 | |
| 141 | def test_status_detects_new_file(self, tmp_path: pathlib.Path) -> None: |
| 142 | root, repo_id = _init_repo(tmp_path) |
| 143 | _make_commit(root, repo_id) |
| 144 | (root / "new_song.mid").write_bytes(b"MIDI data") |
| 145 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 146 | assert result.exit_code == 0 |
| 147 | assert "new_song.mid" in result.output |
| 148 | |
| 149 | def test_status_output_sanitized(self, tmp_path: pathlib.Path) -> None: |
| 150 | """Status output must not echo ANSI escape codes from filenames.""" |
| 151 | root, repo_id = _init_repo(tmp_path) |
| 152 | _make_commit(root, repo_id) |
| 153 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 154 | # No raw ANSI from repo state (filenames can't be controlled here easily) |
| 155 | assert result.exit_code == 0 |
| 156 | |
| 157 | |
| 158 | class TestStatusStress: |
| 159 | def test_status_with_many_files(self, tmp_path: pathlib.Path) -> None: |
| 160 | root, repo_id = _init_repo(tmp_path) |
| 161 | _make_commit(root, repo_id) |
| 162 | for i in range(100): |
| 163 | (root / f"track_{i:03d}.mid").write_bytes(f"data {i}".encode()) |
| 164 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 165 | assert result.exit_code == 0 |
| 166 | |
| 167 | def test_repeated_status_calls_idempotent(self, tmp_path: pathlib.Path) -> None: |
| 168 | root, repo_id = _init_repo(tmp_path) |
| 169 | _make_commit(root, repo_id) |
| 170 | outputs = [] |
| 171 | for _ in range(5): |
| 172 | result = runner.invoke(cli, ["status"], env=_env(root), catch_exceptions=False) |
| 173 | assert result.exit_code == 0 |
| 174 | outputs.append(result.output) |
| 175 | assert len(set(outputs)) == 1 |