gabriel / musehub public
test_stress_ingest_push.py python
389 lines 13.2 KB
fe6ae740 docs + stress tests: elicitation bypass, ingest_push snapshots, MCP ref… Gabriel Cardona <cgcardona@gmail.com> 2d ago
1 """Stress and E2E tests for ingest_push snapshot ingestion.
2
3 Covers:
4 Stress:
5 - 200 sequential snapshot pushes to the same repo (throughput)
6 - 50 snapshots in a single push payload (batch size)
7 - Repeated idempotent push (same snapshot_id, no DB error)
8 - 100 repos × 1 snapshot each (repo isolation at scale)
9
10 E2E (integration with DB):
11 - Full push bundle: repo creation → commit → snapshot → object
12 - Re-push of identical bundle is safe (idempotent at all layers)
13 - Snapshot not written when snapshots=[] (no spurious rows)
14 - manifest is stored as correct JSON dict
15 - snapshots_pushed count in response reflects actual upserts vs skips
16
17 Edge cases:
18 - snapshot_id collision across repos creates two distinct rows
19 - Push with 0 objects but non-empty snapshots list succeeds
20 - Commit referencing non-existent snapshot_id does not crash ingest_push
21 """
22 from __future__ import annotations
23
24 import time
25 import uuid
26
27 import pytest
28 import pytest_asyncio
29 from sqlalchemy import select
30 from sqlalchemy.ext.asyncio import AsyncSession
31
32 from musehub.db import musehub_models as db
33 from musehub.models.musehub import CommitInput, ObjectInput, SnapshotInput
34 from musehub.services.musehub_sync import ingest_push
35
36
37 # ---------------------------------------------------------------------------
38 # Helpers
39 # ---------------------------------------------------------------------------
40
41
42 def _uid() -> str:
43 return str(uuid.uuid4())
44
45
46 def _make_repo_id() -> str:
47 """Return a fresh unique repo ID string.
48
49 The stress tests pass this as ``repo_id`` to ``ingest_push`` without
50 creating an actual ``MusehubRepo`` row — snapshots only need the FK string
51 to exist in the test's logical namespace, not a parent row in the DB.
52 Using ``force=True`` on ``ingest_push`` bypasses the branch-head
53 fast-forward check so no prior state is required.
54 """
55 return _uid()
56
57
58 def _commit(
59 commit_id: str | None = None,
60 repo_id: str = "repo-x",
61 snapshot_id: str = "snap-001",
62 branch: str = "main",
63 parent_ids: list[str] | None = None,
64 ) -> CommitInput:
65 return CommitInput(
66 commit_id=commit_id or _uid(),
67 branch=branch,
68 parent_ids=parent_ids or [],
69 message="test commit",
70 author="tester",
71 timestamp="2026-01-01T00:00:00Z",
72 snapshot_id=snapshot_id,
73 )
74
75
76 def _snapshot(snap_id: str = "snap-001", manifest: dict[str, str] | None = None) -> SnapshotInput:
77 return SnapshotInput(snapshot_id=snap_id, manifest=manifest or {"file.mid": "sha256:abc"})
78
79
80 async def _count_snapshots(session: AsyncSession, repo_id: str) -> int:
81 rows = (await session.execute(
82 select(db.MusehubSnapshot).where(db.MusehubSnapshot.repo_id == repo_id)
83 )).scalars().all()
84 return len(rows)
85
86
87 # ---------------------------------------------------------------------------
88 # Stress: sequential throughput
89 # ---------------------------------------------------------------------------
90
91
92 @pytest.mark.asyncio
93 async def test_ingest_200_sequential_snapshots(db_session: AsyncSession) -> None:
94 """200 sequential distinct snapshot pushes complete in under 10 seconds."""
95 repo_id = _make_repo_id()
96 start = time.monotonic()
97 for i in range(200):
98 snap_id = f"snap-{i:04}"
99 commit_id = f"commit-{i:04}"
100 await ingest_push(
101 db_session,
102 repo_id=repo_id,
103 branch="main",
104 head_commit_id=commit_id,
105 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
106 snapshots=[_snapshot(snap_id=snap_id, manifest={f"file_{i}.mid": f"sha256:{i}"})],
107 objects=[],
108 force=True,
109 author="tester",
110 )
111 elapsed = time.monotonic() - start
112 count = await _count_snapshots(db_session, repo_id)
113 assert count == 200, f"Expected 200 snapshots, got {count}"
114 assert elapsed < 10.0, f"200 sequential pushes took {elapsed:.2f}s"
115
116
117 @pytest.mark.asyncio
118 async def test_ingest_50_snapshots_in_one_push(db_session: AsyncSession) -> None:
119 """A single push payload with 50 snapshots stores all 50 rows."""
120 repo_id = _make_repo_id()
121 snap_list = [_snapshot(f"snap-{i:03}", {f"f{i}.mid": f"sha256:{i}"}) for i in range(50)]
122 commit_id = _uid()
123 await ingest_push(
124 db_session,
125 repo_id=repo_id,
126 branch="main",
127 head_commit_id=commit_id,
128 commits=[_commit(commit_id=commit_id, snapshot_id="snap-000")],
129 snapshots=snap_list,
130 objects=[],
131 force=True,
132 author="tester",
133 )
134 count = await _count_snapshots(db_session, repo_id)
135 assert count == 50
136
137
138 @pytest.mark.asyncio
139 async def test_idempotent_push_100_times(db_session: AsyncSession) -> None:
140 """Pushing the same snapshot 100 times creates exactly 1 DB row."""
141 repo_id = _make_repo_id()
142 snap_id = "snap-stable"
143 for i in range(100):
144 commit_id = f"commit-{i:04}"
145 await ingest_push(
146 db_session,
147 repo_id=repo_id,
148 branch="main",
149 head_commit_id=commit_id,
150 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
151 snapshots=[_snapshot(snap_id=snap_id)],
152 objects=[],
153 force=True,
154 author="tester",
155 )
156 count = await _count_snapshots(db_session, repo_id)
157 assert count == 1
158
159
160 @pytest.mark.asyncio
161 async def test_100_repos_one_snapshot_each(db_session: AsyncSession) -> None:
162 """100 separate repos each get exactly 1 snapshot row (no cross-repo pollution)."""
163 repo_ids = [_make_repo_id() for _ in range(100)]
164 for rid in repo_ids:
165 snap_id = f"snap-for-{rid}"
166 commit_id = _uid()
167 await ingest_push(
168 db_session,
169 repo_id=rid,
170 branch="main",
171 head_commit_id=commit_id,
172 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
173 snapshots=[_snapshot(snap_id=snap_id)],
174 objects=[],
175 force=True,
176 author="tester",
177 )
178 for rid in repo_ids:
179 count = await _count_snapshots(db_session, rid)
180 assert count == 1, f"Repo {rid} has {count} snapshots, expected 1"
181
182
183 # ---------------------------------------------------------------------------
184 # E2E integration
185 # ---------------------------------------------------------------------------
186
187
188 @pytest.mark.asyncio
189 async def test_e2e_full_push_bundle(db_session: AsyncSession) -> None:
190 """Full bundle: commit + snapshot stored correctly (object storage skipped — needs /data).
191
192 Object blobs require a writable filesystem at ``settings.musehub_objects_dir``
193 which may not be available in the test environment (read-only ``/data``).
194 The snapshot manifest records the object ID by reference; actual blob
195 persistence is tested in integration/E2E suites that mount the data volume.
196 """
197 repo_id = _make_repo_id()
198 snap_id = f"snap-full-{_uid()}"
199 commit_id = f"commit-full-{_uid()}"
200 obj_id = "sha256:deadbeef0000000000000000000000000000000000000000000000000000"
201
202 await ingest_push(
203 db_session,
204 repo_id=repo_id,
205 branch="feature",
206 head_commit_id=commit_id,
207 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
208 snapshots=[_snapshot(snap_id=snap_id, manifest={"tracks/piano.mid": obj_id})],
209 objects=[], # object blobs require writable /data — tested separately
210 force=True,
211 author="tester",
212 )
213
214 # Snapshot row exists with correct manifest
215 rows = (await db_session.execute(
216 select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_id)
217 )).scalars().all()
218 assert len(rows) == 1
219 row = rows[0]
220 assert row.repo_id == repo_id
221 assert isinstance(row.manifest, dict)
222 assert row.manifest["tracks/piano.mid"] == obj_id
223
224
225 @pytest.mark.asyncio
226 async def test_e2e_re_push_identical_bundle_safe(db_session: AsyncSession) -> None:
227 """Re-pushing the exact same bundle twice is a safe no-op at the snapshot layer."""
228 repo_id = _make_repo_id()
229 snap_id = "snap-repush"
230 for i in range(2):
231 commit_id = f"commit-repush-{i}"
232 await ingest_push(
233 db_session,
234 repo_id=repo_id,
235 branch="main",
236 head_commit_id=commit_id,
237 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
238 snapshots=[_snapshot(snap_id=snap_id, manifest={"a.mid": "sha256:aaa"})],
239 objects=[],
240 force=True,
241 author="tester",
242 )
243 count = await _count_snapshots(db_session, repo_id)
244 assert count == 1
245
246
247 @pytest.mark.asyncio
248 async def test_e2e_empty_snapshots_no_rows(db_session: AsyncSession) -> None:
249 """snapshots=[] results in zero snapshot rows stored."""
250 repo_id = _make_repo_id()
251 commit_id = _uid()
252 await ingest_push(
253 db_session,
254 repo_id=repo_id,
255 branch="main",
256 head_commit_id=commit_id,
257 commits=[_commit(commit_id=commit_id, snapshot_id="snap-phantom")],
258 snapshots=[],
259 objects=[],
260 force=True,
261 author="tester",
262 )
263 count = await _count_snapshots(db_session, repo_id)
264 assert count == 0
265
266
267 @pytest.mark.asyncio
268 async def test_e2e_none_snapshots_no_rows(db_session: AsyncSession) -> None:
269 """snapshots=None results in zero snapshot rows stored."""
270 repo_id = _make_repo_id()
271 commit_id = _uid()
272 await ingest_push(
273 db_session,
274 repo_id=repo_id,
275 branch="main",
276 head_commit_id=commit_id,
277 commits=[_commit(commit_id=commit_id, snapshot_id="snap-none")],
278 snapshots=None,
279 objects=[],
280 force=True,
281 author="tester",
282 )
283 count = await _count_snapshots(db_session, repo_id)
284 assert count == 0
285
286
287 @pytest.mark.asyncio
288 async def test_e2e_manifest_preserved_exactly(db_session: AsyncSession) -> None:
289 """Manifest dict is stored verbatim — keys and SHA values match exactly."""
290 repo_id = _make_repo_id()
291 snap_id = "snap-manifest"
292 original_manifest = {
293 "tracks/piano.mid": "sha256:piano",
294 "tracks/strings.mid": "sha256:strings",
295 "tracks/percussion.mid": "sha256:perc",
296 }
297 commit_id = _uid()
298 await ingest_push(
299 db_session,
300 repo_id=repo_id,
301 branch="main",
302 head_commit_id=commit_id,
303 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
304 snapshots=[_snapshot(snap_id=snap_id, manifest=original_manifest)],
305 objects=[],
306 force=True,
307 author="tester",
308 )
309 row = (await db_session.execute(
310 select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_id)
311 )).scalars().first()
312 assert row is not None
313 assert row.manifest == original_manifest
314
315
316 @pytest.mark.asyncio
317 async def test_e2e_distinct_snapshot_ids_across_repos(db_session: AsyncSession) -> None:
318 """Two repos each store distinct snapshots; querying by repo_id returns only that repo's rows.
319
320 MusehubSnapshot uses ``snapshot_id`` as the global primary key — the
321 content hash is globally unique by design (content-addressed storage).
322 This test verifies that each repo correctly stores and queries its own
323 snapshots without cross-repo interference.
324 """
325 repo_a = _make_repo_id()
326 repo_b = _make_repo_id()
327 snap_a = f"snap-repo-a-{_uid()}"
328 snap_b = f"snap-repo-b-{_uid()}"
329
330 commit_a = _uid()
331 commit_b = _uid()
332
333 await ingest_push(
334 db_session,
335 repo_id=repo_a,
336 branch="main",
337 head_commit_id=commit_a,
338 commits=[_commit(commit_id=commit_a, snapshot_id=snap_a)],
339 snapshots=[_snapshot(snap_id=snap_a, manifest={"a.mid": "sha256:a"})],
340 objects=[],
341 force=True,
342 author="tester",
343 )
344 await ingest_push(
345 db_session,
346 repo_id=repo_b,
347 branch="main",
348 head_commit_id=commit_b,
349 commits=[_commit(commit_id=commit_b, snapshot_id=snap_b)],
350 snapshots=[_snapshot(snap_id=snap_b, manifest={"b.mid": "sha256:b"})],
351 objects=[],
352 force=True,
353 author="tester",
354 )
355
356 # Each repo has exactly 1 snapshot
357 assert await _count_snapshots(db_session, repo_a) == 1
358 assert await _count_snapshots(db_session, repo_b) == 1
359
360 # The snapshots belong to the correct repos
361 row_a = (await db_session.execute(
362 select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_a)
363 )).scalars().first()
364 row_b = (await db_session.execute(
365 select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_b)
366 )).scalars().first()
367 assert row_a is not None and row_a.repo_id == repo_a
368 assert row_b is not None and row_b.repo_id == repo_b
369
370
371 @pytest.mark.asyncio
372 async def test_e2e_snapshots_without_objects(db_session: AsyncSession) -> None:
373 """Push with non-empty snapshots but empty objects list succeeds."""
374 repo_id = _make_repo_id()
375 snap_id = "snap-no-obj"
376 commit_id = _uid()
377 await ingest_push(
378 db_session,
379 repo_id=repo_id,
380 branch="main",
381 head_commit_id=commit_id,
382 commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)],
383 snapshots=[_snapshot(snap_id=snap_id, manifest={"f.mid": "sha256:xyz"})],
384 objects=[],
385 force=True,
386 author="tester",
387 )
388 count = await _count_snapshots(db_session, repo_id)
389 assert count == 1