gabriel / musehub public
test_musehub_activity.py python
254 lines 8.3 KB
c0f0b481 release: merge dev → main (#5) Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """Tests for the MuseHub activity feed — .
2
3 Covers:
4 - GET /repos/{repo_id}/activity returns empty feed on new repo
5 - record_event appends events visible via list_events
6 - event_type filter works correctly
7 - pagination works correctly (page, page_size)
8 - GET /{owner}/{repo_slug}/activity returns 200 HTML
9 - 404 for unknown repo on UI route
10 """
11 from __future__ import annotations
12
13 import pytest
14 from httpx import AsyncClient
15 from sqlalchemy.ext.asyncio import AsyncSession
16
17 from musehub.db.musehub_models import MusehubRepo
18 from musehub.services import musehub_events
19
20
21 # ---------------------------------------------------------------------------
22 # Helpers
23 # ---------------------------------------------------------------------------
24
25
26 async def _create_repo(db: AsyncSession, owner: str = "alice", slug: str = "beats") -> MusehubRepo:
27 repo = MusehubRepo(
28 name="beats",
29 owner=owner,
30 slug=slug,
31 visibility="public",
32 owner_user_id="uid-001",
33 )
34 db.add(repo)
35 await db.flush()
36 return repo
37
38
39 # ---------------------------------------------------------------------------
40 # Service-layer tests
41 # ---------------------------------------------------------------------------
42
43
44 @pytest.mark.anyio
45 async def test_list_events_empty_on_new_repo(db_session: AsyncSession) -> None:
46 """list_events returns an empty feed for a fresh repo."""
47 repo = await _create_repo(db_session)
48 result = await musehub_events.list_events(db_session, repo.repo_id)
49 assert result.events == []
50 assert result.total == 0
51 assert result.page == 1
52
53
54 @pytest.mark.anyio
55 async def test_record_event_appears_in_feed(db_session: AsyncSession) -> None:
56 """record_event persists an event that is returned by list_events."""
57 repo = await _create_repo(db_session)
58 await musehub_events.record_event(
59 db_session,
60 repo_id=repo.repo_id,
61 event_type="commit_pushed",
62 actor="alice",
63 description="Add groove baseline",
64 metadata={"sha": "abc123", "message": "Add groove baseline"},
65 )
66 await db_session.commit()
67
68 result = await musehub_events.list_events(db_session, repo.repo_id)
69 assert result.total == 1
70 assert len(result.events) == 1
71 ev = result.events[0]
72 assert ev.event_type == "commit_pushed"
73 assert ev.actor == "alice"
74 assert ev.description == "Add groove baseline"
75 assert ev.metadata["sha"] == "abc123"
76
77
78 @pytest.mark.anyio
79 async def test_event_type_filter_isolates_events(db_session: AsyncSession) -> None:
80 """event_type filter returns only events matching that type."""
81 repo = await _create_repo(db_session)
82 await musehub_events.record_event(
83 db_session, repo_id=repo.repo_id, event_type="commit_pushed",
84 actor="alice", description="push",
85 )
86 await musehub_events.record_event(
87 db_session, repo_id=repo.repo_id, event_type="issue_opened",
88 actor="bob", description="open issue",
89 )
90 await db_session.commit()
91
92 commits_result = await musehub_events.list_events(
93 db_session, repo.repo_id, event_type="commit_pushed"
94 )
95 assert commits_result.total == 1
96 assert commits_result.events[0].event_type == "commit_pushed"
97
98 issues_result = await musehub_events.list_events(
99 db_session, repo.repo_id, event_type="issue_opened"
100 )
101 assert issues_result.total == 1
102 assert issues_result.events[0].event_type == "issue_opened"
103
104
105 @pytest.mark.anyio
106 async def test_list_events_pagination(db_session: AsyncSession) -> None:
107 """list_events paginates correctly — page 1 and page 2 return disjoint events."""
108 repo = await _create_repo(db_session)
109 for i in range(5):
110 await musehub_events.record_event(
111 db_session,
112 repo_id=repo.repo_id,
113 event_type="commit_pushed",
114 actor="alice",
115 description=f"commit {i}",
116 )
117 await db_session.commit()
118
119 page1 = await musehub_events.list_events(db_session, repo.repo_id, page=1, page_size=3)
120 page2 = await musehub_events.list_events(db_session, repo.repo_id, page=2, page_size=3)
121
122 assert page1.total == 5
123 assert len(page1.events) == 3
124 assert len(page2.events) == 2
125
126 # Pages must be disjoint
127 ids1 = {e.event_id for e in page1.events}
128 ids2 = {e.event_id for e in page2.events}
129 assert ids1.isdisjoint(ids2)
130
131
132 @pytest.mark.anyio
133 async def test_events_ordered_newest_first(db_session: AsyncSession) -> None:
134 """Events are returned newest-first."""
135 repo = await _create_repo(db_session)
136 for i in range(3):
137 await musehub_events.record_event(
138 db_session,
139 repo_id=repo.repo_id,
140 event_type="commit_pushed",
141 actor="alice",
142 description=f"commit {i}",
143 )
144 await db_session.commit()
145
146 result = await musehub_events.list_events(db_session, repo.repo_id)
147 timestamps = [e.created_at for e in result.events]
148 assert timestamps == sorted(timestamps, reverse=True)
149
150
151 # ---------------------------------------------------------------------------
152 # HTTP API tests
153 # ---------------------------------------------------------------------------
154
155
156 @pytest.mark.anyio
157 async def test_get_activity_empty_public_repo(
158 client: AsyncClient,
159 db_session: AsyncSession,
160 auth_headers: dict[str, str],
161 ) -> None:
162 """GET /repos/{repo_id}/activity returns empty feed for new public repo."""
163 response = await client.post(
164 "/api/v1/repos",
165 json={"name": "activity-test", "owner": "testuser", "visibility": "public"},
166 headers=auth_headers,
167 )
168 assert response.status_code == 201
169 repo_id = response.json()["repoId"]
170
171 activity_response = await client.get(
172 f"/api/v1/repos/{repo_id}/activity",
173 )
174 assert activity_response.status_code == 200
175 body = activity_response.json()
176 assert body["events"] == []
177 assert body["total"] == 0
178 assert body["page"] == 1
179 assert body["pageSize"] == 30
180
181
182 @pytest.mark.anyio
183 async def test_get_activity_unknown_repo_404(
184 client: AsyncClient,
185 auth_headers: dict[str, str],
186 ) -> None:
187 """GET /repos/{repo_id}/activity returns 404 for unknown repo."""
188 response = await client.get("/api/v1/repos/nonexistent-id/activity")
189 assert response.status_code == 404
190
191
192 @pytest.mark.anyio
193 async def test_get_activity_event_type_filter_via_api(
194 client: AsyncClient,
195 db_session: AsyncSession,
196 auth_headers: dict[str, str],
197 ) -> None:
198 """event_type query param filters events correctly via the HTTP API."""
199 response = await client.post(
200 "/api/v1/repos",
201 json={"name": "filter-test", "owner": "testuser2", "visibility": "public"},
202 headers=auth_headers,
203 )
204 assert response.status_code == 201
205 repo_id = response.json()["repoId"]
206
207 # Seed events directly via service
208 await musehub_events.record_event(
209 db_session, repo_id=repo_id, event_type="commit_pushed",
210 actor="testuser2", description="first commit",
211 )
212 await musehub_events.record_event(
213 db_session, repo_id=repo_id, event_type="issue_opened",
214 actor="testuser2", description="open issue",
215 )
216 await db_session.commit()
217
218 r = await client.get(
219 f"/api/v1/repos/{repo_id}/activity?event_type=commit_pushed",
220 )
221 assert r.status_code == 200
222 body = r.json()
223 assert body["total"] == 1
224 assert body["events"][0]["eventType"] == "commit_pushed"
225 assert body["eventTypeFilter"] == "commit_pushed"
226
227
228 # ---------------------------------------------------------------------------
229 # UI page tests
230 # ---------------------------------------------------------------------------
231
232
233 @pytest.mark.anyio
234 async def test_activity_ui_page_returns_html(
235 client: AsyncClient,
236 auth_headers: dict[str, str],
237 ) -> None:
238 """GET /{owner}/{slug}/activity returns 200 HTML."""
239 await client.post(
240 "/api/v1/repos",
241 json={"name": "ui-activity", "owner": "uiuser", "visibility": "public"},
242 headers=auth_headers,
243 )
244 response = await client.get("/uiuser/ui-activity/activity")
245 assert response.status_code == 200
246 assert "text/html" in response.headers["content-type"]
247 assert b"Activity" in response.content
248
249
250 @pytest.mark.anyio
251 async def test_activity_ui_page_unknown_repo_404(client: AsyncClient) -> None:
252 """GET /{owner}/{slug}/activity returns 404 for unknown repo."""
253 response = await client.get("/nobody/no-repo/activity")
254 assert response.status_code == 404