gabriel / muse public
test_core_coverage_gaps.py python
350 lines 13.3 KB
a6a637a2 test(log): cover max_count early-stop at both unit and CLI level Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine."""
2
3 import hashlib
4 import json
5 import os
6 import pathlib
7
8 import pytest
9
10
11 def _sha256(data: bytes) -> str:
12 return hashlib.sha256(data).hexdigest()
13
14 from muse.core.object_store import (
15 has_object,
16 object_path,
17 objects_dir,
18 read_object,
19 restore_object,
20 write_object,
21 write_object_from_path,
22 )
23 from muse.core.repo import find_repo_root, require_repo
24 from muse.core.store import (
25 CommitRecord,
26 SnapshotRecord,
27 get_commits_for_branch,
28 get_head_commit_id,
29 get_head_snapshot_id,
30 get_head_snapshot_manifest,
31 get_tags_for_commit,
32 read_commit,
33 read_snapshot,
34 resolve_commit_ref,
35 update_commit_metadata,
36 write_commit,
37 write_snapshot,
38 )
39 from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state
40
41 import datetime
42
43
44 # ---------------------------------------------------------------------------
45 # object_store
46 # ---------------------------------------------------------------------------
47
48
49 class TestObjectStore:
50 def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None:
51 d = objects_dir(tmp_path)
52 assert d == tmp_path / ".muse" / "objects"
53
54 def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None:
55 oid = "ab" + "c" * 62
56 p = object_path(tmp_path, oid)
57 assert p.parent.name == "ab"
58 assert p.name == "c" * 62
59
60 def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None:
61 assert not has_object(tmp_path, "a" * 64)
62
63 def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None:
64 content = b"hello"
65 oid = _sha256(content)
66 write_object(tmp_path, oid, content)
67 assert has_object(tmp_path, oid)
68
69 def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None:
70 content = b"first"
71 oid = _sha256(content)
72 assert write_object(tmp_path, oid, content) is True
73 # Second write with correct hash but same ID — idempotent
74 assert write_object(tmp_path, oid, content) is False
75 # content should not change
76 assert read_object(tmp_path, oid) == content
77
78 def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None:
79 content = b"content"
80 src = tmp_path / "src.bin"
81 src.write_bytes(content)
82 oid = _sha256(content)
83 assert write_object_from_path(tmp_path, oid, src) is True
84 assert write_object_from_path(tmp_path, oid, src) is False
85
86 def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None:
87 content = b"my bytes"
88 src = tmp_path / "file.bin"
89 src.write_bytes(content)
90 oid = _sha256(content)
91 write_object_from_path(tmp_path, oid, src)
92 assert read_object(tmp_path, oid) == content
93
94 def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None:
95 assert read_object(tmp_path, "e" * 64) is None
96
97 def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None:
98 content = b"data"
99 oid = _sha256(content)
100 write_object(tmp_path, oid, content)
101 assert read_object(tmp_path, oid) == content
102
103 def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None:
104 dest = tmp_path / "out.bin"
105 result = restore_object(tmp_path, "0" * 64, dest)
106 assert result is False
107 assert not dest.exists()
108
109 def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None:
110 content = b"restored"
111 oid = _sha256(content)
112 write_object(tmp_path, oid, content)
113 dest = tmp_path / "sub" / "out.bin"
114 result = restore_object(tmp_path, oid, dest)
115 assert result is True
116 assert dest.read_bytes() == content
117
118 def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
119 content = b"nested"
120 oid = _sha256(content)
121 write_object(tmp_path, oid, content)
122 dest = tmp_path / "a" / "b" / "c" / "file.bin"
123 restore_object(tmp_path, oid, dest)
124 assert dest.exists()
125
126
127 # ---------------------------------------------------------------------------
128 # repo
129 # ---------------------------------------------------------------------------
130
131
132 class TestFindRepoRoot:
133 def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None:
134 (tmp_path / ".muse").mkdir()
135 result = find_repo_root(tmp_path)
136 assert result == tmp_path
137
138 def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None:
139 (tmp_path / ".muse").mkdir()
140 subdir = tmp_path / "a" / "b"
141 subdir.mkdir(parents=True)
142 result = find_repo_root(subdir)
143 assert result == tmp_path
144
145 def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None:
146 result = find_repo_root(tmp_path)
147 assert result is None
148
149 def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
150 (tmp_path / ".muse").mkdir()
151 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
152 result = find_repo_root()
153 assert result == tmp_path
154
155 def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
156 # tmp_path exists but has no .muse/
157 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
158 result = find_repo_root()
159 assert result is None
160
161 def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
162 import click
163 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
164 monkeypatch.chdir(tmp_path)
165 with pytest.raises(click.exceptions.Exit):
166 require_repo()
167
168
169 # ---------------------------------------------------------------------------
170 # store coverage gaps
171 # ---------------------------------------------------------------------------
172
173
174 class TestStoreGaps:
175 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
176 muse = tmp_path / ".muse"
177 for d in ("commits", "snapshots", "objects", "refs/heads"):
178 (muse / d).mkdir(parents=True)
179 (muse / "HEAD").write_text("ref: refs/heads/main\n")
180 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
181 (muse / "refs" / "heads" / "main").write_text("")
182 return tmp_path
183
184 def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None:
185 root = self._make_repo(tmp_path)
186 assert get_head_commit_id(root, "main") is None
187
188 def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None:
189 root = self._make_repo(tmp_path)
190 assert get_head_snapshot_id(root, "test-repo", "main") is None
191
192 def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None:
193 root = self._make_repo(tmp_path)
194 assert get_head_snapshot_manifest(root, "test-repo", "main") is None
195
196 def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None:
197 root = self._make_repo(tmp_path)
198 commits = get_commits_for_branch(root, "test-repo", "main")
199 assert commits == []
200
201 def _seed_chain(self, root: pathlib.Path, n: int) -> list[str]:
202 """Write a linear chain of *n* commits on ``main`` and return their IDs (newest first)."""
203 import hashlib
204 now = datetime.datetime.now(datetime.timezone.utc)
205 ids: list[str] = []
206 parent_id: str | None = None
207 for i in range(n):
208 snap_id = hashlib.sha256(f"snap-{i}".encode()).hexdigest()
209 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}))
210 commit_id = hashlib.sha256(f"commit-{i}".encode()).hexdigest()
211 commit = CommitRecord(
212 commit_id=commit_id,
213 repo_id="test-repo",
214 branch="main",
215 snapshot_id=snap_id,
216 message=f"commit {i}",
217 committed_at=now,
218 parent_commit_id=parent_id,
219 )
220 write_commit(root, commit)
221 ids.append(commit_id)
222 parent_id = commit_id
223 # HEAD points at the last (newest) commit
224 (root / ".muse" / "refs" / "heads" / "main").write_text(ids[-1])
225 ids.reverse() # newest first, matching get_commits_for_branch order
226 return ids
227
228 def test_get_commits_for_branch_max_count_stops_early(
229 self, tmp_path: pathlib.Path
230 ) -> None:
231 """max_count caps the walk — only that many commits are returned."""
232 root = self._make_repo(tmp_path)
233 all_ids = self._seed_chain(root, 5)
234
235 result = get_commits_for_branch(root, "test-repo", "main", max_count=2)
236 assert len(result) == 2
237 assert result[0].commit_id == all_ids[0]
238 assert result[1].commit_id == all_ids[1]
239
240 def test_get_commits_for_branch_max_count_zero_returns_all(
241 self, tmp_path: pathlib.Path
242 ) -> None:
243 """max_count=0 (the default) returns the full chain."""
244 root = self._make_repo(tmp_path)
245 all_ids = self._seed_chain(root, 5)
246
247 result = get_commits_for_branch(root, "test-repo", "main", max_count=0)
248 assert len(result) == 5
249 assert [c.commit_id for c in result] == all_ids
250
251 def test_get_commits_for_branch_max_count_larger_than_chain(
252 self, tmp_path: pathlib.Path
253 ) -> None:
254 """max_count larger than the chain length returns every commit without error."""
255 root = self._make_repo(tmp_path)
256 all_ids = self._seed_chain(root, 3)
257
258 result = get_commits_for_branch(root, "test-repo", "main", max_count=100)
259 assert len(result) == 3
260 assert [c.commit_id for c in result] == all_ids
261
262 def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None:
263 root = self._make_repo(tmp_path)
264 snap = SnapshotRecord(snapshot_id="s" * 64, manifest={"a.mid": "h" * 64})
265 write_snapshot(root, snap)
266 committed_at = datetime.datetime.now(datetime.timezone.utc)
267 commit = CommitRecord(
268 commit_id="c" * 64,
269 repo_id="test-repo",
270 branch="main",
271 snapshot_id="s" * 64,
272 message="test",
273 committed_at=committed_at,
274 )
275 write_commit(root, commit)
276 (root / ".muse" / "refs" / "heads" / "main").write_text("c" * 64)
277
278 result = resolve_commit_ref(root, "test-repo", "main", None)
279 assert result is not None
280 assert result.commit_id == "c" * 64
281
282 def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
283 root = self._make_repo(tmp_path)
284 assert read_commit(root, "unknown") is None
285
286 def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
287 root = self._make_repo(tmp_path)
288 assert read_snapshot(root, "unknown") is None
289
290 def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None:
291 root = self._make_repo(tmp_path)
292 assert update_commit_metadata(root, "unknown", "key", "val") is False
293
294 def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None:
295 root = self._make_repo(tmp_path)
296 tags = get_tags_for_commit(root, "test-repo", "c" * 64)
297 assert tags == []
298
299
300 # ---------------------------------------------------------------------------
301 # merge_engine coverage gaps
302 # ---------------------------------------------------------------------------
303
304
305 class TestMergeEngineCoverageGaps:
306 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
307 muse = tmp_path / ".muse"
308 muse.mkdir(parents=True)
309 return tmp_path
310
311 def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None:
312 root = self._make_repo(tmp_path)
313 # Should not raise even if MERGE_STATE.json is absent
314 clear_merge_state(root)
315
316 def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None:
317 root = self._make_repo(tmp_path)
318 # Write a real object to the store — oid must be the SHA-256 of the content.
319 content = b"resolved content"
320 oid = _sha256(content)
321 write_object(root, oid, content)
322
323 apply_resolution(root, "track.mid", oid)
324 dest = root / "track.mid"
325 assert dest.exists()
326 assert dest.read_bytes() == b"resolved content"
327
328 def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None:
329 root = self._make_repo(tmp_path)
330 with pytest.raises(FileNotFoundError):
331 apply_resolution(root, "track.mid", "0" * 64)
332
333 def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None:
334 root = self._make_repo(tmp_path)
335 (root / ".muse" / "MERGE_STATE.json").write_text("not json {{")
336 result = read_merge_state(root)
337 assert result is None
338
339 def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None:
340 root = self._make_repo(tmp_path)
341 write_merge_state(
342 root,
343 base_commit="b" * 64,
344 ours_commit="o" * 64,
345 theirs_commit="t" * 64,
346 conflict_paths=["a.mid"],
347 )
348 assert (root / ".muse" / "MERGE_STATE.json").exists()
349 clear_merge_state(root)
350 assert not (root / ".muse" / "MERGE_STATE.json").exists()