test_core_gc.py
python
| 1 | """Tests for muse/core/gc.py — garbage collection.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | import pathlib |
| 7 | |
| 8 | import pytest |
| 9 | |
| 10 | from muse.core.gc import GcResult, count_unreachable, run_gc |
| 11 | |
| 12 | |
| 13 | # --------------------------------------------------------------------------- |
| 14 | # Helpers |
| 15 | # --------------------------------------------------------------------------- |
| 16 | |
| 17 | |
| 18 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 19 | """Create a minimal .muse repo structure.""" |
| 20 | muse = tmp_path / ".muse" |
| 21 | for d in ("objects", "commits", "snapshots", "refs/heads"): |
| 22 | (muse / d).mkdir(parents=True, exist_ok=True) |
| 23 | (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 24 | (muse / "HEAD").write_text("refs/heads/main\n") |
| 25 | return tmp_path |
| 26 | |
| 27 | |
| 28 | def _write_object(repo: pathlib.Path, content: bytes) -> str: |
| 29 | import hashlib |
| 30 | |
| 31 | sha = hashlib.sha256(content).hexdigest() |
| 32 | obj_dir = repo / ".muse" / "objects" / sha[:2] |
| 33 | obj_dir.mkdir(parents=True, exist_ok=True) |
| 34 | (obj_dir / sha[2:]).write_bytes(content) |
| 35 | return sha |
| 36 | |
| 37 | |
| 38 | def _write_snapshot(repo: pathlib.Path, snapshot_id: str, manifest: dict[str, str]) -> None: |
| 39 | snap_dir = repo / ".muse" / "snapshots" |
| 40 | snap_dir.mkdir(parents=True, exist_ok=True) |
| 41 | (snap_dir / f"{snapshot_id}.json").write_text( |
| 42 | json.dumps({"snapshot_id": snapshot_id, "manifest": manifest}) |
| 43 | ) |
| 44 | |
| 45 | |
| 46 | def _write_commit(repo: pathlib.Path, commit_id: str, snapshot_id: str) -> None: |
| 47 | import datetime |
| 48 | |
| 49 | commit_dir = repo / ".muse" / "commits" |
| 50 | commit_dir.mkdir(parents=True, exist_ok=True) |
| 51 | (commit_dir / f"{commit_id}.json").write_text(json.dumps({ |
| 52 | "commit_id": commit_id, |
| 53 | "repo_id": "test-repo", |
| 54 | "branch": "main", |
| 55 | "snapshot_id": snapshot_id, |
| 56 | "message": "test", |
| 57 | "committed_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| 58 | "parent_commit_id": None, |
| 59 | "parent2_commit_id": None, |
| 60 | "author": "", |
| 61 | "metadata": {}, |
| 62 | })) |
| 63 | # Advance branch HEAD. |
| 64 | ref_path = repo / ".muse" / "refs" / "heads" / "main" |
| 65 | ref_path.parent.mkdir(parents=True, exist_ok=True) |
| 66 | ref_path.write_text(commit_id) |
| 67 | |
| 68 | |
| 69 | # --------------------------------------------------------------------------- |
| 70 | # Tests |
| 71 | # --------------------------------------------------------------------------- |
| 72 | |
| 73 | |
| 74 | def test_gc_empty_repo(tmp_path: pathlib.Path) -> None: |
| 75 | """GC on an empty repo should report 0 collected.""" |
| 76 | repo = _make_repo(tmp_path) |
| 77 | result = run_gc(repo) |
| 78 | assert isinstance(result, GcResult) |
| 79 | assert result.collected_count == 0 |
| 80 | |
| 81 | |
| 82 | def test_gc_removes_unreachable_object(tmp_path: pathlib.Path) -> None: |
| 83 | repo = _make_repo(tmp_path) |
| 84 | # Write an object but don't reference it in any commit. |
| 85 | orphan_id = _write_object(repo, b"orphan data") |
| 86 | obj_path = repo / ".muse" / "objects" / orphan_id[:2] / orphan_id[2:] |
| 87 | assert obj_path.exists() |
| 88 | |
| 89 | result = run_gc(repo) |
| 90 | assert result.collected_count == 1 |
| 91 | assert orphan_id in result.collected_ids |
| 92 | assert not obj_path.exists() |
| 93 | |
| 94 | |
| 95 | def test_gc_preserves_reachable_object(tmp_path: pathlib.Path) -> None: |
| 96 | repo = _make_repo(tmp_path) |
| 97 | content = b"reachable file content" |
| 98 | obj_id = _write_object(repo, content) |
| 99 | snap_id = "s" * 64 |
| 100 | commit_id = "c" * 64 |
| 101 | _write_snapshot(repo, snap_id, {"file.txt": obj_id}) |
| 102 | _write_commit(repo, commit_id, snap_id) |
| 103 | |
| 104 | result = run_gc(repo) |
| 105 | assert result.collected_count == 0 |
| 106 | obj_path = repo / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 107 | assert obj_path.exists() |
| 108 | |
| 109 | |
| 110 | def test_gc_dry_run_does_not_delete(tmp_path: pathlib.Path) -> None: |
| 111 | repo = _make_repo(tmp_path) |
| 112 | orphan_id = _write_object(repo, b"orphan") |
| 113 | obj_path = repo / ".muse" / "objects" / orphan_id[:2] / orphan_id[2:] |
| 114 | |
| 115 | result = run_gc(repo, dry_run=True) |
| 116 | assert result.dry_run is True |
| 117 | assert result.collected_count == 1 |
| 118 | # File should still exist. |
| 119 | assert obj_path.exists() |
| 120 | |
| 121 | |
| 122 | def test_gc_collected_bytes(tmp_path: pathlib.Path) -> None: |
| 123 | repo = _make_repo(tmp_path) |
| 124 | content = b"x" * 1000 |
| 125 | _write_object(repo, content) |
| 126 | result = run_gc(repo) |
| 127 | assert result.collected_bytes >= 1000 |
| 128 | |
| 129 | |
| 130 | def test_gc_multiple_orphans(tmp_path: pathlib.Path) -> None: |
| 131 | repo = _make_repo(tmp_path) |
| 132 | for i in range(5): |
| 133 | _write_object(repo, f"orphan {i}".encode()) |
| 134 | result = run_gc(repo) |
| 135 | assert result.collected_count == 5 |
| 136 | |
| 137 | |
| 138 | def test_gc_mixed_reachable_and_orphans(tmp_path: pathlib.Path) -> None: |
| 139 | repo = _make_repo(tmp_path) |
| 140 | # One reachable object. |
| 141 | reachable_id = _write_object(repo, b"reachable") |
| 142 | snap_id = "s" * 64 |
| 143 | commit_id = "c" * 64 |
| 144 | _write_snapshot(repo, snap_id, {"file.txt": reachable_id}) |
| 145 | _write_commit(repo, commit_id, snap_id) |
| 146 | # Two orphans. |
| 147 | _write_object(repo, b"orphan A") |
| 148 | _write_object(repo, b"orphan B") |
| 149 | |
| 150 | result = run_gc(repo) |
| 151 | assert result.collected_count == 2 |
| 152 | assert result.reachable_count == 1 |
| 153 | |
| 154 | |
| 155 | def test_count_unreachable(tmp_path: pathlib.Path) -> None: |
| 156 | repo = _make_repo(tmp_path) |
| 157 | _write_object(repo, b"orphan 1") |
| 158 | _write_object(repo, b"orphan 2") |
| 159 | assert count_unreachable(repo) == 2 |
| 160 | |
| 161 | |
| 162 | def test_count_unreachable_empty(tmp_path: pathlib.Path) -> None: |
| 163 | repo = _make_repo(tmp_path) |
| 164 | assert count_unreachable(repo) == 0 |
| 165 | |
| 166 | |
| 167 | def test_gc_elapsed_time_positive(tmp_path: pathlib.Path) -> None: |
| 168 | repo = _make_repo(tmp_path) |
| 169 | result = run_gc(repo) |
| 170 | assert result.elapsed_seconds >= 0.0 |
| 171 | |
| 172 | |
| 173 | # --------------------------------------------------------------------------- |
| 174 | # Stress test |
| 175 | # --------------------------------------------------------------------------- |
| 176 | |
| 177 | |
| 178 | def test_gc_stress_many_orphans(tmp_path: pathlib.Path) -> None: |
| 179 | """GC should handle 200 orphaned objects efficiently.""" |
| 180 | repo = _make_repo(tmp_path) |
| 181 | for i in range(200): |
| 182 | _write_object(repo, f"orphan-{i:04d}".encode()) |
| 183 | result = run_gc(repo) |
| 184 | assert result.collected_count == 200 |
| 185 | # Verify the objects directory is clean. |
| 186 | obj_dir = repo / ".muse" / "objects" |
| 187 | remaining = list(obj_dir.rglob("*")) |
| 188 | remaining_files = [p for p in remaining if p.is_file()] |
| 189 | assert remaining_files == [] |