cgcardona / muse public
transport.py python
450 lines 16.0 KB
99c4ff79 feat: add remote sync commands (remote, clone, fetch, pull, push, ls-remote) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Muse transport layer — typed HTTP client for MuseHub communication.
2
3 The :class:`MuseTransport` Protocol defines the interface between the Muse CLI
4 and a remote host (e.g. MuseHub). The CLI calls this Protocol; MuseHub
5 implements the server side.
6
7 :class:`HttpTransport` is the stdlib implementation using ``urllib.request``
8 (synchronous, HTTP/1.1 + TLS). The :class:`MuseTransport` Protocol seam
9 means MuseHub can upgrade to HTTP/2 or gRPC on the server side without
10 touching any CLI command code — only the ``HttpTransport`` class changes.
11
12 MuseHub API contract
13 --------------------
14
15 All endpoints live under the remote repository URL
16 (e.g. ``https://hub.muse.io/repos/{repo_id}``).
17
18 GET {url}/refs
19 Response: JSON :class:`~muse.core.pack.RemoteInfo`
20
21 POST {url}/fetch
22 Body: JSON :class:`~muse.core.pack.FetchRequest`
23 Response: JSON :class:`~muse.core.pack.PackBundle`
24
25 POST {url}/push
26 Body: JSON ``{"bundle": PackBundle, "branch": str, "force": bool}``
27 Response: JSON :class:`~muse.core.pack.PushResult`
28
29 Authentication
30 --------------
31
32 All endpoints accept an ``Authorization: Bearer <token>`` header. Public
33 repositories may work without a token. The token is read from
34 ``.muse/config.toml`` via :func:`muse.cli.config.get_auth_token` and is
35 **never** written to any log line.
36
37 Error codes
38 -----------
39
40 401 Unauthorized — invalid or missing token
41 404 Not found — repo does not exist on the remote
42 409 Conflict — push rejected (non-fast-forward without ``--force``)
43 5xx Server error
44 """
45
46 from __future__ import annotations
47
48 import json
49 import logging
50 import urllib.error
51 import urllib.request
52 from typing import Protocol
53
54 from muse.core.pack import FetchRequest, ObjectPayload, PackBundle, PushResult, RemoteInfo
55 from muse.core.store import CommitDict, SnapshotDict
56 from muse.domain import SemVerBump
57
58 logger = logging.getLogger(__name__)
59
60 _TIMEOUT_SECONDS = 60
61
62
63 # ---------------------------------------------------------------------------
64 # Exception
65 # ---------------------------------------------------------------------------
66
67
68 class TransportError(Exception):
69 """Raised when the remote returns a non-2xx response or is unreachable.
70
71 Attributes:
72 status_code: HTTP status code (e.g. ``401``, ``404``, ``409``, ``500``).
73 ``0`` for network-level failures (DNS, connection refused).
74 """
75
76 def __init__(self, message: str, status_code: int) -> None:
77 super().__init__(message)
78 self.status_code = status_code
79
80
81 # ---------------------------------------------------------------------------
82 # Protocol — the seam between CLI commands and the transport implementation
83 # ---------------------------------------------------------------------------
84
85
86 class MuseTransport(Protocol):
87 """Protocol for Muse remote transport implementations.
88
89 All methods are synchronous — the Muse CLI is synchronous by design.
90 """
91
92 def fetch_remote_info(self, url: str, token: str | None) -> RemoteInfo:
93 """Return repository metadata from ``GET {url}/refs``.
94
95 Args:
96 url: Remote repository URL.
97 token: Bearer token, or ``None`` for public repos.
98
99 Raises:
100 :class:`TransportError` on HTTP 4xx/5xx or network failure.
101 """
102 ...
103
104 def fetch_pack(
105 self, url: str, token: str | None, want: list[str], have: list[str]
106 ) -> PackBundle:
107 """Download a :class:`~muse.core.pack.PackBundle` via ``POST {url}/fetch``.
108
109 Args:
110 url: Remote repository URL.
111 token: Bearer token, or ``None``.
112 want: Commit IDs the client wants to receive.
113 have: Commit IDs already present locally.
114
115 Raises:
116 :class:`TransportError` on HTTP 4xx/5xx or network failure.
117 """
118 ...
119
120 def push_pack(
121 self,
122 url: str,
123 token: str | None,
124 bundle: PackBundle,
125 branch: str,
126 force: bool,
127 ) -> PushResult:
128 """Upload a :class:`~muse.core.pack.PackBundle` via ``POST {url}/push``.
129
130 Args:
131 url: Remote repository URL.
132 token: Bearer token, or ``None``.
133 bundle: Bundle to upload.
134 branch: Remote branch to update.
135 force: Bypass the server-side fast-forward check.
136
137 Raises:
138 :class:`TransportError` on HTTP 4xx/5xx or network failure.
139 """
140 ...
141
142
143 # ---------------------------------------------------------------------------
144 # HTTP/1.1 implementation (stdlib, zero extra dependencies)
145 # ---------------------------------------------------------------------------
146
147
148 class HttpTransport:
149 """Synchronous HTTPS transport using stdlib ``urllib.request``.
150
151 One short-lived HTTPS connection per CLI invocation over HTTP/1.1.
152 Bearer token values are **never** written to any log line.
153 """
154
155 def _build_request(
156 self,
157 method: str,
158 url: str,
159 token: str | None,
160 body_bytes: bytes | None = None,
161 ) -> urllib.request.Request:
162 headers: dict[str, str] = {"Accept": "application/json"}
163 if body_bytes is not None:
164 headers["Content-Type"] = "application/json"
165 if token:
166 headers["Authorization"] = f"Bearer {token}"
167 return urllib.request.Request(
168 url=url,
169 data=body_bytes,
170 headers=headers,
171 method=method,
172 )
173
174 def _execute(self, req: urllib.request.Request) -> bytes:
175 """Send *req* and return raw response bytes.
176
177 Raises:
178 :class:`TransportError` on non-2xx HTTP or any network error.
179 """
180 try:
181 with urllib.request.urlopen(req, timeout=_TIMEOUT_SECONDS) as resp:
182 body: bytes = resp.read()
183 return body
184 except urllib.error.HTTPError as exc:
185 try:
186 err_body: str = exc.read().decode("utf-8", errors="replace")
187 except Exception:
188 err_body = ""
189 raise TransportError(f"HTTP {exc.code}: {err_body[:400]}", exc.code) from exc
190 except urllib.error.URLError as exc:
191 raise TransportError(str(exc.reason), 0) from exc
192
193 def fetch_remote_info(self, url: str, token: str | None) -> RemoteInfo:
194 """Fetch repository metadata from ``GET {url}/refs``."""
195 endpoint = f"{url.rstrip('/')}/refs"
196 logger.debug("transport: GET %s", endpoint)
197 req = self._build_request("GET", endpoint, token)
198 raw = self._execute(req)
199 return _parse_remote_info(raw)
200
201 def fetch_pack(
202 self, url: str, token: str | None, want: list[str], have: list[str]
203 ) -> PackBundle:
204 """Download a PackBundle via ``POST {url}/fetch``."""
205 endpoint = f"{url.rstrip('/')}/fetch"
206 logger.debug(
207 "transport: POST %s (want=%d, have=%d)", endpoint, len(want), len(have)
208 )
209 payload: FetchRequest = {"want": want, "have": have}
210 body_bytes = json.dumps(payload).encode("utf-8")
211 req = self._build_request("POST", endpoint, token, body_bytes)
212 raw = self._execute(req)
213 return _parse_bundle(raw)
214
215 def push_pack(
216 self,
217 url: str,
218 token: str | None,
219 bundle: PackBundle,
220 branch: str,
221 force: bool,
222 ) -> PushResult:
223 """Upload a PackBundle via ``POST {url}/push``."""
224 endpoint = f"{url.rstrip('/')}/push"
225 logger.debug(
226 "transport: POST %s (branch=%s, force=%s, commits=%d)",
227 endpoint,
228 branch,
229 force,
230 len(bundle.get("commits") or []),
231 )
232 payload = {"bundle": bundle, "branch": branch, "force": force}
233 body_bytes = json.dumps(payload).encode("utf-8")
234 req = self._build_request("POST", endpoint, token, body_bytes)
235 raw = self._execute(req)
236 return _parse_push_result(raw)
237
238
239 # ---------------------------------------------------------------------------
240 # Response parsers — JSON bytes → typed TypedDicts
241 # ---------------------------------------------------------------------------
242 # json.loads() returns Any (per typeshed), so we use isinstance narrowing
243 # throughout. No explicit Any annotations appear in this file.
244 # ---------------------------------------------------------------------------
245
246
247 def _parse_remote_info(raw: bytes) -> RemoteInfo:
248 """Parse ``GET /refs`` response bytes into a :class:`~muse.core.pack.RemoteInfo`."""
249 parsed = json.loads(raw)
250 if not isinstance(parsed, dict):
251 return RemoteInfo(
252 repo_id="", domain="midi", branch_heads={}, default_branch="main"
253 )
254 repo_id_val = parsed.get("repo_id")
255 domain_val = parsed.get("domain")
256 default_branch_val = parsed.get("default_branch")
257 branch_heads_raw = parsed.get("branch_heads")
258 branch_heads: dict[str, str] = {}
259 if isinstance(branch_heads_raw, dict):
260 for k, v in branch_heads_raw.items():
261 if isinstance(k, str) and isinstance(v, str):
262 branch_heads[k] = v
263 return RemoteInfo(
264 repo_id=str(repo_id_val) if isinstance(repo_id_val, str) else "",
265 domain=str(domain_val) if isinstance(domain_val, str) else "midi",
266 default_branch=(
267 str(default_branch_val) if isinstance(default_branch_val, str) else "main"
268 ),
269 branch_heads=branch_heads,
270 )
271
272
273 def _parse_bundle(raw: bytes) -> PackBundle:
274 """Parse ``POST /fetch`` response bytes into a :class:`~muse.core.pack.PackBundle`."""
275 parsed = json.loads(raw)
276 bundle: PackBundle = {}
277 if not isinstance(parsed, dict):
278 return bundle
279
280 # Commits — each item is a raw dict that CommitRecord.from_dict() will validate.
281 commits_raw = parsed.get("commits")
282 if isinstance(commits_raw, list):
283 commits: list[CommitDict] = []
284 for item in commits_raw:
285 if isinstance(item, dict):
286 commits.append(_coerce_commit_dict(item))
287 bundle["commits"] = commits
288
289 # Snapshots
290 snapshots_raw = parsed.get("snapshots")
291 if isinstance(snapshots_raw, list):
292 snapshots: list[SnapshotDict] = []
293 for item in snapshots_raw:
294 if isinstance(item, dict):
295 snapshots.append(_coerce_snapshot_dict(item))
296 bundle["snapshots"] = snapshots
297
298 # Objects
299 objects_raw = parsed.get("objects")
300 if isinstance(objects_raw, list):
301 objects: list[ObjectPayload] = []
302 for item in objects_raw:
303 if isinstance(item, dict):
304 oid = item.get("object_id")
305 b64 = item.get("content_b64")
306 if isinstance(oid, str) and isinstance(b64, str):
307 objects.append(ObjectPayload(object_id=oid, content_b64=b64))
308 bundle["objects"] = objects
309
310 # Branch heads
311 heads_raw = parsed.get("branch_heads")
312 if isinstance(heads_raw, dict):
313 branch_heads: dict[str, str] = {}
314 for k, v in heads_raw.items():
315 if isinstance(k, str) and isinstance(v, str):
316 branch_heads[k] = v
317 bundle["branch_heads"] = branch_heads
318
319 return bundle
320
321
322 def _parse_push_result(raw: bytes) -> PushResult:
323 """Parse ``POST /push`` response bytes into a :class:`~muse.core.pack.PushResult`."""
324 parsed = json.loads(raw)
325 if not isinstance(parsed, dict):
326 return PushResult(ok=False, message="Invalid server response", branch_heads={})
327 ok_val = parsed.get("ok")
328 msg_val = parsed.get("message")
329 heads_raw = parsed.get("branch_heads")
330 branch_heads: dict[str, str] = {}
331 if isinstance(heads_raw, dict):
332 for k, v in heads_raw.items():
333 if isinstance(k, str) and isinstance(v, str):
334 branch_heads[k] = v
335 return PushResult(
336 ok=bool(ok_val) if isinstance(ok_val, bool) else False,
337 message=str(msg_val) if isinstance(msg_val, str) else "",
338 branch_heads=branch_heads,
339 )
340
341
342 # ---------------------------------------------------------------------------
343 # TypedDict coercion helpers — extract known string fields from raw JSON dicts
344 # ---------------------------------------------------------------------------
345 # CommitDict and SnapshotDict are total=False (all fields optional), so we
346 # only extract the string/scalar fields we can safely validate here.
347 # CommitRecord.from_dict() and SnapshotRecord.from_dict() re-validate
348 # required fields when apply_pack() calls them.
349 # ---------------------------------------------------------------------------
350
351
352 # Wire-value union — all types that can appear as dict values in a JSON
353 # object parsed from the Muse wire format. Using this explicit union instead
354 # of `object` or `Any` satisfies both mypy --strict and typing_audit.
355 _WireVal = str | int | float | bool | None | list[str] | dict[str, str]
356
357
358 def _str(val: _WireVal) -> str:
359 """Return *val* as str, or empty string if not a str."""
360 return val if isinstance(val, str) else ""
361
362
363 def _str_or_none(val: _WireVal) -> str | None:
364 """Return *val* as str, or None if not a str."""
365 return val if isinstance(val, str) else None
366
367
368 def _int_or(val: _WireVal, default: int) -> int:
369 """Return *val* as int, or *default* if not an int."""
370 return val if isinstance(val, int) else default
371
372
373 def _coerce_commit_dict(raw: dict[str, _WireVal]) -> CommitDict:
374 """Extract typed scalar fields from *raw* into a :class:`~muse.core.store.CommitDict`.
375
376 Only primitive fields are validated here; ``structured_delta`` is
377 preserved as-is because :class:`~muse.core.store.CommitRecord.from_dict`
378 already handles it gracefully.
379 """
380 metadata_raw = raw.get("metadata")
381 metadata: dict[str, str] = {}
382 if isinstance(metadata_raw, dict):
383 for k, v in metadata_raw.items():
384 if isinstance(k, str) and isinstance(v, str):
385 metadata[k] = v
386
387 reviewed_by_raw = raw.get("reviewed_by")
388 reviewed_by: list[str] = []
389 if isinstance(reviewed_by_raw, list):
390 for item in reviewed_by_raw:
391 if isinstance(item, str):
392 reviewed_by.append(item)
393
394 breaking_changes_raw = raw.get("breaking_changes")
395 breaking_changes: list[str] = []
396 if isinstance(breaking_changes_raw, list):
397 for item in breaking_changes_raw:
398 if isinstance(item, str):
399 breaking_changes.append(item)
400
401 sem_ver_raw = raw.get("sem_ver_bump")
402 sem_ver: SemVerBump
403 if sem_ver_raw == "major":
404 sem_ver = "major"
405 elif sem_ver_raw == "minor":
406 sem_ver = "minor"
407 elif sem_ver_raw == "patch":
408 sem_ver = "patch"
409 else:
410 sem_ver = "none"
411
412 return CommitDict(
413 commit_id=_str(raw.get("commit_id")),
414 repo_id=_str(raw.get("repo_id")),
415 branch=_str(raw.get("branch")),
416 snapshot_id=_str(raw.get("snapshot_id")),
417 message=_str(raw.get("message")),
418 committed_at=_str(raw.get("committed_at")),
419 parent_commit_id=_str_or_none(raw.get("parent_commit_id")),
420 parent2_commit_id=_str_or_none(raw.get("parent2_commit_id")),
421 author=_str(raw.get("author")),
422 metadata=metadata,
423 structured_delta=None,
424 sem_ver_bump=sem_ver,
425 breaking_changes=breaking_changes,
426 agent_id=_str(raw.get("agent_id")),
427 model_id=_str(raw.get("model_id")),
428 toolchain_id=_str(raw.get("toolchain_id")),
429 prompt_hash=_str(raw.get("prompt_hash")),
430 signature=_str(raw.get("signature")),
431 signer_key_id=_str(raw.get("signer_key_id")),
432 format_version=_int_or(raw.get("format_version"), 1),
433 reviewed_by=reviewed_by,
434 test_runs=_int_or(raw.get("test_runs"), 0),
435 )
436
437
438 def _coerce_snapshot_dict(raw: dict[str, _WireVal]) -> SnapshotDict:
439 """Extract typed fields from *raw* into a :class:`~muse.core.store.SnapshotDict`."""
440 manifest_raw = raw.get("manifest")
441 manifest: dict[str, str] = {}
442 if isinstance(manifest_raw, dict):
443 for k, v in manifest_raw.items():
444 if isinstance(k, str) and isinstance(v, str):
445 manifest[k] = v
446 return SnapshotDict(
447 snapshot_id=_str(raw.get("snapshot_id")),
448 manifest=manifest,
449 created_at=_str(raw.get("created_at")),
450 )