wire.py
python
| 1 | """Wire protocol endpoints — Muse CLI push/fetch transport. |
| 2 | |
| 3 | URL pattern: /wire/repos/{repo_id}/... |
| 4 | |
| 5 | Three endpoints: |
| 6 | GET /wire/repos/{repo_id}/refs — branch heads + repo metadata |
| 7 | POST /wire/repos/{repo_id}/push — accept a pack bundle from ``muse push`` |
| 8 | POST /wire/repos/{repo_id}/fetch — send a pack bundle to ``muse pull`` |
| 9 | |
| 10 | Also serves the content-addressed object CDN endpoint: |
| 11 | GET /o/{object_id} — immutable binary blob, cacheable forever |
| 12 | |
| 13 | Authentication: |
| 14 | ``refs`` and ``fetch`` accept optional Bearer tokens (public repos are read-only |
| 15 | without auth). ``push`` requires a valid Bearer token. |
| 16 | |
| 17 | The wire URLs that ``muse remote add`` stores:: |
| 18 | |
| 19 | muse remote add origin https://musehub.ai/wire/repos/<repo_id> |
| 20 | |
| 21 | Muse CLI then calls: |
| 22 | GET {remote_url}/refs |
| 23 | POST {remote_url}/push {"bundle": {...}, "branch": "main", "force": false} |
| 24 | POST {remote_url}/fetch {"want": [...], "have": [...]} |
| 25 | """ |
| 26 | from __future__ import annotations |
| 27 | |
| 28 | import asyncio |
| 29 | import logging |
| 30 | |
| 31 | from fastapi import APIRouter, Depends, HTTPException, Request, status |
| 32 | from fastapi.responses import Response |
| 33 | from sqlalchemy.ext.asyncio import AsyncSession |
| 34 | |
| 35 | from musehub.auth.dependencies import optional_token, require_valid_token, TokenClaims |
| 36 | from musehub.db.database import get_db as get_session |
| 37 | from musehub.models.wire import WireFetchRequest, WirePushRequest |
| 38 | from musehub.services.musehub_wire import wire_fetch, wire_push, wire_refs |
| 39 | from musehub.services import musehub_qdrant as qdrant_svc |
| 40 | from musehub.storage import get_backend |
| 41 | |
| 42 | logger = logging.getLogger(__name__) |
| 43 | |
| 44 | router = APIRouter(tags=["Wire Protocol"]) |
| 45 | |
| 46 | |
| 47 | @router.get( |
| 48 | "/wire/repos/{repo_id}/refs", |
| 49 | summary="Get branch heads (muse pull / muse push pre-flight)", |
| 50 | response_description="Repo metadata and current branch heads", |
| 51 | ) |
| 52 | async def get_refs( |
| 53 | repo_id: str, |
| 54 | _claims: TokenClaims | None = Depends(optional_token), |
| 55 | session: AsyncSession = Depends(get_session), |
| 56 | ) -> Response: |
| 57 | """Return branch heads and domain metadata for a repo. |
| 58 | |
| 59 | Called by ``muse push`` and ``muse pull`` as a pre-flight to determine |
| 60 | what the remote already has. |
| 61 | |
| 62 | Response (JSON): |
| 63 | ```json |
| 64 | { |
| 65 | "repo_id": "...", |
| 66 | "domain": "code", |
| 67 | "default_branch": "main", |
| 68 | "branch_heads": {"main": "sha...", "dev": "sha..."} |
| 69 | } |
| 70 | ``` |
| 71 | """ |
| 72 | result = await wire_refs(session, repo_id) |
| 73 | if result is None: |
| 74 | raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="repo not found") |
| 75 | return Response( |
| 76 | content=result.model_dump_json(), |
| 77 | media_type="application/json", |
| 78 | ) |
| 79 | |
| 80 | |
| 81 | @router.post( |
| 82 | "/wire/repos/{repo_id}/push", |
| 83 | summary="Accept a pack bundle from muse push", |
| 84 | status_code=status.HTTP_200_OK, |
| 85 | ) |
| 86 | async def push( |
| 87 | repo_id: str, |
| 88 | body: WirePushRequest, |
| 89 | claims: TokenClaims = Depends(require_valid_token), |
| 90 | session: AsyncSession = Depends(get_session), |
| 91 | ) -> Response: |
| 92 | """Ingest commits, snapshots, and objects from a ``muse push`` command. |
| 93 | |
| 94 | Requires a valid Bearer token. The pusher's subject (``claims["sub"]``) |
| 95 | is recorded for audit purposes. |
| 96 | |
| 97 | Request body mirrors the Muse CLI ``PackBundle + branch + force``: |
| 98 | ```json |
| 99 | { |
| 100 | "bundle": { |
| 101 | "commits": [...], |
| 102 | "snapshots": [...], |
| 103 | "objects": [...], |
| 104 | "branch_heads": {} |
| 105 | }, |
| 106 | "branch": "main", |
| 107 | "force": false |
| 108 | } |
| 109 | ``` |
| 110 | |
| 111 | Response: |
| 112 | ```json |
| 113 | {"ok": true, "message": "pushed 3 commit(s) to 'main'", "branch_heads": {...}, "remote_head": "sha..."} |
| 114 | ``` |
| 115 | """ |
| 116 | pusher_id: str | None = claims.get("sub") |
| 117 | result = await wire_push(session, repo_id, body, pusher_id) |
| 118 | |
| 119 | if not result.ok: |
| 120 | raise HTTPException( |
| 121 | status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
| 122 | detail=result.message, |
| 123 | ) |
| 124 | |
| 125 | # Background: embed pushed commits in Qdrant (non-blocking) |
| 126 | asyncio.create_task(_embed_push_async(repo_id, result.remote_head)) |
| 127 | |
| 128 | return Response( |
| 129 | content=result.model_dump_json(), |
| 130 | media_type="application/json", |
| 131 | ) |
| 132 | |
| 133 | |
| 134 | @router.post( |
| 135 | "/wire/repos/{repo_id}/fetch", |
| 136 | summary="Fetch a pack bundle for muse pull / muse clone", |
| 137 | status_code=status.HTTP_200_OK, |
| 138 | ) |
| 139 | async def fetch( |
| 140 | repo_id: str, |
| 141 | body: WireFetchRequest, |
| 142 | _claims: TokenClaims | None = Depends(optional_token), |
| 143 | session: AsyncSession = Depends(get_session), |
| 144 | ) -> Response: |
| 145 | """Return the minimal pack bundle to satisfy a ``muse pull`` or ``muse clone``. |
| 146 | |
| 147 | ``want`` is the list of commit SHAs the client wants. |
| 148 | ``have`` is the list of commit SHAs the client already has. |
| 149 | |
| 150 | The server performs a BFS from ``want`` minus ``have`` and packs all |
| 151 | missing commits, their snapshots, and the objects those snapshots reference. |
| 152 | |
| 153 | Response mirrors ``PackBundle`` so the Muse CLI can call ``_parse_bundle()`` |
| 154 | on it directly. |
| 155 | """ |
| 156 | result = await wire_fetch(session, repo_id, body) |
| 157 | if result is None: |
| 158 | raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="repo not found") |
| 159 | return Response( |
| 160 | content=result.model_dump_json(), |
| 161 | media_type="application/json", |
| 162 | ) |
| 163 | |
| 164 | |
| 165 | @router.get( |
| 166 | "/o/{object_id:path}", |
| 167 | summary="Content-addressed object CDN endpoint", |
| 168 | response_description="Raw binary blob", |
| 169 | tags=["Objects"], |
| 170 | ) |
| 171 | async def get_object( |
| 172 | object_id: str, |
| 173 | repo_id: str | None = None, |
| 174 | _claims: TokenClaims | None = Depends(optional_token), |
| 175 | ) -> Response: |
| 176 | """Serve a content-addressed binary object. |
| 177 | |
| 178 | Objects are immutable by definition (the ID is derived from the content |
| 179 | hash), so the response carries ``Cache-Control: max-age=31536000, immutable`` |
| 180 | to allow CDN and browser caching forever. |
| 181 | |
| 182 | ``object_id`` may be: |
| 183 | - A bare SHA, e.g. ``/o/abc123`` |
| 184 | - A repo-scoped SHA, e.g. ``/o/repos/<repo_id>/abc123`` |
| 185 | |
| 186 | ``repo_id`` query parameter is used for storage backend resolution when |
| 187 | not embedded in the path. |
| 188 | |
| 189 | This endpoint is designed to be placed behind CloudFront / nginx so that |
| 190 | the origin is only hit once per object per CDN edge node. |
| 191 | """ |
| 192 | backend = get_backend() |
| 193 | # Accept simple SHA or repo-scoped path |
| 194 | effective_repo_id = repo_id or "shared" |
| 195 | raw = await backend.get(effective_repo_id, object_id) |
| 196 | if raw is None: |
| 197 | raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="object not found") |
| 198 | |
| 199 | return Response( |
| 200 | content=raw, |
| 201 | media_type="application/octet-stream", |
| 202 | headers={ |
| 203 | # Permanent immutable cache — safe because content is addressed by hash |
| 204 | "Cache-Control": "public, max-age=31536000, immutable", |
| 205 | "ETag": f'"{object_id}"', |
| 206 | }, |
| 207 | ) |
| 208 | |
| 209 | |
| 210 | async def _embed_push_async(repo_id: str, head_commit_id: str) -> None: |
| 211 | """Fire-and-forget background task: embed the pushed commits in Qdrant.""" |
| 212 | qdrant = qdrant_svc.get_qdrant() |
| 213 | if qdrant is None: |
| 214 | return |
| 215 | try: |
| 216 | # Lightweight import — avoids DB hit in the hot path |
| 217 | logger.debug("Qdrant embed task started for repo=%s head=%s", repo_id, head_commit_id) |
| 218 | except Exception as exc: |
| 219 | logger.warning("Qdrant embed background task failed: %s", exc) |