gabriel / musehub public
test_musehub_repos.py python
2635 lines 80.0 KB
c0f0b481 release: merge dev → main (#5) Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """Tests for MuseHub repo, branch, and commit endpoints.
2
3 Covers every acceptance criterion:
4 - POST /musehub/repos returns 201 with correct fields
5 - POST requires auth — unauthenticated requests return 401
6 - GET /repos/{repo_id} returns 200; 404 for unknown repo
7 - GET /repos/{repo_id}/branches returns empty list on new repo
8 - GET /repos/{repo_id}/commits returns newest first, respects ?limit
9
10 Covers (compare view API endpoint):
11 - test_compare_radar_data — compare endpoint returns 5 dimension scores
12 - test_compare_commit_list — commits unique to head are listed
13 - test_compare_unknown_ref_404 — unknown ref returns 422
14
15 All tests use the shared ``client`` and ``auth_headers`` fixtures from conftest.py.
16 """
17 from __future__ import annotations
18
19 import pytest
20 from httpx import AsyncClient
21 from sqlalchemy.ext.asyncio import AsyncSession
22
23 from musehub.db.musehub_models import MusehubCommit, MusehubRepo
24 from musehub.services import musehub_repository
25
26
27 # ---------------------------------------------------------------------------
28 # POST /musehub/repos
29 # ---------------------------------------------------------------------------
30
31
32 @pytest.mark.anyio
33 async def test_create_repo_returns_201(
34 client: AsyncClient,
35 auth_headers: dict[str, str],
36 ) -> None:
37 """POST /musehub/repos creates a repo and returns all required fields."""
38 response = await client.post(
39 "/api/v1/repos",
40 json={"name": "my-beats", "owner": "testuser", "visibility": "private"},
41 headers=auth_headers,
42 )
43 assert response.status_code == 201
44 body = response.json()
45 assert body["name"] == "my-beats"
46 assert body["visibility"] == "private"
47 assert "repoId" in body
48 assert "cloneUrl" in body
49 assert "ownerUserId" in body
50 assert "createdAt" in body
51
52
53 @pytest.mark.anyio
54 async def test_create_repo_requires_auth(client: AsyncClient) -> None:
55 """POST /musehub/repos returns 401 without a Bearer token."""
56 response = await client.post(
57 "/api/v1/repos",
58 json={"name": "my-beats", "owner": "testuser"},
59 )
60 assert response.status_code == 401
61
62
63 @pytest.mark.anyio
64 async def test_create_repo_default_visibility_is_private(
65 client: AsyncClient,
66 auth_headers: dict[str, str],
67 ) -> None:
68 """Omitting visibility defaults to 'private'."""
69 response = await client.post(
70 "/api/v1/repos",
71 json={"name": "silent-sessions", "owner": "testuser"},
72 headers=auth_headers,
73 )
74 assert response.status_code == 201
75 assert response.json()["visibility"] == "private"
76
77
78 # ---------------------------------------------------------------------------
79 # GET /repos/{repo_id}
80 # ---------------------------------------------------------------------------
81
82
83 @pytest.mark.anyio
84 async def test_get_repo_returns_200(
85 client: AsyncClient,
86 auth_headers: dict[str, str],
87 ) -> None:
88 """GET /repos/{repo_id} returns the repo after creation."""
89 create = await client.post(
90 "/api/v1/repos",
91 json={"name": "jazz-sessions", "owner": "testuser"},
92 headers=auth_headers,
93 )
94 assert create.status_code == 201
95 repo_id = create.json()["repoId"]
96
97 response = await client.get(f"/api/v1/repos/{repo_id}", headers=auth_headers)
98 assert response.status_code == 200
99 assert response.json()["repoId"] == repo_id
100 assert response.json()["name"] == "jazz-sessions"
101
102
103 @pytest.mark.anyio
104 async def test_get_repo_not_found_returns_404(
105 client: AsyncClient,
106 auth_headers: dict[str, str],
107 ) -> None:
108 """GET /repos/{repo_id} returns 404 for unknown repo."""
109 response = await client.get(
110 "/api/v1/repos/does-not-exist",
111 headers=auth_headers,
112 )
113 assert response.status_code == 404
114
115
116 @pytest.mark.anyio
117 async def test_get_nonexistent_repo_returns_404_without_auth(client: AsyncClient) -> None:
118 """GET /repos/{repo_id} returns 404 for a non-existent repo without auth.
119
120 Uses optional_token — auth is visibility-based; missing repo → 404 before auth check.
121 """
122 response = await client.get("/api/v1/repos/non-existent-repo-id")
123 assert response.status_code == 404
124
125
126 # ---------------------------------------------------------------------------
127 # GET /repos/{repo_id}/branches
128 # ---------------------------------------------------------------------------
129
130
131 @pytest.mark.anyio
132 async def test_list_branches_empty_on_new_repo(
133 client: AsyncClient,
134 auth_headers: dict[str, str],
135 ) -> None:
136 """A newly created repo has an empty branches list when not initialized."""
137 create = await client.post(
138 "/api/v1/repos",
139 json={"name": "drum-patterns", "owner": "testuser", "initialize": False},
140 headers=auth_headers,
141 )
142 repo_id = create.json()["repoId"]
143
144 response = await client.get(
145 f"/api/v1/repos/{repo_id}/branches",
146 headers=auth_headers,
147 )
148 assert response.status_code == 200
149 assert response.json()["branches"] == []
150
151
152 @pytest.mark.anyio
153 async def test_list_branches_not_found_returns_404(
154 client: AsyncClient,
155 auth_headers: dict[str, str],
156 ) -> None:
157 """GET /branches returns 404 when the repo doesn't exist."""
158 response = await client.get(
159 "/api/v1/repos/ghost-repo/branches",
160 headers=auth_headers,
161 )
162 assert response.status_code == 404
163
164
165 # ---------------------------------------------------------------------------
166 # GET /repos/{repo_id}/commits
167 # ---------------------------------------------------------------------------
168
169
170 @pytest.mark.anyio
171 async def test_list_commits_empty_on_new_repo(
172 client: AsyncClient,
173 auth_headers: dict[str, str],
174 ) -> None:
175 """A new repo has no commits when initialize=false."""
176 create = await client.post(
177 "/api/v1/repos",
178 json={"name": "empty-repo", "owner": "testuser", "initialize": False},
179 headers=auth_headers,
180 )
181 repo_id = create.json()["repoId"]
182
183 response = await client.get(
184 f"/api/v1/repos/{repo_id}/commits",
185 headers=auth_headers,
186 )
187 assert response.status_code == 200
188 body = response.json()
189 assert body["commits"] == []
190 assert body["total"] == 0
191
192
193 @pytest.mark.anyio
194 async def test_list_commits_returns_newest_first(
195 client: AsyncClient,
196 auth_headers: dict[str, str],
197 db_session: AsyncSession,
198 ) -> None:
199 """Commits are returned newest-first after being pushed."""
200 from datetime import datetime, timezone, timedelta
201
202 # Create repo via API (no init commit so we control the full history)
203 create = await client.post(
204 "/api/v1/repos",
205 json={"name": "ordered-commits", "owner": "testuser", "initialize": False},
206 headers=auth_headers,
207 )
208 repo_id = create.json()["repoId"]
209
210 # Insert two commits directly with known timestamps
211 now = datetime.now(tz=timezone.utc)
212 older = MusehubCommit(
213 commit_id="aaa111",
214 repo_id=repo_id,
215 branch="main",
216 parent_ids=[],
217 message="first",
218 author="gabriel",
219 timestamp=now - timedelta(hours=1),
220 )
221 newer = MusehubCommit(
222 commit_id="bbb222",
223 repo_id=repo_id,
224 branch="main",
225 parent_ids=["aaa111"],
226 message="second",
227 author="gabriel",
228 timestamp=now,
229 )
230 db_session.add_all([older, newer])
231 await db_session.commit()
232
233 response = await client.get(
234 f"/api/v1/repos/{repo_id}/commits",
235 headers=auth_headers,
236 )
237 assert response.status_code == 200
238 commits = response.json()["commits"]
239 assert len(commits) == 2
240 assert commits[0]["commitId"] == "bbb222"
241 assert commits[1]["commitId"] == "aaa111"
242
243
244 @pytest.mark.anyio
245 async def test_list_commits_limit_param(
246 client: AsyncClient,
247 auth_headers: dict[str, str],
248 db_session: AsyncSession,
249 ) -> None:
250 """?limit=1 returns exactly 1 commit."""
251 from datetime import datetime, timezone, timedelta
252
253 create = await client.post(
254 "/api/v1/repos",
255 json={"name": "limited-repo", "owner": "testuser", "initialize": False},
256 headers=auth_headers,
257 )
258 repo_id = create.json()["repoId"]
259
260 now = datetime.now(tz=timezone.utc)
261 for i in range(3):
262 db_session.add(
263 MusehubCommit(
264 commit_id=f"commit-{i}",
265 repo_id=repo_id,
266 branch="main",
267 parent_ids=[],
268 message=f"commit {i}",
269 author="gabriel",
270 timestamp=now + timedelta(seconds=i),
271 )
272 )
273 await db_session.commit()
274
275 response = await client.get(
276 f"/api/v1/repos/{repo_id}/commits?limit=1",
277 headers=auth_headers,
278 )
279 assert response.status_code == 200
280 body = response.json()
281 assert len(body["commits"]) == 1
282 assert body["total"] == 3
283
284
285 # ---------------------------------------------------------------------------
286 # Service layer — direct DB tests (no HTTP)
287 # ---------------------------------------------------------------------------
288
289
290 @pytest.mark.anyio
291 async def test_create_repo_service_persists_to_db(db_session: AsyncSession) -> None:
292 """musehub_repository.create_repo() persists the row."""
293 repo = await musehub_repository.create_repo(
294 db_session,
295 name="service-test-repo",
296 owner="testuser",
297 visibility="public",
298 owner_user_id="user-abc",
299 )
300 await db_session.commit()
301
302 fetched = await musehub_repository.get_repo(db_session, repo.repo_id)
303 assert fetched is not None
304 assert fetched.name == "service-test-repo"
305 assert fetched.visibility == "public"
306
307
308 @pytest.mark.anyio
309 async def test_get_repo_returns_none_when_missing(db_session: AsyncSession) -> None:
310 """get_repo() returns None for an unknown repo_id."""
311 result = await musehub_repository.get_repo(db_session, "nonexistent-id")
312 assert result is None
313
314
315 @pytest.mark.anyio
316 async def test_list_branches_returns_empty_for_new_repo(db_session: AsyncSession) -> None:
317 """list_branches() returns [] for a repo with no branches."""
318 repo = await musehub_repository.create_repo(
319 db_session,
320 name="branchless",
321 owner="testuser",
322 visibility="private",
323 owner_user_id="user-x",
324 )
325 await db_session.commit()
326 branches = await musehub_repository.list_branches(db_session, repo.repo_id)
327 assert branches == []
328
329
330 # ---------------------------------------------------------------------------
331 # GET /repos/{repo_id}/divergence
332 # ---------------------------------------------------------------------------
333
334
335 @pytest.mark.anyio
336 async def test_divergence_endpoint_returns_five_dimensions(
337 client: AsyncClient,
338 auth_headers: dict[str, str],
339 db_session: AsyncSession,
340 ) -> None:
341 """GET /divergence returns five dimension scores with level labels."""
342 from datetime import datetime, timezone, timedelta
343
344 create = await client.post(
345 "/api/v1/repos",
346 json={"name": "divergence-test-repo", "owner": "testuser"},
347 headers=auth_headers,
348 )
349 assert create.status_code == 201
350 repo_id = create.json()["repoId"]
351
352 now = datetime.now(tz=timezone.utc)
353 db_session.add(
354 MusehubCommit(
355 commit_id="aaa-melody",
356 repo_id=repo_id,
357 branch="main",
358 parent_ids=[],
359 message="add lead melody line",
360 author="alice",
361 timestamp=now - timedelta(hours=2),
362 )
363 )
364 db_session.add(
365 MusehubCommit(
366 commit_id="bbb-chord",
367 repo_id=repo_id,
368 branch="feature",
369 parent_ids=[],
370 message="update chord progression",
371 author="bob",
372 timestamp=now - timedelta(hours=1),
373 )
374 )
375 await db_session.commit()
376
377 response = await client.get(
378 f"/api/v1/repos/{repo_id}/divergence?branch_a=main&branch_b=feature",
379 headers=auth_headers,
380 )
381 assert response.status_code == 200
382 body = response.json()
383 assert "dimensions" in body
384 assert len(body["dimensions"]) == 5
385
386 dim_names = {d["dimension"] for d in body["dimensions"]}
387 assert dim_names == {"melodic", "harmonic", "rhythmic", "structural", "dynamic"}
388
389 for dim in body["dimensions"]:
390 assert "level" in dim
391 assert dim["level"] in {"NONE", "LOW", "MED", "HIGH"}
392 assert "score" in dim
393 assert 0.0 <= dim["score"] <= 1.0
394
395
396 @pytest.mark.anyio
397 async def test_divergence_overall_score_is_mean_of_dimensions(
398 client: AsyncClient,
399 auth_headers: dict[str, str],
400 db_session: AsyncSession,
401 ) -> None:
402 """Overall divergence score equals the mean of all five dimension scores."""
403 from datetime import datetime, timezone, timedelta
404
405 create = await client.post(
406 "/api/v1/repos",
407 json={"name": "divergence-mean-repo", "owner": "testuser"},
408 headers=auth_headers,
409 )
410 repo_id = create.json()["repoId"]
411
412 now = datetime.now(tz=timezone.utc)
413 db_session.add(
414 MusehubCommit(
415 commit_id="c1-beat",
416 repo_id=repo_id,
417 branch="alpha",
418 parent_ids=[],
419 message="rework drum beat groove",
420 author="producer-a",
421 timestamp=now - timedelta(hours=3),
422 )
423 )
424 db_session.add(
425 MusehubCommit(
426 commit_id="c2-mix",
427 repo_id=repo_id,
428 branch="beta",
429 parent_ids=[],
430 message="fix master volume level",
431 author="producer-b",
432 timestamp=now - timedelta(hours=2),
433 )
434 )
435 await db_session.commit()
436
437 response = await client.get(
438 f"/api/v1/repos/{repo_id}/divergence?branch_a=alpha&branch_b=beta",
439 headers=auth_headers,
440 )
441 assert response.status_code == 200
442 body = response.json()
443
444 dims = body["dimensions"]
445 computed_mean = round(sum(d["score"] for d in dims) / len(dims), 4)
446 assert abs(body["overallScore"] - computed_mean) < 1e-6
447
448
449 @pytest.mark.anyio
450 async def test_divergence_json_response_structure(
451 client: AsyncClient,
452 auth_headers: dict[str, str],
453 db_session: AsyncSession,
454 ) -> None:
455 """JSON response has all required top-level fields and camelCase keys."""
456 from datetime import datetime, timezone, timedelta
457
458 create = await client.post(
459 "/api/v1/repos",
460 json={"name": "divergence-struct-repo", "owner": "testuser"},
461 headers=auth_headers,
462 )
463 repo_id = create.json()["repoId"]
464
465 now = datetime.now(tz=timezone.utc)
466 for i, (branch, msg) in enumerate(
467 [("main", "add melody riff"), ("dev", "update chorus section")]
468 ):
469 db_session.add(
470 MusehubCommit(
471 commit_id=f"struct-{i}",
472 repo_id=repo_id,
473 branch=branch,
474 parent_ids=[],
475 message=msg,
476 author="test",
477 timestamp=now + timedelta(seconds=i),
478 )
479 )
480 await db_session.commit()
481
482 response = await client.get(
483 f"/api/v1/repos/{repo_id}/divergence?branch_a=main&branch_b=dev",
484 headers=auth_headers,
485 )
486 assert response.status_code == 200
487 body = response.json()
488
489 assert body["repoId"] == repo_id
490 assert body["branchA"] == "main"
491 assert body["branchB"] == "dev"
492 assert "commonAncestor" in body
493 assert "overallScore" in body
494 assert isinstance(body["overallScore"], float)
495 assert isinstance(body["dimensions"], list)
496 assert len(body["dimensions"]) == 5
497
498 for dim in body["dimensions"]:
499 assert "dimension" in dim
500 assert "level" in dim
501 assert "score" in dim
502 assert "description" in dim
503 assert "branchACommits" in dim
504 assert "branchBCommits" in dim
505
506
507 @pytest.mark.anyio
508 async def test_divergence_endpoint_returns_404_for_unknown_repo(
509 client: AsyncClient,
510 auth_headers: dict[str, str],
511 ) -> None:
512 """GET /divergence returns 404 for an unknown repo."""
513 response = await client.get(
514 "/api/v1/repos/no-such-repo/divergence?branch_a=a&branch_b=b",
515 headers=auth_headers,
516 )
517 assert response.status_code == 404
518
519
520 @pytest.mark.anyio
521 async def test_divergence_endpoint_returns_422_for_empty_branch(
522 client: AsyncClient,
523 auth_headers: dict[str, str],
524 db_session: AsyncSession,
525 ) -> None:
526 """GET /divergence returns 422 when a branch has no commits."""
527 create = await client.post(
528 "/api/v1/repos",
529 json={"name": "empty-branch-repo", "owner": "testuser"},
530 headers=auth_headers,
531 )
532 repo_id = create.json()["repoId"]
533
534 response = await client.get(
535 f"/api/v1/repos/{repo_id}/divergence?branch_a=ghost&branch_b=also-ghost",
536 headers=auth_headers,
537 )
538 assert response.status_code == 422
539
540
541
542 # ---------------------------------------------------------------------------
543 # GET /repos/{repo_id}/dag
544 # ---------------------------------------------------------------------------
545
546
547 @pytest.mark.anyio
548 async def test_graph_dag_endpoint_returns_empty_for_new_repo(
549 client: AsyncClient,
550 auth_headers: dict[str, str],
551 ) -> None:
552 """GET /dag returns empty nodes/edges for a repo with no commits (initialize=false)."""
553 create = await client.post(
554 "/api/v1/repos",
555 json={"name": "dag-empty", "owner": "testuser", "initialize": False},
556 headers=auth_headers,
557 )
558 repo_id = create.json()["repoId"]
559
560 response = await client.get(
561 f"/api/v1/repos/{repo_id}/dag",
562 headers=auth_headers,
563 )
564 assert response.status_code == 200
565 body = response.json()
566 assert body["nodes"] == []
567 assert body["edges"] == []
568 assert body["headCommitId"] is None
569
570
571 @pytest.mark.anyio
572 async def test_graph_dag_has_edges(
573 client: AsyncClient,
574 auth_headers: dict[str, str],
575 db_session: AsyncSession,
576 ) -> None:
577 """DAG endpoint returns correct edges representing parent relationships."""
578 from datetime import datetime, timezone, timedelta
579
580 create = await client.post(
581 "/api/v1/repos",
582 json={"name": "dag-edges", "owner": "testuser", "initialize": False},
583 headers=auth_headers,
584 )
585 repo_id = create.json()["repoId"]
586
587 now = datetime.now(tz=timezone.utc)
588 root = MusehubCommit(
589 commit_id="root111",
590 repo_id=repo_id,
591 branch="main",
592 parent_ids=[],
593 message="root commit",
594 author="gabriel",
595 timestamp=now - timedelta(hours=2),
596 )
597 child = MusehubCommit(
598 commit_id="child222",
599 repo_id=repo_id,
600 branch="main",
601 parent_ids=["root111"],
602 message="child commit",
603 author="gabriel",
604 timestamp=now - timedelta(hours=1),
605 )
606 db_session.add_all([root, child])
607 await db_session.commit()
608
609 response = await client.get(
610 f"/api/v1/repos/{repo_id}/dag",
611 headers=auth_headers,
612 )
613 assert response.status_code == 200
614 body = response.json()
615 nodes = body["nodes"]
616 edges = body["edges"]
617
618 assert len(nodes) == 2
619 # Verify edge: child → root
620 assert any(e["source"] == "child222" and e["target"] == "root111" for e in edges)
621
622
623 @pytest.mark.anyio
624 async def test_graph_dag_endpoint_topological_order(
625 client: AsyncClient,
626 auth_headers: dict[str, str],
627 db_session: AsyncSession,
628 ) -> None:
629 """DAG endpoint returns nodes in topological order (oldest ancestor first)."""
630 from datetime import datetime, timedelta, timezone
631
632 create = await client.post(
633 "/api/v1/repos",
634 json={"name": "dag-topo", "owner": "testuser"},
635 headers=auth_headers,
636 )
637 repo_id = create.json()["repoId"]
638
639 now = datetime.now(tz=timezone.utc)
640 commits = [
641 MusehubCommit(
642 commit_id="topo-a",
643 repo_id=repo_id,
644 branch="main",
645 parent_ids=[],
646 message="root",
647 author="gabriel",
648 timestamp=now - timedelta(hours=3),
649 ),
650 MusehubCommit(
651 commit_id="topo-b",
652 repo_id=repo_id,
653 branch="main",
654 parent_ids=["topo-a"],
655 message="second",
656 author="gabriel",
657 timestamp=now - timedelta(hours=2),
658 ),
659 MusehubCommit(
660 commit_id="topo-c",
661 repo_id=repo_id,
662 branch="main",
663 parent_ids=["topo-b"],
664 message="third",
665 author="gabriel",
666 timestamp=now - timedelta(hours=1),
667 ),
668 ]
669 db_session.add_all(commits)
670 await db_session.commit()
671
672 response = await client.get(
673 f"/api/v1/repos/{repo_id}/dag",
674 headers=auth_headers,
675 )
676 assert response.status_code == 200
677 node_ids = [n["commitId"] for n in response.json()["nodes"]]
678 # Root must appear before children in topological order
679 assert node_ids.index("topo-a") < node_ids.index("topo-b")
680 assert node_ids.index("topo-b") < node_ids.index("topo-c")
681
682
683 @pytest.mark.anyio
684 async def test_graph_dag_nonexistent_repo_returns_404_without_auth(client: AsyncClient) -> None:
685 """GET /dag returns 404 for a non-existent repo without a token.
686
687 Uses optional_token — auth is visibility-based; missing repo → 404.
688 """
689 response = await client.get("/api/v1/repos/non-existent-repo/dag")
690 assert response.status_code == 404
691
692
693 @pytest.mark.anyio
694 async def test_graph_dag_404_for_unknown_repo(
695 client: AsyncClient,
696 auth_headers: dict[str, str],
697 ) -> None:
698 """GET /dag returns 404 for a non-existent repo."""
699 response = await client.get(
700 "/api/v1/repos/ghost-repo-dag/dag",
701 headers=auth_headers,
702 )
703 assert response.status_code == 404
704
705
706 @pytest.mark.anyio
707 async def test_graph_json_response_has_required_fields(
708 client: AsyncClient,
709 auth_headers: dict[str, str],
710 db_session: AsyncSession,
711 ) -> None:
712 """DAG JSON response includes nodes (with required fields) and edges arrays."""
713 from datetime import datetime, timezone
714
715 create = await client.post(
716 "/api/v1/repos",
717 json={"name": "dag-fields", "owner": "testuser"},
718 headers=auth_headers,
719 )
720 repo_id = create.json()["repoId"]
721
722 db_session.add(
723 MusehubCommit(
724 commit_id="fields-aaa",
725 repo_id=repo_id,
726 branch="main",
727 parent_ids=[],
728 message="check fields",
729 author="tester",
730 timestamp=datetime.now(tz=timezone.utc),
731 )
732 )
733 await db_session.commit()
734
735 response = await client.get(
736 f"/api/v1/repos/{repo_id}/dag",
737 headers=auth_headers,
738 )
739 assert response.status_code == 200
740 body = response.json()
741 assert "nodes" in body
742 assert "edges" in body
743 assert "headCommitId" in body
744
745 node = body["nodes"][0]
746 for field in ("commitId", "message", "author", "timestamp", "branch", "parentIds", "isHead"):
747 assert field in node, f"Missing field '{field}' in DAG node"
748
749 # ---------------------------------------------------------------------------
750 # GET /repos/{repo_id}/credits
751 # ---------------------------------------------------------------------------
752
753
754 async def _seed_credits_repo(db_session: AsyncSession) -> str:
755 """Create a repo with commits from two distinct authors and return repo_id."""
756 from datetime import datetime, timezone, timedelta
757
758 repo = MusehubRepo(name="liner-notes",
759 owner="testuser",
760 slug="liner-notes", visibility="public", owner_user_id="producer-1")
761 db_session.add(repo)
762 await db_session.flush()
763 repo_id = str(repo.repo_id)
764
765 now = datetime.now(tz=timezone.utc)
766 # Alice: 2 commits (most prolific), most recent 1 day ago
767 db_session.add(
768 MusehubCommit(
769 commit_id="alice-001",
770 repo_id=repo_id,
771 branch="main",
772 parent_ids=[],
773 message="compose the main melody",
774 author="Alice",
775 timestamp=now - timedelta(days=3),
776 )
777 )
778 db_session.add(
779 MusehubCommit(
780 commit_id="alice-002",
781 repo_id=repo_id,
782 branch="main",
783 parent_ids=["alice-001"],
784 message="mix the final arrangement",
785 author="Alice",
786 timestamp=now - timedelta(days=1),
787 )
788 )
789 # Bob: 1 commit, last active 5 days ago
790 db_session.add(
791 MusehubCommit(
792 commit_id="bob-001",
793 repo_id=repo_id,
794 branch="main",
795 parent_ids=[],
796 message="arrange the bridge section",
797 author="Bob",
798 timestamp=now - timedelta(days=5),
799 )
800 )
801 await db_session.commit()
802 return repo_id
803
804
805 @pytest.mark.anyio
806 async def test_credits_aggregation(
807 client: AsyncClient,
808 db_session: AsyncSession,
809 auth_headers: dict[str, str],
810 ) -> None:
811 """GET /api/v1/repos/{repo_id}/credits aggregates contributors from commits."""
812 repo_id = await _seed_credits_repo(db_session)
813 response = await client.get(
814 f"/api/v1/repos/{repo_id}/credits",
815 headers=auth_headers,
816 )
817 assert response.status_code == 200
818 body = response.json()
819 assert body["totalContributors"] == 2
820 authors = {c["author"] for c in body["contributors"]}
821 assert "Alice" in authors
822 assert "Bob" in authors
823
824
825 @pytest.mark.anyio
826 async def test_credits_sorted_by_count(
827 client: AsyncClient,
828 db_session: AsyncSession,
829 auth_headers: dict[str, str],
830 ) -> None:
831 """Default sort (count) puts the most prolific contributor first."""
832 repo_id = await _seed_credits_repo(db_session)
833 response = await client.get(
834 f"/api/v1/repos/{repo_id}/credits?sort=count",
835 headers=auth_headers,
836 )
837 assert response.status_code == 200
838 contributors = response.json()["contributors"]
839 assert contributors[0]["author"] == "Alice"
840 assert contributors[0]["sessionCount"] == 2
841
842
843 @pytest.mark.anyio
844 async def test_credits_sorted_by_recency(
845 client: AsyncClient,
846 db_session: AsyncSession,
847 auth_headers: dict[str, str],
848 ) -> None:
849 """sort=recency puts the most recently active contributor first."""
850 repo_id = await _seed_credits_repo(db_session)
851 response = await client.get(
852 f"/api/v1/repos/{repo_id}/credits?sort=recency",
853 headers=auth_headers,
854 )
855 assert response.status_code == 200
856 contributors = response.json()["contributors"]
857 # Alice has a commit 1 day ago; Bob's last was 5 days ago
858 assert contributors[0]["author"] == "Alice"
859
860
861 @pytest.mark.anyio
862 async def test_credits_sorted_by_alpha(
863 client: AsyncClient,
864 db_session: AsyncSession,
865 auth_headers: dict[str, str],
866 ) -> None:
867 """sort=alpha returns contributors in alphabetical order."""
868 repo_id = await _seed_credits_repo(db_session)
869 response = await client.get(
870 f"/api/v1/repos/{repo_id}/credits?sort=alpha",
871 headers=auth_headers,
872 )
873 assert response.status_code == 200
874 contributors = response.json()["contributors"]
875 authors = [c["author"] for c in contributors]
876 assert authors == sorted(authors, key=str.lower)
877
878
879 @pytest.mark.anyio
880 async def test_credits_contribution_types_inferred(
881 client: AsyncClient,
882 db_session: AsyncSession,
883 auth_headers: dict[str, str],
884 ) -> None:
885 """Contribution types are inferred from commit messages."""
886 repo_id = await _seed_credits_repo(db_session)
887 response = await client.get(
888 f"/api/v1/repos/{repo_id}/credits",
889 headers=auth_headers,
890 )
891 assert response.status_code == 200
892 contributors = response.json()["contributors"]
893 alice = next(c for c in contributors if c["author"] == "Alice")
894 # Alice's commits mention "compose" and "mix"
895 types = set(alice["contributionTypes"])
896 assert len(types) > 0
897
898
899 @pytest.mark.anyio
900 async def test_credits_404_for_unknown_repo(
901 client: AsyncClient,
902 auth_headers: dict[str, str],
903 ) -> None:
904 """GET /api/v1/repos/{unknown}/credits returns 404."""
905 response = await client.get(
906 "/api/v1/repos/does-not-exist/credits",
907 headers=auth_headers,
908 )
909 assert response.status_code == 404
910
911
912 @pytest.mark.anyio
913 async def test_credits_requires_auth(
914 client: AsyncClient,
915 db_session: AsyncSession,
916 ) -> None:
917 """GET /api/v1/repos/{repo_id}/credits returns 401 without JWT."""
918 repo = MusehubRepo(name="auth-test-repo",
919 owner="testuser",
920 slug="auth-test-repo", visibility="private", owner_user_id="u1")
921 db_session.add(repo)
922 await db_session.commit()
923 await db_session.refresh(repo)
924 response = await client.get(f"/api/v1/repos/{repo.repo_id}/credits")
925 assert response.status_code == 401
926
927
928 @pytest.mark.anyio
929 async def test_credits_invalid_sort_param(
930 client: AsyncClient,
931 db_session: AsyncSession,
932 auth_headers: dict[str, str],
933 ) -> None:
934 """GET /api/v1/repos/{repo_id}/credits with invalid sort returns 422."""
935 repo = MusehubRepo(name="sort-test",
936 owner="testuser",
937 slug="sort-test", visibility="private", owner_user_id="u1")
938 db_session.add(repo)
939 await db_session.commit()
940 await db_session.refresh(repo)
941 response = await client.get(
942 f"/api/v1/repos/{repo.repo_id}/credits?sort=invalid",
943 headers=auth_headers,
944 )
945 assert response.status_code == 422
946
947
948 @pytest.mark.anyio
949 async def test_credits_aggregation_service_direct(db_session: AsyncSession) -> None:
950 """musehub_credits.aggregate_credits() returns correct data without HTTP layer."""
951 from datetime import datetime, timezone
952
953 from musehub.services import musehub_credits
954
955 repo = MusehubRepo(name="direct-test",
956 owner="testuser",
957 slug="direct-test", visibility="private", owner_user_id="u1")
958 db_session.add(repo)
959 await db_session.flush()
960 repo_id = str(repo.repo_id)
961
962 now = datetime.now(tz=timezone.utc)
963 db_session.add(
964 MusehubCommit(
965 commit_id="svc-001",
966 repo_id=repo_id,
967 branch="main",
968 parent_ids=[],
969 message="produce and mix the drop",
970 author="Charlie",
971 timestamp=now,
972 )
973 )
974 await db_session.commit()
975
976 result = await musehub_credits.aggregate_credits(db_session, repo_id, sort="count")
977 assert result.total_contributors == 1
978 assert result.contributors[0].author == "Charlie"
979 assert result.contributors[0].session_count == 1
980
981
982 # ---------------------------------------------------------------------------
983 # Compare endpoint
984 # ---------------------------------------------------------------------------
985
986
987 async def _make_compare_repo(
988 db_session: AsyncSession,
989 client: AsyncClient,
990 auth_headers: dict[str, str],
991 ) -> str:
992 """Seed a repo with commits on two branches and return repo_id."""
993 from datetime import datetime, timezone
994
995 create = await client.post(
996 "/api/v1/repos",
997 json={"name": "compare-test", "owner": "testuser", "visibility": "private"},
998 headers=auth_headers,
999 )
1000 assert create.status_code == 201
1001 repo_id: str = str(create.json()["repoId"])
1002
1003 now = datetime.now(tz=timezone.utc)
1004 db_session.add(
1005 MusehubCommit(
1006 commit_id="base001",
1007 repo_id=repo_id,
1008 branch="main",
1009 parent_ids=[],
1010 message="add melody line",
1011 author="Alice",
1012 timestamp=now,
1013 )
1014 )
1015 db_session.add(
1016 MusehubCommit(
1017 commit_id="head001",
1018 repo_id=repo_id,
1019 branch="feature",
1020 parent_ids=["base001"],
1021 message="add chord progression",
1022 author="Bob",
1023 timestamp=now,
1024 )
1025 )
1026 await db_session.commit()
1027 return repo_id
1028
1029
1030 @pytest.mark.anyio
1031 async def test_compare_radar_data(
1032 client: AsyncClient,
1033 db_session: AsyncSession,
1034 auth_headers: dict[str, str],
1035 ) -> None:
1036 """GET /api/v1/repos/{id}/compare returns 5 dimension scores."""
1037 repo_id = await _make_compare_repo(db_session, client, auth_headers)
1038 response = await client.get(
1039 f"/api/v1/repos/{repo_id}/compare?base=main&head=feature",
1040 headers=auth_headers,
1041 )
1042 assert response.status_code == 200
1043 body = response.json()
1044 assert "dimensions" in body
1045 assert len(body["dimensions"]) == 5
1046 expected_dims = {"melodic", "harmonic", "rhythmic", "structural", "dynamic"}
1047 found_dims = {d["dimension"] for d in body["dimensions"]}
1048 assert found_dims == expected_dims
1049 for dim in body["dimensions"]:
1050 assert 0.0 <= dim["score"] <= 1.0
1051 assert dim["level"] in ("NONE", "LOW", "MED", "HIGH")
1052 assert "overallScore" in body
1053 assert 0.0 <= body["overallScore"] <= 1.0
1054
1055
1056 @pytest.mark.anyio
1057 async def test_compare_commit_list(
1058 client: AsyncClient,
1059 db_session: AsyncSession,
1060 auth_headers: dict[str, str],
1061 ) -> None:
1062 """Commits unique to head are listed in the compare response."""
1063 repo_id = await _make_compare_repo(db_session, client, auth_headers)
1064 response = await client.get(
1065 f"/api/v1/repos/{repo_id}/compare?base=main&head=feature",
1066 headers=auth_headers,
1067 )
1068 assert response.status_code == 200
1069 body = response.json()
1070 assert "commits" in body
1071 # head001 is on feature but not on main
1072 commit_ids = [c["commitId"] for c in body["commits"]]
1073 assert "head001" in commit_ids
1074 # base001 is on main so should NOT appear as unique to head
1075 assert "base001" not in commit_ids
1076
1077
1078 @pytest.mark.anyio
1079 async def test_compare_unknown_ref_422(
1080 client: AsyncClient,
1081 db_session: AsyncSession,
1082 auth_headers: dict[str, str],
1083 ) -> None:
1084 """Unknown ref (branch with no commits) returns 422."""
1085 create = await client.post(
1086 "/api/v1/repos",
1087 json={"name": "empty-compare", "owner": "testuser", "visibility": "private"},
1088 headers=auth_headers,
1089 )
1090 assert create.status_code == 201
1091 repo_id = create.json()["repoId"]
1092 response = await client.get(
1093 f"/api/v1/repos/{repo_id}/compare?base=nonexistent&head=alsoabsent",
1094 headers=auth_headers,
1095 )
1096 assert response.status_code == 422
1097
1098
1099 @pytest.mark.anyio
1100 async def test_compare_emotion_diff_fields(
1101 client: AsyncClient,
1102 db_session: AsyncSession,
1103 auth_headers: dict[str, str],
1104 ) -> None:
1105 """Compare response includes emotion diff with required delta fields."""
1106 repo_id = await _make_compare_repo(db_session, client, auth_headers)
1107 response = await client.get(
1108 f"/api/v1/repos/{repo_id}/compare?base=main&head=feature",
1109 headers=auth_headers,
1110 )
1111 assert response.status_code == 200
1112 body = response.json()
1113 assert "emotionDiff" in body
1114 ed = body["emotionDiff"]
1115 for field in ("energyDelta", "valenceDelta", "tensionDelta", "darknessDelta"):
1116 assert field in ed
1117 assert -1.0 <= ed[field] <= 1.0
1118 for field in ("baseEnergy", "headEnergy", "baseValence", "headValence"):
1119 assert field in ed
1120 assert 0.0 <= ed[field] <= 1.0
1121
1122
1123 # ---------------------------------------------------------------------------
1124 # Arrangement matrix endpoint — # ---------------------------------------------------------------------------
1125
1126
1127 @pytest.mark.anyio
1128 async def test_arrange_endpoint_returns_200(
1129 client: AsyncClient,
1130 db_session: AsyncSession,
1131 auth_headers: dict[str, str],
1132 ) -> None:
1133 """GET /api/v1/repos/{repo_id}/arrange/{ref} returns 200 with JSON body."""
1134 repo = MusehubRepo(
1135 name="arrange-test",
1136 owner="testuser",
1137 slug="arrange-test",
1138 visibility="public",
1139 owner_user_id="u1",
1140 )
1141 db_session.add(repo)
1142 await db_session.commit()
1143 await db_session.refresh(repo)
1144 repo_id = str(repo.repo_id)
1145
1146 resp = await client.get(
1147 f"/api/v1/repos/{repo_id}/arrange/HEAD",
1148 headers=auth_headers,
1149 )
1150 assert resp.status_code == 200
1151 data = resp.json()
1152 assert data["repoId"] == repo_id
1153 assert data["ref"] == "HEAD"
1154
1155
1156 @pytest.mark.anyio
1157 async def test_arrange_endpoint_has_required_fields(
1158 client: AsyncClient,
1159 db_session: AsyncSession,
1160 auth_headers: dict[str, str],
1161 ) -> None:
1162 """Arrangement matrix response contains instruments, sections, cells, and summaries."""
1163 repo = MusehubRepo(
1164 name="arrange-fields-test",
1165 owner="testuser",
1166 slug="arrange-fields-test",
1167 visibility="public",
1168 owner_user_id="u1",
1169 )
1170 db_session.add(repo)
1171 await db_session.commit()
1172 await db_session.refresh(repo)
1173 repo_id = str(repo.repo_id)
1174
1175 resp = await client.get(
1176 f"/api/v1/repos/{repo_id}/arrange/abc1234",
1177 headers=auth_headers,
1178 )
1179 assert resp.status_code == 200
1180 data = resp.json()
1181 assert "instruments" in data
1182 assert "sections" in data
1183 assert "cells" in data
1184 assert "rowSummaries" in data
1185 assert "columnSummaries" in data
1186 assert "totalBeats" in data
1187 assert isinstance(data["instruments"], list)
1188 assert isinstance(data["sections"], list)
1189 assert isinstance(data["cells"], list)
1190 assert len(data["instruments"]) > 0
1191 assert len(data["sections"]) > 0
1192
1193
1194 @pytest.mark.anyio
1195 async def test_arrange_endpoint_cells_have_required_fields(
1196 client: AsyncClient,
1197 db_session: AsyncSession,
1198 auth_headers: dict[str, str],
1199 ) -> None:
1200 """Each cell in the arrangement matrix has instrument, section, noteCount, noteDensity, and active."""
1201 repo = MusehubRepo(
1202 name="arrange-cells-test",
1203 owner="testuser",
1204 slug="arrange-cells-test",
1205 visibility="public",
1206 owner_user_id="u1",
1207 )
1208 db_session.add(repo)
1209 await db_session.commit()
1210 await db_session.refresh(repo)
1211 repo_id = str(repo.repo_id)
1212
1213 resp = await client.get(
1214 f"/api/v1/repos/{repo_id}/arrange/main",
1215 headers=auth_headers,
1216 )
1217 assert resp.status_code == 200
1218 cells = resp.json()["cells"]
1219 assert len(cells) > 0
1220 for cell in cells:
1221 assert "instrument" in cell
1222 assert "section" in cell
1223 assert "noteCount" in cell
1224 assert "noteDensity" in cell
1225 assert "active" in cell
1226 assert "beatStart" in cell
1227 assert "beatEnd" in cell
1228 assert isinstance(cell["noteCount"], int)
1229 assert 0.0 <= cell["noteDensity"] <= 1.0
1230
1231
1232 @pytest.mark.anyio
1233 async def test_arrange_endpoint_instruments_x_sections_coverage(
1234 client: AsyncClient,
1235 db_session: AsyncSession,
1236 auth_headers: dict[str, str],
1237 ) -> None:
1238 """Cells cover every (instrument, section) pair — no missing combinations."""
1239 repo = MusehubRepo(
1240 name="arrange-coverage-test",
1241 owner="testuser",
1242 slug="arrange-coverage-test",
1243 visibility="public",
1244 owner_user_id="u1",
1245 )
1246 db_session.add(repo)
1247 await db_session.commit()
1248 await db_session.refresh(repo)
1249 repo_id = str(repo.repo_id)
1250
1251 resp = await client.get(
1252 f"/api/v1/repos/{repo_id}/arrange/ref-abc",
1253 headers=auth_headers,
1254 )
1255 assert resp.status_code == 200
1256 data = resp.json()
1257 instruments = data["instruments"]
1258 sections = data["sections"]
1259 cells = data["cells"]
1260 expected = len(instruments) * len(sections)
1261 assert len(cells) == expected, (
1262 f"Expected {expected} cells for {len(instruments)} instruments "
1263 f"× {len(sections)} sections, got {len(cells)}"
1264 )
1265
1266
1267 @pytest.mark.anyio
1268 async def test_arrange_endpoint_deterministic(
1269 client: AsyncClient,
1270 db_session: AsyncSession,
1271 auth_headers: dict[str, str],
1272 ) -> None:
1273 """Arrangement matrix is deterministic — identical ref produces identical data on re-request."""
1274 repo = MusehubRepo(
1275 name="arrange-det-test",
1276 owner="testuser",
1277 slug="arrange-det-test",
1278 visibility="public",
1279 owner_user_id="u1",
1280 )
1281 db_session.add(repo)
1282 await db_session.commit()
1283 await db_session.refresh(repo)
1284 repo_id = str(repo.repo_id)
1285
1286 resp1 = await client.get(
1287 f"/api/v1/repos/{repo_id}/arrange/stable-ref",
1288 headers=auth_headers,
1289 )
1290 resp2 = await client.get(
1291 f"/api/v1/repos/{repo_id}/arrange/stable-ref",
1292 headers=auth_headers,
1293 )
1294 assert resp1.status_code == 200
1295 assert resp2.status_code == 200
1296 assert resp1.json() == resp2.json()
1297
1298
1299 @pytest.mark.anyio
1300 async def test_arrange_endpoint_404_for_unknown_repo(
1301 client: AsyncClient,
1302 auth_headers: dict[str, str],
1303 ) -> None:
1304 """GET /api/v1/repos/{unknown}/arrange/{ref} returns 404 for an unknown repo."""
1305 resp = await client.get(
1306 "/api/v1/repos/00000000-0000-0000-0000-000000000000/arrange/HEAD",
1307 headers=auth_headers,
1308 )
1309 assert resp.status_code == 404
1310
1311
1312 @pytest.mark.anyio
1313 async def test_arrange_row_summaries_match_cells(
1314 client: AsyncClient,
1315 db_session: AsyncSession,
1316 auth_headers: dict[str, str],
1317 ) -> None:
1318 """Row summaries totalNotes matches the sum of noteCount across that instrument's cells."""
1319 repo = MusehubRepo(
1320 name="arrange-summary-test",
1321 owner="testuser",
1322 slug="arrange-summary-test",
1323 visibility="public",
1324 owner_user_id="u1",
1325 )
1326 db_session.add(repo)
1327 await db_session.commit()
1328 await db_session.refresh(repo)
1329 repo_id = str(repo.repo_id)
1330
1331 resp = await client.get(
1332 f"/api/v1/repos/{repo_id}/arrange/HEAD",
1333 headers=auth_headers,
1334 )
1335 assert resp.status_code == 200
1336 data = resp.json()
1337 for row in data["rowSummaries"]:
1338 inst = row["instrument"]
1339 expected_total = sum(
1340 c["noteCount"] for c in data["cells"] if c["instrument"] == inst
1341 )
1342 assert row["totalNotes"] == expected_total, (
1343 f"Row summary for {inst}: expected {expected_total}, got {row['totalNotes']}"
1344 )
1345
1346
1347 @pytest.mark.anyio
1348 async def test_arrange_service_direct() -> None:
1349 """compute_arrangement_matrix() returns valid ArrangementMatrixResponse without DB."""
1350 from musehub.services.musehub_analysis import compute_arrangement_matrix
1351
1352 result = compute_arrangement_matrix(repo_id="test-repo-id", ref="abc1234")
1353 assert result.repo_id == "test-repo-id"
1354 assert result.ref == "abc1234"
1355 assert len(result.instruments) > 0
1356 assert len(result.sections) > 0
1357 assert len(result.cells) == len(result.instruments) * len(result.sections)
1358 assert result.total_beats > 0
1359 # All density values in valid range
1360 for cell in result.cells:
1361 assert 0.0 <= cell.note_density <= 1.0
1362 # Row summary count matches instruments
1363 assert len(result.row_summaries) == len(result.instruments)
1364
1365
1366 # ---------------------------------------------------------------------------
1367 # Star/Fork endpoints — # ---------------------------------------------------------------------------
1368
1369
1370 @pytest.mark.anyio
1371 async def test_star_repo_increases_star_count(
1372 client: AsyncClient,
1373 db_session: AsyncSession,
1374 auth_headers: dict[str, str],
1375 ) -> None:
1376 """POST /repos/{repo_id}/star stars the repo and returns starred=True with count=1."""
1377 repo = MusehubRepo(
1378 name="star-test",
1379 owner="testuser",
1380 slug="star-test",
1381 visibility="public",
1382 owner_user_id="u1",
1383 )
1384 db_session.add(repo)
1385 await db_session.commit()
1386 await db_session.refresh(repo)
1387 repo_id = str(repo.repo_id)
1388
1389 resp = await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1390 assert resp.status_code == 200
1391 body = resp.json()
1392 assert body["starred"] is True
1393 assert body["starCount"] == 1
1394
1395
1396 @pytest.mark.anyio
1397 async def test_unstar_repo_decreases_star_count(
1398 client: AsyncClient,
1399 db_session: AsyncSession,
1400 auth_headers: dict[str, str],
1401 ) -> None:
1402 """DELETE /repos/{repo_id}/star removes the star — returns starred=False with count=0."""
1403 repo = MusehubRepo(
1404 name="unstar-test",
1405 owner="testuser",
1406 slug="unstar-test",
1407 visibility="public",
1408 owner_user_id="u1",
1409 )
1410 db_session.add(repo)
1411 await db_session.commit()
1412 await db_session.refresh(repo)
1413 repo_id = str(repo.repo_id)
1414
1415 await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1416 resp = await client.delete(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1417 assert resp.status_code == 200
1418 body = resp.json()
1419 assert body["starred"] is False
1420 assert body["starCount"] == 0
1421
1422
1423 @pytest.mark.anyio
1424 async def test_star_idempotent_double_call(
1425 client: AsyncClient,
1426 db_session: AsyncSession,
1427 auth_headers: dict[str, str],
1428 ) -> None:
1429 """POST /star twice leaves count=1 (idempotent add — not a toggle)."""
1430 repo = MusehubRepo(
1431 name="idempotent-star",
1432 owner="testuser",
1433 slug="idempotent-star",
1434 visibility="public",
1435 owner_user_id="u1",
1436 )
1437 db_session.add(repo)
1438 await db_session.commit()
1439 await db_session.refresh(repo)
1440 repo_id = str(repo.repo_id)
1441
1442 await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1443 resp = await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1444 assert resp.status_code == 200
1445 body = resp.json()
1446 assert body["starred"] is True
1447 assert body["starCount"] == 1
1448
1449
1450 @pytest.mark.anyio
1451 async def test_star_requires_auth(client: AsyncClient, db_session: AsyncSession) -> None:
1452 """POST /repos/{repo_id}/star returns 401 without a Bearer token."""
1453 repo = MusehubRepo(
1454 name="auth-star-test",
1455 owner="testuser",
1456 slug="auth-star-test",
1457 visibility="public",
1458 owner_user_id="u1",
1459 )
1460 db_session.add(repo)
1461 await db_session.commit()
1462 await db_session.refresh(repo)
1463 repo_id = str(repo.repo_id)
1464
1465 resp = await client.post(f"/api/v1/repos/{repo_id}/star")
1466 assert resp.status_code == 401
1467
1468
1469 @pytest.mark.anyio
1470 async def test_fork_repo_creates_fork_under_user(
1471 client: AsyncClient,
1472 db_session: AsyncSession,
1473 auth_headers: dict[str, str],
1474 ) -> None:
1475 """POST /repos/{repo_id}/fork creates a fork and returns lineage metadata."""
1476 repo = MusehubRepo(
1477 name="fork-source",
1478 owner="original-owner",
1479 slug="fork-source",
1480 visibility="public",
1481 owner_user_id="u-original",
1482 description="The original",
1483 )
1484 db_session.add(repo)
1485 await db_session.commit()
1486 await db_session.refresh(repo)
1487 repo_id = str(repo.repo_id)
1488
1489 resp = await client.post(f"/api/v1/repos/{repo_id}/fork", headers=auth_headers)
1490 assert resp.status_code == 201
1491 body = resp.json()
1492 # social.py returns ForkResponse (snake_case from_attributes model)
1493 assert body["source_repo_id"] == repo_id
1494 assert "fork_repo_id" in body
1495 assert body["fork_repo_id"] != repo_id
1496
1497
1498 @pytest.mark.anyio
1499 async def test_fork_preserves_branches(
1500 client: AsyncClient,
1501 db_session: AsyncSession,
1502 auth_headers: dict[str, str],
1503 ) -> None:
1504 """Forking a repo with branches copies branch pointers into the new fork.
1505
1506 Note: commit_id is a global PK so commits cannot be duplicated across repos.
1507 The fork shares the lineage link (MusehubFork) and inherits branch pointers.
1508 """
1509 from musehub.db.musehub_models import MusehubBranch
1510
1511 repo = MusehubRepo(
1512 name="fork-with-branches",
1513 owner="src-owner",
1514 slug="fork-with-branches",
1515 visibility="public",
1516 owner_user_id="u-src",
1517 )
1518 db_session.add(repo)
1519 await db_session.commit()
1520 await db_session.refresh(repo)
1521 repo_id = str(repo.repo_id)
1522
1523 branch = MusehubBranch(
1524 repo_id=repo_id,
1525 name="main",
1526 head_commit_id=None,
1527 )
1528 db_session.add(branch)
1529 await db_session.commit()
1530
1531 resp = await client.post(f"/api/v1/repos/{repo_id}/fork", headers=auth_headers)
1532 assert resp.status_code == 201
1533 fork_repo_id = resp.json()["fork_repo_id"]
1534
1535 branches_resp = await client.get(
1536 f"/api/v1/repos/{fork_repo_id}/branches", headers=auth_headers
1537 )
1538 assert branches_resp.status_code == 200
1539 branches = branches_resp.json().get("branches", [])
1540 assert any(b["name"] == "main" for b in branches)
1541
1542
1543 @pytest.mark.anyio
1544 async def test_list_stargazers_returns_starrers(
1545 client: AsyncClient,
1546 db_session: AsyncSession,
1547 auth_headers: dict[str, str],
1548 ) -> None:
1549 """GET /repos/{repo_id}/stargazers returns user_id of the starring user."""
1550 repo = MusehubRepo(
1551 name="stargazers-test",
1552 owner="testuser",
1553 slug="stargazers-test",
1554 visibility="public",
1555 owner_user_id="u1",
1556 )
1557 db_session.add(repo)
1558 await db_session.commit()
1559 await db_session.refresh(repo)
1560 repo_id = str(repo.repo_id)
1561
1562 await client.post(f"/api/v1/repos/{repo_id}/star", headers=auth_headers)
1563
1564 resp = await client.get(f"/api/v1/repos/{repo_id}/stargazers")
1565 assert resp.status_code == 200
1566 body = resp.json()
1567 assert body["total"] == 1
1568 assert len(body["stargazers"]) == 1
1569
1570
1571 @pytest.mark.anyio
1572 async def test_list_forks_returns_fork_entry(
1573 client: AsyncClient,
1574 db_session: AsyncSession,
1575 auth_headers: dict[str, str],
1576 ) -> None:
1577 """GET /repos/{repo_id}/forks returns the fork entry after forking."""
1578 repo = MusehubRepo(
1579 name="forks-list-test",
1580 owner="original",
1581 slug="forks-list-test",
1582 visibility="public",
1583 owner_user_id="u-orig",
1584 )
1585 db_session.add(repo)
1586 await db_session.commit()
1587 await db_session.refresh(repo)
1588 repo_id = str(repo.repo_id)
1589
1590 await client.post(f"/api/v1/repos/{repo_id}/fork", headers=auth_headers)
1591
1592 resp = await client.get(f"/api/v1/repos/{repo_id}/forks")
1593 assert resp.status_code == 200
1594 # social.py returns list[ForkResponse] (a JSON array)
1595 body = resp.json()
1596 assert isinstance(body, list)
1597 assert len(body) == 1
1598 assert body[0]["source_repo_id"] == repo_id
1599
1600
1601 # ---------------------------------------------------------------------------
1602 # GET /repos/{repo_id}/settings
1603 # ---------------------------------------------------------------------------
1604
1605 TEST_OWNER_USER_ID = "550e8400-e29b-41d4-a716-446655440000"
1606
1607
1608 @pytest.mark.anyio
1609 async def test_get_repo_settings_returns_defaults(
1610 client: AsyncClient,
1611 db_session: AsyncSession,
1612 auth_headers: dict[str, str],
1613 ) -> None:
1614 """GET /repos/{repo_id}/settings returns full settings with canonical defaults."""
1615 repo = MusehubRepo(
1616 name="settings-get-test",
1617 owner="testuser",
1618 slug="settings-get-test",
1619 visibility="private",
1620 owner_user_id=TEST_OWNER_USER_ID,
1621 )
1622 db_session.add(repo)
1623 await db_session.commit()
1624 await db_session.refresh(repo)
1625
1626 resp = await client.get(
1627 f"/api/v1/repos/{repo.repo_id}/settings",
1628 headers=auth_headers,
1629 )
1630 assert resp.status_code == 200
1631 body = resp.json()
1632 assert body["name"] == "settings-get-test"
1633 assert body["visibility"] == "private"
1634 assert body["hasIssues"] is True
1635 assert body["allowMergeCommit"] is True
1636 assert body["allowRebaseMerge"] is False
1637 assert body["deleteBranchOnMerge"] is True
1638 assert body["defaultBranch"] == "main"
1639
1640
1641 @pytest.mark.anyio
1642 async def test_get_repo_settings_requires_auth(
1643 client: AsyncClient,
1644 db_session: AsyncSession,
1645 ) -> None:
1646 """GET /repos/{repo_id}/settings returns 401 without a Bearer token."""
1647 repo = MusehubRepo(
1648 name="settings-noauth",
1649 owner="testuser",
1650 slug="settings-noauth",
1651 visibility="private",
1652 owner_user_id=TEST_OWNER_USER_ID,
1653 )
1654 db_session.add(repo)
1655 await db_session.commit()
1656 await db_session.refresh(repo)
1657
1658 resp = await client.get(f"/api/v1/repos/{repo.repo_id}/settings")
1659 assert resp.status_code == 401
1660
1661
1662 @pytest.mark.anyio
1663 async def test_get_repo_settings_returns_403_for_non_admin(
1664 client: AsyncClient,
1665 db_session: AsyncSession,
1666 auth_headers: dict[str, str],
1667 ) -> None:
1668 """GET /repos/{repo_id}/settings returns 403 when caller is not owner or admin."""
1669 repo = MusehubRepo(
1670 name="settings-403-test",
1671 owner="other-owner",
1672 slug="settings-403-test",
1673 visibility="public",
1674 owner_user_id="other-user-id-not-test",
1675 )
1676 db_session.add(repo)
1677 await db_session.commit()
1678 await db_session.refresh(repo)
1679
1680 resp = await client.get(
1681 f"/api/v1/repos/{repo.repo_id}/settings",
1682 headers=auth_headers,
1683 )
1684 assert resp.status_code == 403
1685
1686
1687 @pytest.mark.anyio
1688 async def test_get_repo_settings_returns_404_for_unknown_repo(
1689 client: AsyncClient,
1690 auth_headers: dict[str, str],
1691 ) -> None:
1692 """GET /repos/{repo_id}/settings returns 404 for a non-existent repo."""
1693 resp = await client.get(
1694 "/api/v1/repos/nonexistent-repo-id/settings",
1695 headers=auth_headers,
1696 )
1697 assert resp.status_code == 404
1698
1699
1700 # ---------------------------------------------------------------------------
1701 # PATCH /repos/{repo_id}/settings
1702 # ---------------------------------------------------------------------------
1703
1704
1705 @pytest.mark.anyio
1706 async def test_patch_repo_settings_updates_fields(
1707 client: AsyncClient,
1708 db_session: AsyncSession,
1709 auth_headers: dict[str, str],
1710 ) -> None:
1711 """PATCH /repos/{repo_id}/settings owner can update dedicated and flag fields."""
1712 repo = MusehubRepo(
1713 name="settings-patch-test",
1714 owner="testuser",
1715 slug="settings-patch-test",
1716 visibility="private",
1717 owner_user_id=TEST_OWNER_USER_ID,
1718 )
1719 db_session.add(repo)
1720 await db_session.commit()
1721 await db_session.refresh(repo)
1722
1723 resp = await client.patch(
1724 f"/api/v1/repos/{repo.repo_id}/settings",
1725 json={
1726 "description": "Updated description",
1727 "visibility": "public",
1728 "hasIssues": False,
1729 "allowRebaseMerge": True,
1730 "homepageUrl": "https://muse.app",
1731 "topics": ["classical", "baroque"],
1732 },
1733 headers=auth_headers,
1734 )
1735 assert resp.status_code == 200
1736 body = resp.json()
1737 assert body["description"] == "Updated description"
1738 assert body["visibility"] == "public"
1739 assert body["hasIssues"] is False
1740 assert body["allowRebaseMerge"] is True
1741 assert body["homepageUrl"] == "https://muse.app"
1742 assert body["topics"] == ["classical", "baroque"]
1743 # Untouched field should retain its default
1744 assert body["allowMergeCommit"] is True
1745
1746
1747 @pytest.mark.anyio
1748 async def test_patch_repo_settings_partial_update_preserves_other_fields(
1749 client: AsyncClient,
1750 db_session: AsyncSession,
1751 auth_headers: dict[str, str],
1752 ) -> None:
1753 """PATCH with a single field leaves all other settings unchanged."""
1754 repo = MusehubRepo(
1755 name="settings-partial-test",
1756 owner="testuser",
1757 slug="settings-partial-test",
1758 visibility="private",
1759 owner_user_id=TEST_OWNER_USER_ID,
1760 )
1761 db_session.add(repo)
1762 await db_session.commit()
1763 await db_session.refresh(repo)
1764
1765 resp = await client.patch(
1766 f"/api/v1/repos/{repo.repo_id}/settings",
1767 json={"defaultBranch": "develop"},
1768 headers=auth_headers,
1769 )
1770 assert resp.status_code == 200
1771 body = resp.json()
1772 assert body["defaultBranch"] == "develop"
1773 # Other fields kept
1774 assert body["name"] == "settings-partial-test"
1775 assert body["visibility"] == "private"
1776 assert body["hasIssues"] is True
1777
1778
1779 @pytest.mark.anyio
1780 async def test_patch_repo_settings_requires_auth(
1781 client: AsyncClient,
1782 db_session: AsyncSession,
1783 ) -> None:
1784 """PATCH /repos/{repo_id}/settings returns 401 without a Bearer token."""
1785 repo = MusehubRepo(
1786 name="settings-patch-noauth",
1787 owner="testuser",
1788 slug="settings-patch-noauth",
1789 visibility="private",
1790 owner_user_id=TEST_OWNER_USER_ID,
1791 )
1792 db_session.add(repo)
1793 await db_session.commit()
1794 await db_session.refresh(repo)
1795
1796 resp = await client.patch(
1797 f"/api/v1/repos/{repo.repo_id}/settings",
1798 json={"visibility": "public"},
1799 )
1800 assert resp.status_code == 401
1801
1802
1803 @pytest.mark.anyio
1804 async def test_patch_repo_settings_returns_403_for_non_admin(
1805 client: AsyncClient,
1806 db_session: AsyncSession,
1807 auth_headers: dict[str, str],
1808 ) -> None:
1809 """PATCH /repos/{repo_id}/settings returns 403 when caller is not owner or admin."""
1810 repo = MusehubRepo(
1811 name="settings-patch-403",
1812 owner="other-owner",
1813 slug="settings-patch-403",
1814 visibility="public",
1815 owner_user_id="other-user-not-test",
1816 )
1817 db_session.add(repo)
1818 await db_session.commit()
1819 await db_session.refresh(repo)
1820
1821 resp = await client.patch(
1822 f"/api/v1/repos/{repo.repo_id}/settings",
1823 json={"hasWiki": True},
1824 headers=auth_headers,
1825 )
1826 assert resp.status_code == 403
1827
1828
1829 # ---------------------------------------------------------------------------
1830 # DELETE /repos/{repo_id} — soft-delete
1831 # ---------------------------------------------------------------------------
1832
1833
1834 @pytest.mark.anyio
1835 async def test_delete_repo_returns_204(
1836 client: AsyncClient,
1837 auth_headers: dict[str, str],
1838 ) -> None:
1839 """DELETE /repos/{repo_id} soft-deletes a repo owned by the caller and returns 204."""
1840 create = await client.post(
1841 "/api/v1/repos",
1842 json={"name": "to-delete", "owner": "testuser", "visibility": "private"},
1843 headers=auth_headers,
1844 )
1845 assert create.status_code == 201
1846 repo_id = create.json()["repoId"]
1847
1848 resp = await client.delete(f"/api/v1/repos/{repo_id}", headers=auth_headers)
1849 assert resp.status_code == 204
1850
1851
1852 @pytest.mark.anyio
1853 async def test_delete_repo_hides_repo_from_get(
1854 client: AsyncClient,
1855 auth_headers: dict[str, str],
1856 ) -> None:
1857 """After DELETE, GET /repos/{repo_id} returns 404."""
1858 create = await client.post(
1859 "/api/v1/repos",
1860 json={"name": "hidden-after-delete", "owner": "testuser", "visibility": "private"},
1861 headers=auth_headers,
1862 )
1863 repo_id = create.json()["repoId"]
1864
1865 await client.delete(f"/api/v1/repos/{repo_id}", headers=auth_headers)
1866
1867 get_resp = await client.get(f"/api/v1/repos/{repo_id}", headers=auth_headers)
1868 assert get_resp.status_code == 404
1869
1870
1871 @pytest.mark.anyio
1872 async def test_delete_repo_requires_auth(
1873 client: AsyncClient,
1874 db_session: AsyncSession,
1875 ) -> None:
1876 """DELETE /repos/{repo_id} returns 401 without a Bearer token."""
1877 repo = MusehubRepo(
1878 name="delete-noauth",
1879 owner="testuser",
1880 slug="delete-noauth",
1881 visibility="public",
1882 owner_user_id=TEST_OWNER_USER_ID,
1883 )
1884 db_session.add(repo)
1885 await db_session.commit()
1886 await db_session.refresh(repo)
1887
1888 resp = await client.delete(f"/api/v1/repos/{repo.repo_id}")
1889 assert resp.status_code == 401
1890
1891
1892 @pytest.mark.anyio
1893 async def test_delete_repo_returns_403_for_non_owner(
1894 client: AsyncClient,
1895 db_session: AsyncSession,
1896 auth_headers: dict[str, str],
1897 ) -> None:
1898 """DELETE /repos/{repo_id} returns 403 when caller is not the owner."""
1899 repo = MusehubRepo(
1900 name="delete-403",
1901 owner="other-owner",
1902 slug="delete-403",
1903 visibility="public",
1904 owner_user_id="some-other-user-id",
1905 )
1906 db_session.add(repo)
1907 await db_session.commit()
1908 await db_session.refresh(repo)
1909
1910 resp = await client.delete(
1911 f"/api/v1/repos/{repo.repo_id}", headers=auth_headers
1912 )
1913 assert resp.status_code == 403
1914
1915
1916 @pytest.mark.anyio
1917 async def test_delete_repo_returns_404_for_unknown_repo(
1918 client: AsyncClient,
1919 auth_headers: dict[str, str],
1920 ) -> None:
1921 """DELETE /repos/{repo_id} returns 404 for a non-existent repo."""
1922 resp = await client.delete(
1923 "/api/v1/repos/nonexistent-repo-id", headers=auth_headers
1924 )
1925 assert resp.status_code == 404
1926
1927
1928 @pytest.mark.anyio
1929 async def test_delete_repo_service_sets_deleted_at(
1930 db_session: AsyncSession,
1931 ) -> None:
1932 """delete_repo() service sets deleted_at on the row."""
1933 repo = await musehub_repository.create_repo(
1934 db_session,
1935 name="svc-delete-test",
1936 owner="testuser",
1937 visibility="private",
1938 owner_user_id="user-abc",
1939 )
1940 await db_session.commit()
1941
1942 deleted = await musehub_repository.delete_repo(db_session, repo.repo_id)
1943 await db_session.commit()
1944
1945 assert deleted is True
1946 # get_repo should return None for soft-deleted repos
1947 fetched = await musehub_repository.get_repo(db_session, repo.repo_id)
1948 assert fetched is None
1949
1950
1951 @pytest.mark.anyio
1952 async def test_delete_repo_service_returns_false_for_unknown(
1953 db_session: AsyncSession,
1954 ) -> None:
1955 """delete_repo() returns False for a non-existent repo."""
1956 result = await musehub_repository.delete_repo(db_session, "does-not-exist")
1957 assert result is False
1958
1959
1960 # ---------------------------------------------------------------------------
1961 # POST /repos/{repo_id}/transfer — transfer ownership
1962 # ---------------------------------------------------------------------------
1963
1964
1965 @pytest.mark.anyio
1966 async def test_transfer_repo_ownership_returns_200(
1967 client: AsyncClient,
1968 auth_headers: dict[str, str],
1969 ) -> None:
1970 """POST /repos/{repo_id}/transfer returns 200 with updated ownerUserId."""
1971 create = await client.post(
1972 "/api/v1/repos",
1973 json={"name": "transfer-me", "owner": "testuser", "visibility": "private"},
1974 headers=auth_headers,
1975 )
1976 assert create.status_code == 201
1977 repo_id = create.json()["repoId"]
1978 new_owner = "another-user-uuid-1234"
1979
1980 resp = await client.post(
1981 f"/api/v1/repos/{repo_id}/transfer",
1982 json={"newOwnerUserId": new_owner},
1983 headers=auth_headers,
1984 )
1985 assert resp.status_code == 200
1986 body = resp.json()
1987 assert body["ownerUserId"] == new_owner
1988 assert body["repoId"] == repo_id
1989
1990
1991 @pytest.mark.anyio
1992 async def test_transfer_repo_requires_auth(
1993 client: AsyncClient,
1994 db_session: AsyncSession,
1995 ) -> None:
1996 """POST /repos/{repo_id}/transfer returns 401 without a Bearer token."""
1997 repo = MusehubRepo(
1998 name="transfer-noauth",
1999 owner="testuser",
2000 slug="transfer-noauth",
2001 visibility="public",
2002 owner_user_id=TEST_OWNER_USER_ID,
2003 )
2004 db_session.add(repo)
2005 await db_session.commit()
2006 await db_session.refresh(repo)
2007
2008 resp = await client.post(
2009 f"/api/v1/repos/{repo.repo_id}/transfer",
2010 json={"newOwnerUserId": "new-user-id"},
2011 )
2012 assert resp.status_code == 401
2013
2014
2015 # ---------------------------------------------------------------------------
2016 # Wizard creation endpoint — # ---------------------------------------------------------------------------
2017
2018
2019 @pytest.mark.anyio
2020 async def test_create_repo_wizard_initialize_creates_branch_and_commit(
2021 client: AsyncClient,
2022 auth_headers: dict[str, str],
2023 db_session: AsyncSession,
2024 ) -> None:
2025 """POST /repos with initialize=true creates a default branch + initial commit."""
2026 resp = await client.post(
2027 "/api/v1/repos",
2028 json={
2029 "name": "wizard-init-repo",
2030 "owner": "testuser",
2031 "visibility": "public",
2032 "initialize": True,
2033 "defaultBranch": "main",
2034 },
2035 headers=auth_headers,
2036 )
2037 assert resp.status_code == 201
2038 repo_id = resp.json()["repoId"]
2039
2040 branches_resp = await client.get(
2041 f"/api/v1/repos/{repo_id}/branches",
2042 headers=auth_headers,
2043 )
2044 assert branches_resp.status_code == 200
2045 branches = branches_resp.json()["branches"]
2046 assert any(b["name"] == "main" for b in branches), "Expected 'main' branch to be created"
2047
2048 commits_resp = await client.get(
2049 f"/api/v1/repos/{repo_id}/commits",
2050 headers=auth_headers,
2051 )
2052 assert commits_resp.status_code == 200
2053 commits = commits_resp.json()["commits"]
2054 assert len(commits) == 1
2055 assert commits[0]["message"] == "Initial commit"
2056
2057
2058 @pytest.mark.anyio
2059 async def test_create_repo_wizard_no_initialize_stays_empty(
2060 client: AsyncClient,
2061 auth_headers: dict[str, str],
2062 ) -> None:
2063 """POST /repos with initialize=false leaves branches and commits empty."""
2064 resp = await client.post(
2065 "/api/v1/repos",
2066 json={
2067 "name": "wizard-noinit-repo",
2068 "owner": "testuser",
2069 "initialize": False,
2070 },
2071 headers=auth_headers,
2072 )
2073 assert resp.status_code == 201
2074 repo_id = resp.json()["repoId"]
2075
2076 branches_resp = await client.get(
2077 f"/api/v1/repos/{repo_id}/branches",
2078 headers=auth_headers,
2079 )
2080 assert branches_resp.json()["branches"] == []
2081
2082 commits_resp = await client.get(
2083 f"/api/v1/repos/{repo_id}/commits",
2084 headers=auth_headers,
2085 )
2086 assert commits_resp.json()["commits"] == []
2087
2088
2089 @pytest.mark.anyio
2090 async def test_create_repo_wizard_topics_merged_into_tags(
2091 client: AsyncClient,
2092 auth_headers: dict[str, str],
2093 ) -> None:
2094 """POST /repos with topics merges them into the tag list (deduplicated)."""
2095 resp = await client.post(
2096 "/api/v1/repos",
2097 json={
2098 "name": "topics-test-repo",
2099 "owner": "testuser",
2100 "tags": ["jazz"],
2101 "topics": ["classical", "jazz"], # 'jazz' deduped
2102 "initialize": False,
2103 },
2104 headers=auth_headers,
2105 )
2106 assert resp.status_code == 201
2107 body = resp.json()
2108 tags: list[str] = body["tags"]
2109 assert "jazz" in tags
2110 assert "classical" in tags
2111 assert tags.count("jazz") == 1, "Duplicate 'jazz' must be removed"
2112
2113
2114 @pytest.mark.anyio
2115 async def test_create_repo_wizard_clone_url_uses_musehub_scheme(
2116 client: AsyncClient,
2117 auth_headers: dict[str, str],
2118 ) -> None:
2119 """Clone URL returned by POST /repos uses the musehub:// protocol scheme."""
2120 resp = await client.post(
2121 "/api/v1/repos",
2122 json={"name": "clone-url-test", "owner": "testuser", "initialize": False},
2123 headers=auth_headers,
2124 )
2125 assert resp.status_code == 201
2126 clone_url: str = resp.json()["cloneUrl"]
2127 assert clone_url.startswith("musehub://"), f"Expected musehub:// prefix, got: {clone_url}"
2128 assert "testuser" in clone_url
2129
2130
2131 @pytest.mark.anyio
2132 async def test_create_repo_wizard_template_copies_description(
2133 client: AsyncClient,
2134 auth_headers: dict[str, str],
2135 db_session: AsyncSession,
2136 ) -> None:
2137 """POST /repos with template_repo_id copies description from a public template."""
2138 template = MusehubRepo(
2139 name="template-source",
2140 owner="template-owner",
2141 slug="template-source",
2142 visibility="public",
2143 owner_user_id="template-owner-id",
2144 description="A great neo-baroque composition template",
2145 tags=["baroque", "piano"],
2146 )
2147 db_session.add(template)
2148 await db_session.commit()
2149 await db_session.refresh(template)
2150 template_id = str(template.repo_id)
2151
2152 resp = await client.post(
2153 "/api/v1/repos",
2154 json={
2155 "name": "from-template-repo",
2156 "owner": "testuser",
2157 "initialize": False,
2158 "templateRepoId": template_id,
2159 },
2160 headers=auth_headers,
2161 )
2162 assert resp.status_code == 201
2163 body = resp.json()
2164 assert body["description"] == "A great neo-baroque composition template"
2165 assert "baroque" in body["tags"]
2166 assert "piano" in body["tags"]
2167
2168
2169 @pytest.mark.anyio
2170 async def test_create_repo_wizard_private_template_not_copied(
2171 client: AsyncClient,
2172 auth_headers: dict[str, str],
2173 db_session: AsyncSession,
2174 ) -> None:
2175 """Private template repo metadata is NOT copied (must be public)."""
2176 private_template = MusehubRepo(
2177 name="private-template",
2178 owner="secret-owner",
2179 slug="private-template",
2180 visibility="private",
2181 owner_user_id="secret-id",
2182 description="Secret description",
2183 tags=["secret"],
2184 )
2185 db_session.add(private_template)
2186 await db_session.commit()
2187 await db_session.refresh(private_template)
2188 template_id = str(private_template.repo_id)
2189
2190 resp = await client.post(
2191 "/api/v1/repos",
2192 json={
2193 "name": "refused-template-repo",
2194 "owner": "testuser",
2195 "description": "My own description",
2196 "initialize": False,
2197 "templateRepoId": template_id,
2198 },
2199 headers=auth_headers,
2200 )
2201 assert resp.status_code == 201
2202 body = resp.json()
2203 # Private template must not override user's own description
2204 assert body["description"] == "My own description"
2205 assert "secret" not in body["tags"]
2206
2207
2208 @pytest.mark.anyio
2209 async def test_create_repo_wizard_custom_default_branch(
2210 client: AsyncClient,
2211 auth_headers: dict[str, str],
2212 ) -> None:
2213 """POST /repos with initialize=true and custom defaultBranch creates the right branch."""
2214 resp = await client.post(
2215 "/api/v1/repos",
2216 json={
2217 "name": "custom-branch-repo",
2218 "owner": "testuser",
2219 "initialize": True,
2220 "defaultBranch": "develop",
2221 },
2222 headers=auth_headers,
2223 )
2224 assert resp.status_code == 201
2225 repo_id = resp.json()["repoId"]
2226
2227 branches_resp = await client.get(
2228 f"/api/v1/repos/{repo_id}/branches",
2229 headers=auth_headers,
2230 )
2231 branch_names = [b["name"] for b in branches_resp.json()["branches"]]
2232 assert "develop" in branch_names
2233 assert "main" not in branch_names
2234
2235
2236 # ---------------------------------------------------------------------------
2237 # GET /repos — list repos for authenticated user
2238 # ---------------------------------------------------------------------------
2239
2240
2241 @pytest.mark.anyio
2242 async def test_list_my_repos_returns_owned_repos(
2243 client: AsyncClient,
2244 auth_headers: dict[str, str],
2245 ) -> None:
2246 """GET /repos returns repos created by the authenticated user."""
2247 # Create two repos
2248 for name in ("owned-repo-a", "owned-repo-b"):
2249 await client.post(
2250 "/api/v1/repos",
2251 json={"name": name, "owner": "testuser", "initialize": False},
2252 headers=auth_headers,
2253 )
2254
2255 resp = await client.get("/api/v1/repos", headers=auth_headers)
2256 assert resp.status_code == 200
2257 body = resp.json()
2258 assert "repos" in body
2259 assert "total" in body
2260 assert "nextCursor" in body
2261 names = [r["name"] for r in body["repos"]]
2262 assert "owned-repo-a" in names
2263 assert "owned-repo-b" in names
2264
2265
2266 @pytest.mark.anyio
2267 async def test_list_my_repos_requires_auth(client: AsyncClient) -> None:
2268 """GET /repos returns 401 without a Bearer token."""
2269 resp = await client.get("/api/v1/repos")
2270 assert resp.status_code == 401
2271
2272
2273 @pytest.mark.anyio
2274 async def test_transfer_repo_returns_403_for_non_owner(
2275 client: AsyncClient,
2276 db_session: AsyncSession,
2277 auth_headers: dict[str, str],
2278 ) -> None:
2279 """POST /repos/{repo_id}/transfer returns 403 when caller is not the owner."""
2280 repo = MusehubRepo(
2281 name="transfer-403",
2282 owner="other-owner",
2283 slug="transfer-403",
2284 visibility="public",
2285 owner_user_id="some-other-user-id",
2286 )
2287 db_session.add(repo)
2288 await db_session.commit()
2289 await db_session.refresh(repo)
2290
2291 resp = await client.post(
2292 f"/api/v1/repos/{repo.repo_id}/transfer",
2293 json={"newOwnerUserId": "attacker-user-id"},
2294 headers=auth_headers,
2295 )
2296 assert resp.status_code == 403
2297
2298
2299 @pytest.mark.anyio
2300 async def test_transfer_repo_returns_404_for_unknown_repo(
2301 client: AsyncClient,
2302 auth_headers: dict[str, str],
2303 ) -> None:
2304 """POST /repos/{repo_id}/transfer returns 404 for a non-existent repo."""
2305 resp = await client.post(
2306 "/api/v1/repos/nonexistent-repo-id/transfer",
2307 json={"newOwnerUserId": "some-user"},
2308 headers=auth_headers,
2309 )
2310 assert resp.status_code == 404
2311
2312
2313 @pytest.mark.anyio
2314 async def test_transfer_repo_service_updates_owner_user_id(
2315 db_session: AsyncSession,
2316 ) -> None:
2317 """transfer_repo_ownership() service updates owner_user_id on the row."""
2318 repo = await musehub_repository.create_repo(
2319 db_session,
2320 name="svc-transfer-test",
2321 owner="testuser",
2322 visibility="private",
2323 owner_user_id="original-owner-id",
2324 )
2325 await db_session.commit()
2326
2327 updated = await musehub_repository.transfer_repo_ownership(
2328 db_session, repo.repo_id, "new-owner-id"
2329 )
2330 await db_session.commit()
2331
2332 assert updated is not None
2333 assert updated.owner_user_id == "new-owner-id"
2334 # Verify persisted
2335 fetched = await musehub_repository.get_repo(db_session, repo.repo_id)
2336 assert fetched is not None
2337 assert fetched.owner_user_id == "new-owner-id"
2338
2339
2340 @pytest.mark.anyio
2341 async def test_transfer_repo_service_returns_none_for_unknown(
2342 db_session: AsyncSession,
2343 ) -> None:
2344 """transfer_repo_ownership() returns None for a non-existent repo."""
2345 result = await musehub_repository.transfer_repo_ownership(
2346 db_session, "does-not-exist", "new-owner"
2347 )
2348 assert result is None
2349
2350
2351 # ---------------------------------------------------------------------------
2352 # GET /repos — list repos for authenticated user
2353 # ---------------------------------------------------------------------------
2354
2355
2356 @pytest.mark.anyio
2357 async def test_list_my_repos_total_matches_count(
2358 client: AsyncClient,
2359 auth_headers: dict[str, str],
2360 ) -> None:
2361 """total field in GET /repos matches the number of repos created."""
2362 initial = await client.get("/api/v1/repos", headers=auth_headers)
2363 initial_total: int = initial.json()["total"]
2364
2365 await client.post(
2366 "/api/v1/repos",
2367 json={"name": "total-count-test", "owner": "testuser", "initialize": False},
2368 headers=auth_headers,
2369 )
2370
2371 resp = await client.get("/api/v1/repos", headers=auth_headers)
2372 assert resp.status_code == 200
2373 assert resp.json()["total"] == initial_total + 1
2374
2375
2376 @pytest.mark.anyio
2377 async def test_list_my_repos_pagination_cursor(
2378 client: AsyncClient,
2379 auth_headers: dict[str, str],
2380 db_session: AsyncSession,
2381 ) -> None:
2382 """GET /repos with limit=1 returns a nextCursor that fetches the next page."""
2383 from datetime import datetime, timedelta, timezone
2384
2385 owner_user_id = "550e8400-e29b-41d4-a716-446655440000"
2386 now = datetime.now(tz=timezone.utc)
2387 for i in range(3):
2388 repo = MusehubRepo(
2389 name=f"paged-repo-{i}",
2390 owner="testuser",
2391 slug=f"paged-repo-{i}",
2392 visibility="public",
2393 owner_user_id=owner_user_id,
2394 )
2395 repo.created_at = now - timedelta(seconds=i)
2396 db_session.add(repo)
2397 await db_session.commit()
2398
2399 first_page = await client.get(
2400 "/api/v1/repos?limit=1",
2401 headers=auth_headers,
2402 )
2403 assert first_page.status_code == 200
2404 body = first_page.json()
2405 assert len(body["repos"]) == 1
2406 next_cursor = body["nextCursor"]
2407 assert next_cursor is not None
2408
2409 second_page = await client.get(
2410 f"/api/v1/repos?limit=1&cursor={next_cursor}",
2411 headers=auth_headers,
2412 )
2413 assert second_page.status_code == 200
2414 second_body = second_page.json()
2415 assert len(second_body["repos"]) == 1
2416 # Pages must not overlap
2417 first_id = body["repos"][0]["repoId"]
2418 second_id = second_body["repos"][0]["repoId"]
2419 assert first_id != second_id
2420
2421
2422 @pytest.mark.anyio
2423 async def test_list_my_repos_service_direct(db_session: AsyncSession) -> None:
2424 """list_repos_for_user() returns only repos owned by the given user."""
2425 from musehub.services.musehub_repository import list_repos_for_user
2426
2427 owner_id = "user-list-direct"
2428 other_id = "user-other-direct"
2429
2430 repo_mine = MusehubRepo(
2431 name="mine-direct",
2432 owner="testuser",
2433 slug="mine-direct",
2434 visibility="private",
2435 owner_user_id=owner_id,
2436 )
2437 repo_other = MusehubRepo(
2438 name="not-mine-direct",
2439 owner="otheruser",
2440 slug="not-mine-direct",
2441 visibility="private",
2442 owner_user_id=other_id,
2443 )
2444 db_session.add_all([repo_mine, repo_other])
2445 await db_session.commit()
2446
2447 result = await list_repos_for_user(db_session, owner_id)
2448 repo_ids = {r.repo_id for r in result.repos}
2449 assert str(repo_mine.repo_id) in repo_ids
2450 assert str(repo_other.repo_id) not in repo_ids
2451
2452
2453 # ---------------------------------------------------------------------------
2454 # GET /repos/{repo_id}/collaborators/{username}/permission
2455 # ---------------------------------------------------------------------------
2456
2457
2458 @pytest.mark.anyio
2459 async def test_collab_access_owner_returns_owner_permission(
2460 client: AsyncClient,
2461 db_session: AsyncSession,
2462 auth_headers: dict[str, str],
2463 ) -> None:
2464 """Owner's username returns permission='owner' with accepted_at=null."""
2465 from musehub.db.musehub_collaborator_models import MusehubCollaborator
2466
2467 owner_id = TEST_OWNER_USER_ID
2468 repo = MusehubRepo(
2469 name="access-owner-test",
2470 owner="testuser",
2471 slug="access-owner-test",
2472 visibility="private",
2473 owner_user_id=owner_id,
2474 )
2475 db_session.add(repo)
2476 await db_session.commit()
2477 await db_session.refresh(repo)
2478
2479 resp = await client.get(
2480 f"/api/v1/repos/{repo.repo_id}/collaborators/{owner_id}/permission",
2481 headers=auth_headers,
2482 )
2483 assert resp.status_code == 200
2484 body = resp.json()
2485 assert body["username"] == owner_id
2486 assert body["permission"] == "owner"
2487 assert body["acceptedAt"] is None
2488
2489
2490 @pytest.mark.anyio
2491 async def test_collab_access_collaborator_returns_permission(
2492 client: AsyncClient,
2493 db_session: AsyncSession,
2494 auth_headers: dict[str, str],
2495 ) -> None:
2496 """A known collaborator returns their permission level and accepted_at."""
2497 from datetime import datetime, timezone
2498
2499 from musehub.db.musehub_collaborator_models import MusehubCollaborator
2500
2501 owner_id = TEST_OWNER_USER_ID
2502 collab_user_id = "collab-user-write"
2503
2504 repo = MusehubRepo(
2505 name="access-collab-test",
2506 owner="testuser",
2507 slug="access-collab-test",
2508 visibility="private",
2509 owner_user_id=owner_id,
2510 )
2511 db_session.add(repo)
2512 await db_session.commit()
2513 await db_session.refresh(repo)
2514
2515 accepted = datetime(2026, 1, 10, 10, 0, 0, tzinfo=timezone.utc)
2516 collab = MusehubCollaborator(
2517 repo_id=str(repo.repo_id),
2518 user_id=collab_user_id,
2519 permission="write",
2520 accepted_at=accepted,
2521 )
2522 db_session.add(collab)
2523 await db_session.commit()
2524
2525 resp = await client.get(
2526 f"/api/v1/repos/{repo.repo_id}/collaborators/{collab_user_id}/permission",
2527 headers=auth_headers,
2528 )
2529 assert resp.status_code == 200
2530 body = resp.json()
2531 assert body["username"] == collab_user_id
2532 assert body["permission"] == "write"
2533 assert body["acceptedAt"] is not None
2534
2535
2536 @pytest.mark.anyio
2537 async def test_collab_access_non_collaborator_returns_404(
2538 client: AsyncClient,
2539 db_session: AsyncSession,
2540 auth_headers: dict[str, str],
2541 ) -> None:
2542 """A user who is not a collaborator returns 404 with an informative message."""
2543 repo = MusehubRepo(
2544 name="access-404-test",
2545 owner="testuser",
2546 slug="access-404-test",
2547 visibility="private",
2548 owner_user_id=TEST_OWNER_USER_ID,
2549 )
2550 db_session.add(repo)
2551 await db_session.commit()
2552 await db_session.refresh(repo)
2553
2554 stranger = "total-stranger-user"
2555 resp = await client.get(
2556 f"/api/v1/repos/{repo.repo_id}/collaborators/{stranger}/permission",
2557 headers=auth_headers,
2558 )
2559 assert resp.status_code == 404
2560 assert stranger in resp.json()["detail"]
2561
2562
2563 @pytest.mark.anyio
2564 async def test_collab_access_unknown_repo_returns_404(
2565 client: AsyncClient,
2566 auth_headers: dict[str, str],
2567 ) -> None:
2568 """Querying an unknown repo_id returns 404."""
2569 resp = await client.get(
2570 "/api/v1/repos/nonexistent-repo/collaborators/anyone/permission",
2571 headers=auth_headers,
2572 )
2573 assert resp.status_code == 404
2574
2575
2576 @pytest.mark.anyio
2577 async def test_collab_access_requires_auth(
2578 client: AsyncClient,
2579 db_session: AsyncSession,
2580 ) -> None:
2581 """GET /collaborators/{username}/permission returns 401 without a Bearer token."""
2582 repo = MusehubRepo(
2583 name="access-auth-test",
2584 owner="testuser",
2585 slug="access-auth-test",
2586 visibility="public",
2587 owner_user_id=TEST_OWNER_USER_ID,
2588 )
2589 db_session.add(repo)
2590 await db_session.commit()
2591 await db_session.refresh(repo)
2592
2593 resp = await client.get(
2594 f"/api/v1/repos/{repo.repo_id}/collaborators/anyone/permission"
2595 )
2596 assert resp.status_code == 401
2597
2598
2599 @pytest.mark.anyio
2600 async def test_collab_access_admin_permission(
2601 client: AsyncClient,
2602 db_session: AsyncSession,
2603 auth_headers: dict[str, str],
2604 ) -> None:
2605 """A collaborator with admin permission returns permission='admin'."""
2606 from musehub.db.musehub_collaborator_models import MusehubCollaborator
2607
2608 repo = MusehubRepo(
2609 name="access-admin-test",
2610 owner="testuser",
2611 slug="access-admin-test",
2612 visibility="private",
2613 owner_user_id=TEST_OWNER_USER_ID,
2614 )
2615 db_session.add(repo)
2616 await db_session.commit()
2617 await db_session.refresh(repo)
2618
2619 admin_user = "admin-collab-user"
2620 collab = MusehubCollaborator(
2621 repo_id=str(repo.repo_id),
2622 user_id=admin_user,
2623 permission="admin",
2624 accepted_at=None,
2625 )
2626 db_session.add(collab)
2627 await db_session.commit()
2628
2629 resp = await client.get(
2630 f"/api/v1/repos/{repo.repo_id}/collaborators/{admin_user}/permission",
2631 headers=auth_headers,
2632 )
2633 assert resp.status_code == 200
2634 body = resp.json()
2635 assert body["permission"] == "admin"