test_cmd_gc.py
python
| 1 | """Comprehensive tests for ``muse gc``. |
| 2 | |
| 3 | Covers: |
| 4 | - Unit: run_gc core logic (reachable vs unreachable objects) |
| 5 | - Integration: gc cleans up orphaned objects after commits |
| 6 | - E2E: full CLI via CliRunner (--dry-run, --verbose, --format json) |
| 7 | - Security: only objects dir affected, no path traversal |
| 8 | - Stress: gc with many orphaned objects |
| 9 | """ |
| 10 | |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import datetime |
| 14 | import hashlib |
| 15 | import json |
| 16 | import pathlib |
| 17 | import uuid |
| 18 | |
| 19 | import pytest |
| 20 | from tests.cli_test_helper import CliRunner |
| 21 | |
| 22 | cli = None # argparse migration — CliRunner ignores this arg |
| 23 | |
| 24 | runner = CliRunner() |
| 25 | |
| 26 | |
| 27 | # --------------------------------------------------------------------------- |
| 28 | # Helpers |
| 29 | # --------------------------------------------------------------------------- |
| 30 | |
| 31 | def _env(root: pathlib.Path) -> dict[str, str]: |
| 32 | return {"MUSE_REPO_ROOT": str(root)} |
| 33 | |
| 34 | |
| 35 | def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 36 | muse_dir = tmp_path / ".muse" |
| 37 | muse_dir.mkdir() |
| 38 | repo_id = str(uuid.uuid4()) |
| 39 | (muse_dir / "repo.json").write_text(json.dumps({ |
| 40 | "repo_id": repo_id, |
| 41 | "domain": "midi", |
| 42 | "default_branch": "main", |
| 43 | "created_at": "2025-01-01T00:00:00+00:00", |
| 44 | }), encoding="utf-8") |
| 45 | (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 46 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 47 | (muse_dir / "snapshots").mkdir() |
| 48 | (muse_dir / "commits").mkdir() |
| 49 | (muse_dir / "objects").mkdir() |
| 50 | return tmp_path, repo_id |
| 51 | |
| 52 | |
| 53 | def _write_object(root: pathlib.Path, content: bytes) -> str: |
| 54 | obj_id = hashlib.sha256(content).hexdigest() |
| 55 | obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 56 | obj_path.parent.mkdir(parents=True, exist_ok=True) |
| 57 | obj_path.write_bytes(content) |
| 58 | return obj_id |
| 59 | |
| 60 | |
| 61 | def _make_commit(root: pathlib.Path, repo_id: str, message: str = "init") -> str: |
| 62 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 63 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 64 | |
| 65 | ref_file = root / ".muse" / "refs" / "heads" / "main" |
| 66 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 67 | manifest: dict[str, str] = {} |
| 68 | snap_id = compute_snapshot_id(manifest) |
| 69 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 70 | commit_id = compute_commit_id( |
| 71 | parent_ids=[parent_id] if parent_id else [], |
| 72 | snapshot_id=snap_id, message=message, |
| 73 | committed_at_iso=committed_at.isoformat(), |
| 74 | ) |
| 75 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 76 | write_commit(root, CommitRecord( |
| 77 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 78 | snapshot_id=snap_id, message=message, committed_at=committed_at, |
| 79 | parent_commit_id=parent_id, |
| 80 | )) |
| 81 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 82 | ref_file.write_text(commit_id, encoding="utf-8") |
| 83 | return commit_id |
| 84 | |
| 85 | |
| 86 | # --------------------------------------------------------------------------- |
| 87 | # Unit tests |
| 88 | # --------------------------------------------------------------------------- |
| 89 | |
| 90 | class TestGcUnit: |
| 91 | def test_run_gc_empty_repo(self, tmp_path: pathlib.Path) -> None: |
| 92 | root, _ = _init_repo(tmp_path) |
| 93 | from muse.core.gc import run_gc |
| 94 | result = run_gc(root, dry_run=False) |
| 95 | assert result.collected_count == 0 |
| 96 | |
| 97 | def test_run_gc_dry_run_does_not_delete(self, tmp_path: pathlib.Path) -> None: |
| 98 | root, _ = _init_repo(tmp_path) |
| 99 | orphan_id = _write_object(root, b"orphaned content") |
| 100 | from muse.core.gc import run_gc |
| 101 | result = run_gc(root, dry_run=True) |
| 102 | obj_path = root / ".muse" / "objects" / orphan_id[:2] / orphan_id[2:] |
| 103 | assert obj_path.exists() |
| 104 | assert result.collected_count >= 1 |
| 105 | |
| 106 | def test_run_gc_collects_unreachable_objects(self, tmp_path: pathlib.Path) -> None: |
| 107 | root, repo_id = _init_repo(tmp_path) |
| 108 | _make_commit(root, repo_id, message="committed") |
| 109 | orphan_id = _write_object(root, b"never committed content") |
| 110 | from muse.core.gc import run_gc |
| 111 | result = run_gc(root, dry_run=False) |
| 112 | obj_path = root / ".muse" / "objects" / orphan_id[:2] / orphan_id[2:] |
| 113 | assert not obj_path.exists() |
| 114 | assert orphan_id in result.collected_ids |
| 115 | |
| 116 | |
| 117 | # --------------------------------------------------------------------------- |
| 118 | # Integration (CLI) tests |
| 119 | # --------------------------------------------------------------------------- |
| 120 | |
| 121 | class TestGcIntegration: |
| 122 | def test_gc_default_clean_repo(self, tmp_path: pathlib.Path) -> None: |
| 123 | root, repo_id = _init_repo(tmp_path) |
| 124 | _make_commit(root, repo_id) |
| 125 | result = runner.invoke(cli, ["gc"], env=_env(root), catch_exceptions=False) |
| 126 | assert result.exit_code == 0 |
| 127 | |
| 128 | def test_gc_dry_run_reports_orphans(self, tmp_path: pathlib.Path) -> None: |
| 129 | root, repo_id = _init_repo(tmp_path) |
| 130 | _make_commit(root, repo_id) |
| 131 | _write_object(root, b"orphan1") |
| 132 | _write_object(root, b"orphan2") |
| 133 | result = runner.invoke(cli, ["gc", "--dry-run"], env=_env(root), catch_exceptions=False) |
| 134 | assert result.exit_code == 0 |
| 135 | assert "2" in result.output or "collect" in result.output.lower() |
| 136 | |
| 137 | def test_gc_verbose_shows_ids(self, tmp_path: pathlib.Path) -> None: |
| 138 | root, repo_id = _init_repo(tmp_path) |
| 139 | _make_commit(root, repo_id) |
| 140 | orphan_id = _write_object(root, b"verbose orphan") |
| 141 | result = runner.invoke(cli, ["gc", "--verbose"], env=_env(root), catch_exceptions=False) |
| 142 | assert result.exit_code == 0 |
| 143 | assert orphan_id[:12] in result.output |
| 144 | |
| 145 | def test_gc_output_includes_count(self, tmp_path: pathlib.Path) -> None: |
| 146 | root, repo_id = _init_repo(tmp_path) |
| 147 | _write_object(root, b"orphan for count test") |
| 148 | result = runner.invoke(cli, ["gc"], env=_env(root), catch_exceptions=False) |
| 149 | assert result.exit_code == 0 |
| 150 | assert "Removed" in result.output or "object" in result.output |
| 151 | |
| 152 | def test_gc_keeps_referenced_objects(self, tmp_path: pathlib.Path) -> None: |
| 153 | root, repo_id = _init_repo(tmp_path) |
| 154 | content = b"referenced file content" |
| 155 | obj_id = _write_object(root, content) |
| 156 | |
| 157 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 158 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 159 | |
| 160 | manifest = {"file.mid": obj_id} |
| 161 | snap_id = compute_snapshot_id(manifest) |
| 162 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 163 | commit_id = compute_commit_id([], snap_id, "with file", committed_at.isoformat()) |
| 164 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 165 | write_commit(root, CommitRecord( |
| 166 | commit_id=commit_id, repo_id=repo_id, branch="main", |
| 167 | snapshot_id=snap_id, message="with file", |
| 168 | committed_at=committed_at, parent_commit_id=None, |
| 169 | )) |
| 170 | (root / ".muse" / "refs" / "heads" / "main").write_text(commit_id) |
| 171 | |
| 172 | runner.invoke(cli, ["gc"], env=_env(root), catch_exceptions=False) |
| 173 | obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 174 | assert obj_path.exists() |
| 175 | |
| 176 | def test_gc_short_flags(self, tmp_path: pathlib.Path) -> None: |
| 177 | root, repo_id = _init_repo(tmp_path) |
| 178 | _make_commit(root, repo_id) |
| 179 | _write_object(root, b"short flag orphan") |
| 180 | result = runner.invoke(cli, ["gc", "-n", "-v"], env=_env(root), catch_exceptions=False) |
| 181 | assert result.exit_code == 0 |
| 182 | |
| 183 | |
| 184 | # --------------------------------------------------------------------------- |
| 185 | # Stress tests |
| 186 | # --------------------------------------------------------------------------- |
| 187 | |
| 188 | class TestGcStress: |
| 189 | def test_gc_many_orphaned_objects(self, tmp_path: pathlib.Path) -> None: |
| 190 | root, repo_id = _init_repo(tmp_path) |
| 191 | _make_commit(root, repo_id) |
| 192 | orphan_ids = [_write_object(root, f"orphan {i}".encode()) for i in range(100)] |
| 193 | |
| 194 | result = runner.invoke(cli, ["gc"], env=_env(root), catch_exceptions=False) |
| 195 | assert result.exit_code == 0 |
| 196 | assert "100" in result.output |
| 197 | |
| 198 | for oid in orphan_ids: |
| 199 | obj_path = root / ".muse" / "objects" / oid[:2] / oid[2:] |
| 200 | assert not obj_path.exists() |
| 201 | |
| 202 | def test_gc_repeated_runs_idempotent(self, tmp_path: pathlib.Path) -> None: |
| 203 | root, repo_id = _init_repo(tmp_path) |
| 204 | _make_commit(root, repo_id) |
| 205 | for _ in range(3): |
| 206 | result = runner.invoke(cli, ["gc"], env=_env(root), catch_exceptions=False) |
| 207 | assert result.exit_code == 0 |