gabriel / muse public
test_plumbing_stress.py python
444 lines 15.6 KB
dec4604a feat(mwp): replace JSON+base64 wire protocol with MWP binary msgpack Gabriel Cardona <gabriel@tellurstori.com> 13h ago
1 """Stress and scale tests for the Muse plumbing layer.
2
3 These tests exercise plumbing commands at a scale that would reveal
4 O(n²) performance regressions, memory leaks, and missing edge-case
5 handling. Every test in this module is designed to complete in under
6 10 seconds on a modern laptop when running from an in-memory temp
7 directory — if any test consistently takes longer, it signals a
8 performance regression worth investigating.
9
10 Scenarios:
11 - commit-graph BFS on a 500-commit linear history
12 - merge-base on a 300-deep dag (shared ancestor at the root)
13 - name-rev multi-source BFS on a 200-commit diamond graph
14 - snapshot-diff on manifests with 2000 files each
15 - verify-object on 200 objects
16 - ls-files on a 2000-file snapshot
17 - for-each-ref on 100 branches
18 - show-ref on 100 branches
19 - pack-objects → unpack-objects with 100 commits and 100 objects
20 - read-commit on 200 sequential commits
21 """
22
23 from __future__ import annotations
24
25 import datetime
26 import hashlib
27 import json
28 import pathlib
29
30 from tests.cli_test_helper import CliRunner
31
32 cli = None # argparse migration — CliRunner ignores this arg
33 from muse.core.object_store import write_object
34 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
35
36 runner = CliRunner()
37
38
39 # ---------------------------------------------------------------------------
40 # Helpers
41 # ---------------------------------------------------------------------------
42
43
44 def _sha(tag: str) -> str:
45 return hashlib.sha256(tag.encode()).hexdigest()
46
47
48 def _sha_bytes(data: bytes) -> str:
49 return hashlib.sha256(data).hexdigest()
50
51
52 def _init_repo(path: pathlib.Path) -> pathlib.Path:
53 muse = path / ".muse"
54 (muse / "commits").mkdir(parents=True)
55 (muse / "snapshots").mkdir(parents=True)
56 (muse / "objects").mkdir(parents=True)
57 (muse / "refs" / "heads").mkdir(parents=True)
58 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
59 (muse / "repo.json").write_text(
60 json.dumps({"repo_id": "stress-repo", "domain": "midi"}), encoding="utf-8"
61 )
62 return path
63
64
65 def _env(repo: pathlib.Path) -> dict[str, str]:
66 return {"MUSE_REPO_ROOT": str(repo)}
67
68
69 def _snap(repo: pathlib.Path, manifest: dict[str, str] | None = None, tag: str = "s") -> str:
70 m = manifest or {}
71 sid = _sha(f"snap-{tag}")
72 write_snapshot(
73 repo,
74 SnapshotRecord(
75 snapshot_id=sid,
76 manifest=m,
77 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
78 ),
79 )
80 return sid
81
82
83 def _commit_raw(
84 repo: pathlib.Path,
85 cid: str,
86 sid: str,
87 message: str,
88 branch: str = "main",
89 parent: str | None = None,
90 parent2: str | None = None,
91 ) -> None:
92 write_commit(
93 repo,
94 CommitRecord(
95 commit_id=cid,
96 repo_id="stress-repo",
97 branch=branch,
98 snapshot_id=sid,
99 message=message,
100 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
101 author="stress-tester",
102 parent_commit_id=parent,
103 parent2_commit_id=parent2,
104 ),
105 )
106
107
108 def _set_branch(repo: pathlib.Path, branch: str, cid: str) -> None:
109 ref = repo / ".muse" / "refs" / "heads" / branch
110 ref.parent.mkdir(parents=True, exist_ok=True)
111 ref.write_text(cid, encoding="utf-8")
112
113
114 def _linear_chain(repo: pathlib.Path, n: int, sid: str, branch: str = "main") -> list[str]:
115 """Build a linear chain of n commits. Returns list root→tip."""
116 cids: list[str] = []
117 parent: str | None = None
118 for i in range(n):
119 cid = _sha(f"linear-{branch}-{i}")
120 _commit_raw(repo, cid, sid, f"commit {i}", branch=branch, parent=parent)
121 cids.append(cid)
122 parent = cid
123 _set_branch(repo, branch, cids[-1])
124 return cids
125
126
127 def _obj(repo: pathlib.Path, tag: str) -> str:
128 content = tag.encode()
129 oid = _sha_bytes(content)
130 write_object(repo, oid, content)
131 return oid
132
133
134 # ---------------------------------------------------------------------------
135 # Stress: commit-graph
136 # ---------------------------------------------------------------------------
137
138
139 class TestCommitGraphStress:
140 def test_500_commit_linear_chain_full_traversal(self, tmp_path: pathlib.Path) -> None:
141 repo = _init_repo(tmp_path)
142 sid = _snap(repo)
143 cids = _linear_chain(repo, 500, sid)
144 result = runner.invoke(cli, ["plumbing", "commit-graph"], env=_env(repo))
145 assert result.exit_code == 0, result.output
146 data = json.loads(result.stdout)
147 assert data["count"] == 500
148 assert data["truncated"] is False
149
150 def test_500_commit_chain_stop_at_midpoint(self, tmp_path: pathlib.Path) -> None:
151 repo = _init_repo(tmp_path)
152 sid = _snap(repo)
153 cids = _linear_chain(repo, 500, sid)
154 result = runner.invoke(
155 cli,
156 ["plumbing", "commit-graph", "--tip", cids[499], "--stop-at", cids[249]],
157 env=_env(repo),
158 )
159 assert result.exit_code == 0
160 data = json.loads(result.stdout)
161 assert data["count"] == 250
162
163 def test_count_flag_on_500_commits(self, tmp_path: pathlib.Path) -> None:
164 repo = _init_repo(tmp_path)
165 sid = _snap(repo)
166 _linear_chain(repo, 500, sid)
167 result = runner.invoke(cli, ["plumbing", "commit-graph", "--count"], env=_env(repo))
168 assert result.exit_code == 0
169 data = json.loads(result.stdout)
170 assert data["count"] == 500
171 assert "commits" not in data # --count suppresses node list
172
173
174 # ---------------------------------------------------------------------------
175 # Stress: merge-base
176 # ---------------------------------------------------------------------------
177
178
179 class TestMergeBaseStress:
180 def test_merge_base_300_deep_shared_root(self, tmp_path: pathlib.Path) -> None:
181 repo = _init_repo(tmp_path)
182 sid = _snap(repo)
183
184 # Shared root
185 root_cid = _sha("shared-root")
186 _commit_raw(repo, root_cid, sid, "root")
187
188 # Two 150-commit chains from the same root
189 main_chain = [root_cid]
190 feat_chain = [root_cid]
191 for i in range(150):
192 mc = _sha(f"main-{i}")
193 _commit_raw(repo, mc, sid, f"main-{i}", branch="main", parent=main_chain[-1])
194 main_chain.append(mc)
195 fc = _sha(f"feat-{i}")
196 _commit_raw(repo, fc, sid, f"feat-{i}", branch="feat", parent=feat_chain[-1])
197 feat_chain.append(fc)
198
199 _set_branch(repo, "main", main_chain[-1])
200 _set_branch(repo, "feat", feat_chain[-1])
201 (repo / ".muse" / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
202
203 result = runner.invoke(
204 cli, ["plumbing", "merge-base", "main", "feat"], env=_env(repo)
205 )
206 assert result.exit_code == 0
207 data = json.loads(result.stdout)
208 assert data["merge_base"] == root_cid
209
210
211 # ---------------------------------------------------------------------------
212 # Stress: name-rev
213 # ---------------------------------------------------------------------------
214
215
216 class TestNameRevStress:
217 def test_name_rev_200_commit_chain_all_named(self, tmp_path: pathlib.Path) -> None:
218 repo = _init_repo(tmp_path)
219 sid = _snap(repo)
220 cids = _linear_chain(repo, 200, sid)
221
222 result = runner.invoke(cli, ["plumbing", "name-rev", *cids], env=_env(repo))
223 assert result.exit_code == 0
224 data = json.loads(result.stdout)
225 assert len(data["results"]) == 200
226 for entry in data["results"]:
227 assert not entry["undefined"]
228
229 def test_name_rev_tip_has_no_tilde_suffix(self, tmp_path: pathlib.Path) -> None:
230 """distance=0 means the tip is the branch tip itself; name is bare branch name."""
231 repo = _init_repo(tmp_path)
232 sid = _snap(repo)
233 cids = _linear_chain(repo, 10, sid)
234 tip = cids[-1]
235
236 result = runner.invoke(cli, ["plumbing", "name-rev", tip], env=_env(repo))
237 assert result.exit_code == 0
238 entry = json.loads(result.stdout)["results"][0]
239 # name-rev emits "<branch>" (no ~0) for the exact branch tip.
240 assert entry["name"] == "main"
241 assert entry["distance"] == 0
242
243
244 # ---------------------------------------------------------------------------
245 # Stress: snapshot-diff
246 # ---------------------------------------------------------------------------
247
248
249 class TestSnapshotDiffStress:
250 def test_diff_2000_file_manifests(self, tmp_path: pathlib.Path) -> None:
251 repo = _init_repo(tmp_path)
252 oid = _sha("shared-blob")
253
254 # Manifest A: 2000 files
255 manifest_a = {f"track_{i:04d}.mid": oid for i in range(2000)}
256 # Manifest B: same 2000 files but first 200 have new IDs (modified)
257 new_oid = _sha("new-blob")
258 manifest_b = {f"track_{i:04d}.mid": (new_oid if i < 200 else oid) for i in range(2000)}
259
260 sid_a = _sha("big-snap-a")
261 sid_b = _sha("big-snap-b")
262 write_snapshot(
263 repo,
264 SnapshotRecord(
265 snapshot_id=sid_a,
266 manifest=manifest_a,
267 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
268 ),
269 )
270 write_snapshot(
271 repo,
272 SnapshotRecord(
273 snapshot_id=sid_b,
274 manifest=manifest_b,
275 created_at=datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc),
276 ),
277 )
278
279 result = runner.invoke(cli, ["plumbing", "snapshot-diff", sid_a, sid_b], env=_env(repo))
280 assert result.exit_code == 0
281 data = json.loads(result.stdout)
282 assert data["total_changes"] == 200
283 assert len(data["modified"]) == 200
284 assert data["added"] == []
285 assert data["deleted"] == []
286
287
288 # ---------------------------------------------------------------------------
289 # Stress: verify-object
290 # ---------------------------------------------------------------------------
291
292
293 class TestVerifyObjectStress:
294 def test_200_objects_all_verified(self, tmp_path: pathlib.Path) -> None:
295 repo = _init_repo(tmp_path)
296 oids = [_obj(repo, f"stress-obj-{i}") for i in range(200)]
297 result = runner.invoke(cli, ["plumbing", "verify-object", *oids], env=_env(repo))
298 assert result.exit_code == 0
299 data = json.loads(result.stdout)
300 assert data["all_ok"] is True
301 assert data["checked"] == 200
302 assert data["failed"] == 0
303
304 def test_verify_1mib_object_no_crash(self, tmp_path: pathlib.Path) -> None:
305 repo = _init_repo(tmp_path)
306 content = b"Z" * (1024 * 1024)
307 oid = _sha_bytes(content)
308 write_object(repo, oid, content)
309 result = runner.invoke(cli, ["plumbing", "verify-object", oid], env=_env(repo))
310 assert result.exit_code == 0
311 assert json.loads(result.stdout)["all_ok"] is True
312
313
314 # ---------------------------------------------------------------------------
315 # Stress: ls-files
316 # ---------------------------------------------------------------------------
317
318
319 class TestLsFilesStress:
320 def test_ls_files_2000_file_snapshot(self, tmp_path: pathlib.Path) -> None:
321 repo = _init_repo(tmp_path)
322 oid = _sha("common-oid")
323 manifest = {f"track_{i:04d}.mid": oid for i in range(2000)}
324 sid = _snap(repo, manifest, "big")
325 cid = _sha("big-commit")
326 _commit_raw(repo, cid, sid, "big manifest", branch="main")
327 _set_branch(repo, "main", cid)
328
329 result = runner.invoke(cli, ["plumbing", "ls-files"], env=_env(repo))
330 assert result.exit_code == 0
331 data = json.loads(result.stdout)
332 assert data["file_count"] == 2000
333
334
335 # ---------------------------------------------------------------------------
336 # Stress: for-each-ref and show-ref
337 # ---------------------------------------------------------------------------
338
339
340 class TestRefCommandsStress:
341 def _build_100_branches(self, repo: pathlib.Path) -> None:
342 sid = _snap(repo, tag="multi-branch")
343 for i in range(100):
344 branch = f"feature-{i:03d}"
345 cid = _sha(f"branch-tip-{i}")
346 _commit_raw(repo, cid, sid, f"tip of {branch}", branch=branch)
347 _set_branch(repo, branch, cid)
348
349 def test_for_each_ref_100_branches(self, tmp_path: pathlib.Path) -> None:
350 repo = _init_repo(tmp_path)
351 self._build_100_branches(repo)
352 result = runner.invoke(cli, ["plumbing", "for-each-ref"], env=_env(repo))
353 assert result.exit_code == 0
354 data = json.loads(result.stdout)
355 assert len(data["refs"]) == 100
356
357 def test_show_ref_100_branches(self, tmp_path: pathlib.Path) -> None:
358 repo = _init_repo(tmp_path)
359 self._build_100_branches(repo)
360 result = runner.invoke(cli, ["plumbing", "show-ref"], env=_env(repo))
361 assert result.exit_code == 0
362 data = json.loads(result.stdout)
363 assert data["count"] == 100
364
365 def test_for_each_ref_pattern_filter_on_100(self, tmp_path: pathlib.Path) -> None:
366 repo = _init_repo(tmp_path)
367 self._build_100_branches(repo)
368 result = runner.invoke(
369 cli,
370 ["plumbing", "for-each-ref", "--pattern", "refs/heads/feature-00*"],
371 env=_env(repo),
372 )
373 assert result.exit_code == 0
374 data = json.loads(result.stdout)
375 # feature-000 through feature-009 = 10 branches
376 assert len(data["refs"]) == 10
377
378
379 # ---------------------------------------------------------------------------
380 # Stress: pack-objects → unpack-objects
381 # ---------------------------------------------------------------------------
382
383
384 class TestPackUnpackStress:
385 def test_100_commit_100_object_round_trip(self, tmp_path: pathlib.Path) -> None:
386 from muse.core.object_store import has_object
387 from muse.core.store import read_commit
388
389 src = _init_repo(tmp_path / "src")
390 dst = _init_repo(tmp_path / "dst")
391
392 # Build 100 objects
393 oids = [_obj(src, f"blob-{i}") for i in range(100)]
394 manifest = {f"f{i}.mid": oids[i] for i in range(100)}
395 sid = _snap(src, manifest, "big-pack")
396
397 # Build 100-commit linear chain referencing that snapshot
398 parent: str | None = None
399 cids: list[str] = []
400 for i in range(100):
401 cid = _sha(f"pack-commit-{i}")
402 _commit_raw(src, cid, sid, f"pack-{i}", parent=parent)
403 cids.append(cid)
404 parent = cid
405 _set_branch(src, "main", cids[-1])
406
407 # Pack tip → unpack into dst
408 pack_result = runner.invoke(
409 cli, ["plumbing", "pack-objects", cids[-1]], env=_env(src)
410 )
411 assert pack_result.exit_code == 0
412
413 unpack_result = runner.invoke(
414 cli,
415 ["plumbing", "unpack-objects"],
416 input=pack_result.stdout_bytes,
417 env=_env(dst),
418 )
419 assert unpack_result.exit_code == 0
420 counts = json.loads(unpack_result.stdout)
421 assert counts["commits_written"] == 100
422 assert counts["objects_written"] == 100
423
424 for cid in cids:
425 assert read_commit(dst, cid) is not None
426 for oid in oids:
427 assert has_object(dst, oid)
428
429
430 # ---------------------------------------------------------------------------
431 # Stress: read-commit sequential
432 # ---------------------------------------------------------------------------
433
434
435 class TestReadCommitStress:
436 def test_200_commits_all_readable(self, tmp_path: pathlib.Path) -> None:
437 repo = _init_repo(tmp_path)
438 sid = _snap(repo)
439 cids = _linear_chain(repo, 200, sid)
440 for cid in cids:
441 result = runner.invoke(cli, ["plumbing", "read-commit", cid], env=_env(repo))
442 assert result.exit_code == 0
443 data = json.loads(result.stdout)
444 assert data["commit_id"] == cid