gabriel / muse public
test_core_coverage_gaps.py python
349 lines 13.3 KB
d78c6f12 fix: remove last typer references from muse/core/ Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine."""
2
3 import hashlib
4 import json
5 import os
6 import pathlib
7
8 import pytest
9
10
11 def _sha256(data: bytes) -> str:
12 return hashlib.sha256(data).hexdigest()
13
14 from muse.core.object_store import (
15 has_object,
16 object_path,
17 objects_dir,
18 read_object,
19 restore_object,
20 write_object,
21 write_object_from_path,
22 )
23 from muse.core.repo import find_repo_root, require_repo
24 from muse.core.store import (
25 CommitRecord,
26 SnapshotRecord,
27 get_commits_for_branch,
28 get_head_commit_id,
29 get_head_snapshot_id,
30 get_head_snapshot_manifest,
31 get_tags_for_commit,
32 read_commit,
33 read_snapshot,
34 resolve_commit_ref,
35 update_commit_metadata,
36 write_commit,
37 write_snapshot,
38 )
39 from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state
40
41 import datetime
42
43
44 # ---------------------------------------------------------------------------
45 # object_store
46 # ---------------------------------------------------------------------------
47
48
49 class TestObjectStore:
50 def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None:
51 d = objects_dir(tmp_path)
52 assert d == tmp_path / ".muse" / "objects"
53
54 def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None:
55 oid = "ab" + "c" * 62
56 p = object_path(tmp_path, oid)
57 assert p.parent.name == "ab"
58 assert p.name == "c" * 62
59
60 def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None:
61 assert not has_object(tmp_path, "a" * 64)
62
63 def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None:
64 content = b"hello"
65 oid = _sha256(content)
66 write_object(tmp_path, oid, content)
67 assert has_object(tmp_path, oid)
68
69 def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None:
70 content = b"first"
71 oid = _sha256(content)
72 assert write_object(tmp_path, oid, content) is True
73 # Second write with correct hash but same ID — idempotent
74 assert write_object(tmp_path, oid, content) is False
75 # content should not change
76 assert read_object(tmp_path, oid) == content
77
78 def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None:
79 content = b"content"
80 src = tmp_path / "src.bin"
81 src.write_bytes(content)
82 oid = _sha256(content)
83 assert write_object_from_path(tmp_path, oid, src) is True
84 assert write_object_from_path(tmp_path, oid, src) is False
85
86 def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None:
87 content = b"my bytes"
88 src = tmp_path / "file.bin"
89 src.write_bytes(content)
90 oid = _sha256(content)
91 write_object_from_path(tmp_path, oid, src)
92 assert read_object(tmp_path, oid) == content
93
94 def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None:
95 assert read_object(tmp_path, "e" * 64) is None
96
97 def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None:
98 content = b"data"
99 oid = _sha256(content)
100 write_object(tmp_path, oid, content)
101 assert read_object(tmp_path, oid) == content
102
103 def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None:
104 dest = tmp_path / "out.bin"
105 result = restore_object(tmp_path, "0" * 64, dest)
106 assert result is False
107 assert not dest.exists()
108
109 def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None:
110 content = b"restored"
111 oid = _sha256(content)
112 write_object(tmp_path, oid, content)
113 dest = tmp_path / "sub" / "out.bin"
114 result = restore_object(tmp_path, oid, dest)
115 assert result is True
116 assert dest.read_bytes() == content
117
118 def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
119 content = b"nested"
120 oid = _sha256(content)
121 write_object(tmp_path, oid, content)
122 dest = tmp_path / "a" / "b" / "c" / "file.bin"
123 restore_object(tmp_path, oid, dest)
124 assert dest.exists()
125
126
127 # ---------------------------------------------------------------------------
128 # repo
129 # ---------------------------------------------------------------------------
130
131
132 class TestFindRepoRoot:
133 def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None:
134 (tmp_path / ".muse").mkdir()
135 result = find_repo_root(tmp_path)
136 assert result == tmp_path
137
138 def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None:
139 (tmp_path / ".muse").mkdir()
140 subdir = tmp_path / "a" / "b"
141 subdir.mkdir(parents=True)
142 result = find_repo_root(subdir)
143 assert result == tmp_path
144
145 def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None:
146 result = find_repo_root(tmp_path)
147 assert result is None
148
149 def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
150 (tmp_path / ".muse").mkdir()
151 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
152 result = find_repo_root()
153 assert result == tmp_path
154
155 def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
156 # tmp_path exists but has no .muse/
157 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
158 result = find_repo_root()
159 assert result is None
160
161 def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
162 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
163 monkeypatch.chdir(tmp_path)
164 with pytest.raises(SystemExit):
165 require_repo()
166
167
168 # ---------------------------------------------------------------------------
169 # store coverage gaps
170 # ---------------------------------------------------------------------------
171
172
173 class TestStoreGaps:
174 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
175 muse = tmp_path / ".muse"
176 for d in ("commits", "snapshots", "objects", "refs/heads"):
177 (muse / d).mkdir(parents=True)
178 (muse / "HEAD").write_text("ref: refs/heads/main\n")
179 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
180 (muse / "refs" / "heads" / "main").write_text("")
181 return tmp_path
182
183 def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None:
184 root = self._make_repo(tmp_path)
185 assert get_head_commit_id(root, "main") is None
186
187 def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None:
188 root = self._make_repo(tmp_path)
189 assert get_head_snapshot_id(root, "test-repo", "main") is None
190
191 def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None:
192 root = self._make_repo(tmp_path)
193 assert get_head_snapshot_manifest(root, "test-repo", "main") is None
194
195 def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None:
196 root = self._make_repo(tmp_path)
197 commits = get_commits_for_branch(root, "test-repo", "main")
198 assert commits == []
199
200 def _seed_chain(self, root: pathlib.Path, n: int) -> list[str]:
201 """Write a linear chain of *n* commits on ``main`` and return their IDs (newest first)."""
202 import hashlib
203 now = datetime.datetime.now(datetime.timezone.utc)
204 ids: list[str] = []
205 parent_id: str | None = None
206 for i in range(n):
207 snap_id = hashlib.sha256(f"snap-{i}".encode()).hexdigest()
208 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}))
209 commit_id = hashlib.sha256(f"commit-{i}".encode()).hexdigest()
210 commit = CommitRecord(
211 commit_id=commit_id,
212 repo_id="test-repo",
213 branch="main",
214 snapshot_id=snap_id,
215 message=f"commit {i}",
216 committed_at=now,
217 parent_commit_id=parent_id,
218 )
219 write_commit(root, commit)
220 ids.append(commit_id)
221 parent_id = commit_id
222 # HEAD points at the last (newest) commit
223 (root / ".muse" / "refs" / "heads" / "main").write_text(ids[-1])
224 ids.reverse() # newest first, matching get_commits_for_branch order
225 return ids
226
227 def test_get_commits_for_branch_max_count_stops_early(
228 self, tmp_path: pathlib.Path
229 ) -> None:
230 """max_count caps the walk — only that many commits are returned."""
231 root = self._make_repo(tmp_path)
232 all_ids = self._seed_chain(root, 5)
233
234 result = get_commits_for_branch(root, "test-repo", "main", max_count=2)
235 assert len(result) == 2
236 assert result[0].commit_id == all_ids[0]
237 assert result[1].commit_id == all_ids[1]
238
239 def test_get_commits_for_branch_max_count_zero_returns_all(
240 self, tmp_path: pathlib.Path
241 ) -> None:
242 """max_count=0 (the default) returns the full chain."""
243 root = self._make_repo(tmp_path)
244 all_ids = self._seed_chain(root, 5)
245
246 result = get_commits_for_branch(root, "test-repo", "main", max_count=0)
247 assert len(result) == 5
248 assert [c.commit_id for c in result] == all_ids
249
250 def test_get_commits_for_branch_max_count_larger_than_chain(
251 self, tmp_path: pathlib.Path
252 ) -> None:
253 """max_count larger than the chain length returns every commit without error."""
254 root = self._make_repo(tmp_path)
255 all_ids = self._seed_chain(root, 3)
256
257 result = get_commits_for_branch(root, "test-repo", "main", max_count=100)
258 assert len(result) == 3
259 assert [c.commit_id for c in result] == all_ids
260
261 def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None:
262 root = self._make_repo(tmp_path)
263 snap = SnapshotRecord(snapshot_id="s" * 64, manifest={"a.mid": "h" * 64})
264 write_snapshot(root, snap)
265 committed_at = datetime.datetime.now(datetime.timezone.utc)
266 commit = CommitRecord(
267 commit_id="c" * 64,
268 repo_id="test-repo",
269 branch="main",
270 snapshot_id="s" * 64,
271 message="test",
272 committed_at=committed_at,
273 )
274 write_commit(root, commit)
275 (root / ".muse" / "refs" / "heads" / "main").write_text("c" * 64)
276
277 result = resolve_commit_ref(root, "test-repo", "main", None)
278 assert result is not None
279 assert result.commit_id == "c" * 64
280
281 def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
282 root = self._make_repo(tmp_path)
283 assert read_commit(root, "unknown") is None
284
285 def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
286 root = self._make_repo(tmp_path)
287 assert read_snapshot(root, "unknown") is None
288
289 def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None:
290 root = self._make_repo(tmp_path)
291 assert update_commit_metadata(root, "unknown", "key", "val") is False
292
293 def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None:
294 root = self._make_repo(tmp_path)
295 tags = get_tags_for_commit(root, "test-repo", "c" * 64)
296 assert tags == []
297
298
299 # ---------------------------------------------------------------------------
300 # merge_engine coverage gaps
301 # ---------------------------------------------------------------------------
302
303
304 class TestMergeEngineCoverageGaps:
305 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
306 muse = tmp_path / ".muse"
307 muse.mkdir(parents=True)
308 return tmp_path
309
310 def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None:
311 root = self._make_repo(tmp_path)
312 # Should not raise even if MERGE_STATE.json is absent
313 clear_merge_state(root)
314
315 def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None:
316 root = self._make_repo(tmp_path)
317 # Write a real object to the store — oid must be the SHA-256 of the content.
318 content = b"resolved content"
319 oid = _sha256(content)
320 write_object(root, oid, content)
321
322 apply_resolution(root, "track.mid", oid)
323 dest = root / "track.mid"
324 assert dest.exists()
325 assert dest.read_bytes() == b"resolved content"
326
327 def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None:
328 root = self._make_repo(tmp_path)
329 with pytest.raises(FileNotFoundError):
330 apply_resolution(root, "track.mid", "0" * 64)
331
332 def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None:
333 root = self._make_repo(tmp_path)
334 (root / ".muse" / "MERGE_STATE.json").write_text("not json {{")
335 result = read_merge_state(root)
336 assert result is None
337
338 def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None:
339 root = self._make_repo(tmp_path)
340 write_merge_state(
341 root,
342 base_commit="b" * 64,
343 ours_commit="o" * 64,
344 theirs_commit="t" * 64,
345 conflict_paths=["a.mid"],
346 )
347 assert (root / ".muse" / "MERGE_STATE.json").exists()
348 clear_merge_state(root)
349 assert not (root / ".muse" / "MERGE_STATE.json").exists()