cgcardona / muse public
test_indices.py python
274 lines 10.5 KB
dfa7b7aa Add comprehensive docs and supercharged tests for Code Domain V2 (#70) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for muse/core/indices.py — optional local index layer.
2
3 Coverage
4 --------
5 SymbolHistoryEntry
6 - to_dict / from_dict round-trip.
7 - All six fields preserved.
8
9 symbol_history index
10 - save_symbol_history writes a valid JSON file.
11 - load_symbol_history reads it back correctly.
12 - load returns empty dict when file absent.
13 - load returns empty dict on corrupt JSON.
14 - Sorting: entries dict is sorted by address.
15 - Multiple addresses, multiple events per address.
16
17 hash_occurrence index
18 - save_hash_occurrence writes a valid JSON file.
19 - load_hash_occurrence reads it back correctly.
20 - load returns empty dict when file absent.
21 - load returns empty dict on corrupt JSON.
22 - Addresses within each hash entry are sorted.
23
24 index_info
25 - Reports "absent" for missing indexes.
26 - Reports "present" + correct entry count for existing indexes.
27 - Reports "corrupt" for malformed JSON.
28 - Reports both indexes.
29
30 Schema compliance
31 - schema_version == 1.
32 - updated_at is present and is a non-empty string.
33 - index field matches the index name.
34 """
35 from __future__ import annotations
36
37 import json
38 import pathlib
39
40 import pytest
41
42 from muse.core.indices import (
43 HashOccurrenceIndex,
44 SymbolHistoryEntry,
45 SymbolHistoryIndex,
46 index_info,
47 load_hash_occurrence,
48 load_symbol_history,
49 save_hash_occurrence,
50 save_symbol_history,
51 )
52
53
54 # ---------------------------------------------------------------------------
55 # SymbolHistoryEntry
56 # ---------------------------------------------------------------------------
57
58
59 class TestSymbolHistoryEntry:
60 def test_to_dict_from_dict_round_trip(self) -> None:
61 entry = SymbolHistoryEntry(
62 commit_id="abc123",
63 committed_at="2026-01-01T00:00:00+00:00",
64 op="insert",
65 content_id="content_abc",
66 body_hash="body_hash_xyz",
67 signature_id="sig_id_pqr",
68 )
69 d = entry.to_dict()
70 entry2 = SymbolHistoryEntry.from_dict(d)
71 assert entry2.commit_id == "abc123"
72 assert entry2.committed_at == "2026-01-01T00:00:00+00:00"
73 assert entry2.op == "insert"
74 assert entry2.content_id == "content_abc"
75 assert entry2.body_hash == "body_hash_xyz"
76 assert entry2.signature_id == "sig_id_pqr"
77
78 def test_all_ops_preserved(self) -> None:
79 for op in ("insert", "delete", "replace", "patch"):
80 e = SymbolHistoryEntry("c", "t", op, "cid", "bh", "sig")
81 assert SymbolHistoryEntry.from_dict(e.to_dict()).op == op
82
83
84 # ---------------------------------------------------------------------------
85 # symbol_history index — save / load
86 # ---------------------------------------------------------------------------
87
88
89 class TestSymbolHistoryIndex:
90 def _make_entry(self, op: str = "insert") -> SymbolHistoryEntry:
91 return SymbolHistoryEntry(
92 commit_id="commit1",
93 committed_at="2026-01-01T00:00:00+00:00",
94 op=op,
95 content_id="cid1",
96 body_hash="bh1",
97 signature_id="sig1",
98 )
99
100 def test_save_creates_file(self, tmp_path: pathlib.Path) -> None:
101 index: SymbolHistoryIndex = {
102 "src/a.py::f": [self._make_entry()],
103 }
104 save_symbol_history(tmp_path, index)
105 path = tmp_path / ".muse" / "indices" / "symbol_history.json"
106 assert path.exists()
107
108 def test_round_trip(self, tmp_path: pathlib.Path) -> None:
109 entry = self._make_entry("replace")
110 index: SymbolHistoryIndex = {
111 "src/billing.py::compute_total": [entry],
112 }
113 save_symbol_history(tmp_path, index)
114 loaded = load_symbol_history(tmp_path)
115 assert "src/billing.py::compute_total" in loaded
116 entries = loaded["src/billing.py::compute_total"]
117 assert len(entries) == 1
118 assert entries[0].op == "replace"
119 assert entries[0].commit_id == "commit1"
120
121 def test_multiple_addresses(self, tmp_path: pathlib.Path) -> None:
122 index: SymbolHistoryIndex = {
123 "src/a.py::alpha": [self._make_entry("insert")],
124 "src/b.py::beta": [self._make_entry("insert"), self._make_entry("replace")],
125 }
126 save_symbol_history(tmp_path, index)
127 loaded = load_symbol_history(tmp_path)
128 assert len(loaded["src/a.py::alpha"]) == 1
129 assert len(loaded["src/b.py::beta"]) == 2
130
131 def test_load_absent_returns_empty(self, tmp_path: pathlib.Path) -> None:
132 result = load_symbol_history(tmp_path)
133 assert result == {}
134
135 def test_load_corrupt_returns_empty(self, tmp_path: pathlib.Path) -> None:
136 indices_dir = tmp_path / ".muse" / "indices"
137 indices_dir.mkdir(parents=True, exist_ok=True)
138 (indices_dir / "symbol_history.json").write_text("{not valid json")
139 result = load_symbol_history(tmp_path)
140 assert result == {}
141
142 def test_schema_compliance(self, tmp_path: pathlib.Path) -> None:
143 index: SymbolHistoryIndex = {"x.py::f": [self._make_entry()]}
144 save_symbol_history(tmp_path, index)
145 raw = json.loads((tmp_path / ".muse" / "indices" / "symbol_history.json").read_text())
146 assert raw["schema_version"] == 1
147 assert raw["index"] == "symbol_history"
148 assert raw["updated_at"] # non-empty string
149 assert "x.py::f" in raw["entries"]
150
151 def test_empty_index_saved(self, tmp_path: pathlib.Path) -> None:
152 save_symbol_history(tmp_path, {})
153 loaded = load_symbol_history(tmp_path)
154 assert loaded == {}
155
156 def test_entries_sorted_by_address(self, tmp_path: pathlib.Path) -> None:
157 index: SymbolHistoryIndex = {
158 "z.py::z": [self._make_entry()],
159 "a.py::a": [self._make_entry()],
160 "m.py::m": [self._make_entry()],
161 }
162 save_symbol_history(tmp_path, index)
163 raw = json.loads((tmp_path / ".muse" / "indices" / "symbol_history.json").read_text())
164 keys = list(raw["entries"].keys())
165 assert keys == sorted(keys)
166
167
168 # ---------------------------------------------------------------------------
169 # hash_occurrence index — save / load
170 # ---------------------------------------------------------------------------
171
172
173 class TestHashOccurrenceIndex:
174 def test_save_creates_file(self, tmp_path: pathlib.Path) -> None:
175 index: HashOccurrenceIndex = {
176 "deadbeef": ["src/a.py::f", "src/b.py::g"],
177 }
178 save_hash_occurrence(tmp_path, index)
179 path = tmp_path / ".muse" / "indices" / "hash_occurrence.json"
180 assert path.exists()
181
182 def test_round_trip(self, tmp_path: pathlib.Path) -> None:
183 index: HashOccurrenceIndex = {
184 "abc123": ["src/a.py::f", "src/b.py::g"],
185 "def456": ["src/c.py::h"],
186 }
187 save_hash_occurrence(tmp_path, index)
188 loaded = load_hash_occurrence(tmp_path)
189 assert "abc123" in loaded
190 assert set(loaded["abc123"]) == {"src/a.py::f", "src/b.py::g"}
191 assert loaded["def456"] == ["src/c.py::h"]
192
193 def test_addresses_sorted_within_hash(self, tmp_path: pathlib.Path) -> None:
194 index: HashOccurrenceIndex = {
195 "hash1": ["z.py::z", "a.py::a", "m.py::m"],
196 }
197 save_hash_occurrence(tmp_path, index)
198 raw = json.loads((tmp_path / ".muse" / "indices" / "hash_occurrence.json").read_text())
199 addrs = raw["entries"]["hash1"]
200 assert addrs == sorted(addrs)
201
202 def test_hashes_sorted(self, tmp_path: pathlib.Path) -> None:
203 index: HashOccurrenceIndex = {
204 "zzz": ["a.py::f"],
205 "aaa": ["b.py::g"],
206 }
207 save_hash_occurrence(tmp_path, index)
208 raw = json.loads((tmp_path / ".muse" / "indices" / "hash_occurrence.json").read_text())
209 keys = list(raw["entries"].keys())
210 assert keys == sorted(keys)
211
212 def test_load_absent_returns_empty(self, tmp_path: pathlib.Path) -> None:
213 assert load_hash_occurrence(tmp_path) == {}
214
215 def test_load_corrupt_returns_empty(self, tmp_path: pathlib.Path) -> None:
216 indices_dir = tmp_path / ".muse" / "indices"
217 indices_dir.mkdir(parents=True, exist_ok=True)
218 (indices_dir / "hash_occurrence.json").write_text("not json at all")
219 assert load_hash_occurrence(tmp_path) == {}
220
221 def test_schema_compliance(self, tmp_path: pathlib.Path) -> None:
222 save_hash_occurrence(tmp_path, {"h": ["a.py::f"]})
223 raw = json.loads((tmp_path / ".muse" / "indices" / "hash_occurrence.json").read_text())
224 assert raw["schema_version"] == 1
225 assert raw["index"] == "hash_occurrence"
226 assert raw["updated_at"]
227
228 def test_empty_index(self, tmp_path: pathlib.Path) -> None:
229 save_hash_occurrence(tmp_path, {})
230 assert load_hash_occurrence(tmp_path) == {}
231
232
233 # ---------------------------------------------------------------------------
234 # index_info
235 # ---------------------------------------------------------------------------
236
237
238 class TestIndexInfo:
239 def test_both_absent(self, tmp_path: pathlib.Path) -> None:
240 info = index_info(tmp_path)
241 assert len(info) == 2
242 names = {i["name"] for i in info}
243 assert names == {"symbol_history", "hash_occurrence"}
244 for item in info:
245 assert item["status"] == "absent"
246
247 def test_symbol_history_present(self, tmp_path: pathlib.Path) -> None:
248 entry = SymbolHistoryEntry("c", "t", "insert", "cid", "bh", "sig")
249 save_symbol_history(tmp_path, {"a.py::f": [entry], "b.py::g": [entry]})
250 info = index_info(tmp_path)
251 sh = next(i for i in info if i["name"] == "symbol_history")
252 assert sh["status"] == "present"
253 assert sh["entries"] == "2"
254
255 def test_hash_occurrence_present(self, tmp_path: pathlib.Path) -> None:
256 save_hash_occurrence(tmp_path, {"h1": ["a.py::f"], "h2": ["b.py::g"]})
257 info = index_info(tmp_path)
258 ho = next(i for i in info if i["name"] == "hash_occurrence")
259 assert ho["status"] == "present"
260 assert ho["entries"] == "2"
261
262 def test_corrupt_index_reported(self, tmp_path: pathlib.Path) -> None:
263 indices_dir = tmp_path / ".muse" / "indices"
264 indices_dir.mkdir(parents=True, exist_ok=True)
265 (indices_dir / "symbol_history.json").write_text("{bad")
266 info = index_info(tmp_path)
267 sh = next(i for i in info if i["name"] == "symbol_history")
268 assert sh["status"] == "corrupt"
269
270 def test_updated_at_present_when_index_exists(self, tmp_path: pathlib.Path) -> None:
271 save_hash_occurrence(tmp_path, {"h": ["f.py::x"]})
272 info = index_info(tmp_path)
273 ho = next(i for i in info if i["name"] == "hash_occurrence")
274 assert ho["updated_at"] # non-empty string