gabriel / muse public
test_plumbing_stress.py python
444 lines 15.5 KB
99746394 feat(tests+docs): supercharge plumbing test suite and update reference doc Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Stress and scale tests for the Muse plumbing layer.
2
3 These tests exercise plumbing commands at a scale that would reveal
4 O(n²) performance regressions, memory leaks, and missing edge-case
5 handling. Every test in this module is designed to complete in under
6 10 seconds on a modern laptop when running from an in-memory temp
7 directory — if any test consistently takes longer, it signals a
8 performance regression worth investigating.
9
10 Scenarios:
11 - commit-graph BFS on a 500-commit linear history
12 - merge-base on a 300-deep dag (shared ancestor at the root)
13 - name-rev multi-source BFS on a 200-commit diamond graph
14 - snapshot-diff on manifests with 2000 files each
15 - verify-object on 200 objects
16 - ls-files on a 2000-file snapshot
17 - for-each-ref on 100 branches
18 - show-ref on 100 branches
19 - pack-objects → unpack-objects with 100 commits and 100 objects
20 - read-commit on 200 sequential commits
21 """
22
23 from __future__ import annotations
24
25 import datetime
26 import hashlib
27 import json
28 import pathlib
29
30 from typer.testing import CliRunner
31
32 from muse.cli.app import cli
33 from muse.core.object_store import write_object
34 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
35
36 runner = CliRunner()
37
38
39 # ---------------------------------------------------------------------------
40 # Helpers
41 # ---------------------------------------------------------------------------
42
43
44 def _sha(tag: str) -> str:
45 return hashlib.sha256(tag.encode()).hexdigest()
46
47
48 def _sha_bytes(data: bytes) -> str:
49 return hashlib.sha256(data).hexdigest()
50
51
52 def _init_repo(path: pathlib.Path) -> pathlib.Path:
53 muse = path / ".muse"
54 (muse / "commits").mkdir(parents=True)
55 (muse / "snapshots").mkdir(parents=True)
56 (muse / "objects").mkdir(parents=True)
57 (muse / "refs" / "heads").mkdir(parents=True)
58 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
59 (muse / "repo.json").write_text(
60 json.dumps({"repo_id": "stress-repo", "domain": "midi"}), encoding="utf-8"
61 )
62 return path
63
64
65 def _env(repo: pathlib.Path) -> dict[str, str]:
66 return {"MUSE_REPO_ROOT": str(repo)}
67
68
69 def _snap(repo: pathlib.Path, manifest: dict[str, str] | None = None, tag: str = "s") -> str:
70 m = manifest or {}
71 sid = _sha(f"snap-{tag}")
72 write_snapshot(
73 repo,
74 SnapshotRecord(
75 snapshot_id=sid,
76 manifest=m,
77 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
78 ),
79 )
80 return sid
81
82
83 def _commit_raw(
84 repo: pathlib.Path,
85 cid: str,
86 sid: str,
87 message: str,
88 branch: str = "main",
89 parent: str | None = None,
90 parent2: str | None = None,
91 ) -> None:
92 write_commit(
93 repo,
94 CommitRecord(
95 commit_id=cid,
96 repo_id="stress-repo",
97 branch=branch,
98 snapshot_id=sid,
99 message=message,
100 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
101 author="stress-tester",
102 parent_commit_id=parent,
103 parent2_commit_id=parent2,
104 ),
105 )
106
107
108 def _set_branch(repo: pathlib.Path, branch: str, cid: str) -> None:
109 ref = repo / ".muse" / "refs" / "heads" / branch
110 ref.parent.mkdir(parents=True, exist_ok=True)
111 ref.write_text(cid, encoding="utf-8")
112
113
114 def _linear_chain(repo: pathlib.Path, n: int, sid: str, branch: str = "main") -> list[str]:
115 """Build a linear chain of n commits. Returns list root→tip."""
116 cids: list[str] = []
117 parent: str | None = None
118 for i in range(n):
119 cid = _sha(f"linear-{branch}-{i}")
120 _commit_raw(repo, cid, sid, f"commit {i}", branch=branch, parent=parent)
121 cids.append(cid)
122 parent = cid
123 _set_branch(repo, branch, cids[-1])
124 return cids
125
126
127 def _obj(repo: pathlib.Path, tag: str) -> str:
128 content = tag.encode()
129 oid = _sha_bytes(content)
130 write_object(repo, oid, content)
131 return oid
132
133
134 # ---------------------------------------------------------------------------
135 # Stress: commit-graph
136 # ---------------------------------------------------------------------------
137
138
139 class TestCommitGraphStress:
140 def test_500_commit_linear_chain_full_traversal(self, tmp_path: pathlib.Path) -> None:
141 repo = _init_repo(tmp_path)
142 sid = _snap(repo)
143 cids = _linear_chain(repo, 500, sid)
144 result = runner.invoke(cli, ["plumbing", "commit-graph"], env=_env(repo))
145 assert result.exit_code == 0, result.output
146 data = json.loads(result.stdout)
147 assert data["count"] == 500
148 assert data["truncated"] is False
149
150 def test_500_commit_chain_stop_at_midpoint(self, tmp_path: pathlib.Path) -> None:
151 repo = _init_repo(tmp_path)
152 sid = _snap(repo)
153 cids = _linear_chain(repo, 500, sid)
154 result = runner.invoke(
155 cli,
156 ["plumbing", "commit-graph", "--tip", cids[499], "--stop-at", cids[249]],
157 env=_env(repo),
158 )
159 assert result.exit_code == 0
160 data = json.loads(result.stdout)
161 assert data["count"] == 250
162
163 def test_count_flag_on_500_commits(self, tmp_path: pathlib.Path) -> None:
164 repo = _init_repo(tmp_path)
165 sid = _snap(repo)
166 _linear_chain(repo, 500, sid)
167 result = runner.invoke(cli, ["plumbing", "commit-graph", "--count"], env=_env(repo))
168 assert result.exit_code == 0
169 data = json.loads(result.stdout)
170 assert data["count"] == 500
171 assert "commits" not in data # --count suppresses node list
172
173
174 # ---------------------------------------------------------------------------
175 # Stress: merge-base
176 # ---------------------------------------------------------------------------
177
178
179 class TestMergeBaseStress:
180 def test_merge_base_300_deep_shared_root(self, tmp_path: pathlib.Path) -> None:
181 repo = _init_repo(tmp_path)
182 sid = _snap(repo)
183
184 # Shared root
185 root_cid = _sha("shared-root")
186 _commit_raw(repo, root_cid, sid, "root")
187
188 # Two 150-commit chains from the same root
189 main_chain = [root_cid]
190 feat_chain = [root_cid]
191 for i in range(150):
192 mc = _sha(f"main-{i}")
193 _commit_raw(repo, mc, sid, f"main-{i}", branch="main", parent=main_chain[-1])
194 main_chain.append(mc)
195 fc = _sha(f"feat-{i}")
196 _commit_raw(repo, fc, sid, f"feat-{i}", branch="feat", parent=feat_chain[-1])
197 feat_chain.append(fc)
198
199 _set_branch(repo, "main", main_chain[-1])
200 _set_branch(repo, "feat", feat_chain[-1])
201 (repo / ".muse" / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
202
203 result = runner.invoke(
204 cli, ["plumbing", "merge-base", "main", "feat"], env=_env(repo)
205 )
206 assert result.exit_code == 0
207 data = json.loads(result.stdout)
208 assert data["merge_base"] == root_cid
209
210
211 # ---------------------------------------------------------------------------
212 # Stress: name-rev
213 # ---------------------------------------------------------------------------
214
215
216 class TestNameRevStress:
217 def test_name_rev_200_commit_chain_all_named(self, tmp_path: pathlib.Path) -> None:
218 repo = _init_repo(tmp_path)
219 sid = _snap(repo)
220 cids = _linear_chain(repo, 200, sid)
221
222 result = runner.invoke(cli, ["plumbing", "name-rev", *cids], env=_env(repo))
223 assert result.exit_code == 0
224 data = json.loads(result.stdout)
225 assert len(data["results"]) == 200
226 for entry in data["results"]:
227 assert not entry["undefined"]
228
229 def test_name_rev_tip_has_no_tilde_suffix(self, tmp_path: pathlib.Path) -> None:
230 """distance=0 means the tip is the branch tip itself; name is bare branch name."""
231 repo = _init_repo(tmp_path)
232 sid = _snap(repo)
233 cids = _linear_chain(repo, 10, sid)
234 tip = cids[-1]
235
236 result = runner.invoke(cli, ["plumbing", "name-rev", tip], env=_env(repo))
237 assert result.exit_code == 0
238 entry = json.loads(result.stdout)["results"][0]
239 # name-rev emits "<branch>" (no ~0) for the exact branch tip.
240 assert entry["name"] == "main"
241 assert entry["distance"] == 0
242
243
244 # ---------------------------------------------------------------------------
245 # Stress: snapshot-diff
246 # ---------------------------------------------------------------------------
247
248
249 class TestSnapshotDiffStress:
250 def test_diff_2000_file_manifests(self, tmp_path: pathlib.Path) -> None:
251 repo = _init_repo(tmp_path)
252 oid = _sha("shared-blob")
253
254 # Manifest A: 2000 files
255 manifest_a = {f"track_{i:04d}.mid": oid for i in range(2000)}
256 # Manifest B: same 2000 files but first 200 have new IDs (modified)
257 new_oid = _sha("new-blob")
258 manifest_b = {f"track_{i:04d}.mid": (new_oid if i < 200 else oid) for i in range(2000)}
259
260 sid_a = _sha("big-snap-a")
261 sid_b = _sha("big-snap-b")
262 write_snapshot(
263 repo,
264 SnapshotRecord(
265 snapshot_id=sid_a,
266 manifest=manifest_a,
267 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
268 ),
269 )
270 write_snapshot(
271 repo,
272 SnapshotRecord(
273 snapshot_id=sid_b,
274 manifest=manifest_b,
275 created_at=datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc),
276 ),
277 )
278
279 result = runner.invoke(cli, ["plumbing", "snapshot-diff", sid_a, sid_b], env=_env(repo))
280 assert result.exit_code == 0
281 data = json.loads(result.stdout)
282 assert data["total_changes"] == 200
283 assert len(data["modified"]) == 200
284 assert data["added"] == []
285 assert data["deleted"] == []
286
287
288 # ---------------------------------------------------------------------------
289 # Stress: verify-object
290 # ---------------------------------------------------------------------------
291
292
293 class TestVerifyObjectStress:
294 def test_200_objects_all_verified(self, tmp_path: pathlib.Path) -> None:
295 repo = _init_repo(tmp_path)
296 oids = [_obj(repo, f"stress-obj-{i}") for i in range(200)]
297 result = runner.invoke(cli, ["plumbing", "verify-object", *oids], env=_env(repo))
298 assert result.exit_code == 0
299 data = json.loads(result.stdout)
300 assert data["all_ok"] is True
301 assert data["checked"] == 200
302 assert data["failed"] == 0
303
304 def test_verify_1mib_object_no_crash(self, tmp_path: pathlib.Path) -> None:
305 repo = _init_repo(tmp_path)
306 content = b"Z" * (1024 * 1024)
307 oid = _sha_bytes(content)
308 write_object(repo, oid, content)
309 result = runner.invoke(cli, ["plumbing", "verify-object", oid], env=_env(repo))
310 assert result.exit_code == 0
311 assert json.loads(result.stdout)["all_ok"] is True
312
313
314 # ---------------------------------------------------------------------------
315 # Stress: ls-files
316 # ---------------------------------------------------------------------------
317
318
319 class TestLsFilesStress:
320 def test_ls_files_2000_file_snapshot(self, tmp_path: pathlib.Path) -> None:
321 repo = _init_repo(tmp_path)
322 oid = _sha("common-oid")
323 manifest = {f"track_{i:04d}.mid": oid for i in range(2000)}
324 sid = _snap(repo, manifest, "big")
325 cid = _sha("big-commit")
326 _commit_raw(repo, cid, sid, "big manifest", branch="main")
327 _set_branch(repo, "main", cid)
328
329 result = runner.invoke(cli, ["plumbing", "ls-files"], env=_env(repo))
330 assert result.exit_code == 0
331 data = json.loads(result.stdout)
332 assert data["file_count"] == 2000
333
334
335 # ---------------------------------------------------------------------------
336 # Stress: for-each-ref and show-ref
337 # ---------------------------------------------------------------------------
338
339
340 class TestRefCommandsStress:
341 def _build_100_branches(self, repo: pathlib.Path) -> None:
342 sid = _snap(repo, tag="multi-branch")
343 for i in range(100):
344 branch = f"feature-{i:03d}"
345 cid = _sha(f"branch-tip-{i}")
346 _commit_raw(repo, cid, sid, f"tip of {branch}", branch=branch)
347 _set_branch(repo, branch, cid)
348
349 def test_for_each_ref_100_branches(self, tmp_path: pathlib.Path) -> None:
350 repo = _init_repo(tmp_path)
351 self._build_100_branches(repo)
352 result = runner.invoke(cli, ["plumbing", "for-each-ref"], env=_env(repo))
353 assert result.exit_code == 0
354 data = json.loads(result.stdout)
355 assert len(data["refs"]) == 100
356
357 def test_show_ref_100_branches(self, tmp_path: pathlib.Path) -> None:
358 repo = _init_repo(tmp_path)
359 self._build_100_branches(repo)
360 result = runner.invoke(cli, ["plumbing", "show-ref"], env=_env(repo))
361 assert result.exit_code == 0
362 data = json.loads(result.stdout)
363 assert data["count"] == 100
364
365 def test_for_each_ref_pattern_filter_on_100(self, tmp_path: pathlib.Path) -> None:
366 repo = _init_repo(tmp_path)
367 self._build_100_branches(repo)
368 result = runner.invoke(
369 cli,
370 ["plumbing", "for-each-ref", "--pattern", "refs/heads/feature-00*"],
371 env=_env(repo),
372 )
373 assert result.exit_code == 0
374 data = json.loads(result.stdout)
375 # feature-000 through feature-009 = 10 branches
376 assert len(data["refs"]) == 10
377
378
379 # ---------------------------------------------------------------------------
380 # Stress: pack-objects → unpack-objects
381 # ---------------------------------------------------------------------------
382
383
384 class TestPackUnpackStress:
385 def test_100_commit_100_object_round_trip(self, tmp_path: pathlib.Path) -> None:
386 from muse.core.object_store import has_object
387 from muse.core.store import read_commit
388
389 src = _init_repo(tmp_path / "src")
390 dst = _init_repo(tmp_path / "dst")
391
392 # Build 100 objects
393 oids = [_obj(src, f"blob-{i}") for i in range(100)]
394 manifest = {f"f{i}.mid": oids[i] for i in range(100)}
395 sid = _snap(src, manifest, "big-pack")
396
397 # Build 100-commit linear chain referencing that snapshot
398 parent: str | None = None
399 cids: list[str] = []
400 for i in range(100):
401 cid = _sha(f"pack-commit-{i}")
402 _commit_raw(src, cid, sid, f"pack-{i}", parent=parent)
403 cids.append(cid)
404 parent = cid
405 _set_branch(src, "main", cids[-1])
406
407 # Pack tip → unpack into dst
408 pack_result = runner.invoke(
409 cli, ["plumbing", "pack-objects", cids[-1]], env=_env(src)
410 )
411 assert pack_result.exit_code == 0
412
413 unpack_result = runner.invoke(
414 cli,
415 ["plumbing", "unpack-objects"],
416 input=pack_result.stdout,
417 env=_env(dst),
418 )
419 assert unpack_result.exit_code == 0
420 counts = json.loads(unpack_result.stdout)
421 assert counts["commits_written"] == 100
422 assert counts["objects_written"] == 100
423
424 for cid in cids:
425 assert read_commit(dst, cid) is not None
426 for oid in oids:
427 assert has_object(dst, oid)
428
429
430 # ---------------------------------------------------------------------------
431 # Stress: read-commit sequential
432 # ---------------------------------------------------------------------------
433
434
435 class TestReadCommitStress:
436 def test_200_commits_all_readable(self, tmp_path: pathlib.Path) -> None:
437 repo = _init_repo(tmp_path)
438 sid = _snap(repo)
439 cids = _linear_chain(repo, 200, sid)
440 for cid in cids:
441 result = runner.invoke(cli, ["plumbing", "read-commit", cid], env=_env(repo))
442 assert result.exit_code == 0
443 data = json.loads(result.stdout)
444 assert data["commit_id"] == cid