gabriel / musehub public
test_musehub_repos.py python
2393 lines 72.5 KB
4c818820 Delete dead music analysis layer — keep only piano roll demo (#56) Gabriel Cardona <cgcardona@gmail.com> 17h ago
1 """Tests for MuseHub repo, branch, and commit endpoints.
2
3 Covers every acceptance criterion:
4 - POST /musehub/repos returns 201 with correct fields
5 - POST requires auth — unauthenticated requests return 401
6 - GET /repos/{repo_id} returns 200; 404 for unknown repo
7 - GET /repos/{repo_id}/branches returns empty list on new repo
8 - GET /repos/{repo_id}/commits returns newest first, respects ?limit
9
10 Covers (compare view API endpoint):
11 - test_compare_radar_data — compare endpoint returns 5 dimension scores
12 - test_compare_commit_list — commits unique to head are listed
13 - test_compare_unknown_ref_404 — unknown ref returns 422
14
15 All tests use the shared ``client`` and ``auth_headers`` fixtures from conftest.py.
16 """
17 from __future__ import annotations
18
19 import pytest
20 from httpx import AsyncClient
21 from sqlalchemy.ext.asyncio import AsyncSession
22
23 from musehub.db.musehub_models import MusehubCommit, MusehubRepo
24 from musehub.services import musehub_repository
25
26
27 # ---------------------------------------------------------------------------
28 # POST /musehub/repos
29 # ---------------------------------------------------------------------------
30
31
32 @pytest.mark.anyio
33 async def test_create_repo_returns_201(
34 client: AsyncClient,
35 auth_headers: dict[str, str],
36 ) -> None:
37 """POST /musehub/repos creates a repo and returns all required fields."""
38 response = await client.post(
39 "/api/v1/repos",
40 json={"name": "my-beats", "owner": "testuser", "visibility": "private"},
41 headers=auth_headers,
42 )
43 assert response.status_code == 201
44 body = response.json()
45 assert body["name"] == "my-beats"
46 assert body["visibility"] == "private"
47 assert "repoId" in body
48 assert "cloneUrl" in body
49 assert "ownerUserId" in body
50 assert "createdAt" in body
51
52
53 @pytest.mark.anyio
54 async def test_create_repo_requires_auth(client: AsyncClient) -> None:
55 """POST /musehub/repos returns 401 without a Bearer token."""
56 response = await client.post(
57 "/api/v1/repos",
58 json={"name": "my-beats", "owner": "testuser"},
59 )
60 assert response.status_code == 401
61
62
63 @pytest.mark.anyio
64 async def test_create_repo_default_visibility_is_private(
65 client: AsyncClient,
66 auth_headers: dict[str, str],
67 ) -> None:
68 """Omitting visibility defaults to 'private'."""
69 response = await client.post(
70 "/api/v1/repos",
71 json={"name": "silent-sessions", "owner": "testuser"},
72 headers=auth_headers,
73 )
74 assert response.status_code == 201
75 assert response.json()["visibility"] == "private"
76
77
78 # ---------------------------------------------------------------------------
79 # GET /repos/{repo_id}
80 # ---------------------------------------------------------------------------
81
82
83 @pytest.mark.anyio
84 async def test_get_repo_returns_200(
85 client: AsyncClient,
86 auth_headers: dict[str, str],
87 ) -> None:
88 """GET /repos/{repo_id} returns the repo after creation."""
89 create = await client.post(
90 "/api/v1/repos",
91 json={"name": "jazz-sessions", "owner": "testuser"},
92 headers=auth_headers,
93 )
94 assert create.status_code == 201
95 repo_id = create.json()["repoId"]
96
97 response = await client.get(f"/api/v1/repos/{repo_id}", headers=auth_headers)
98 assert response.status_code == 200
99 assert response.json()["repoId"] == repo_id
100 assert response.json()["name"] == "jazz-sessions"
101
102
103 @pytest.mark.anyio
104 async def test_get_repo_not_found_returns_404(
105 client: AsyncClient,
106 auth_headers: dict[str, str],
107 ) -> None:
108 """GET /repos/{repo_id} returns 404 for unknown repo."""
109 response = await client.get(
110 "/api/v1/repos/does-not-exist",
111 headers=auth_headers,
112 )
113 assert response.status_code == 404
114
115
116 @pytest.mark.anyio
117 async def test_get_nonexistent_repo_returns_404_without_auth(client: AsyncClient) -> None:
118 """GET /repos/{repo_id} returns 404 for a non-existent repo without auth.
119
120 Uses optional_token — auth is visibility-based; missing repo → 404 before auth check.
121 """
122 response = await client.get("/api/v1/repos/non-existent-repo-id")
123 assert response.status_code == 404
124
125
126 # ---------------------------------------------------------------------------
127 # GET /repos/{repo_id}/branches
128 # ---------------------------------------------------------------------------
129
130
131 @pytest.mark.anyio
132 async def test_list_branches_empty_on_new_repo(
133 client: AsyncClient,
134 auth_headers: dict[str, str],
135 ) -> None:
136 """A newly created repo has an empty branches list when not initialized."""
137 create = await client.post(
138 "/api/v1/repos",
139 json={"name": "drum-patterns", "owner": "testuser", "initialize": False},
140 headers=auth_headers,
141 )
142 repo_id = create.json()["repoId"]
143
144 response = await client.get(
145 f"/api/v1/repos/{repo_id}/branches",
146 headers=auth_headers,
147 )
148 assert response.status_code == 200
149 assert response.json()["branches"] == []
150
151
152 @pytest.mark.anyio
153 async def test_list_branches_not_found_returns_404(
154 client: AsyncClient,
155 auth_headers: dict[str, str],
156 ) -> None:
157 """GET /branches returns 404 when the repo doesn't exist."""
158 response = await client.get(
159 "/api/v1/repos/ghost-repo/branches",
160 headers=auth_headers,
161 )
162 assert response.status_code == 404
163
164
165 # ---------------------------------------------------------------------------
166 # GET /repos/{repo_id}/commits
167 # ---------------------------------------------------------------------------
168
169
170 @pytest.mark.anyio
171 async def test_list_commits_empty_on_new_repo(
172 client: AsyncClient,
173 auth_headers: dict[str, str],
174 ) -> None:
175 """A new repo has no commits when initialize=false."""
176 create = await client.post(
177 "/api/v1/repos",
178 json={"name": "empty-repo", "owner": "testuser", "initialize": False},
179 headers=auth_headers,
180 )
181 repo_id = create.json()["repoId"]
182
183 response = await client.get(
184 f"/api/v1/repos/{repo_id}/commits",
185 headers=auth_headers,
186 )
187 assert response.status_code == 200
188 body = response.json()
189 assert body["commits"] == []
190 assert body["total"] == 0
191
192
193 @pytest.mark.anyio
194 async def test_list_commits_returns_newest_first(
195 client: AsyncClient,
196 auth_headers: dict[str, str],
197 db_session: AsyncSession,
198 ) -> None:
199 """Commits are returned newest-first after being pushed."""
200 from datetime import datetime, timezone, timedelta
201
202 # Create repo via API (no init commit so we control the full history)
203 create = await client.post(
204 "/api/v1/repos",
205 json={"name": "ordered-commits", "owner": "testuser", "initialize": False},
206 headers=auth_headers,
207 )
208 repo_id = create.json()["repoId"]
209
210 # Insert two commits directly with known timestamps
211 now = datetime.now(tz=timezone.utc)
212 older = MusehubCommit(
213 commit_id="aaa111",
214 repo_id=repo_id,
215 branch="main",
216 parent_ids=[],
217 message="first",
218 author="gabriel",
219 timestamp=now - timedelta(hours=1),
220 )
221 newer = MusehubCommit(
222 commit_id="bbb222",
223 repo_id=repo_id,
224 branch="main",
225 parent_ids=["aaa111"],
226 message="second",
227 author="gabriel",
228 timestamp=now,
229 )
230 db_session.add_all([older, newer])
231 await db_session.commit()
232
233 response = await client.get(
234 f"/api/v1/repos/{repo_id}/commits",
235 headers=auth_headers,
236 )
237 assert response.status_code == 200
238 commits = response.json()["commits"]
239 assert len(commits) == 2
240 assert commits[0]["commitId"] == "bbb222"
241 assert commits[1]["commitId"] == "aaa111"
242
243
244 @pytest.mark.anyio
245 async def test_list_commits_limit_param(
246 client: AsyncClient,
247 auth_headers: dict[str, str],
248 db_session: AsyncSession,
249 ) -> None:
250 """?limit=1 returns exactly 1 commit."""
251 from datetime import datetime, timezone, timedelta
252
253 create = await client.post(
254 "/api/v1/repos",
255 json={"name": "limited-repo", "owner": "testuser", "initialize": False},
256 headers=auth_headers,
257 )
258 repo_id = create.json()["repoId"]
259
260 now = datetime.now(tz=timezone.utc)
261 for i in range(3):
262 db_session.add(
263 MusehubCommit(
264 commit_id=f"commit-{i}",
265 repo_id=repo_id,
266 branch="main",
267 parent_ids=[],
268 message=f"commit {i}",
269 author="gabriel",
270 timestamp=now + timedelta(seconds=i),
271 )
272 )
273 await db_session.commit()
274
275 response = await client.get(
276 f"/api/v1/repos/{repo_id}/commits?limit=1",
277 headers=auth_headers,
278 )
279 assert response.status_code == 200
280 body = response.json()
281 assert len(body["commits"]) == 1
282 assert body["total"] == 3
283
284
285 # ---------------------------------------------------------------------------
286 # Service layer — direct DB tests (no HTTP)
287 # ---------------------------------------------------------------------------
288
289
290 @pytest.mark.anyio
291 async def test_create_repo_service_persists_to_db(db_session: AsyncSession) -> None:
292 """musehub_repository.create_repo() persists the row."""
293 repo = await musehub_repository.create_repo(
294 db_session,
295 name="service-test-repo",
296 owner="testuser",
297 visibility="public",
298 owner_user_id="user-abc",
299 )
300 await db_session.commit()
301
302 fetched = await musehub_repository.get_repo(db_session, repo.repo_id)
303 assert fetched is not None
304 assert fetched.name == "service-test-repo"
305 assert fetched.visibility == "public"
306
307
308 @pytest.mark.anyio
309 async def test_get_repo_returns_none_when_missing(db_session: AsyncSession) -> None:
310 """get_repo() returns None for an unknown repo_id."""
311 result = await musehub_repository.get_repo(db_session, "nonexistent-id")
312 assert result is None
313
314
315 @pytest.mark.anyio
316 async def test_list_branches_returns_empty_for_new_repo(db_session: AsyncSession) -> None:
317 """list_branches() returns [] for a repo with no branches."""
318 repo = await musehub_repository.create_repo(
319 db_session,
320 name="branchless",
321 owner="testuser",
322 visibility="private",
323 owner_user_id="user-x",
324 )
325 await db_session.commit()
326 branches = await musehub_repository.list_branches(db_session, repo.repo_id)
327 assert branches == []
328
329
330 # ---------------------------------------------------------------------------
331 # GET /repos/{repo_id}/divergence
332 # ---------------------------------------------------------------------------
333
334
335 @pytest.mark.anyio
336 async def test_divergence_endpoint_returns_five_dimensions(
337 client: AsyncClient,
338 auth_headers: dict[str, str],
339 db_session: AsyncSession,
340 ) -> None:
341 """GET /divergence returns five dimension scores with level labels."""
342 from datetime import datetime, timezone, timedelta
343
344 create = await client.post(
345 "/api/v1/repos",
346 json={"name": "divergence-test-repo", "owner": "testuser"},
347 headers=auth_headers,
348 )
349 assert create.status_code == 201
350 repo_id = create.json()["repoId"]
351
352 now = datetime.now(tz=timezone.utc)
353 db_session.add(
354 MusehubCommit(
355 commit_id="aaa-melody",
356 repo_id=repo_id,
357 branch="main",
358 parent_ids=[],
359 message="add lead melody line",
360 author="alice",
361 timestamp=now - timedelta(hours=2),
362 )
363 )
364 db_session.add(
365 MusehubCommit(
366 commit_id="bbb-chord",
367 repo_id=repo_id,
368 branch="feature",
369 parent_ids=[],
370 message="update chord progression",
371 author="bob",
372 timestamp=now - timedelta(hours=1),
373 )
374 )
375 await db_session.commit()
376
377 response = await client.get(
378 f"/api/v1/repos/{repo_id}/divergence?branch_a=main&branch_b=feature",
379 headers=auth_headers,
380 )
381 assert response.status_code == 200
382 body = response.json()
383 assert "dimensions" in body
384 assert len(body["dimensions"]) == 5
385
386 dim_names = {d["dimension"] for d in body["dimensions"]}
387 assert dim_names == {"melodic", "harmonic", "rhythmic", "structural", "dynamic"}
388
389 for dim in body["dimensions"]:
390 assert "level" in dim
391 assert dim["level"] in {"NONE", "LOW", "MED", "HIGH"}
392 assert "score" in dim
393 assert 0.0 <= dim["score"] <= 1.0
394
395
396 @pytest.mark.anyio
397 async def test_divergence_overall_score_is_mean_of_dimensions(
398 client: AsyncClient,
399 auth_headers: dict[str, str],
400 db_session: AsyncSession,
401 ) -> None:
402 """Overall divergence score equals the mean of all five dimension scores."""
403 from datetime import datetime, timezone, timedelta
404
405 create = await client.post(
406 "/api/v1/repos",
407 json={"name": "divergence-mean-repo", "owner": "testuser"},
408 headers=auth_headers,
409 )
410 repo_id = create.json()["repoId"]
411
412 now = datetime.now(tz=timezone.utc)
413 db_session.add(
414 MusehubCommit(
415 commit_id="c1-beat",
416 repo_id=repo_id,
417 branch="alpha",
418 parent_ids=[],
419 message="rework drum beat groove",
420 author="producer-a",
421 timestamp=now - timedelta(hours=3),
422 )
423 )
424 db_session.add(
425 MusehubCommit(
426 commit_id="c2-mix",
427 repo_id=repo_id,
428 branch="beta",
429 parent_ids=[],
430 message="fix master volume level",
431 author="producer-b",
432 timestamp=now - timedelta(hours=2),
433 )
434 )
435 await db_session.commit()
436
437 response = await client.get(
438 f"/api/v1/repos/{repo_id}/divergence?branch_a=alpha&branch_b=beta",
439 headers=auth_headers,
440 )
441 assert response.status_code == 200
442 body = response.json()
443
444 dims = body["dimensions"]
445 computed_mean = round(sum(d["score"] for d in dims) / len(dims), 4)
446 assert abs(body["overallScore"] - computed_mean) < 1e-6
447
448
449 @pytest.mark.anyio
450 async def test_divergence_json_response_structure(
451 client: AsyncClient,
452 auth_headers: dict[str, str],
453 db_session: AsyncSession,
454 ) -> None:
455 """JSON response has all required top-level fields and camelCase keys."""
456 from datetime import datetime, timezone, timedelta
457
458 create = await client.post(
459 "/api/v1/repos",
460 json={"name": "divergence-struct-repo", "owner": "testuser"},
461 headers=auth_headers,
462 )
463 repo_id = create.json()["repoId"]
464
465 now = datetime.now(tz=timezone.utc)
466 for i, (branch, msg) in enumerate(
467 [("main", "add melody riff"), ("dev", "update chorus section")]
468 ):
469 db_session.add(
470 MusehubCommit(
471 commit_id=f"struct-{i}",
472 repo_id=repo_id,
473 branch=branch,
474 parent_ids=[],
475 message=msg,
476 author="test",
477 timestamp=now + timedelta(seconds=i),
478 )
479 )
480 await db_session.commit()
481
482 response = await client.get(
483 f"/api/v1/repos/{repo_id}/divergence?branch_a=main&branch_b=dev",
484 headers=auth_headers,
485 )
486 assert response.status_code == 200
487 body = response.json()
488
489 assert body["repoId"] == repo_id
490 assert body["branchA"] == "main"
491 assert body["branchB"] == "dev"
492 assert "commonAncestor" in body
493 assert "overallScore" in body
494 assert isinstance(body["overallScore"], float)
495 assert isinstance(body["dimensions"], list)
496 assert len(body["dimensions"]) == 5
497
498 for dim in body["dimensions"]:
499 assert "dimension" in dim
500 assert "level" in dim
501 assert "score" in dim
502 assert "description" in dim
503 assert "branchACommits" in dim
504 assert "branchBCommits" in dim
505
506
507 @pytest.mark.anyio
508 async def test_divergence_endpoint_returns_404_for_unknown_repo(
509 client: AsyncClient,
510 auth_headers: dict[str, str],
511 ) -> None:
512 """GET /divergence returns 404 for an unknown repo."""
513 response = await client.get(
514 "/api/v1/repos/no-such-repo/divergence?branch_a=a&branch_b=b",
515 headers=auth_headers,
516 )
517 assert response.status_code == 404
518
519
520 @pytest.mark.anyio
521 async def test_divergence_endpoint_returns_422_for_empty_branch(
522 client: AsyncClient,
523 auth_headers: dict[str, str],
524 db_session: AsyncSession,
525 ) -> None:
526 """GET /divergence returns 422 when a branch has no commits."""
527 create = await client.post(
528 "/api/v1/repos",
529 json={"name": "empty-branch-repo", "owner": "testuser"},
530 headers=auth_headers,
531 )
532 repo_id = create.json()["repoId"]
533
534 response = await client.get(
535 f"/api/v1/repos/{repo_id}/divergence?branch_a=ghost&branch_b=also-ghost",
536 headers=auth_headers,
537 )
538 assert response.status_code == 422
539
540
541
542 # ---------------------------------------------------------------------------
543 # GET /repos/{repo_id}/dag
544 # ---------------------------------------------------------------------------
545
546
547 @pytest.mark.anyio
548 async def test_graph_dag_endpoint_returns_empty_for_new_repo(
549 client: AsyncClient,
550 auth_headers: dict[str, str],
551 ) -> None:
552 """GET /dag returns empty nodes/edges for a repo with no commits (initialize=false)."""
553 create = await client.post(
554 "/api/v1/repos",
555 json={"name": "dag-empty", "owner": "testuser", "initialize": False},
556 headers=auth_headers,
557 )
558 repo_id = create.json()["repoId"]
559
560 response = await client.get(
561 f"/api/v1/repos/{repo_id}/dag",
562 headers=auth_headers,
563 )
564 assert response.status_code == 200
565 body = response.json()
566 assert body["nodes"] == []
567 assert body["edges"] == []
568 assert body["headCommitId"] is None
569
570
571 @pytest.mark.anyio
572 async def test_graph_dag_has_edges(
573 client: AsyncClient,
574 auth_headers: dict[str, str],
575 db_session: AsyncSession,
576 ) -> None:
577 """DAG endpoint returns correct edges representing parent relationships."""
578 from datetime import datetime, timezone, timedelta
579
580 create = await client.post(
581 "/api/v1/repos",
582 json={"name": "dag-edges", "owner": "testuser", "initialize": False},
583 headers=auth_headers,
584 )
585 repo_id = create.json()["repoId"]
586
587 now = datetime.now(tz=timezone.utc)
588 root = MusehubCommit(
589 commit_id="root111",
590 repo_id=repo_id,
591 branch="main",
592 parent_ids=[],
593 message="root commit",
594 author="gabriel",
595 timestamp=now - timedelta(hours=2),
596 )
597 child = MusehubCommit(
598 commit_id="child222",
599 repo_id=repo_id,
600 branch="main",
601 parent_ids=["root111"],
602 message="child commit",
603 author="gabriel",
604 timestamp=now - timedelta(hours=1),
605 )
606 db_session.add_all([root, child])
607 await db_session.commit()
608
609 response = await client.get(
610 f"/api/v1/repos/{repo_id}/dag",
611 headers=auth_headers,
612 )
613 assert response.status_code == 200
614 body = response.json()
615 nodes = body["nodes"]
616 edges = body["edges"]
617
618 assert len(nodes) == 2
619 # Verify edge: child → root
620 assert any(e["source"] == "child222" and e["target"] == "root111" for e in edges)
621
622
623 @pytest.mark.anyio
624 async def test_graph_dag_endpoint_topological_order(
625 client: AsyncClient,
626 auth_headers: dict[str, str],
627 db_session: AsyncSession,
628 ) -> None:
629 """DAG endpoint returns nodes in topological order (oldest ancestor first)."""
630 from datetime import datetime, timedelta, timezone
631
632 create = await client.post(
633 "/api/v1/repos",
634 json={"name": "dag-topo", "owner": "testuser"},
635 headers=auth_headers,
636 )
637 repo_id = create.json()["repoId"]
638
639 now = datetime.now(tz=timezone.utc)
640 commits = [
641 MusehubCommit(
642 commit_id="topo-a",
643 repo_id=repo_id,
644 branch="main",
645 parent_ids=[],
646 message="root",
647 author="gabriel",
648 timestamp=now - timedelta(hours=3),
649 ),
650 MusehubCommit(
651 commit_id="topo-b",
652 repo_id=repo_id,
653 branch="main",
654 parent_ids=["topo-a"],
655 message="second",
656 author="gabriel",
657 timestamp=now - timedelta(hours=2),
658 ),
659 MusehubCommit(
660 commit_id="topo-c",
661 repo_id=repo_id,
662 branch="main",
663 parent_ids=["topo-b"],
664 message="third",
665 author="gabriel",
666 timestamp=now - timedelta(hours=1),
667 ),
668 ]
669 db_session.add_all(commits)
670 await db_session.commit()
671
672 response = await client.get(
673 f"/api/v1/repos/{repo_id}/dag",
674 headers=auth_headers,
675 )
676 assert response.status_code == 200
677 node_ids = [n["commitId"] for n in response.json()["nodes"]]
678 # Root must appear before children in topological order
679 assert node_ids.index("topo-a") < node_ids.index("topo-b")
680 assert node_ids.index("topo-b") < node_ids.index("topo-c")
681
682
683 @pytest.mark.anyio
684 async def test_graph_dag_nonexistent_repo_returns_404_without_auth(client: AsyncClient) -> None:
685 """GET /dag returns 404 for a non-existent repo without a token.
686
687 Uses optional_token — auth is visibility-based; missing repo → 404.
688 """
689 response = await client.get("/api/v1/repos/non-existent-repo/dag")
690 assert response.status_code == 404
691
692
693 @pytest.mark.anyio
694 async def test_graph_dag_404_for_unknown_repo(
695 client: AsyncClient,
696 auth_headers: dict[str, str],
697 ) -> None:
698 """GET /dag returns 404 for a non-existent repo."""
699 response = await client.get(
700 "/api/v1/repos/ghost-repo-dag/dag",
701 headers=auth_headers,
702 )
703 assert response.status_code == 404
704
705
706 @pytest.mark.anyio
707 async def test_graph_json_response_has_required_fields(
708 client: AsyncClient,
709 auth_headers: dict[str, str],
710 db_session: AsyncSession,
711 ) -> None:
712 """DAG JSON response includes nodes (with required fields) and edges arrays."""
713 from datetime import datetime, timezone
714
715 create = await client.post(
716 "/api/v1/repos",
717 json={"name": "dag-fields", "owner": "testuser"},
718 headers=auth_headers,
719 )
720 repo_id = create.json()["repoId"]
721
722 db_session.add(
723 MusehubCommit(
724 commit_id="fields-aaa",
725 repo_id=repo_id,
726 branch="main",
727 parent_ids=[],
728 message="check fields",
729 author="tester",
730 timestamp=datetime.now(tz=timezone.utc),
731 )
732 )
733 await db_session.commit()
734
735 response = await client.get(
736 f"/api/v1/repos/{repo_id}/dag",
737 headers=auth_headers,
738 )
739 assert response.status_code == 200
740 body = response.json()
741 assert "nodes" in body
742 assert "edges" in body
743 assert "headCommitId" in body
744
745 node = body["nodes"][0]
746 for field in ("commitId", "message", "author", "timestamp", "branch", "parentIds", "isHead"):
747 assert field in node, f"Missing field '{field}' in DAG node"
748
749 # ---------------------------------------------------------------------------
750 # GET /repos/{repo_id}/credits
751 # ---------------------------------------------------------------------------
752
753
754 async def _seed_credits_repo(db_session: AsyncSession) -> str:
755 """Create a repo with commits from two distinct authors and return repo_id."""
756 from datetime import datetime, timezone, timedelta
757
758 repo = MusehubRepo(name="liner-notes",
759 owner="testuser",
760 slug="liner-notes", visibility="public", owner_user_id="producer-1")
761 db_session.add(repo)
762 await db_session.flush()
763 repo_id = str(repo.repo_id)
764
765 now = datetime.now(tz=timezone.utc)
766 # Alice: 2 commits (most prolific), most recent 1 day ago
767 db_session.add(
768 MusehubCommit(
769 commit_id="alice-001",
770 repo_id=repo_id,
771 branch="main",
772 parent_ids=[],
773 message="compose the main melody",
774 author="Alice",
775 timestamp=now - timedelta(days=3),
776 )
777 )
778 db_session.add(
779 MusehubCommit(
780 commit_id="alice-002",
781 repo_id=repo_id,
782 branch="main",
783 parent_ids=["alice-001"],
784 message="mix the final arrangement",
785 author="Alice",
786 timestamp=now - timedelta(days=1),
787 )
788 )
789 # Bob: 1 commit, last active 5 days ago
790 db_session.add(
791 MusehubCommit(
792 commit_id="bob-001",
793 repo_id=repo_id,
794 branch="main",
795 parent_ids=[],
796 message="arrange the bridge section",
797 author="Bob",
798 timestamp=now - timedelta(days=5),
799 )
800 )
801 await db_session.commit()
802 return repo_id
803
804
805 @pytest.mark.anyio
806 async def test_credits_aggregation(
807 client: AsyncClient,
808 db_session: AsyncSession,
809 auth_headers: dict[str, str],
810 ) -> None:
811 """GET /api/v1/repos/{repo_id}/credits aggregates contributors from commits."""
812 repo_id = await _seed_credits_repo(db_session)
813 response = await client.get(
814 f"/api/v1/repos/{repo_id}/credits",
815 headers=auth_headers,
816 )
817 assert response.status_code == 200
818 body = response.json()
819 assert body["totalContributors"] == 2
820 authors = {c["author"] for c in body["contributors"]}
821 assert "Alice" in authors
822 assert "Bob" in authors
823
824
825 @pytest.mark.anyio
826 async def test_credits_sorted_by_count(
827 client: AsyncClient,
828 db_session: AsyncSession,
829 auth_headers: dict[str, str],
830 ) -> None:
831 """Default sort (count) puts the most prolific contributor first."""
832 repo_id = await _seed_credits_repo(db_session)
833 response = await client.get(
834 f"/api/v1/repos/{repo_id}/credits?sort=count",
835 headers=auth_headers,
836 )
837 assert response.status_code == 200
838 contributors = response.json()["contributors"]
839 assert contributors[0]["author"] == "Alice"
840 assert contributors[0]["sessionCount"] == 2
841
842
843 @pytest.mark.anyio
844 async def test_credits_sorted_by_recency(
845 client: AsyncClient,
846 db_session: AsyncSession,
847 auth_headers: dict[str, str],
848 ) -> None:
849 """sort=recency puts the most recently active contributor first."""
850 repo_id = await _seed_credits_repo(db_session)
851 response = await client.get(
852 f"/api/v1/repos/{repo_id}/credits?sort=recency",
853 headers=auth_headers,
854 )
855 assert response.status_code == 200
856 contributors = response.json()["contributors"]
857 # Alice has a commit 1 day ago; Bob's last was 5 days ago
858 assert contributors[0]["author"] == "Alice"
859
860
861 @pytest.mark.anyio
862 async def test_credits_sorted_by_alpha(
863 client: AsyncClient,
864 db_session: AsyncSession,
865 auth_headers: dict[str, str],
866 ) -> None:
867 """sort=alpha returns contributors in alphabetical order."""
868 repo_id = await _seed_credits_repo(db_session)
869 response = await client.get(
870 f"/api/v1/repos/{repo_id}/credits?sort=alpha",
871 headers=auth_headers,
872 )
873 assert response.status_code == 200
874 contributors = response.json()["contributors"]
875 authors = [c["author"] for c in contributors]
876 assert authors == sorted(authors, key=str.lower)
877
878
879 @pytest.mark.anyio
880 async def test_credits_contribution_types_inferred(
881 client: AsyncClient,
882 db_session: AsyncSession,
883 auth_headers: dict[str, str],
884 ) -> None:
885 """Contribution types are inferred from commit messages."""
886 repo_id = await _seed_credits_repo(db_session)
887 response = await client.get(
888 f"/api/v1/repos/{repo_id}/credits",
889 headers=auth_headers,
890 )
891 assert response.status_code == 200
892 contributors = response.json()["contributors"]
893 alice = next(c for c in contributors if c["author"] == "Alice")
894 # Alice's commits mention "compose" and "mix"
895 types = set(alice["contributionTypes"])
896 assert len(types) > 0
897
898
899 @pytest.mark.anyio
900 async def test_credits_404_for_unknown_repo(
901 client: AsyncClient,
902 auth_headers: dict[str, str],
903 ) -> None:
904 """GET /api/v1/repos/{unknown}/credits returns 404."""
905 response = await client.get(
906 "/api/v1/repos/does-not-exist/credits",
907 headers=auth_headers,
908 )
909 assert response.status_code == 404
910
911
912 @pytest.mark.anyio
913 async def test_credits_requires_auth(
914 client: AsyncClient,
915 db_session: AsyncSession,
916 ) -> None:
917 """GET /api/v1/repos/{repo_id}/credits returns 401 without JWT."""
918 repo = MusehubRepo(name="auth-test-repo",
919 owner="testuser",
920 slug="auth-test-repo", visibility="private", owner_user_id="u1")
921 db_session.add(repo)
922 await db_session.commit()
923 await db_session.refresh(repo)
924 response = await client.get(f"/api/v1/repos/{repo.repo_id}/credits")
925 assert response.status_code == 401
926
927
928 @pytest.mark.anyio
929 async def test_credits_invalid_sort_param(
930 client: AsyncClient,
931 db_session: AsyncSession,
932 auth_headers: dict[str, str],
933 ) -> None:
934 """GET /api/v1/repos/{repo_id}/credits with invalid sort returns 422."""
935 repo = MusehubRepo(name="sort-test",
936 owner="testuser",
937 slug="sort-test", visibility="private", owner_user_id="u1")
938 db_session.add(repo)
939 await db_session.commit()
940 await db_session.refresh(repo)
941 response = await client.get(
942 f"/api/v1/repos/{repo.repo_id}/credits?sort=invalid",
943 headers=auth_headers,
944 )
945 assert response.status_code == 422
946
947
948 @pytest.mark.anyio
949 async def test_credits_aggregation_service_direct(db_session: AsyncSession) -> None:
950 """musehub_credits.aggregate_credits() returns correct data without HTTP layer."""
951 from datetime import datetime, timezone
952
953 from musehub.services import musehub_credits
954
955 repo = MusehubRepo(name="direct-test",
956 owner="testuser",
957 slug="direct-test", visibility="private", owner_user_id="u1")
958 db_session.add(repo)
959 await db_session.flush()
960 repo_id = str(repo.repo_id)
961
962 now = datetime.now(tz=timezone.utc)
963 db_session.add(
964 MusehubCommit(
965 commit_id="svc-001",
966 repo_id=repo_id,
967 branch="main",
968 parent_ids=[],
969 message="produce and mix the drop",
970 author="Charlie",
971 timestamp=now,
972 )
973 )
974 await db_session.commit()
975
976 result = await musehub_credits.aggregate_credits(db_session, repo_id, sort="count")
977 assert result.total_contributors == 1
978 assert result.contributors[0].author == "Charlie"
979 assert result.contributors[0].session_count == 1
980
981
982 # ---------------------------------------------------------------------------
983 # Compare endpoint
984 # ---------------------------------------------------------------------------
985
986
987 async def _make_compare_repo(
988 db_session: AsyncSession,
989 client: AsyncClient,
990 auth_headers: dict[str, str],
991 ) -> str:
992 """Seed a repo with commits on two branches and return repo_id."""
993 from datetime import datetime, timezone
994
995 create = await client.post(
996 "/api/v1/repos",
997 json={"name": "compare-test", "owner": "testuser", "visibility": "private"},
998 headers=auth_headers,
999 )
1000 assert create.status_code == 201
1001 repo_id: str = str(create.json()["repoId"])
1002
1003 now = datetime.now(tz=timezone.utc)
1004 db_session.add(
1005 MusehubCommit(
1006 commit_id="base001",
1007 repo_id=repo_id,
1008 branch="main",
1009 parent_ids=[],
1010 message="add melody line",
1011 author="Alice",
1012 timestamp=now,
1013 )
1014 )
1015 db_session.add(
1016 MusehubCommit(
1017 commit_id="head001",
1018 repo_id=repo_id,
1019 branch="feature",
1020 parent_ids=["base001"],
1021 message="add chord progression",
1022 author="Bob",
1023 timestamp=now,
1024 )
1025 )
1026 await db_session.commit()
1027 return repo_id
1028
1029
1030 @pytest.mark.anyio
1031 async def test_compare_radar_data(
1032 client: AsyncClient,
1033 db_session: AsyncSession,
1034 auth_headers: dict[str, str],
1035 ) -> None:
1036 """GET /api/v1/repos/{id}/compare returns 5 dimension scores."""
1037 repo_id = await _make_compare_repo(db_session, client, auth_headers)
1038 response = await client.get(
1039 f"/api/v1/repos/{repo_id}/compare?base=main&head=feature",
1040 headers=auth_headers,
1041 )
1042 assert response.status_code == 200
1043 body = response.json()
1044 assert "dimensions" in body
1045 assert len(body["dimensions"]) == 5
1046 expected_dims = {"melodic", "harmonic", "rhythmic", "structural", "dynamic"}
1047 found_dims = {d["dimension"] for d in body["dimensions"]}
1048 assert found_dims == expected_dims
1049 for dim in body["dimensions"]:
1050 assert 0.0 <= dim["score"] <= 1.0
1051 assert dim["level"] in ("NONE", "LOW", "MED", "HIGH")
1052 assert "overallScore" in body
1053 assert 0.0 <= body["overallScore"] <= 1.0
1054
1055
1056 @pytest.mark.anyio
1057 async def test_compare_commit_list(
1058 client: AsyncClient,
1059 db_session: AsyncSession,
1060 auth_headers: dict[str, str],
1061 ) -> None:
1062 """Commits unique to head are listed in the compare response."""
1063 repo_id = await _make_compare_repo(db_session, client, auth_headers)
1064 response = await client.get(
1065 f"/api/v1/repos/{repo_id}/compare?base=main&head=feature",
1066 headers=auth_headers,
1067 )
1068 assert response.status_code == 200
1069 body = response.json()
1070 assert "commits" in body
1071 # head001 is on feature but not on main
1072 commit_ids = [c["commitId"] for c in body["commits"]]
1073 assert "head001" in commit_ids
1074 # base001 is on main so should NOT appear as unique to head
1075 assert "base001" not in commit_ids
1076
1077
1078 @pytest.mark.anyio
1079 async def test_compare_unknown_ref_422(
1080 client: AsyncClient,
1081 db_session: AsyncSession,
1082 auth_headers: dict[str, str],
1083 ) -> None:
1084 """Unknown ref (branch with no commits) returns 422."""
1085 create = await client.post(
1086 "/api/v1/repos",
1087 json={"name": "empty-compare", "owner": "testuser", "visibility": "private"},
1088 headers=auth_headers,
1089 )
1090 assert create.status_code == 201
1091 repo_id = create.json()["repoId"]
1092 response = await client.get(
1093 f"/api/v1/repos/{repo_id}/compare?base=nonexistent&head=alsoabsent",
1094 headers=auth_headers,
1095 )
1096 assert response.status_code == 422
1097
1098
1099 @pytest.mark.anyio
1100 async def test_compare_emotion_diff_fields(
1101 client: AsyncClient,
1102 db_session: AsyncSession,
1103 auth_headers: dict[str, str],
1104 ) -> None:
1105 """Compare response includes emotion diff with required delta fields."""
1106 repo_id = await _make_compare_repo(db_session, client, auth_headers)
1107 response = await client.get(
1108 f"/api/v1/repos/{repo_id}/compare?base=main&head=feature",
1109 headers=auth_headers,
1110 )
1111 assert response.status_code == 200
1112 body = response.json()
1113 assert "emotionDiff" in body
1114 ed = body["emotionDiff"]
1115 for field in ("energyDelta", "valenceDelta", "tensionDelta", "darknessDelta"):
1116 assert field in ed
1117 assert -1.0 <= ed[field] <= 1.0
1118 for field in ("baseEnergy", "headEnergy", "baseValence", "headValence"):
1119 assert field in ed
1120 assert 0.0 <= ed[field] <= 1.0
1121
1122
1123
1124 # ---------------------------------------------------------------------------
1125 # Star/Fork endpoints — # ---------------------------------------------------------------------------
1126
1127
1128 @pytest.mark.anyio
1129 async def test_star_repo_increases_star_count(
1130 client: AsyncClient,
1131 db_session: AsyncSession,
1132 auth_headers: dict[str, str],
1133 ) -> None:
1134 """POST /repos/{repo_id}/star stars the repo and returns starred=True with count=1."""
1135 repo = MusehubRepo(
1136 name="star-test",
1137 owner="testuser",
1138 slug="star-test",
1139 visibility="public",
1140 owner_user_id="u1",
1141 )
1142 db_session.add(repo)
1143 await db_session.commit()
1144 await db_session.refresh(repo)
1145 repo_id = str(repo.repo_id)
1146
1147 resp = await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1148 assert resp.status_code == 200
1149 body = resp.json()
1150 assert body["starred"] is True
1151 assert body["starCount"] == 1
1152
1153
1154 @pytest.mark.anyio
1155 async def test_unstar_repo_decreases_star_count(
1156 client: AsyncClient,
1157 db_session: AsyncSession,
1158 auth_headers: dict[str, str],
1159 ) -> None:
1160 """DELETE /repos/{repo_id}/star removes the star — returns starred=False with count=0."""
1161 repo = MusehubRepo(
1162 name="unstar-test",
1163 owner="testuser",
1164 slug="unstar-test",
1165 visibility="public",
1166 owner_user_id="u1",
1167 )
1168 db_session.add(repo)
1169 await db_session.commit()
1170 await db_session.refresh(repo)
1171 repo_id = str(repo.repo_id)
1172
1173 await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1174 resp = await client.delete(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1175 assert resp.status_code == 200
1176 body = resp.json()
1177 assert body["starred"] is False
1178 assert body["starCount"] == 0
1179
1180
1181 @pytest.mark.anyio
1182 async def test_star_idempotent_double_call(
1183 client: AsyncClient,
1184 db_session: AsyncSession,
1185 auth_headers: dict[str, str],
1186 ) -> None:
1187 """POST /star twice leaves count=1 (idempotent add — not a toggle)."""
1188 repo = MusehubRepo(
1189 name="idempotent-star",
1190 owner="testuser",
1191 slug="idempotent-star",
1192 visibility="public",
1193 owner_user_id="u1",
1194 )
1195 db_session.add(repo)
1196 await db_session.commit()
1197 await db_session.refresh(repo)
1198 repo_id = str(repo.repo_id)
1199
1200 await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1201 resp = await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1202 assert resp.status_code == 200
1203 body = resp.json()
1204 assert body["starred"] is True
1205 assert body["starCount"] == 1
1206
1207
1208 @pytest.mark.anyio
1209 async def test_star_requires_auth(client: AsyncClient, db_session: AsyncSession) -> None:
1210 """POST /repos/{repo_id}/star returns 401 without a Bearer token."""
1211 repo = MusehubRepo(
1212 name="auth-star-test",
1213 owner="testuser",
1214 slug="auth-star-test",
1215 visibility="public",
1216 owner_user_id="u1",
1217 )
1218 db_session.add(repo)
1219 await db_session.commit()
1220 await db_session.refresh(repo)
1221 repo_id = str(repo.repo_id)
1222
1223 resp = await client.post(f"/api/v1/repos/{repo_id}/star")
1224 assert resp.status_code == 401
1225
1226
1227 @pytest.mark.anyio
1228 async def test_fork_repo_creates_fork_under_user(
1229 client: AsyncClient,
1230 db_session: AsyncSession,
1231 auth_headers: dict[str, str],
1232 ) -> None:
1233 """POST /repos/{repo_id}/fork creates a fork and returns lineage metadata."""
1234 repo = MusehubRepo(
1235 name="fork-source",
1236 owner="original-owner",
1237 slug="fork-source",
1238 visibility="public",
1239 owner_user_id="u-original",
1240 description="The original",
1241 )
1242 db_session.add(repo)
1243 await db_session.commit()
1244 await db_session.refresh(repo)
1245 repo_id = str(repo.repo_id)
1246
1247 resp = await client.post(f"/api/v1/repos/{repo_id}/fork", headers=auth_headers)
1248 assert resp.status_code == 201
1249 body = resp.json()
1250 # social.py returns ForkResponse (snake_case from_attributes model)
1251 assert body["source_repo_id"] == repo_id
1252 assert "fork_repo_id" in body
1253 assert body["fork_repo_id"] != repo_id
1254
1255
1256 @pytest.mark.anyio
1257 async def test_fork_preserves_branches(
1258 client: AsyncClient,
1259 db_session: AsyncSession,
1260 auth_headers: dict[str, str],
1261 ) -> None:
1262 """Forking a repo with branches copies branch pointers into the new fork.
1263
1264 Note: commit_id is a global PK so commits cannot be duplicated across repos.
1265 The fork shares the lineage link (MusehubFork) and inherits branch pointers.
1266 """
1267 from musehub.db.musehub_models import MusehubBranch
1268
1269 repo = MusehubRepo(
1270 name="fork-with-branches",
1271 owner="src-owner",
1272 slug="fork-with-branches",
1273 visibility="public",
1274 owner_user_id="u-src",
1275 )
1276 db_session.add(repo)
1277 await db_session.commit()
1278 await db_session.refresh(repo)
1279 repo_id = str(repo.repo_id)
1280
1281 branch = MusehubBranch(
1282 repo_id=repo_id,
1283 name="main",
1284 head_commit_id=None,
1285 )
1286 db_session.add(branch)
1287 await db_session.commit()
1288
1289 resp = await client.post(f"/api/v1/repos/{repo_id}/fork", headers=auth_headers)
1290 assert resp.status_code == 201
1291 fork_repo_id = resp.json()["fork_repo_id"]
1292
1293 branches_resp = await client.get(
1294 f"/api/v1/repos/{fork_repo_id}/branches", headers=auth_headers
1295 )
1296 assert branches_resp.status_code == 200
1297 branches = branches_resp.json().get("branches", [])
1298 assert any(b["name"] == "main" for b in branches)
1299
1300
1301 @pytest.mark.anyio
1302 async def test_list_stargazers_returns_starrers(
1303 client: AsyncClient,
1304 db_session: AsyncSession,
1305 auth_headers: dict[str, str],
1306 ) -> None:
1307 """GET /repos/{repo_id}/stargazers returns user_id of the starring user."""
1308 repo = MusehubRepo(
1309 name="stargazers-test",
1310 owner="testuser",
1311 slug="stargazers-test",
1312 visibility="public",
1313 owner_user_id="u1",
1314 )
1315 db_session.add(repo)
1316 await db_session.commit()
1317 await db_session.refresh(repo)
1318 repo_id = str(repo.repo_id)
1319
1320 await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1321
1322 resp = await client.get(f"/api/v1/repos/{repo_id}/stargazers")
1323 assert resp.status_code == 200
1324 body = resp.json()
1325 assert body["total"] == 1
1326 assert len(body["stargazers"]) == 1
1327
1328
1329 @pytest.mark.anyio
1330 async def test_list_forks_returns_fork_entry(
1331 client: AsyncClient,
1332 db_session: AsyncSession,
1333 auth_headers: dict[str, str],
1334 ) -> None:
1335 """GET /repos/{repo_id}/forks returns the fork entry after forking."""
1336 repo = MusehubRepo(
1337 name="forks-list-test",
1338 owner="original",
1339 slug="forks-list-test",
1340 visibility="public",
1341 owner_user_id="u-orig",
1342 )
1343 db_session.add(repo)
1344 await db_session.commit()
1345 await db_session.refresh(repo)
1346 repo_id = str(repo.repo_id)
1347
1348 await client.post(f"/api/v1/repos/{repo_id}/fork", headers=auth_headers)
1349
1350 resp = await client.get(f"/api/v1/repos/{repo_id}/forks")
1351 assert resp.status_code == 200
1352 # social.py returns list[ForkResponse] (a JSON array)
1353 body = resp.json()
1354 assert isinstance(body, list)
1355 assert len(body) == 1
1356 assert body[0]["source_repo_id"] == repo_id
1357
1358
1359 # ---------------------------------------------------------------------------
1360 # GET /repos/{repo_id}/settings
1361 # ---------------------------------------------------------------------------
1362
1363 TEST_OWNER_USER_ID = "550e8400-e29b-41d4-a716-446655440000"
1364
1365
1366 @pytest.mark.anyio
1367 async def test_get_repo_settings_returns_defaults(
1368 client: AsyncClient,
1369 db_session: AsyncSession,
1370 auth_headers: dict[str, str],
1371 ) -> None:
1372 """GET /repos/{repo_id}/settings returns full settings with canonical defaults."""
1373 repo = MusehubRepo(
1374 name="settings-get-test",
1375 owner="testuser",
1376 slug="settings-get-test",
1377 visibility="private",
1378 owner_user_id=TEST_OWNER_USER_ID,
1379 )
1380 db_session.add(repo)
1381 await db_session.commit()
1382 await db_session.refresh(repo)
1383
1384 resp = await client.get(
1385 f"/api/v1/repos/{repo.repo_id}/settings",
1386 headers=auth_headers,
1387 )
1388 assert resp.status_code == 200
1389 body = resp.json()
1390 assert body["name"] == "settings-get-test"
1391 assert body["visibility"] == "private"
1392 assert body["hasIssues"] is True
1393 assert body["allowMergeCommit"] is True
1394 assert body["allowRebaseMerge"] is False
1395 assert body["deleteBranchOnMerge"] is True
1396 assert body["defaultBranch"] == "main"
1397
1398
1399 @pytest.mark.anyio
1400 async def test_get_repo_settings_requires_auth(
1401 client: AsyncClient,
1402 db_session: AsyncSession,
1403 ) -> None:
1404 """GET /repos/{repo_id}/settings returns 401 without a Bearer token."""
1405 repo = MusehubRepo(
1406 name="settings-noauth",
1407 owner="testuser",
1408 slug="settings-noauth",
1409 visibility="private",
1410 owner_user_id=TEST_OWNER_USER_ID,
1411 )
1412 db_session.add(repo)
1413 await db_session.commit()
1414 await db_session.refresh(repo)
1415
1416 resp = await client.get(f"/api/v1/repos/{repo.repo_id}/settings")
1417 assert resp.status_code == 401
1418
1419
1420 @pytest.mark.anyio
1421 async def test_get_repo_settings_returns_403_for_non_admin(
1422 client: AsyncClient,
1423 db_session: AsyncSession,
1424 auth_headers: dict[str, str],
1425 ) -> None:
1426 """GET /repos/{repo_id}/settings returns 403 when caller is not owner or admin."""
1427 repo = MusehubRepo(
1428 name="settings-403-test",
1429 owner="other-owner",
1430 slug="settings-403-test",
1431 visibility="public",
1432 owner_user_id="other-user-id-not-test",
1433 )
1434 db_session.add(repo)
1435 await db_session.commit()
1436 await db_session.refresh(repo)
1437
1438 resp = await client.get(
1439 f"/api/v1/repos/{repo.repo_id}/settings",
1440 headers=auth_headers,
1441 )
1442 assert resp.status_code == 403
1443
1444
1445 @pytest.mark.anyio
1446 async def test_get_repo_settings_returns_404_for_unknown_repo(
1447 client: AsyncClient,
1448 auth_headers: dict[str, str],
1449 ) -> None:
1450 """GET /repos/{repo_id}/settings returns 404 for a non-existent repo."""
1451 resp = await client.get(
1452 "/api/v1/repos/nonexistent-repo-id/settings",
1453 headers=auth_headers,
1454 )
1455 assert resp.status_code == 404
1456
1457
1458 # ---------------------------------------------------------------------------
1459 # PATCH /repos/{repo_id}/settings
1460 # ---------------------------------------------------------------------------
1461
1462
1463 @pytest.mark.anyio
1464 async def test_patch_repo_settings_updates_fields(
1465 client: AsyncClient,
1466 db_session: AsyncSession,
1467 auth_headers: dict[str, str],
1468 ) -> None:
1469 """PATCH /repos/{repo_id}/settings owner can update dedicated and flag fields."""
1470 repo = MusehubRepo(
1471 name="settings-patch-test",
1472 owner="testuser",
1473 slug="settings-patch-test",
1474 visibility="private",
1475 owner_user_id=TEST_OWNER_USER_ID,
1476 )
1477 db_session.add(repo)
1478 await db_session.commit()
1479 await db_session.refresh(repo)
1480
1481 resp = await client.patch(
1482 f"/api/v1/repos/{repo.repo_id}/settings",
1483 json={
1484 "description": "Updated description",
1485 "visibility": "public",
1486 "hasIssues": False,
1487 "allowRebaseMerge": True,
1488 "homepageUrl": "https://muse.app",
1489 "topics": ["classical", "baroque"],
1490 },
1491 headers=auth_headers,
1492 )
1493 assert resp.status_code == 200
1494 body = resp.json()
1495 assert body["description"] == "Updated description"
1496 assert body["visibility"] == "public"
1497 assert body["hasIssues"] is False
1498 assert body["allowRebaseMerge"] is True
1499 assert body["homepageUrl"] == "https://muse.app"
1500 assert body["topics"] == ["classical", "baroque"]
1501 # Untouched field should retain its default
1502 assert body["allowMergeCommit"] is True
1503
1504
1505 @pytest.mark.anyio
1506 async def test_patch_repo_settings_partial_update_preserves_other_fields(
1507 client: AsyncClient,
1508 db_session: AsyncSession,
1509 auth_headers: dict[str, str],
1510 ) -> None:
1511 """PATCH with a single field leaves all other settings unchanged."""
1512 repo = MusehubRepo(
1513 name="settings-partial-test",
1514 owner="testuser",
1515 slug="settings-partial-test",
1516 visibility="private",
1517 owner_user_id=TEST_OWNER_USER_ID,
1518 )
1519 db_session.add(repo)
1520 await db_session.commit()
1521 await db_session.refresh(repo)
1522
1523 resp = await client.patch(
1524 f"/api/v1/repos/{repo.repo_id}/settings",
1525 json={"defaultBranch": "develop"},
1526 headers=auth_headers,
1527 )
1528 assert resp.status_code == 200
1529 body = resp.json()
1530 assert body["defaultBranch"] == "develop"
1531 # Other fields kept
1532 assert body["name"] == "settings-partial-test"
1533 assert body["visibility"] == "private"
1534 assert body["hasIssues"] is True
1535
1536
1537 @pytest.mark.anyio
1538 async def test_patch_repo_settings_requires_auth(
1539 client: AsyncClient,
1540 db_session: AsyncSession,
1541 ) -> None:
1542 """PATCH /repos/{repo_id}/settings returns 401 without a Bearer token."""
1543 repo = MusehubRepo(
1544 name="settings-patch-noauth",
1545 owner="testuser",
1546 slug="settings-patch-noauth",
1547 visibility="private",
1548 owner_user_id=TEST_OWNER_USER_ID,
1549 )
1550 db_session.add(repo)
1551 await db_session.commit()
1552 await db_session.refresh(repo)
1553
1554 resp = await client.patch(
1555 f"/api/v1/repos/{repo.repo_id}/settings",
1556 json={"visibility": "public"},
1557 )
1558 assert resp.status_code == 401
1559
1560
1561 @pytest.mark.anyio
1562 async def test_patch_repo_settings_returns_403_for_non_admin(
1563 client: AsyncClient,
1564 db_session: AsyncSession,
1565 auth_headers: dict[str, str],
1566 ) -> None:
1567 """PATCH /repos/{repo_id}/settings returns 403 when caller is not owner or admin."""
1568 repo = MusehubRepo(
1569 name="settings-patch-403",
1570 owner="other-owner",
1571 slug="settings-patch-403",
1572 visibility="public",
1573 owner_user_id="other-user-not-test",
1574 )
1575 db_session.add(repo)
1576 await db_session.commit()
1577 await db_session.refresh(repo)
1578
1579 resp = await client.patch(
1580 f"/api/v1/repos/{repo.repo_id}/settings",
1581 json={"hasWiki": True},
1582 headers=auth_headers,
1583 )
1584 assert resp.status_code == 403
1585
1586
1587 # ---------------------------------------------------------------------------
1588 # DELETE /repos/{repo_id} — soft-delete
1589 # ---------------------------------------------------------------------------
1590
1591
1592 @pytest.mark.anyio
1593 async def test_delete_repo_returns_204(
1594 client: AsyncClient,
1595 auth_headers: dict[str, str],
1596 ) -> None:
1597 """DELETE /repos/{repo_id} soft-deletes a repo owned by the caller and returns 204."""
1598 create = await client.post(
1599 "/api/v1/repos",
1600 json={"name": "to-delete", "owner": "testuser", "visibility": "private"},
1601 headers=auth_headers,
1602 )
1603 assert create.status_code == 201
1604 repo_id = create.json()["repoId"]
1605
1606 resp = await client.delete(f"/api/v1/repos/{repo_id}", headers=auth_headers)
1607 assert resp.status_code == 204
1608
1609
1610 @pytest.mark.anyio
1611 async def test_delete_repo_hides_repo_from_get(
1612 client: AsyncClient,
1613 auth_headers: dict[str, str],
1614 ) -> None:
1615 """After DELETE, GET /repos/{repo_id} returns 404."""
1616 create = await client.post(
1617 "/api/v1/repos",
1618 json={"name": "hidden-after-delete", "owner": "testuser", "visibility": "private"},
1619 headers=auth_headers,
1620 )
1621 repo_id = create.json()["repoId"]
1622
1623 await client.delete(f"/api/v1/repos/{repo_id}", headers=auth_headers)
1624
1625 get_resp = await client.get(f"/api/v1/repos/{repo_id}", headers=auth_headers)
1626 assert get_resp.status_code == 404
1627
1628
1629 @pytest.mark.anyio
1630 async def test_delete_repo_requires_auth(
1631 client: AsyncClient,
1632 db_session: AsyncSession,
1633 ) -> None:
1634 """DELETE /repos/{repo_id} returns 401 without a Bearer token."""
1635 repo = MusehubRepo(
1636 name="delete-noauth",
1637 owner="testuser",
1638 slug="delete-noauth",
1639 visibility="public",
1640 owner_user_id=TEST_OWNER_USER_ID,
1641 )
1642 db_session.add(repo)
1643 await db_session.commit()
1644 await db_session.refresh(repo)
1645
1646 resp = await client.delete(f"/api/v1/repos/{repo.repo_id}")
1647 assert resp.status_code == 401
1648
1649
1650 @pytest.mark.anyio
1651 async def test_delete_repo_returns_403_for_non_owner(
1652 client: AsyncClient,
1653 db_session: AsyncSession,
1654 auth_headers: dict[str, str],
1655 ) -> None:
1656 """DELETE /repos/{repo_id} returns 403 when caller is not the owner."""
1657 repo = MusehubRepo(
1658 name="delete-403",
1659 owner="other-owner",
1660 slug="delete-403",
1661 visibility="public",
1662 owner_user_id="some-other-user-id",
1663 )
1664 db_session.add(repo)
1665 await db_session.commit()
1666 await db_session.refresh(repo)
1667
1668 resp = await client.delete(
1669 f"/api/v1/repos/{repo.repo_id}", headers=auth_headers
1670 )
1671 assert resp.status_code == 403
1672
1673
1674 @pytest.mark.anyio
1675 async def test_delete_repo_returns_404_for_unknown_repo(
1676 client: AsyncClient,
1677 auth_headers: dict[str, str],
1678 ) -> None:
1679 """DELETE /repos/{repo_id} returns 404 for a non-existent repo."""
1680 resp = await client.delete(
1681 "/api/v1/repos/nonexistent-repo-id", headers=auth_headers
1682 )
1683 assert resp.status_code == 404
1684
1685
1686 @pytest.mark.anyio
1687 async def test_delete_repo_service_sets_deleted_at(
1688 db_session: AsyncSession,
1689 ) -> None:
1690 """delete_repo() service sets deleted_at on the row."""
1691 repo = await musehub_repository.create_repo(
1692 db_session,
1693 name="svc-delete-test",
1694 owner="testuser",
1695 visibility="private",
1696 owner_user_id="user-abc",
1697 )
1698 await db_session.commit()
1699
1700 deleted = await musehub_repository.delete_repo(db_session, repo.repo_id)
1701 await db_session.commit()
1702
1703 assert deleted is True
1704 # get_repo should return None for soft-deleted repos
1705 fetched = await musehub_repository.get_repo(db_session, repo.repo_id)
1706 assert fetched is None
1707
1708
1709 @pytest.mark.anyio
1710 async def test_delete_repo_service_returns_false_for_unknown(
1711 db_session: AsyncSession,
1712 ) -> None:
1713 """delete_repo() returns False for a non-existent repo."""
1714 result = await musehub_repository.delete_repo(db_session, "does-not-exist")
1715 assert result is False
1716
1717
1718 # ---------------------------------------------------------------------------
1719 # POST /repos/{repo_id}/transfer — transfer ownership
1720 # ---------------------------------------------------------------------------
1721
1722
1723 @pytest.mark.anyio
1724 async def test_transfer_repo_ownership_returns_200(
1725 client: AsyncClient,
1726 auth_headers: dict[str, str],
1727 ) -> None:
1728 """POST /repos/{repo_id}/transfer returns 200 with updated ownerUserId."""
1729 create = await client.post(
1730 "/api/v1/repos",
1731 json={"name": "transfer-me", "owner": "testuser", "visibility": "private"},
1732 headers=auth_headers,
1733 )
1734 assert create.status_code == 201
1735 repo_id = create.json()["repoId"]
1736 new_owner = "another-user-uuid-1234"
1737
1738 resp = await client.post(
1739 f"/api/v1/repos/{repo_id}/transfer",
1740 json={"newOwnerUserId": new_owner},
1741 headers=auth_headers,
1742 )
1743 assert resp.status_code == 200
1744 body = resp.json()
1745 assert body["ownerUserId"] == new_owner
1746 assert body["repoId"] == repo_id
1747
1748
1749 @pytest.mark.anyio
1750 async def test_transfer_repo_requires_auth(
1751 client: AsyncClient,
1752 db_session: AsyncSession,
1753 ) -> None:
1754 """POST /repos/{repo_id}/transfer returns 401 without a Bearer token."""
1755 repo = MusehubRepo(
1756 name="transfer-noauth",
1757 owner="testuser",
1758 slug="transfer-noauth",
1759 visibility="public",
1760 owner_user_id=TEST_OWNER_USER_ID,
1761 )
1762 db_session.add(repo)
1763 await db_session.commit()
1764 await db_session.refresh(repo)
1765
1766 resp = await client.post(
1767 f"/api/v1/repos/{repo.repo_id}/transfer",
1768 json={"newOwnerUserId": "new-user-id"},
1769 )
1770 assert resp.status_code == 401
1771
1772
1773 # ---------------------------------------------------------------------------
1774 # Wizard creation endpoint — # ---------------------------------------------------------------------------
1775
1776
1777 @pytest.mark.anyio
1778 async def test_create_repo_wizard_initialize_creates_branch_and_commit(
1779 client: AsyncClient,
1780 auth_headers: dict[str, str],
1781 db_session: AsyncSession,
1782 ) -> None:
1783 """POST /repos with initialize=true creates a default branch + initial commit."""
1784 resp = await client.post(
1785 "/api/v1/repos",
1786 json={
1787 "name": "wizard-init-repo",
1788 "owner": "testuser",
1789 "visibility": "public",
1790 "initialize": True,
1791 "defaultBranch": "main",
1792 },
1793 headers=auth_headers,
1794 )
1795 assert resp.status_code == 201
1796 repo_id = resp.json()["repoId"]
1797
1798 branches_resp = await client.get(
1799 f"/api/v1/repos/{repo_id}/branches",
1800 headers=auth_headers,
1801 )
1802 assert branches_resp.status_code == 200
1803 branches = branches_resp.json()["branches"]
1804 assert any(b["name"] == "main" for b in branches), "Expected 'main' branch to be created"
1805
1806 commits_resp = await client.get(
1807 f"/api/v1/repos/{repo_id}/commits",
1808 headers=auth_headers,
1809 )
1810 assert commits_resp.status_code == 200
1811 commits = commits_resp.json()["commits"]
1812 assert len(commits) == 1
1813 assert commits[0]["message"] == "Initial commit"
1814
1815
1816 @pytest.mark.anyio
1817 async def test_create_repo_wizard_no_initialize_stays_empty(
1818 client: AsyncClient,
1819 auth_headers: dict[str, str],
1820 ) -> None:
1821 """POST /repos with initialize=false leaves branches and commits empty."""
1822 resp = await client.post(
1823 "/api/v1/repos",
1824 json={
1825 "name": "wizard-noinit-repo",
1826 "owner": "testuser",
1827 "initialize": False,
1828 },
1829 headers=auth_headers,
1830 )
1831 assert resp.status_code == 201
1832 repo_id = resp.json()["repoId"]
1833
1834 branches_resp = await client.get(
1835 f"/api/v1/repos/{repo_id}/branches",
1836 headers=auth_headers,
1837 )
1838 assert branches_resp.json()["branches"] == []
1839
1840 commits_resp = await client.get(
1841 f"/api/v1/repos/{repo_id}/commits",
1842 headers=auth_headers,
1843 )
1844 assert commits_resp.json()["commits"] == []
1845
1846
1847 @pytest.mark.anyio
1848 async def test_create_repo_wizard_topics_merged_into_tags(
1849 client: AsyncClient,
1850 auth_headers: dict[str, str],
1851 ) -> None:
1852 """POST /repos with topics merges them into the tag list (deduplicated)."""
1853 resp = await client.post(
1854 "/api/v1/repos",
1855 json={
1856 "name": "topics-test-repo",
1857 "owner": "testuser",
1858 "tags": ["jazz"],
1859 "topics": ["classical", "jazz"], # 'jazz' deduped
1860 "initialize": False,
1861 },
1862 headers=auth_headers,
1863 )
1864 assert resp.status_code == 201
1865 body = resp.json()
1866 tags: list[str] = body["tags"]
1867 assert "jazz" in tags
1868 assert "classical" in tags
1869 assert tags.count("jazz") == 1, "Duplicate 'jazz' must be removed"
1870
1871
1872 @pytest.mark.anyio
1873 async def test_create_repo_wizard_clone_url_uses_musehub_scheme(
1874 client: AsyncClient,
1875 auth_headers: dict[str, str],
1876 ) -> None:
1877 """Clone URL returned by POST /repos uses the musehub:// protocol scheme."""
1878 resp = await client.post(
1879 "/api/v1/repos",
1880 json={"name": "clone-url-test", "owner": "testuser", "initialize": False},
1881 headers=auth_headers,
1882 )
1883 assert resp.status_code == 201
1884 clone_url: str = resp.json()["cloneUrl"]
1885 assert clone_url.startswith("musehub://"), f"Expected musehub:// prefix, got: {clone_url}"
1886 assert "testuser" in clone_url
1887
1888
1889 @pytest.mark.anyio
1890 async def test_create_repo_wizard_template_copies_description(
1891 client: AsyncClient,
1892 auth_headers: dict[str, str],
1893 db_session: AsyncSession,
1894 ) -> None:
1895 """POST /repos with template_repo_id copies description from a public template."""
1896 template = MusehubRepo(
1897 name="template-source",
1898 owner="template-owner",
1899 slug="template-source",
1900 visibility="public",
1901 owner_user_id="template-owner-id",
1902 description="A great neo-baroque composition template",
1903 tags=["baroque", "piano"],
1904 )
1905 db_session.add(template)
1906 await db_session.commit()
1907 await db_session.refresh(template)
1908 template_id = str(template.repo_id)
1909
1910 resp = await client.post(
1911 "/api/v1/repos",
1912 json={
1913 "name": "from-template-repo",
1914 "owner": "testuser",
1915 "initialize": False,
1916 "templateRepoId": template_id,
1917 },
1918 headers=auth_headers,
1919 )
1920 assert resp.status_code == 201
1921 body = resp.json()
1922 assert body["description"] == "A great neo-baroque composition template"
1923 assert "baroque" in body["tags"]
1924 assert "piano" in body["tags"]
1925
1926
1927 @pytest.mark.anyio
1928 async def test_create_repo_wizard_private_template_not_copied(
1929 client: AsyncClient,
1930 auth_headers: dict[str, str],
1931 db_session: AsyncSession,
1932 ) -> None:
1933 """Private template repo metadata is NOT copied (must be public)."""
1934 private_template = MusehubRepo(
1935 name="private-template",
1936 owner="secret-owner",
1937 slug="private-template",
1938 visibility="private",
1939 owner_user_id="secret-id",
1940 description="Secret description",
1941 tags=["secret"],
1942 )
1943 db_session.add(private_template)
1944 await db_session.commit()
1945 await db_session.refresh(private_template)
1946 template_id = str(private_template.repo_id)
1947
1948 resp = await client.post(
1949 "/api/v1/repos",
1950 json={
1951 "name": "refused-template-repo",
1952 "owner": "testuser",
1953 "description": "My own description",
1954 "initialize": False,
1955 "templateRepoId": template_id,
1956 },
1957 headers=auth_headers,
1958 )
1959 assert resp.status_code == 201
1960 body = resp.json()
1961 # Private template must not override user's own description
1962 assert body["description"] == "My own description"
1963 assert "secret" not in body["tags"]
1964
1965
1966 @pytest.mark.anyio
1967 async def test_create_repo_wizard_custom_default_branch(
1968 client: AsyncClient,
1969 auth_headers: dict[str, str],
1970 ) -> None:
1971 """POST /repos with initialize=true and custom defaultBranch creates the right branch."""
1972 resp = await client.post(
1973 "/api/v1/repos",
1974 json={
1975 "name": "custom-branch-repo",
1976 "owner": "testuser",
1977 "initialize": True,
1978 "defaultBranch": "develop",
1979 },
1980 headers=auth_headers,
1981 )
1982 assert resp.status_code == 201
1983 repo_id = resp.json()["repoId"]
1984
1985 branches_resp = await client.get(
1986 f"/api/v1/repos/{repo_id}/branches",
1987 headers=auth_headers,
1988 )
1989 branch_names = [b["name"] for b in branches_resp.json()["branches"]]
1990 assert "develop" in branch_names
1991 assert "main" not in branch_names
1992
1993
1994 # ---------------------------------------------------------------------------
1995 # GET /repos — list repos for authenticated user
1996 # ---------------------------------------------------------------------------
1997
1998
1999 @pytest.mark.anyio
2000 async def test_list_my_repos_returns_owned_repos(
2001 client: AsyncClient,
2002 auth_headers: dict[str, str],
2003 ) -> None:
2004 """GET /repos returns repos created by the authenticated user."""
2005 # Create two repos
2006 for name in ("owned-repo-a", "owned-repo-b"):
2007 await client.post(
2008 "/api/v1/repos",
2009 json={"name": name, "owner": "testuser", "initialize": False},
2010 headers=auth_headers,
2011 )
2012
2013 resp = await client.get("/api/v1/repos", headers=auth_headers)
2014 assert resp.status_code == 200
2015 body = resp.json()
2016 assert "repos" in body
2017 assert "total" in body
2018 assert "nextCursor" in body
2019 names = [r["name"] for r in body["repos"]]
2020 assert "owned-repo-a" in names
2021 assert "owned-repo-b" in names
2022
2023
2024 @pytest.mark.anyio
2025 async def test_list_my_repos_requires_auth(client: AsyncClient) -> None:
2026 """GET /repos returns 401 without a Bearer token."""
2027 resp = await client.get("/api/v1/repos")
2028 assert resp.status_code == 401
2029
2030
2031 @pytest.mark.anyio
2032 async def test_transfer_repo_returns_403_for_non_owner(
2033 client: AsyncClient,
2034 db_session: AsyncSession,
2035 auth_headers: dict[str, str],
2036 ) -> None:
2037 """POST /repos/{repo_id}/transfer returns 403 when caller is not the owner."""
2038 repo = MusehubRepo(
2039 name="transfer-403",
2040 owner="other-owner",
2041 slug="transfer-403",
2042 visibility="public",
2043 owner_user_id="some-other-user-id",
2044 )
2045 db_session.add(repo)
2046 await db_session.commit()
2047 await db_session.refresh(repo)
2048
2049 resp = await client.post(
2050 f"/api/v1/repos/{repo.repo_id}/transfer",
2051 json={"newOwnerUserId": "attacker-user-id"},
2052 headers=auth_headers,
2053 )
2054 assert resp.status_code == 403
2055
2056
2057 @pytest.mark.anyio
2058 async def test_transfer_repo_returns_404_for_unknown_repo(
2059 client: AsyncClient,
2060 auth_headers: dict[str, str],
2061 ) -> None:
2062 """POST /repos/{repo_id}/transfer returns 404 for a non-existent repo."""
2063 resp = await client.post(
2064 "/api/v1/repos/nonexistent-repo-id/transfer",
2065 json={"newOwnerUserId": "some-user"},
2066 headers=auth_headers,
2067 )
2068 assert resp.status_code == 404
2069
2070
2071 @pytest.mark.anyio
2072 async def test_transfer_repo_service_updates_owner_user_id(
2073 db_session: AsyncSession,
2074 ) -> None:
2075 """transfer_repo_ownership() service updates owner_user_id on the row."""
2076 repo = await musehub_repository.create_repo(
2077 db_session,
2078 name="svc-transfer-test",
2079 owner="testuser",
2080 visibility="private",
2081 owner_user_id="original-owner-id",
2082 )
2083 await db_session.commit()
2084
2085 updated = await musehub_repository.transfer_repo_ownership(
2086 db_session, repo.repo_id, "new-owner-id"
2087 )
2088 await db_session.commit()
2089
2090 assert updated is not None
2091 assert updated.owner_user_id == "new-owner-id"
2092 # Verify persisted
2093 fetched = await musehub_repository.get_repo(db_session, repo.repo_id)
2094 assert fetched is not None
2095 assert fetched.owner_user_id == "new-owner-id"
2096
2097
2098 @pytest.mark.anyio
2099 async def test_transfer_repo_service_returns_none_for_unknown(
2100 db_session: AsyncSession,
2101 ) -> None:
2102 """transfer_repo_ownership() returns None for a non-existent repo."""
2103 result = await musehub_repository.transfer_repo_ownership(
2104 db_session, "does-not-exist", "new-owner"
2105 )
2106 assert result is None
2107
2108
2109 # ---------------------------------------------------------------------------
2110 # GET /repos — list repos for authenticated user
2111 # ---------------------------------------------------------------------------
2112
2113
2114 @pytest.mark.anyio
2115 async def test_list_my_repos_total_matches_count(
2116 client: AsyncClient,
2117 auth_headers: dict[str, str],
2118 ) -> None:
2119 """total field in GET /repos matches the number of repos created."""
2120 initial = await client.get("/api/v1/repos", headers=auth_headers)
2121 initial_total: int = initial.json()["total"]
2122
2123 await client.post(
2124 "/api/v1/repos",
2125 json={"name": "total-count-test", "owner": "testuser", "initialize": False},
2126 headers=auth_headers,
2127 )
2128
2129 resp = await client.get("/api/v1/repos", headers=auth_headers)
2130 assert resp.status_code == 200
2131 assert resp.json()["total"] == initial_total + 1
2132
2133
2134 @pytest.mark.anyio
2135 async def test_list_my_repos_pagination_cursor(
2136 client: AsyncClient,
2137 auth_headers: dict[str, str],
2138 db_session: AsyncSession,
2139 ) -> None:
2140 """GET /repos with limit=1 returns a nextCursor that fetches the next page."""
2141 from datetime import datetime, timedelta, timezone
2142
2143 owner_user_id = "550e8400-e29b-41d4-a716-446655440000"
2144 now = datetime.now(tz=timezone.utc)
2145 for i in range(3):
2146 repo = MusehubRepo(
2147 name=f"paged-repo-{i}",
2148 owner="testuser",
2149 slug=f"paged-repo-{i}",
2150 visibility="public",
2151 owner_user_id=owner_user_id,
2152 )
2153 repo.created_at = now - timedelta(seconds=i)
2154 db_session.add(repo)
2155 await db_session.commit()
2156
2157 first_page = await client.get(
2158 "/api/v1/repos?limit=1",
2159 headers=auth_headers,
2160 )
2161 assert first_page.status_code == 200
2162 body = first_page.json()
2163 assert len(body["repos"]) == 1
2164 next_cursor = body["nextCursor"]
2165 assert next_cursor is not None
2166
2167 second_page = await client.get(
2168 f"/api/v1/repos?limit=1&cursor={next_cursor}",
2169 headers=auth_headers,
2170 )
2171 assert second_page.status_code == 200
2172 second_body = second_page.json()
2173 assert len(second_body["repos"]) == 1
2174 # Pages must not overlap
2175 first_id = body["repos"][0]["repoId"]
2176 second_id = second_body["repos"][0]["repoId"]
2177 assert first_id != second_id
2178
2179
2180 @pytest.mark.anyio
2181 async def test_list_my_repos_service_direct(db_session: AsyncSession) -> None:
2182 """list_repos_for_user() returns only repos owned by the given user."""
2183 from musehub.services.musehub_repository import list_repos_for_user
2184
2185 owner_id = "user-list-direct"
2186 other_id = "user-other-direct"
2187
2188 repo_mine = MusehubRepo(
2189 name="mine-direct",
2190 owner="testuser",
2191 slug="mine-direct",
2192 visibility="private",
2193 owner_user_id=owner_id,
2194 )
2195 repo_other = MusehubRepo(
2196 name="not-mine-direct",
2197 owner="otheruser",
2198 slug="not-mine-direct",
2199 visibility="private",
2200 owner_user_id=other_id,
2201 )
2202 db_session.add_all([repo_mine, repo_other])
2203 await db_session.commit()
2204
2205 result = await list_repos_for_user(db_session, owner_id)
2206 repo_ids = {r.repo_id for r in result.repos}
2207 assert str(repo_mine.repo_id) in repo_ids
2208 assert str(repo_other.repo_id) not in repo_ids
2209
2210
2211 # ---------------------------------------------------------------------------
2212 # GET /repos/{repo_id}/collaborators/{username}/permission
2213 # ---------------------------------------------------------------------------
2214
2215
2216 @pytest.mark.anyio
2217 async def test_collab_access_owner_returns_owner_permission(
2218 client: AsyncClient,
2219 db_session: AsyncSession,
2220 auth_headers: dict[str, str],
2221 ) -> None:
2222 """Owner's username returns permission='owner' with accepted_at=null."""
2223 from musehub.db.musehub_collaborator_models import MusehubCollaborator
2224
2225 owner_id = TEST_OWNER_USER_ID
2226 repo = MusehubRepo(
2227 name="access-owner-test",
2228 owner="testuser",
2229 slug="access-owner-test",
2230 visibility="private",
2231 owner_user_id=owner_id,
2232 )
2233 db_session.add(repo)
2234 await db_session.commit()
2235 await db_session.refresh(repo)
2236
2237 resp = await client.get(
2238 f"/api/v1/repos/{repo.repo_id}/collaborators/{owner_id}/permission",
2239 headers=auth_headers,
2240 )
2241 assert resp.status_code == 200
2242 body = resp.json()
2243 assert body["username"] == owner_id
2244 assert body["permission"] == "owner"
2245 assert body["acceptedAt"] is None
2246
2247
2248 @pytest.mark.anyio
2249 async def test_collab_access_collaborator_returns_permission(
2250 client: AsyncClient,
2251 db_session: AsyncSession,
2252 auth_headers: dict[str, str],
2253 ) -> None:
2254 """A known collaborator returns their permission level and accepted_at."""
2255 from datetime import datetime, timezone
2256
2257 from musehub.db.musehub_collaborator_models import MusehubCollaborator
2258
2259 owner_id = TEST_OWNER_USER_ID
2260 collab_user_id = "collab-user-write"
2261
2262 repo = MusehubRepo(
2263 name="access-collab-test",
2264 owner="testuser",
2265 slug="access-collab-test",
2266 visibility="private",
2267 owner_user_id=owner_id,
2268 )
2269 db_session.add(repo)
2270 await db_session.commit()
2271 await db_session.refresh(repo)
2272
2273 accepted = datetime(2026, 1, 10, 10, 0, 0, tzinfo=timezone.utc)
2274 collab = MusehubCollaborator(
2275 repo_id=str(repo.repo_id),
2276 user_id=collab_user_id,
2277 permission="write",
2278 accepted_at=accepted,
2279 )
2280 db_session.add(collab)
2281 await db_session.commit()
2282
2283 resp = await client.get(
2284 f"/api/v1/repos/{repo.repo_id}/collaborators/{collab_user_id}/permission",
2285 headers=auth_headers,
2286 )
2287 assert resp.status_code == 200
2288 body = resp.json()
2289 assert body["username"] == collab_user_id
2290 assert body["permission"] == "write"
2291 assert body["acceptedAt"] is not None
2292
2293
2294 @pytest.mark.anyio
2295 async def test_collab_access_non_collaborator_returns_404(
2296 client: AsyncClient,
2297 db_session: AsyncSession,
2298 auth_headers: dict[str, str],
2299 ) -> None:
2300 """A user who is not a collaborator returns 404 with an informative message."""
2301 repo = MusehubRepo(
2302 name="access-404-test",
2303 owner="testuser",
2304 slug="access-404-test",
2305 visibility="private",
2306 owner_user_id=TEST_OWNER_USER_ID,
2307 )
2308 db_session.add(repo)
2309 await db_session.commit()
2310 await db_session.refresh(repo)
2311
2312 stranger = "total-stranger-user"
2313 resp = await client.get(
2314 f"/api/v1/repos/{repo.repo_id}/collaborators/{stranger}/permission",
2315 headers=auth_headers,
2316 )
2317 assert resp.status_code == 404
2318 assert stranger in resp.json()["detail"]
2319
2320
2321 @pytest.mark.anyio
2322 async def test_collab_access_unknown_repo_returns_404(
2323 client: AsyncClient,
2324 auth_headers: dict[str, str],
2325 ) -> None:
2326 """Querying an unknown repo_id returns 404."""
2327 resp = await client.get(
2328 "/api/v1/repos/nonexistent-repo/collaborators/anyone/permission",
2329 headers=auth_headers,
2330 )
2331 assert resp.status_code == 404
2332
2333
2334 @pytest.mark.anyio
2335 async def test_collab_access_requires_auth(
2336 client: AsyncClient,
2337 db_session: AsyncSession,
2338 ) -> None:
2339 """GET /collaborators/{username}/permission returns 401 without a Bearer token."""
2340 repo = MusehubRepo(
2341 name="access-auth-test",
2342 owner="testuser",
2343 slug="access-auth-test",
2344 visibility="public",
2345 owner_user_id=TEST_OWNER_USER_ID,
2346 )
2347 db_session.add(repo)
2348 await db_session.commit()
2349 await db_session.refresh(repo)
2350
2351 resp = await client.get(
2352 f"/api/v1/repos/{repo.repo_id}/collaborators/anyone/permission"
2353 )
2354 assert resp.status_code == 401
2355
2356
2357 @pytest.mark.anyio
2358 async def test_collab_access_admin_permission(
2359 client: AsyncClient,
2360 db_session: AsyncSession,
2361 auth_headers: dict[str, str],
2362 ) -> None:
2363 """A collaborator with admin permission returns permission='admin'."""
2364 from musehub.db.musehub_collaborator_models import MusehubCollaborator
2365
2366 repo = MusehubRepo(
2367 name="access-admin-test",
2368 owner="testuser",
2369 slug="access-admin-test",
2370 visibility="private",
2371 owner_user_id=TEST_OWNER_USER_ID,
2372 )
2373 db_session.add(repo)
2374 await db_session.commit()
2375 await db_session.refresh(repo)
2376
2377 admin_user = "admin-collab-user"
2378 collab = MusehubCollaborator(
2379 repo_id=str(repo.repo_id),
2380 user_id=admin_user,
2381 permission="admin",
2382 accepted_at=None,
2383 )
2384 db_session.add(collab)
2385 await db_session.commit()
2386
2387 resp = await client.get(
2388 f"/api/v1/repos/{repo.repo_id}/collaborators/{admin_user}/permission",
2389 headers=auth_headers,
2390 )
2391 assert resp.status_code == 200
2392 body = resp.json()
2393 assert body["permission"] == "admin"