test_core_coverage_gaps.py
python
| 1 | """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine.""" |
| 2 | from __future__ import annotations |
| 3 | |
| 4 | import json |
| 5 | import os |
| 6 | import pathlib |
| 7 | |
| 8 | import pytest |
| 9 | |
| 10 | from muse.core.object_store import ( |
| 11 | has_object, |
| 12 | object_path, |
| 13 | objects_dir, |
| 14 | read_object, |
| 15 | restore_object, |
| 16 | write_object, |
| 17 | write_object_from_path, |
| 18 | ) |
| 19 | from muse.core.repo import find_repo_root, require_repo |
| 20 | from muse.core.store import ( |
| 21 | CommitRecord, |
| 22 | SnapshotRecord, |
| 23 | get_commits_for_branch, |
| 24 | get_head_commit_id, |
| 25 | get_head_snapshot_id, |
| 26 | get_head_snapshot_manifest, |
| 27 | get_tags_for_commit, |
| 28 | read_commit, |
| 29 | read_snapshot, |
| 30 | resolve_commit_ref, |
| 31 | update_commit_metadata, |
| 32 | write_commit, |
| 33 | write_snapshot, |
| 34 | ) |
| 35 | from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state |
| 36 | |
| 37 | import datetime |
| 38 | |
| 39 | |
| 40 | # --------------------------------------------------------------------------- |
| 41 | # object_store |
| 42 | # --------------------------------------------------------------------------- |
| 43 | |
| 44 | |
| 45 | class TestObjectStore: |
| 46 | def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None: |
| 47 | d = objects_dir(tmp_path) |
| 48 | assert d == tmp_path / ".muse" / "objects" |
| 49 | |
| 50 | def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None: |
| 51 | oid = "ab" + "c" * 62 |
| 52 | p = object_path(tmp_path, oid) |
| 53 | assert p.parent.name == "ab" |
| 54 | assert p.name == "c" * 62 |
| 55 | |
| 56 | def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None: |
| 57 | assert not has_object(tmp_path, "a" * 64) |
| 58 | |
| 59 | def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None: |
| 60 | oid = "a" * 64 |
| 61 | write_object(tmp_path, oid, b"hello") |
| 62 | assert has_object(tmp_path, oid) |
| 63 | |
| 64 | def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None: |
| 65 | oid = "b" * 64 |
| 66 | assert write_object(tmp_path, oid, b"first") is True |
| 67 | assert write_object(tmp_path, oid, b"second") is False |
| 68 | # content should not change |
| 69 | assert read_object(tmp_path, oid) == b"first" |
| 70 | |
| 71 | def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None: |
| 72 | src = tmp_path / "src.bin" |
| 73 | src.write_bytes(b"content") |
| 74 | oid = "c" * 64 |
| 75 | assert write_object_from_path(tmp_path, oid, src) is True |
| 76 | assert write_object_from_path(tmp_path, oid, src) is False |
| 77 | |
| 78 | def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None: |
| 79 | src = tmp_path / "file.bin" |
| 80 | src.write_bytes(b"my bytes") |
| 81 | oid = "d" * 64 |
| 82 | write_object_from_path(tmp_path, oid, src) |
| 83 | assert read_object(tmp_path, oid) == b"my bytes" |
| 84 | |
| 85 | def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None: |
| 86 | assert read_object(tmp_path, "e" * 64) is None |
| 87 | |
| 88 | def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None: |
| 89 | oid = "f" * 64 |
| 90 | write_object(tmp_path, oid, b"data") |
| 91 | assert read_object(tmp_path, oid) == b"data" |
| 92 | |
| 93 | def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None: |
| 94 | dest = tmp_path / "out.bin" |
| 95 | result = restore_object(tmp_path, "0" * 64, dest) |
| 96 | assert result is False |
| 97 | assert not dest.exists() |
| 98 | |
| 99 | def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None: |
| 100 | oid = "1" * 64 |
| 101 | write_object(tmp_path, oid, b"restored") |
| 102 | dest = tmp_path / "sub" / "out.bin" |
| 103 | result = restore_object(tmp_path, oid, dest) |
| 104 | assert result is True |
| 105 | assert dest.read_bytes() == b"restored" |
| 106 | |
| 107 | def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None: |
| 108 | oid = "2" * 64 |
| 109 | write_object(tmp_path, oid, b"nested") |
| 110 | dest = tmp_path / "a" / "b" / "c" / "file.bin" |
| 111 | restore_object(tmp_path, oid, dest) |
| 112 | assert dest.exists() |
| 113 | |
| 114 | |
| 115 | # --------------------------------------------------------------------------- |
| 116 | # repo |
| 117 | # --------------------------------------------------------------------------- |
| 118 | |
| 119 | |
| 120 | class TestFindRepoRoot: |
| 121 | def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None: |
| 122 | (tmp_path / ".muse").mkdir() |
| 123 | result = find_repo_root(tmp_path) |
| 124 | assert result == tmp_path |
| 125 | |
| 126 | def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None: |
| 127 | (tmp_path / ".muse").mkdir() |
| 128 | subdir = tmp_path / "a" / "b" |
| 129 | subdir.mkdir(parents=True) |
| 130 | result = find_repo_root(subdir) |
| 131 | assert result == tmp_path |
| 132 | |
| 133 | def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None: |
| 134 | result = find_repo_root(tmp_path) |
| 135 | assert result is None |
| 136 | |
| 137 | def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 138 | (tmp_path / ".muse").mkdir() |
| 139 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 140 | result = find_repo_root() |
| 141 | assert result == tmp_path |
| 142 | |
| 143 | def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 144 | # tmp_path exists but has no .muse/ |
| 145 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 146 | result = find_repo_root() |
| 147 | assert result is None |
| 148 | |
| 149 | def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 150 | import click |
| 151 | monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) |
| 152 | monkeypatch.chdir(tmp_path) |
| 153 | with pytest.raises(click.exceptions.Exit): |
| 154 | require_repo() |
| 155 | |
| 156 | |
| 157 | # --------------------------------------------------------------------------- |
| 158 | # store coverage gaps |
| 159 | # --------------------------------------------------------------------------- |
| 160 | |
| 161 | |
| 162 | class TestStoreGaps: |
| 163 | def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: |
| 164 | muse = tmp_path / ".muse" |
| 165 | for d in ("commits", "snapshots", "objects", "refs/heads"): |
| 166 | (muse / d).mkdir(parents=True) |
| 167 | (muse / "HEAD").write_text("refs/heads/main\n") |
| 168 | (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 169 | (muse / "refs" / "heads" / "main").write_text("") |
| 170 | return tmp_path |
| 171 | |
| 172 | def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None: |
| 173 | root = self._make_repo(tmp_path) |
| 174 | assert get_head_commit_id(root, "main") is None |
| 175 | |
| 176 | def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None: |
| 177 | root = self._make_repo(tmp_path) |
| 178 | assert get_head_snapshot_id(root, "test-repo", "main") is None |
| 179 | |
| 180 | def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None: |
| 181 | root = self._make_repo(tmp_path) |
| 182 | assert get_head_snapshot_manifest(root, "test-repo", "main") is None |
| 183 | |
| 184 | def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None: |
| 185 | root = self._make_repo(tmp_path) |
| 186 | commits = get_commits_for_branch(root, "test-repo", "main") |
| 187 | assert commits == [] |
| 188 | |
| 189 | def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None: |
| 190 | root = self._make_repo(tmp_path) |
| 191 | snap = SnapshotRecord(snapshot_id="s" * 64, manifest={"a.mid": "h" * 64}) |
| 192 | write_snapshot(root, snap) |
| 193 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 194 | commit = CommitRecord( |
| 195 | commit_id="c" * 64, |
| 196 | repo_id="test-repo", |
| 197 | branch="main", |
| 198 | snapshot_id="s" * 64, |
| 199 | message="test", |
| 200 | committed_at=committed_at, |
| 201 | ) |
| 202 | write_commit(root, commit) |
| 203 | (root / ".muse" / "refs" / "heads" / "main").write_text("c" * 64) |
| 204 | |
| 205 | result = resolve_commit_ref(root, "test-repo", "main", None) |
| 206 | assert result is not None |
| 207 | assert result.commit_id == "c" * 64 |
| 208 | |
| 209 | def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None: |
| 210 | root = self._make_repo(tmp_path) |
| 211 | assert read_commit(root, "unknown") is None |
| 212 | |
| 213 | def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None: |
| 214 | root = self._make_repo(tmp_path) |
| 215 | assert read_snapshot(root, "unknown") is None |
| 216 | |
| 217 | def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None: |
| 218 | root = self._make_repo(tmp_path) |
| 219 | assert update_commit_metadata(root, "unknown", "key", "val") is False |
| 220 | |
| 221 | def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None: |
| 222 | root = self._make_repo(tmp_path) |
| 223 | tags = get_tags_for_commit(root, "test-repo", "c" * 64) |
| 224 | assert tags == [] |
| 225 | |
| 226 | |
| 227 | # --------------------------------------------------------------------------- |
| 228 | # merge_engine coverage gaps |
| 229 | # --------------------------------------------------------------------------- |
| 230 | |
| 231 | |
| 232 | class TestMergeEngineCoverageGaps: |
| 233 | def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: |
| 234 | muse = tmp_path / ".muse" |
| 235 | muse.mkdir(parents=True) |
| 236 | return tmp_path |
| 237 | |
| 238 | def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None: |
| 239 | root = self._make_repo(tmp_path) |
| 240 | # Should not raise even if MERGE_STATE.json is absent |
| 241 | clear_merge_state(root) |
| 242 | |
| 243 | def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None: |
| 244 | root = self._make_repo(tmp_path) |
| 245 | # Write a real object to the store |
| 246 | oid = "a" * 64 |
| 247 | write_object(root, oid, b"resolved content") |
| 248 | |
| 249 | apply_resolution(root, "track.mid", oid) |
| 250 | dest = root / "muse-work" / "track.mid" |
| 251 | assert dest.exists() |
| 252 | assert dest.read_bytes() == b"resolved content" |
| 253 | |
| 254 | def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None: |
| 255 | root = self._make_repo(tmp_path) |
| 256 | with pytest.raises(FileNotFoundError): |
| 257 | apply_resolution(root, "track.mid", "0" * 64) |
| 258 | |
| 259 | def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 260 | root = self._make_repo(tmp_path) |
| 261 | (root / ".muse" / "MERGE_STATE.json").write_text("not json {{") |
| 262 | result = read_merge_state(root) |
| 263 | assert result is None |
| 264 | |
| 265 | def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None: |
| 266 | root = self._make_repo(tmp_path) |
| 267 | write_merge_state( |
| 268 | root, |
| 269 | base_commit="b" * 64, |
| 270 | ours_commit="o" * 64, |
| 271 | theirs_commit="t" * 64, |
| 272 | conflict_paths=["a.mid"], |
| 273 | ) |
| 274 | assert (root / ".muse" / "MERGE_STATE.json").exists() |
| 275 | clear_merge_state(root) |
| 276 | assert not (root / ".muse" / "MERGE_STATE.json").exists() |