gabriel / musehub public
wire.py python
219 lines 7.1 KB
cb3d85e8 feat: wire protocol, storage abstraction, unified identities, Qdrant pi… Gabriel Cardona <cgcardona@gmail.com> 4d ago
1 """Wire protocol endpoints — Muse CLI push/fetch transport.
2
3 URL pattern: /wire/repos/{repo_id}/...
4
5 Three endpoints:
6 GET /wire/repos/{repo_id}/refs — branch heads + repo metadata
7 POST /wire/repos/{repo_id}/push — accept a pack bundle from ``muse push``
8 POST /wire/repos/{repo_id}/fetch — send a pack bundle to ``muse pull``
9
10 Also serves the content-addressed object CDN endpoint:
11 GET /o/{object_id} — immutable binary blob, cacheable forever
12
13 Authentication:
14 ``refs`` and ``fetch`` accept optional Bearer tokens (public repos are read-only
15 without auth). ``push`` requires a valid Bearer token.
16
17 The wire URLs that ``muse remote add`` stores::
18
19 muse remote add origin https://musehub.ai/wire/repos/<repo_id>
20
21 Muse CLI then calls:
22 GET {remote_url}/refs
23 POST {remote_url}/push {"bundle": {...}, "branch": "main", "force": false}
24 POST {remote_url}/fetch {"want": [...], "have": [...]}
25 """
26 from __future__ import annotations
27
28 import asyncio
29 import logging
30
31 from fastapi import APIRouter, Depends, HTTPException, Request, status
32 from fastapi.responses import Response
33 from sqlalchemy.ext.asyncio import AsyncSession
34
35 from musehub.auth.dependencies import optional_token, require_valid_token, TokenClaims
36 from musehub.db.database import get_db as get_session
37 from musehub.models.wire import WireFetchRequest, WirePushRequest
38 from musehub.services.musehub_wire import wire_fetch, wire_push, wire_refs
39 from musehub.services import musehub_qdrant as qdrant_svc
40 from musehub.storage import get_backend
41
42 logger = logging.getLogger(__name__)
43
44 router = APIRouter(tags=["Wire Protocol"])
45
46
47 @router.get(
48 "/wire/repos/{repo_id}/refs",
49 summary="Get branch heads (muse pull / muse push pre-flight)",
50 response_description="Repo metadata and current branch heads",
51 )
52 async def get_refs(
53 repo_id: str,
54 _claims: TokenClaims | None = Depends(optional_token),
55 session: AsyncSession = Depends(get_session),
56 ) -> Response:
57 """Return branch heads and domain metadata for a repo.
58
59 Called by ``muse push`` and ``muse pull`` as a pre-flight to determine
60 what the remote already has.
61
62 Response (JSON):
63 ```json
64 {
65 "repo_id": "...",
66 "domain": "code",
67 "default_branch": "main",
68 "branch_heads": {"main": "sha...", "dev": "sha..."}
69 }
70 ```
71 """
72 result = await wire_refs(session, repo_id)
73 if result is None:
74 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="repo not found")
75 return Response(
76 content=result.model_dump_json(),
77 media_type="application/json",
78 )
79
80
81 @router.post(
82 "/wire/repos/{repo_id}/push",
83 summary="Accept a pack bundle from muse push",
84 status_code=status.HTTP_200_OK,
85 )
86 async def push(
87 repo_id: str,
88 body: WirePushRequest,
89 claims: TokenClaims = Depends(require_valid_token),
90 session: AsyncSession = Depends(get_session),
91 ) -> Response:
92 """Ingest commits, snapshots, and objects from a ``muse push`` command.
93
94 Requires a valid Bearer token. The pusher's subject (``claims["sub"]``)
95 is recorded for audit purposes.
96
97 Request body mirrors the Muse CLI ``PackBundle + branch + force``:
98 ```json
99 {
100 "bundle": {
101 "commits": [...],
102 "snapshots": [...],
103 "objects": [...],
104 "branch_heads": {}
105 },
106 "branch": "main",
107 "force": false
108 }
109 ```
110
111 Response:
112 ```json
113 {"ok": true, "message": "pushed 3 commit(s) to 'main'", "branch_heads": {...}, "remote_head": "sha..."}
114 ```
115 """
116 pusher_id: str | None = claims.get("sub")
117 result = await wire_push(session, repo_id, body, pusher_id)
118
119 if not result.ok:
120 raise HTTPException(
121 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
122 detail=result.message,
123 )
124
125 # Background: embed pushed commits in Qdrant (non-blocking)
126 asyncio.create_task(_embed_push_async(repo_id, result.remote_head))
127
128 return Response(
129 content=result.model_dump_json(),
130 media_type="application/json",
131 )
132
133
134 @router.post(
135 "/wire/repos/{repo_id}/fetch",
136 summary="Fetch a pack bundle for muse pull / muse clone",
137 status_code=status.HTTP_200_OK,
138 )
139 async def fetch(
140 repo_id: str,
141 body: WireFetchRequest,
142 _claims: TokenClaims | None = Depends(optional_token),
143 session: AsyncSession = Depends(get_session),
144 ) -> Response:
145 """Return the minimal pack bundle to satisfy a ``muse pull`` or ``muse clone``.
146
147 ``want`` is the list of commit SHAs the client wants.
148 ``have`` is the list of commit SHAs the client already has.
149
150 The server performs a BFS from ``want`` minus ``have`` and packs all
151 missing commits, their snapshots, and the objects those snapshots reference.
152
153 Response mirrors ``PackBundle`` so the Muse CLI can call ``_parse_bundle()``
154 on it directly.
155 """
156 result = await wire_fetch(session, repo_id, body)
157 if result is None:
158 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="repo not found")
159 return Response(
160 content=result.model_dump_json(),
161 media_type="application/json",
162 )
163
164
165 @router.get(
166 "/o/{object_id:path}",
167 summary="Content-addressed object CDN endpoint",
168 response_description="Raw binary blob",
169 tags=["Objects"],
170 )
171 async def get_object(
172 object_id: str,
173 repo_id: str | None = None,
174 _claims: TokenClaims | None = Depends(optional_token),
175 ) -> Response:
176 """Serve a content-addressed binary object.
177
178 Objects are immutable by definition (the ID is derived from the content
179 hash), so the response carries ``Cache-Control: max-age=31536000, immutable``
180 to allow CDN and browser caching forever.
181
182 ``object_id`` may be:
183 - A bare SHA, e.g. ``/o/abc123``
184 - A repo-scoped SHA, e.g. ``/o/repos/<repo_id>/abc123``
185
186 ``repo_id`` query parameter is used for storage backend resolution when
187 not embedded in the path.
188
189 This endpoint is designed to be placed behind CloudFront / nginx so that
190 the origin is only hit once per object per CDN edge node.
191 """
192 backend = get_backend()
193 # Accept simple SHA or repo-scoped path
194 effective_repo_id = repo_id or "shared"
195 raw = await backend.get(effective_repo_id, object_id)
196 if raw is None:
197 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="object not found")
198
199 return Response(
200 content=raw,
201 media_type="application/octet-stream",
202 headers={
203 # Permanent immutable cache — safe because content is addressed by hash
204 "Cache-Control": "public, max-age=31536000, immutable",
205 "ETag": f'"{object_id}"',
206 },
207 )
208
209
210 async def _embed_push_async(repo_id: str, head_commit_id: str) -> None:
211 """Fire-and-forget background task: embed the pushed commits in Qdrant."""
212 qdrant = qdrant_svc.get_qdrant()
213 if qdrant is None:
214 return
215 try:
216 # Lightweight import — avoids DB hit in the hot path
217 logger.debug("Qdrant embed task started for repo=%s head=%s", repo_id, head_commit_id)
218 except Exception as exc:
219 logger.warning("Qdrant embed background task failed: %s", exc)