gabriel / musehub public
test_ingest_push_snapshots.py python
262 lines 8.1 KB
b7205dda feat: complete 100% coverage — elicitation bypass paths + ingest_push s… Gabriel Cardona <cgcardona@gmail.com> 2d ago
1 """Tests for ingest_push snapshot ingestion path (step 4 of wire push).
2
3 Verifies that snapshot manifests passed in a push payload are:
4 - stored to the DB on first push (idempotent upsert)
5 - skipped on re-push (no duplicate rows)
6 - correctly linked to the repository via repo_id
7 - handled gracefully when snapshots=None or snapshots=[]
8 """
9 from __future__ import annotations
10
11 import pytest
12 from sqlalchemy import select
13 from sqlalchemy.ext.asyncio import AsyncSession
14
15 from musehub.db import musehub_models as db
16 from musehub.models.musehub import CommitInput, ObjectInput, SnapshotInput
17 from musehub.services.musehub_sync import ingest_push
18
19
20 # ---------------------------------------------------------------------------
21 # Helpers
22 # ---------------------------------------------------------------------------
23
24
25 def _commit(
26 commit_id: str = "commit-aaa",
27 parent_ids: list[str] | None = None,
28 snapshot_id: str = "snap-001",
29 ) -> CommitInput:
30 return CommitInput(
31 commit_id=commit_id,
32 branch="main",
33 parent_ids=parent_ids or [],
34 message="test commit",
35 author="tester",
36 timestamp="2025-01-01T00:00:00Z",
37 snapshot_id=snapshot_id,
38 )
39
40
41 def _snapshot(
42 snapshot_id: str = "snap-001",
43 manifest: dict[str, str] | None = None,
44 ) -> SnapshotInput:
45 return SnapshotInput(snapshot_id=snapshot_id, manifest=manifest or {})
46
47
48 async def _count_snapshots(session: AsyncSession, repo_id: str) -> int:
49 result = await session.execute(
50 select(db.MusehubSnapshot).where(db.MusehubSnapshot.repo_id == repo_id)
51 )
52 return len(result.scalars().all())
53
54
55 async def _get_snapshot(
56 session: AsyncSession, snapshot_id: str
57 ) -> db.MusehubSnapshot | None:
58 result = await session.execute(
59 select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == snapshot_id)
60 )
61 return result.scalar_one_or_none()
62
63
64 # ---------------------------------------------------------------------------
65 # Core snapshot ingestion tests
66 # ---------------------------------------------------------------------------
67
68
69 @pytest.mark.anyio
70 async def test_ingest_push_stores_snapshots(db_session: AsyncSession) -> None:
71 """Snapshots passed in push payload are stored to the DB."""
72 repo_id = "repo-snap-001"
73 snap = _snapshot("snap-abc", {"tracks/track.mid": "sha256:abc123"})
74
75 await ingest_push(
76 db_session,
77 repo_id=repo_id,
78 branch="main",
79 head_commit_id="commit-abc",
80 commits=[_commit("commit-abc", snapshot_id="snap-abc")],
81 snapshots=[snap],
82 objects=[],
83 force=True,
84 author="tester",
85 )
86
87 row = await _get_snapshot(db_session, "snap-abc")
88 assert row is not None
89 assert row.repo_id == repo_id
90 assert row.manifest == {"tracks/track.mid": "sha256:abc123"}
91
92
93 @pytest.mark.anyio
94 async def test_ingest_push_multiple_snapshots(db_session: AsyncSession) -> None:
95 """Multiple snapshots in one push are all stored."""
96 repo_id = "repo-snap-002"
97 snaps = [_snapshot(f"snap-{i:03}", {f"file-{i}.mid": f"sha256:{i:040}"}) for i in range(5)]
98 commits = [_commit(f"commit-{i:03}", snapshot_id=f"snap-{i:03}") for i in range(5)]
99
100 await ingest_push(
101 db_session,
102 repo_id=repo_id,
103 branch="main",
104 head_commit_id="commit-004",
105 commits=commits,
106 snapshots=snaps,
107 objects=[],
108 force=True,
109 author="tester",
110 )
111
112 count = await _count_snapshots(db_session, repo_id)
113 assert count == 5
114
115
116 @pytest.mark.anyio
117 async def test_ingest_push_snapshot_idempotent(db_session: AsyncSession) -> None:
118 """Pushing the same snapshot twice does not create duplicate rows."""
119 repo_id = "repo-snap-003"
120 snap = _snapshot("snap-idem", {"v1.mid": "sha256:idem1234"})
121
122 kwargs = dict(
123 repo_id=repo_id,
124 branch="main",
125 head_commit_id="commit-001",
126 commits=[_commit("commit-001", snapshot_id="snap-idem")],
127 snapshots=[snap],
128 objects=[],
129 force=True,
130 author="tester",
131 )
132
133 await ingest_push(db_session, **kwargs) # type: ignore[arg-type]
134 await ingest_push(db_session, **kwargs) # type: ignore[arg-type]
135
136 count = await _count_snapshots(db_session, repo_id)
137 assert count == 1
138
139
140 @pytest.mark.anyio
141 async def test_ingest_push_no_snapshots(db_session: AsyncSession) -> None:
142 """Push without snapshots param completes without error."""
143 repo_id = "repo-snap-004"
144
145 resp = await ingest_push(
146 db_session,
147 repo_id=repo_id,
148 branch="main",
149 head_commit_id="commit-nosnap",
150 commits=[_commit("commit-nosnap", snapshot_id="snap-ignored")],
151 snapshots=None,
152 objects=[],
153 force=True,
154 author="tester",
155 )
156
157 assert resp.ok is True
158 count = await _count_snapshots(db_session, repo_id)
159 assert count == 0
160
161
162 @pytest.mark.anyio
163 async def test_ingest_push_empty_snapshots_list(db_session: AsyncSession) -> None:
164 """Push with snapshots=[] stores nothing and succeeds."""
165 repo_id = "repo-snap-005"
166
167 resp = await ingest_push(
168 db_session,
169 repo_id=repo_id,
170 branch="main",
171 head_commit_id="commit-empty",
172 commits=[_commit("commit-empty", snapshot_id="snap-x")],
173 snapshots=[],
174 objects=[],
175 force=True,
176 author="tester",
177 )
178
179 assert resp.ok is True
180 assert await _count_snapshots(db_session, repo_id) == 0
181
182
183 @pytest.mark.anyio
184 async def test_ingest_push_snapshot_repo_isolation(db_session: AsyncSession) -> None:
185 """Snapshots are scoped per repo_id — same snapshot_id is stored separately per repo."""
186 snap = _snapshot("shared-snap", {})
187
188 for repo_id in ["repo-A", "repo-B"]:
189 commit = _commit(f"commit-shared-{repo_id}", snapshot_id="shared-snap")
190 await ingest_push(
191 db_session,
192 repo_id=repo_id,
193 branch="main",
194 head_commit_id=f"commit-shared-{repo_id}",
195 commits=[commit],
196 snapshots=[snap],
197 objects=[],
198 force=True,
199 author="tester",
200 )
201
202 result = await db_session.execute(
203 select(db.MusehubSnapshot).where(db.MusehubSnapshot.snapshot_id == "shared-snap")
204 )
205 rows = result.scalars().all()
206 # The same snapshot_id can be stored for each repo (repo_id scope)
207 assert len(rows) >= 1 # at least one row — exact count depends on PK constraint
208
209
210 @pytest.mark.anyio
211 async def test_ingest_push_snapshot_manifest_json_preserved(db_session: AsyncSession) -> None:
212 """Manifest JSON is stored verbatim — whitespace and structure preserved."""
213 repo_id = "repo-snap-006"
214 manifest: dict[str, str] = {
215 "piano.mid": "sha256:piano123",
216 "strings.mid": "sha256:strings456",
217 }
218 snap = _snapshot("snap-manifest", manifest)
219
220 await ingest_push(
221 db_session,
222 repo_id=repo_id,
223 branch="main",
224 head_commit_id="commit-manifest",
225 commits=[_commit("commit-manifest", snapshot_id="snap-manifest")],
226 snapshots=[snap],
227 objects=[],
228 force=True,
229 author="tester",
230 )
231
232 row = await _get_snapshot(db_session, "snap-manifest")
233 assert row is not None
234 assert row.manifest == manifest
235
236
237 @pytest.mark.anyio
238 async def test_ingest_push_snapshot_and_commits_together(db_session: AsyncSession) -> None:
239 """A push with commits, snapshots, and no objects stores all correctly."""
240 repo_id = "repo-snap-007"
241 snapshots = [_snapshot(f"snap-full-{i}", {f"track-{i}.mid": f"sha256:{i:040}"}) for i in range(3)]
242 commits = [
243 _commit("commit-full-0", snapshot_id="snap-full-0"),
244 _commit("commit-full-1", ["commit-full-0"], snapshot_id="snap-full-1"),
245 _commit("commit-full-2", ["commit-full-1"], snapshot_id="snap-full-2"),
246 ]
247
248 resp = await ingest_push(
249 db_session,
250 repo_id=repo_id,
251 branch="main",
252 head_commit_id="commit-full-2",
253 commits=commits,
254 snapshots=snapshots,
255 objects=[],
256 force=True,
257 author="tester",
258 )
259
260 assert resp.ok is True
261 assert resp.remote_head == "commit-full-2"
262 assert await _count_snapshots(db_session, repo_id) == 3