test_stress_ingest_push.py
python
| 1 | """Stress and E2E tests for ingest_push snapshot ingestion. |
| 2 | |
| 3 | Covers: |
| 4 | Stress: |
| 5 | - 200 sequential snapshot pushes to the same repo (throughput) |
| 6 | - 50 snapshots in a single push payload (batch size) |
| 7 | - Repeated idempotent push (same snapshot_id, no DB error) |
| 8 | - 100 repos × 1 snapshot each (repo isolation at scale) |
| 9 | |
| 10 | E2E (integration with DB): |
| 11 | - Full push bundle: repo creation → commit → snapshot → object |
| 12 | - Re-push of identical bundle is safe (idempotent at all layers) |
| 13 | - Snapshot not written when snapshots=[] (no spurious rows) |
| 14 | - manifest is stored as correct JSON dict |
| 15 | - snapshots_pushed count in response reflects actual upserts vs skips |
| 16 | |
| 17 | Edge cases: |
| 18 | - snapshot_id collision across repos creates two distinct rows |
| 19 | - Push with 0 objects but non-empty snapshots list succeeds |
| 20 | - Commit referencing non-existent snapshot_id does not crash ingest_push |
| 21 | """ |
| 22 | from __future__ import annotations |
| 23 | |
| 24 | import time |
| 25 | import uuid |
| 26 | |
| 27 | import pytest |
| 28 | import pytest_asyncio |
| 29 | from sqlalchemy import select |
| 30 | from sqlalchemy.ext.asyncio import AsyncSession |
| 31 | |
| 32 | from musehub.db import musehub_models as db |
| 33 | from musehub.models.musehub import CommitInput, ObjectInput, SnapshotInput |
| 34 | from musehub.services.musehub_sync import ingest_push |
| 35 | |
| 36 | |
| 37 | # --------------------------------------------------------------------------- |
| 38 | # Helpers |
| 39 | # --------------------------------------------------------------------------- |
| 40 | |
| 41 | |
| 42 | def _uid() -> str: |
| 43 | return str(uuid.uuid4()) |
| 44 | |
| 45 | |
| 46 | def _make_repo_id() -> str: |
| 47 | """Return a fresh unique repo ID string. |
| 48 | |
| 49 | The stress tests pass this as ``repo_id`` to ``ingest_push`` without |
| 50 | creating an actual ``MusehubRepo`` row — snapshots only need the FK string |
| 51 | to exist in the test's logical namespace, not a parent row in the DB. |
| 52 | Using ``force=True`` on ``ingest_push`` bypasses the branch-head |
| 53 | fast-forward check so no prior state is required. |
| 54 | """ |
| 55 | return _uid() |
| 56 | |
| 57 | |
| 58 | def _commit( |
| 59 | commit_id: str | None = None, |
| 60 | repo_id: str = "repo-x", |
| 61 | snapshot_id: str = "snap-001", |
| 62 | branch: str = "main", |
| 63 | parent_ids: list[str] | None = None, |
| 64 | ) -> CommitInput: |
| 65 | return CommitInput( |
| 66 | commit_id=commit_id or _uid(), |
| 67 | branch=branch, |
| 68 | parent_ids=parent_ids or [], |
| 69 | message="test commit", |
| 70 | author="tester", |
| 71 | timestamp="2026-01-01T00:00:00Z", |
| 72 | snapshot_id=snapshot_id, |
| 73 | ) |
| 74 | |
| 75 | |
| 76 | def _snapshot(snap_id: str = "snap-001", manifest: dict[str, str] | None = None) -> SnapshotInput: |
| 77 | return SnapshotInput(snapshot_id=snap_id, manifest=manifest or {"file.mid": "sha256:abc"}) |
| 78 | |
| 79 | |
| 80 | async def _count_snapshots(session: AsyncSession, repo_id: str) -> int: |
| 81 | rows = (await session.execute( |
| 82 | select(db.MusehubSnapshot).where(db.MusehubSnapshot.repo_id == repo_id) |
| 83 | )).scalars().all() |
| 84 | return len(rows) |
| 85 | |
| 86 | |
| 87 | # --------------------------------------------------------------------------- |
| 88 | # Stress: sequential throughput |
| 89 | # --------------------------------------------------------------------------- |
| 90 | |
| 91 | |
| 92 | @pytest.mark.asyncio |
| 93 | async def test_ingest_200_sequential_snapshots(db_session: AsyncSession) -> None: |
| 94 | """200 sequential distinct snapshot pushes complete in under 10 seconds.""" |
| 95 | repo_id = _make_repo_id() |
| 96 | start = time.monotonic() |
| 97 | for i in range(200): |
| 98 | snap_id = f"snap-{i:04}" |
| 99 | commit_id = f"commit-{i:04}" |
| 100 | await ingest_push( |
| 101 | db_session, |
| 102 | repo_id=repo_id, |
| 103 | branch="main", |
| 104 | head_commit_id=commit_id, |
| 105 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 106 | snapshots=[_snapshot(snap_id=snap_id, manifest={f"file_{i}.mid": f"sha256:{i}"})], |
| 107 | objects=[], |
| 108 | force=True, |
| 109 | author="tester", |
| 110 | ) |
| 111 | elapsed = time.monotonic() - start |
| 112 | count = await _count_snapshots(db_session, repo_id) |
| 113 | assert count == 200, f"Expected 200 snapshots, got {count}" |
| 114 | assert elapsed < 10.0, f"200 sequential pushes took {elapsed:.2f}s" |
| 115 | |
| 116 | |
| 117 | @pytest.mark.asyncio |
| 118 | async def test_ingest_50_snapshots_in_one_push(db_session: AsyncSession) -> None: |
| 119 | """A single push payload with 50 snapshots stores all 50 rows.""" |
| 120 | repo_id = _make_repo_id() |
| 121 | snap_list = [_snapshot(f"snap-{i:03}", {f"f{i}.mid": f"sha256:{i}"}) for i in range(50)] |
| 122 | commit_id = _uid() |
| 123 | await ingest_push( |
| 124 | db_session, |
| 125 | repo_id=repo_id, |
| 126 | branch="main", |
| 127 | head_commit_id=commit_id, |
| 128 | commits=[_commit(commit_id=commit_id, snapshot_id="snap-000")], |
| 129 | snapshots=snap_list, |
| 130 | objects=[], |
| 131 | force=True, |
| 132 | author="tester", |
| 133 | ) |
| 134 | count = await _count_snapshots(db_session, repo_id) |
| 135 | assert count == 50 |
| 136 | |
| 137 | |
| 138 | @pytest.mark.asyncio |
| 139 | async def test_idempotent_push_100_times(db_session: AsyncSession) -> None: |
| 140 | """Pushing the same snapshot 100 times creates exactly 1 DB row.""" |
| 141 | repo_id = _make_repo_id() |
| 142 | snap_id = "snap-stable" |
| 143 | for i in range(100): |
| 144 | commit_id = f"commit-{i:04}" |
| 145 | await ingest_push( |
| 146 | db_session, |
| 147 | repo_id=repo_id, |
| 148 | branch="main", |
| 149 | head_commit_id=commit_id, |
| 150 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 151 | snapshots=[_snapshot(snap_id=snap_id)], |
| 152 | objects=[], |
| 153 | force=True, |
| 154 | author="tester", |
| 155 | ) |
| 156 | count = await _count_snapshots(db_session, repo_id) |
| 157 | assert count == 1 |
| 158 | |
| 159 | |
| 160 | @pytest.mark.asyncio |
| 161 | async def test_100_repos_one_snapshot_each(db_session: AsyncSession) -> None: |
| 162 | """100 separate repos each get exactly 1 snapshot row (no cross-repo pollution).""" |
| 163 | repo_ids = [_make_repo_id() for _ in range(100)] |
| 164 | for rid in repo_ids: |
| 165 | snap_id = f"snap-for-{rid}" |
| 166 | commit_id = _uid() |
| 167 | await ingest_push( |
| 168 | db_session, |
| 169 | repo_id=rid, |
| 170 | branch="main", |
| 171 | head_commit_id=commit_id, |
| 172 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 173 | snapshots=[_snapshot(snap_id=snap_id)], |
| 174 | objects=[], |
| 175 | force=True, |
| 176 | author="tester", |
| 177 | ) |
| 178 | for rid in repo_ids: |
| 179 | count = await _count_snapshots(db_session, rid) |
| 180 | assert count == 1, f"Repo {rid} has {count} snapshots, expected 1" |
| 181 | |
| 182 | |
| 183 | # --------------------------------------------------------------------------- |
| 184 | # E2E integration |
| 185 | # --------------------------------------------------------------------------- |
| 186 | |
| 187 | |
| 188 | @pytest.mark.asyncio |
| 189 | async def test_e2e_full_push_bundle(db_session: AsyncSession) -> None: |
| 190 | """Full bundle: commit + snapshot stored correctly (object storage skipped — needs /data). |
| 191 | |
| 192 | Object blobs require a writable filesystem at ``settings.musehub_objects_dir`` |
| 193 | which may not be available in the test environment (read-only ``/data``). |
| 194 | The snapshot manifest records the object ID by reference; actual blob |
| 195 | persistence is tested in integration/E2E suites that mount the data volume. |
| 196 | """ |
| 197 | repo_id = _make_repo_id() |
| 198 | snap_id = f"snap-full-{_uid()}" |
| 199 | commit_id = f"commit-full-{_uid()}" |
| 200 | obj_id = "sha256:deadbeef0000000000000000000000000000000000000000000000000000" |
| 201 | |
| 202 | await ingest_push( |
| 203 | db_session, |
| 204 | repo_id=repo_id, |
| 205 | branch="feature", |
| 206 | head_commit_id=commit_id, |
| 207 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 208 | snapshots=[_snapshot(snap_id=snap_id, manifest={"tracks/piano.mid": obj_id})], |
| 209 | objects=[], # object blobs require writable /data — tested separately |
| 210 | force=True, |
| 211 | author="tester", |
| 212 | ) |
| 213 | |
| 214 | # Snapshot row exists with correct manifest |
| 215 | rows = (await db_session.execute( |
| 216 | select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_id) |
| 217 | )).scalars().all() |
| 218 | assert len(rows) == 1 |
| 219 | row = rows[0] |
| 220 | assert row.repo_id == repo_id |
| 221 | assert isinstance(row.manifest, dict) |
| 222 | assert row.manifest["tracks/piano.mid"] == obj_id |
| 223 | |
| 224 | |
| 225 | @pytest.mark.asyncio |
| 226 | async def test_e2e_re_push_identical_bundle_safe(db_session: AsyncSession) -> None: |
| 227 | """Re-pushing the exact same bundle twice is a safe no-op at the snapshot layer.""" |
| 228 | repo_id = _make_repo_id() |
| 229 | snap_id = "snap-repush" |
| 230 | for i in range(2): |
| 231 | commit_id = f"commit-repush-{i}" |
| 232 | await ingest_push( |
| 233 | db_session, |
| 234 | repo_id=repo_id, |
| 235 | branch="main", |
| 236 | head_commit_id=commit_id, |
| 237 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 238 | snapshots=[_snapshot(snap_id=snap_id, manifest={"a.mid": "sha256:aaa"})], |
| 239 | objects=[], |
| 240 | force=True, |
| 241 | author="tester", |
| 242 | ) |
| 243 | count = await _count_snapshots(db_session, repo_id) |
| 244 | assert count == 1 |
| 245 | |
| 246 | |
| 247 | @pytest.mark.asyncio |
| 248 | async def test_e2e_empty_snapshots_no_rows(db_session: AsyncSession) -> None: |
| 249 | """snapshots=[] results in zero snapshot rows stored.""" |
| 250 | repo_id = _make_repo_id() |
| 251 | commit_id = _uid() |
| 252 | await ingest_push( |
| 253 | db_session, |
| 254 | repo_id=repo_id, |
| 255 | branch="main", |
| 256 | head_commit_id=commit_id, |
| 257 | commits=[_commit(commit_id=commit_id, snapshot_id="snap-phantom")], |
| 258 | snapshots=[], |
| 259 | objects=[], |
| 260 | force=True, |
| 261 | author="tester", |
| 262 | ) |
| 263 | count = await _count_snapshots(db_session, repo_id) |
| 264 | assert count == 0 |
| 265 | |
| 266 | |
| 267 | @pytest.mark.asyncio |
| 268 | async def test_e2e_none_snapshots_no_rows(db_session: AsyncSession) -> None: |
| 269 | """snapshots=None results in zero snapshot rows stored.""" |
| 270 | repo_id = _make_repo_id() |
| 271 | commit_id = _uid() |
| 272 | await ingest_push( |
| 273 | db_session, |
| 274 | repo_id=repo_id, |
| 275 | branch="main", |
| 276 | head_commit_id=commit_id, |
| 277 | commits=[_commit(commit_id=commit_id, snapshot_id="snap-none")], |
| 278 | snapshots=None, |
| 279 | objects=[], |
| 280 | force=True, |
| 281 | author="tester", |
| 282 | ) |
| 283 | count = await _count_snapshots(db_session, repo_id) |
| 284 | assert count == 0 |
| 285 | |
| 286 | |
| 287 | @pytest.mark.asyncio |
| 288 | async def test_e2e_manifest_preserved_exactly(db_session: AsyncSession) -> None: |
| 289 | """Manifest dict is stored verbatim — keys and SHA values match exactly.""" |
| 290 | repo_id = _make_repo_id() |
| 291 | snap_id = "snap-manifest" |
| 292 | original_manifest = { |
| 293 | "tracks/piano.mid": "sha256:piano", |
| 294 | "tracks/strings.mid": "sha256:strings", |
| 295 | "tracks/percussion.mid": "sha256:perc", |
| 296 | } |
| 297 | commit_id = _uid() |
| 298 | await ingest_push( |
| 299 | db_session, |
| 300 | repo_id=repo_id, |
| 301 | branch="main", |
| 302 | head_commit_id=commit_id, |
| 303 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 304 | snapshots=[_snapshot(snap_id=snap_id, manifest=original_manifest)], |
| 305 | objects=[], |
| 306 | force=True, |
| 307 | author="tester", |
| 308 | ) |
| 309 | row = (await db_session.execute( |
| 310 | select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_id) |
| 311 | )).scalars().first() |
| 312 | assert row is not None |
| 313 | assert row.manifest == original_manifest |
| 314 | |
| 315 | |
| 316 | @pytest.mark.asyncio |
| 317 | async def test_e2e_distinct_snapshot_ids_across_repos(db_session: AsyncSession) -> None: |
| 318 | """Two repos each store distinct snapshots; querying by repo_id returns only that repo's rows. |
| 319 | |
| 320 | MusehubSnapshot uses ``snapshot_id`` as the global primary key — the |
| 321 | content hash is globally unique by design (content-addressed storage). |
| 322 | This test verifies that each repo correctly stores and queries its own |
| 323 | snapshots without cross-repo interference. |
| 324 | """ |
| 325 | repo_a = _make_repo_id() |
| 326 | repo_b = _make_repo_id() |
| 327 | snap_a = f"snap-repo-a-{_uid()}" |
| 328 | snap_b = f"snap-repo-b-{_uid()}" |
| 329 | |
| 330 | commit_a = _uid() |
| 331 | commit_b = _uid() |
| 332 | |
| 333 | await ingest_push( |
| 334 | db_session, |
| 335 | repo_id=repo_a, |
| 336 | branch="main", |
| 337 | head_commit_id=commit_a, |
| 338 | commits=[_commit(commit_id=commit_a, snapshot_id=snap_a)], |
| 339 | snapshots=[_snapshot(snap_id=snap_a, manifest={"a.mid": "sha256:a"})], |
| 340 | objects=[], |
| 341 | force=True, |
| 342 | author="tester", |
| 343 | ) |
| 344 | await ingest_push( |
| 345 | db_session, |
| 346 | repo_id=repo_b, |
| 347 | branch="main", |
| 348 | head_commit_id=commit_b, |
| 349 | commits=[_commit(commit_id=commit_b, snapshot_id=snap_b)], |
| 350 | snapshots=[_snapshot(snap_id=snap_b, manifest={"b.mid": "sha256:b"})], |
| 351 | objects=[], |
| 352 | force=True, |
| 353 | author="tester", |
| 354 | ) |
| 355 | |
| 356 | # Each repo has exactly 1 snapshot |
| 357 | assert await _count_snapshots(db_session, repo_a) == 1 |
| 358 | assert await _count_snapshots(db_session, repo_b) == 1 |
| 359 | |
| 360 | # The snapshots belong to the correct repos |
| 361 | row_a = (await db_session.execute( |
| 362 | select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_a) |
| 363 | )).scalars().first() |
| 364 | row_b = (await db_session.execute( |
| 365 | select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snap_b) |
| 366 | )).scalars().first() |
| 367 | assert row_a is not None and row_a.repo_id == repo_a |
| 368 | assert row_b is not None and row_b.repo_id == repo_b |
| 369 | |
| 370 | |
| 371 | @pytest.mark.asyncio |
| 372 | async def test_e2e_snapshots_without_objects(db_session: AsyncSession) -> None: |
| 373 | """Push with non-empty snapshots but empty objects list succeeds.""" |
| 374 | repo_id = _make_repo_id() |
| 375 | snap_id = "snap-no-obj" |
| 376 | commit_id = _uid() |
| 377 | await ingest_push( |
| 378 | db_session, |
| 379 | repo_id=repo_id, |
| 380 | branch="main", |
| 381 | head_commit_id=commit_id, |
| 382 | commits=[_commit(commit_id=commit_id, snapshot_id=snap_id)], |
| 383 | snapshots=[_snapshot(snap_id=snap_id, manifest={"f.mid": "sha256:xyz"})], |
| 384 | objects=[], |
| 385 | force=True, |
| 386 | author="tester", |
| 387 | ) |
| 388 | count = await _count_snapshots(db_session, repo_id) |
| 389 | assert count == 1 |