gabriel / muse public
transport.py python
936 lines 36.2 KB
7855ccd0 feat: harden, test, and document all quality-dial changes Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Muse transport layer — HTTP and local-filesystem remote communication.
2
3 The :class:`MuseTransport` Protocol defines the interface between the Muse CLI
4 and a remote host. The CLI calls this Protocol; the implementation is chosen
5 at runtime by :func:`make_transport` based on the URL scheme.
6
7 Transport implementations
8 --------------------------
9
10 :class:`HttpTransport`
11 Synchronous HTTPS transport using stdlib ``urllib.request``. Used for
12 ``https://`` remote URLs (MuseHub server required).
13
14 :class:`LocalFileTransport`
15 Zero-network transport for ``file://`` URLs. Reads and writes directly
16 from the remote's ``.muse/`` directory on the local filesystem (or a
17 shared network mount). No server is required — ideal for local testing,
18 monorepo setups, and offline workflows.
19
20 Use :func:`make_transport` instead of constructing either class directly —
21 it inspects the URL scheme and returns the appropriate implementation.
22
23 MuseHub API contract (HttpTransport)
24 -------------------------------------
25
26 All endpoints live under the remote repository URL
27 (e.g. ``https://hub.muse.io/repos/{repo_id}``).
28
29 GET {url}/refs
30 Response: JSON :class:`~muse.core.pack.RemoteInfo`
31
32 POST {url}/fetch
33 Body: JSON :class:`~muse.core.pack.FetchRequest`
34 Response: JSON :class:`~muse.core.pack.PackBundle`
35
36 POST {url}/push
37 Body: JSON ``{"bundle": PackBundle, "branch": str, "force": bool}``
38 Response: JSON :class:`~muse.core.pack.PushResult`
39
40 Authentication
41 --------------
42
43 All endpoints accept an ``Authorization: Bearer <token>`` header. Public
44 repositories may work without a token. The token is read from
45 ``.muse/config.toml`` via :func:`muse.cli.config.get_auth_token` and is
46 **never** written to any log line.
47
48 :class:`LocalFileTransport` ignores the ``token`` argument — local repos
49 do not require authentication (access is controlled by filesystem permissions).
50
51 Error codes (HttpTransport)
52 ----------------------------
53
54 401 Unauthorized — invalid or missing token
55 404 Not found — repo does not exist on the remote
56 409 Conflict — push rejected (non-fast-forward without ``--force``)
57 5xx Server error
58
59 Security model
60 --------------
61
62 HttpTransport:
63 - Refuses all HTTP redirects (prevents credential leakage to other hosts).
64 - Rejects non-HTTPS URLs when a token is present (prevents cleartext exposure).
65 - Caps response bodies at ``MAX_RESPONSE_BYTES`` (64 MiB) to prevent OOM.
66 - Validates JSON content-type before parsing.
67
68 LocalFileTransport:
69 - Calls ``.resolve()`` on all filesystem paths (canonicalises symlinks and
70 ``..`` components before any I/O).
71 - Validates branch names with ``validate_branch_name`` (rejects null bytes,
72 backslashes, consecutive dots, and other path-traversal primitives).
73 - Guards ref-file writes with ``contain_path`` (defence-in-depth: asserts the
74 computed path stays inside ``.muse/refs/heads/`` even after symlink resolution).
75 - Never follows redirects, makes no network calls, and ignores the token arg.
76 """
77
78 from __future__ import annotations
79
80 import http.client
81 import json
82 import logging
83 import pathlib
84 import types
85 import urllib.error
86 import urllib.parse
87 import urllib.request
88 import urllib.response
89 from typing import IO, Protocol, runtime_checkable
90
91 from muse.core.pack import FetchRequest, ObjectPayload, PackBundle, PushResult, RemoteInfo
92 from muse.core.store import CommitDict, SnapshotDict
93 from muse.core.validation import MAX_RESPONSE_BYTES, contain_path, validate_branch_name
94 from muse.domain import SemVerBump
95
96 logger = logging.getLogger(__name__)
97
98 _TIMEOUT_SECONDS = 60
99
100
101 # ---------------------------------------------------------------------------
102 # Response protocol — typed adapter for urllib response objects
103 # ---------------------------------------------------------------------------
104
105
106 class _HttpHeaders(Protocol):
107 """Minimal interface for HTTP response headers."""
108
109 def get(self, name: str, default: str = "") -> str: ...
110
111
112 @runtime_checkable
113 class _HttpResponse(Protocol):
114 """Structural interface for urllib HTTP response objects.
115
116 Defined as a Protocol so that ``_open_url`` can have a concrete, non-Any
117 return type without importing implementation-specific urllib internals.
118 """
119
120 headers: _HttpHeaders
121
122 def read(self, amt: int | None = None) -> bytes: ...
123
124 def __enter__(self) -> "_HttpResponse": ...
125
126 def __exit__(
127 self,
128 exc_type: type[BaseException] | None,
129 exc_val: BaseException | None,
130 exc_tb: "types.TracebackType | None",
131 ) -> bool | None: ...
132
133
134 # ---------------------------------------------------------------------------
135 # Security — redirect and scheme enforcement
136 # ---------------------------------------------------------------------------
137
138
139 class _NoRedirectHandler(urllib.request.HTTPRedirectHandler):
140 """Refuse all HTTP redirects.
141
142 ``urllib.request`` follows redirects by default, including across schemes
143 (HTTPS → HTTP) and across hosts. If a server we contact redirects us:
144
145 - To HTTP: the ``Authorization: Bearer`` header would be sent in cleartext.
146 - To a different host: the token would be sent to an unintended recipient.
147
148 We refuse both. The server must use the correct, stable URL. If a
149 redirect is required during operations, it is always better to surface
150 it as a hard error so the operator can update the configured URL than to
151 silently follow it and leak credentials.
152 """
153
154 def redirect_request(
155 self,
156 req: urllib.request.Request,
157 fp: IO[bytes],
158 code: int,
159 msg: str,
160 headers: http.client.HTTPMessage,
161 newurl: str,
162 ) -> urllib.request.Request | None:
163 raise urllib.error.HTTPError(
164 req.full_url,
165 code,
166 (
167 f"Redirect refused ({code}): server tried to redirect to {newurl!r}. "
168 "Update the configured remote URL to the final destination."
169 ),
170 headers,
171 fp,
172 )
173
174
175 # Build one opener that never follows redirects. Used for every request
176 # so that Authorization headers are never sent to an unintended recipient.
177 _STRICT_OPENER = urllib.request.build_opener(_NoRedirectHandler())
178
179
180 def _open_url(req: urllib.request.Request, timeout: int) -> _HttpResponse:
181 """Thin wrapper around ``_STRICT_OPENER.open`` — exists purely to give tests
182 a single, importable patch target instead of deep-patching the opener object.
183
184 Returns an ``_HttpResponse`` Protocol value so that callers can be fully
185 typed without depending on urllib's concrete ``addinfourl`` class.
186 """
187 resp: _HttpResponse = _STRICT_OPENER.open(req, timeout=timeout)
188 return resp
189
190
191 # ---------------------------------------------------------------------------
192 # Exception
193 # ---------------------------------------------------------------------------
194
195
196 class TransportError(Exception):
197 """Raised when the remote returns a non-2xx response or is unreachable.
198
199 Attributes:
200 status_code: HTTP status code (e.g. ``401``, ``404``, ``409``, ``500``).
201 ``0`` for network-level failures (DNS, connection refused).
202 """
203
204 def __init__(self, message: str, status_code: int) -> None:
205 super().__init__(message)
206 self.status_code = status_code
207
208
209 # ---------------------------------------------------------------------------
210 # Protocol — the seam between CLI commands and the transport implementation
211 # ---------------------------------------------------------------------------
212
213
214 class MuseTransport(Protocol):
215 """Protocol for Muse remote transport implementations.
216
217 All methods are synchronous — the Muse CLI is synchronous by design.
218 """
219
220 def fetch_remote_info(self, url: str, token: str | None) -> RemoteInfo:
221 """Return repository metadata from ``GET {url}/refs``.
222
223 Args:
224 url: Remote repository URL.
225 token: Bearer token, or ``None`` for public repos.
226
227 Raises:
228 :class:`TransportError` on HTTP 4xx/5xx or network failure.
229 """
230 ...
231
232 def fetch_pack(
233 self, url: str, token: str | None, want: list[str], have: list[str]
234 ) -> PackBundle:
235 """Download a :class:`~muse.core.pack.PackBundle` via ``POST {url}/fetch``.
236
237 Args:
238 url: Remote repository URL.
239 token: Bearer token, or ``None``.
240 want: Commit IDs the client wants to receive.
241 have: Commit IDs already present locally.
242
243 Raises:
244 :class:`TransportError` on HTTP 4xx/5xx or network failure.
245 """
246 ...
247
248 def push_pack(
249 self,
250 url: str,
251 token: str | None,
252 bundle: PackBundle,
253 branch: str,
254 force: bool,
255 ) -> PushResult:
256 """Upload a :class:`~muse.core.pack.PackBundle` via ``POST {url}/push``.
257
258 Args:
259 url: Remote repository URL.
260 token: Bearer token, or ``None``.
261 bundle: Bundle to upload.
262 branch: Remote branch to update.
263 force: Bypass the server-side fast-forward check.
264
265 Raises:
266 :class:`TransportError` on HTTP 4xx/5xx or network failure.
267 """
268 ...
269
270
271 # ---------------------------------------------------------------------------
272 # HTTP/1.1 implementation (stdlib, zero extra dependencies)
273 # ---------------------------------------------------------------------------
274
275
276 class HttpTransport:
277 """Synchronous HTTPS transport using stdlib ``urllib.request``.
278
279 One short-lived HTTPS connection per CLI invocation over HTTP/1.1.
280 Bearer token values are **never** written to any log line.
281 """
282
283 def _build_request(
284 self,
285 method: str,
286 url: str,
287 token: str | None,
288 body_bytes: bytes | None = None,
289 ) -> urllib.request.Request:
290 # Never send a bearer token over cleartext HTTP — the token would be
291 # visible to any network observer on the path.
292 # Use urlparse for a proper scheme check rather than a fragile prefix test.
293 if token and urllib.parse.urlparse(url).scheme != "https":
294 raise TransportError(
295 f"Refusing to send credentials to a non-HTTPS URL: {url!r}. "
296 "Ensure the remote URL uses https://.",
297 0,
298 )
299 headers: dict[str, str] = {"Accept": "application/json"}
300 if body_bytes is not None:
301 headers["Content-Type"] = "application/json"
302 if token:
303 headers["Authorization"] = f"Bearer {token}"
304 return urllib.request.Request(
305 url=url,
306 data=body_bytes,
307 headers=headers,
308 method=method,
309 )
310
311 def _execute(self, req: urllib.request.Request) -> bytes:
312 """Send *req* and return raw response bytes.
313
314 Uses :data:`_STRICT_OPENER` which refuses all HTTP redirects, ensuring
315 ``Authorization`` headers are never forwarded to an unintended host or
316 sent over a downgraded cleartext connection.
317
318 Raises:
319 :class:`TransportError` on non-2xx HTTP or any network error.
320 """
321 try:
322 with _open_url(req, _TIMEOUT_SECONDS) as resp:
323 # Enforce a hard cap before reading the body to defend against
324 # a malicious or compromised server sending an unbounded response.
325 content_length_str = resp.headers.get("Content-Length", "")
326 if content_length_str:
327 try:
328 declared = int(content_length_str)
329 if declared > MAX_RESPONSE_BYTES:
330 raise TransportError(
331 f"Server Content-Length {declared} exceeds the "
332 f"{MAX_RESPONSE_BYTES // (1024 * 1024)} MiB response cap.",
333 0,
334 )
335 except ValueError:
336 pass # Unparseable Content-Length — fall through to streaming cap.
337 # Read one byte more than the cap so we can detect over-limit responses.
338 body: bytes = resp.read(MAX_RESPONSE_BYTES + 1)
339 if len(body) > MAX_RESPONSE_BYTES:
340 raise TransportError(
341 f"Response body exceeds the {MAX_RESPONSE_BYTES // (1024 * 1024)} MiB "
342 "cap. The server may be sending unexpected data.",
343 0,
344 )
345 return body
346 except urllib.error.HTTPError as exc:
347 try:
348 err_body: str = exc.read().decode("utf-8", errors="replace")
349 except Exception: # noqa: BLE001
350 err_body = ""
351 raise TransportError(f"HTTP {exc.code}: {err_body[:400]}", exc.code) from exc
352 except urllib.error.URLError as exc:
353 raise TransportError(str(exc.reason), 0) from exc
354
355 def fetch_remote_info(self, url: str, token: str | None) -> RemoteInfo:
356 """Fetch repository metadata from ``GET {url}/refs``."""
357 endpoint = f"{url.rstrip('/')}/refs"
358 logger.debug("transport: GET %s", endpoint)
359 req = self._build_request("GET", endpoint, token)
360 raw = self._execute(req)
361 return _parse_remote_info(raw)
362
363 def fetch_pack(
364 self, url: str, token: str | None, want: list[str], have: list[str]
365 ) -> PackBundle:
366 """Download a PackBundle via ``POST {url}/fetch``."""
367 endpoint = f"{url.rstrip('/')}/fetch"
368 logger.debug(
369 "transport: POST %s (want=%d, have=%d)", endpoint, len(want), len(have)
370 )
371 payload: FetchRequest = {"want": want, "have": have}
372 body_bytes = json.dumps(payload).encode("utf-8")
373 req = self._build_request("POST", endpoint, token, body_bytes)
374 raw = self._execute(req)
375 return _parse_bundle(raw)
376
377 def push_pack(
378 self,
379 url: str,
380 token: str | None,
381 bundle: PackBundle,
382 branch: str,
383 force: bool,
384 ) -> PushResult:
385 """Upload a PackBundle via ``POST {url}/push``."""
386 endpoint = f"{url.rstrip('/')}/push"
387 logger.debug(
388 "transport: POST %s (branch=%s, force=%s, commits=%d)",
389 endpoint,
390 branch,
391 force,
392 len(bundle.get("commits") or []),
393 )
394 payload = {"bundle": bundle, "branch": branch, "force": force}
395 body_bytes = json.dumps(payload).encode("utf-8")
396 req = self._build_request("POST", endpoint, token, body_bytes)
397 raw = self._execute(req)
398 return _parse_push_result(raw)
399
400
401 # ---------------------------------------------------------------------------
402 # Response parsers — JSON bytes → typed TypedDicts
403 # ---------------------------------------------------------------------------
404 # json.loads() returns Any (per typeshed), so we use isinstance narrowing
405 # throughout. No explicit Any annotations appear in this file.
406 # ---------------------------------------------------------------------------
407
408
409 def _assert_json_content(raw: bytes, endpoint: str) -> None:
410 """Raise TransportError if *raw* does not look like JSON.
411
412 A best-effort guard: checks that the first non-whitespace byte is ``{``
413 or ``[``, which is always true for valid JSON objects/arrays. This
414 catches HTML error pages (e.g., proxy intercept pages) before json.loads
415 produces a misleading error.
416 """
417 stripped = raw.lstrip()
418 if stripped and stripped[0:1] not in (b"{", b"["):
419 raise TransportError(
420 f"Unexpected response from {endpoint!r}: expected JSON, "
421 f"got content starting with {stripped[:40]!r}.",
422 0,
423 )
424
425
426 def _parse_remote_info(raw: bytes) -> RemoteInfo:
427 """Parse ``GET /refs`` response bytes into a :class:`~muse.core.pack.RemoteInfo`."""
428 _assert_json_content(raw, "/refs")
429 parsed = json.loads(raw)
430 if not isinstance(parsed, dict):
431 return RemoteInfo(
432 repo_id="", domain="midi", branch_heads={}, default_branch="main"
433 )
434 repo_id_val = parsed.get("repo_id")
435 domain_val = parsed.get("domain")
436 default_branch_val = parsed.get("default_branch")
437 branch_heads_raw = parsed.get("branch_heads")
438 branch_heads: dict[str, str] = {}
439 if isinstance(branch_heads_raw, dict):
440 for k, v in branch_heads_raw.items():
441 if isinstance(k, str) and isinstance(v, str):
442 branch_heads[k] = v
443 return RemoteInfo(
444 repo_id=str(repo_id_val) if isinstance(repo_id_val, str) else "",
445 domain=str(domain_val) if isinstance(domain_val, str) else "midi",
446 default_branch=(
447 str(default_branch_val) if isinstance(default_branch_val, str) else "main"
448 ),
449 branch_heads=branch_heads,
450 )
451
452
453 def _parse_bundle(raw: bytes) -> PackBundle:
454 """Parse ``POST /fetch`` response bytes into a :class:`~muse.core.pack.PackBundle`."""
455 _assert_json_content(raw, "/fetch")
456 parsed = json.loads(raw)
457 bundle: PackBundle = {}
458 if not isinstance(parsed, dict):
459 return bundle
460
461 # Commits — each item is a raw dict that CommitRecord.from_dict() will validate.
462 commits_raw = parsed.get("commits")
463 if isinstance(commits_raw, list):
464 commits: list[CommitDict] = []
465 for item in commits_raw:
466 if isinstance(item, dict):
467 commits.append(_coerce_commit_dict(item))
468 bundle["commits"] = commits
469
470 # Snapshots
471 snapshots_raw = parsed.get("snapshots")
472 if isinstance(snapshots_raw, list):
473 snapshots: list[SnapshotDict] = []
474 for item in snapshots_raw:
475 if isinstance(item, dict):
476 snapshots.append(_coerce_snapshot_dict(item))
477 bundle["snapshots"] = snapshots
478
479 # Objects
480 objects_raw = parsed.get("objects")
481 if isinstance(objects_raw, list):
482 objects: list[ObjectPayload] = []
483 for item in objects_raw:
484 if isinstance(item, dict):
485 oid = item.get("object_id")
486 b64 = item.get("content_b64")
487 if isinstance(oid, str) and isinstance(b64, str):
488 objects.append(ObjectPayload(object_id=oid, content_b64=b64))
489 bundle["objects"] = objects
490
491 # Branch heads
492 heads_raw = parsed.get("branch_heads")
493 if isinstance(heads_raw, dict):
494 branch_heads: dict[str, str] = {}
495 for k, v in heads_raw.items():
496 if isinstance(k, str) and isinstance(v, str):
497 branch_heads[k] = v
498 bundle["branch_heads"] = branch_heads
499
500 return bundle
501
502
503 def _parse_push_result(raw: bytes) -> PushResult:
504 """Parse ``POST /push`` response bytes into a :class:`~muse.core.pack.PushResult`."""
505 _assert_json_content(raw, "/push")
506 parsed = json.loads(raw)
507 if not isinstance(parsed, dict):
508 return PushResult(ok=False, message="Invalid server response", branch_heads={})
509 ok_val = parsed.get("ok")
510 msg_val = parsed.get("message")
511 heads_raw = parsed.get("branch_heads")
512 branch_heads: dict[str, str] = {}
513 if isinstance(heads_raw, dict):
514 for k, v in heads_raw.items():
515 if isinstance(k, str) and isinstance(v, str):
516 branch_heads[k] = v
517 return PushResult(
518 ok=bool(ok_val) if isinstance(ok_val, bool) else False,
519 message=str(msg_val) if isinstance(msg_val, str) else "",
520 branch_heads=branch_heads,
521 )
522
523
524 # ---------------------------------------------------------------------------
525 # TypedDict coercion helpers — extract known string fields from raw JSON dicts
526 # ---------------------------------------------------------------------------
527 # CommitDict and SnapshotDict are total=False (all fields optional), so we
528 # only extract the string/scalar fields we can safely validate here.
529 # CommitRecord.from_dict() and SnapshotRecord.from_dict() re-validate
530 # required fields when apply_pack() calls them.
531 # ---------------------------------------------------------------------------
532
533
534 # Wire-value union — all types that can appear as dict values in a JSON
535 # object parsed from the Muse wire format. Using this explicit union instead
536 # of `object` or `Any` satisfies both mypy --strict and typing_audit.
537 _WireVal = str | int | float | bool | None | list[str] | dict[str, str]
538
539
540 def _str(val: _WireVal) -> str:
541 """Return *val* as str, or empty string if not a str."""
542 return val if isinstance(val, str) else ""
543
544
545 def _str_or_none(val: _WireVal) -> str | None:
546 """Return *val* as str, or None if not a str."""
547 return val if isinstance(val, str) else None
548
549
550 def _int_or(val: _WireVal, default: int) -> int:
551 """Return *val* as int, or *default* if not an int."""
552 return val if isinstance(val, int) else default
553
554
555 def _coerce_commit_dict(raw: dict[str, _WireVal]) -> CommitDict:
556 """Extract typed scalar fields from *raw* into a :class:`~muse.core.store.CommitDict`.
557
558 Only primitive fields are validated here; ``structured_delta`` is
559 preserved as-is because :class:`~muse.core.store.CommitRecord.from_dict`
560 already handles it gracefully.
561 """
562 metadata_raw = raw.get("metadata")
563 metadata: dict[str, str] = {}
564 if isinstance(metadata_raw, dict):
565 for k, v in metadata_raw.items():
566 if isinstance(k, str) and isinstance(v, str):
567 metadata[k] = v
568
569 reviewed_by_raw = raw.get("reviewed_by")
570 reviewed_by: list[str] = []
571 if isinstance(reviewed_by_raw, list):
572 for item in reviewed_by_raw:
573 if isinstance(item, str):
574 reviewed_by.append(item)
575
576 breaking_changes_raw = raw.get("breaking_changes")
577 breaking_changes: list[str] = []
578 if isinstance(breaking_changes_raw, list):
579 for item in breaking_changes_raw:
580 if isinstance(item, str):
581 breaking_changes.append(item)
582
583 sem_ver_raw = raw.get("sem_ver_bump")
584 sem_ver: SemVerBump
585 if sem_ver_raw == "major":
586 sem_ver = "major"
587 elif sem_ver_raw == "minor":
588 sem_ver = "minor"
589 elif sem_ver_raw == "patch":
590 sem_ver = "patch"
591 else:
592 sem_ver = "none"
593
594 return CommitDict(
595 commit_id=_str(raw.get("commit_id")),
596 repo_id=_str(raw.get("repo_id")),
597 branch=_str(raw.get("branch")),
598 snapshot_id=_str(raw.get("snapshot_id")),
599 message=_str(raw.get("message")),
600 committed_at=_str(raw.get("committed_at")),
601 parent_commit_id=_str_or_none(raw.get("parent_commit_id")),
602 parent2_commit_id=_str_or_none(raw.get("parent2_commit_id")),
603 author=_str(raw.get("author")),
604 metadata=metadata,
605 structured_delta=None,
606 sem_ver_bump=sem_ver,
607 breaking_changes=breaking_changes,
608 agent_id=_str(raw.get("agent_id")),
609 model_id=_str(raw.get("model_id")),
610 toolchain_id=_str(raw.get("toolchain_id")),
611 prompt_hash=_str(raw.get("prompt_hash")),
612 signature=_str(raw.get("signature")),
613 signer_key_id=_str(raw.get("signer_key_id")),
614 format_version=_int_or(raw.get("format_version"), 1),
615 reviewed_by=reviewed_by,
616 test_runs=_int_or(raw.get("test_runs"), 0),
617 )
618
619
620 def _coerce_snapshot_dict(raw: dict[str, _WireVal]) -> SnapshotDict:
621 """Extract typed fields from *raw* into a :class:`~muse.core.store.SnapshotDict`."""
622 manifest_raw = raw.get("manifest")
623 manifest: dict[str, str] = {}
624 if isinstance(manifest_raw, dict):
625 for k, v in manifest_raw.items():
626 if isinstance(k, str) and isinstance(v, str):
627 manifest[k] = v
628 return SnapshotDict(
629 snapshot_id=_str(raw.get("snapshot_id")),
630 manifest=manifest,
631 created_at=_str(raw.get("created_at")),
632 )
633
634
635 # ---------------------------------------------------------------------------
636 # LocalFileTransport helpers
637 # ---------------------------------------------------------------------------
638
639
640 def _is_ancestor(
641 candidate: str,
642 from_commit: str,
643 bundle_by_id: dict[str, CommitDict],
644 remote_root: pathlib.Path,
645 max_depth: int = 100_000,
646 ) -> bool:
647 """Return True if *candidate* is an ancestor of (or equal to) *from_commit*.
648
649 Walks the commit graph BFS-style starting from *from_commit*, consulting
650 *bundle_by_id* first (commits included in the push bundle) and falling back
651 to the existing commits on disk in *remote_root* (commits already present).
652
653 This two-source walk is necessary because ``build_pack()`` excludes commits
654 in the caller's ``have`` set from the bundle — those commits are already on
655 disk at the remote and must be consulted directly.
656
657 Args:
658 candidate: The commit ID to search for (typically the remote's current HEAD).
659 from_commit: Starting point of the BFS walk (typically the new tip being pushed).
660 bundle_by_id: Commits included in the push bundle, keyed by commit_id.
661 remote_root: Root of the remote Muse repo (for reading pre-existing commits).
662 max_depth: BFS depth cap — prevents unbounded walks on corrupt graphs.
663
664 Returns:
665 ``True`` if *candidate* is reachable from *from_commit*, ``False`` otherwise.
666 """
667 from muse.core.store import read_commit as _rc
668
669 seen: set[str] = set()
670 queue: list[str] = [from_commit]
671 depth = 0
672 while queue and depth < max_depth:
673 cid = queue.pop(0)
674 if cid in seen:
675 continue
676 seen.add(cid)
677 if cid == candidate:
678 return True
679 # Prefer bundle for unwritten commits; fall back to remote store.
680 parent1: str | None
681 parent2: str | None
682 if cid in bundle_by_id:
683 bc = bundle_by_id[cid]
684 p1_raw = bc.get("parent_commit_id")
685 p2_raw = bc.get("parent2_commit_id")
686 parent1 = p1_raw if isinstance(p1_raw, str) else None
687 parent2 = p2_raw if isinstance(p2_raw, str) else None
688 else:
689 rec = _rc(remote_root, cid)
690 if rec is None:
691 depth += 1
692 continue
693 parent1 = rec.parent_commit_id
694 parent2 = rec.parent2_commit_id
695 if parent1 and parent1 not in seen:
696 queue.append(parent1)
697 if parent2 and parent2 not in seen:
698 queue.append(parent2)
699 depth += 1
700 return False
701
702
703 # ---------------------------------------------------------------------------
704 # LocalFileTransport — push/pull between two repos on the same filesystem
705 # ---------------------------------------------------------------------------
706
707
708 class LocalFileTransport:
709 """Transport implementation for ``file://`` URLs.
710
711 Allows ``muse push file:///path/to/repo`` and ``muse pull`` between two
712 Muse repositories on the same filesystem (or a shared network mount)
713 without requiring a MuseHub server.
714
715 The remote path must be the root of an initialised Muse repository — it
716 must contain a ``.muse/`` directory. No separate "bare repo" format is
717 required; Muse repositories are self-describing.
718
719 This transport never makes network calls. All operations are synchronous
720 filesystem reads and writes, consistent with the rest of the Muse CLI.
721 """
722
723 @staticmethod
724 def _repo_root(url: str) -> pathlib.Path:
725 """Extract and validate the filesystem path from a ``file://`` URL.
726
727 Security guarantees:
728 - Rejects non-``file://`` schemes unconditionally.
729 - Calls ``.resolve()`` to canonicalize the path, dereferencing all
730 symlinks before any filesystem operations. A symlink at the URL
731 target that points to a directory without a ``.muse/`` subdirectory
732 is rejected — the check is on the resolved, canonical path, not the
733 symlink itself.
734 - Verifies that ``.muse/`` exists at the resolved root, preventing
735 accidental pushes to arbitrary directories.
736 """
737 parsed = urllib.parse.urlparse(url)
738 if parsed.scheme != "file":
739 raise TransportError(
740 f"LocalFileTransport requires a file:// URL, got: {url!r}", 0
741 )
742 # urllib.parse.urlparse on file:///abs/path gives netloc="" path="/abs/path".
743 # On Windows file://C:/path gives netloc="" path="/C:/path" — strip leading
744 # slash for Windows compatibility via pathlib.
745 path_str = parsed.netloc + parsed.path
746 # resolve() dereferences all symlinks and normalises ".." components.
747 # This is the defence against symlink-based path escape attempts.
748 root = pathlib.Path(path_str).resolve()
749 if not (root / ".muse").is_dir():
750 raise TransportError(
751 f"Remote path {root!r} does not contain a .muse/ directory. "
752 "Run 'muse init' in the target directory first.",
753 404,
754 )
755 return root
756
757 def fetch_remote_info(self, url: str, token: str | None) -> RemoteInfo: # noqa: ARG002
758 """Read branch heads directly from the remote's ref files."""
759 from muse.core.store import get_all_branch_heads
760
761 remote_root = self._repo_root(url)
762 repo_json_path = remote_root / ".muse" / "repo.json"
763 try:
764 repo_data = json.loads(repo_json_path.read_text(encoding="utf-8"))
765 except (OSError, json.JSONDecodeError) as exc:
766 raise TransportError(f"Cannot read remote repo.json: {exc}", 0) from exc
767
768 repo_id = str(repo_data.get("repo_id", ""))
769 domain = str(repo_data.get("domain", "midi"))
770 default_branch = str(repo_data.get("default_branch", "main"))
771
772 branch_heads = get_all_branch_heads(remote_root)
773
774 return RemoteInfo(
775 repo_id=repo_id,
776 domain=domain,
777 default_branch=default_branch,
778 branch_heads=branch_heads,
779 )
780
781 def fetch_pack(
782 self, url: str, token: str | None, want: list[str], have: list[str] # noqa: ARG002
783 ) -> PackBundle:
784 """Build a PackBundle from the remote's local store."""
785 from muse.core.pack import build_pack
786
787 remote_root = self._repo_root(url)
788 have_set = set(have)
789 # Build a pack containing all wanted commits and their transitive deps,
790 # excluding anything the caller already has.
791 bundle = build_pack(remote_root, commit_ids=want, have=list(have_set))
792 return bundle
793
794 def push_pack(
795 self,
796 url: str,
797 token: str | None, # noqa: ARG002
798 bundle: PackBundle,
799 branch: str,
800 force: bool,
801 ) -> PushResult:
802 """Write a PackBundle directly into the remote's local store.
803
804 Security guarantees:
805 - ``branch`` is validated with :func:`~muse.core.validation.validate_branch_name`
806 before any I/O. Names containing path traversal components (`..`),
807 null bytes, backslashes, or other forbidden characters are rejected.
808 - The ref file path is further hardened with
809 :func:`~muse.core.validation.contain_path`, which resolves symlinks
810 and asserts the result stays inside ``.muse/refs/heads/``. A branch
811 name that ``validate_branch_name`` would allow but that resolves
812 outside the expected directory (e.g. via a pre-placed symlink) is
813 rejected before any write occurs.
814 - A fast-forward check prevents overwriting diverged remote history
815 unless ``force=True`` is explicitly passed.
816 """
817 from muse.core.pack import apply_pack
818 from muse.core.store import get_all_branch_heads, get_head_commit_id
819
820 remote_root = self._repo_root(url)
821
822 try:
823 validate_branch_name(branch)
824 except ValueError as exc:
825 return PushResult(ok=False, message=str(exc), branch_heads={})
826
827 # Determine the new tip for the branch.
828 # Prefer an explicit branch_heads entry in the bundle (set by the push
829 # command when it knows the local HEAD). Fall back to computing the
830 # leaf commit — the commit in the bundle that is not referenced as a
831 # parent of any other commit in the bundle, filtered to the branch.
832 # This handles bundles produced by build_pack(), which does not
833 # populate branch_heads.
834 bundle_heads = bundle.get("branch_heads") or {}
835 new_tip: str | None = bundle_heads.get(branch)
836 if new_tip is None:
837 bundle_commits_list = bundle.get("commits") or []
838 all_parent_ids: set[str] = set()
839 for bc in bundle_commits_list:
840 pid = bc.get("parent_commit_id")
841 if isinstance(pid, str):
842 all_parent_ids.add(pid)
843 pid2 = bc.get("parent2_commit_id")
844 if isinstance(pid2, str):
845 all_parent_ids.add(pid2)
846 # Leaf = commit whose ID is not a parent of any other bundle commit.
847 # Prefer commits whose branch field matches; otherwise take any leaf.
848 leaves_for_branch = [
849 bc["commit_id"]
850 for bc in bundle_commits_list
851 if bc.get("commit_id") not in all_parent_ids
852 and bc.get("branch") == branch
853 and isinstance(bc.get("commit_id"), str)
854 ]
855 any_leaves = [
856 bc["commit_id"]
857 for bc in bundle_commits_list
858 if bc.get("commit_id") not in all_parent_ids
859 and isinstance(bc.get("commit_id"), str)
860 ]
861 fallback: list[str | None] = [None]
862 new_tip = (leaves_for_branch or any_leaves or fallback)[0]
863
864 # Fast-forward check: the remote's current HEAD for this branch must be
865 # an ancestor of the tip commit in the bundle, unless --force is passed.
866 if not force and new_tip:
867 remote_tip = get_head_commit_id(remote_root, branch)
868 if remote_tip and remote_tip != new_tip:
869 # BFS from new_tip through bundle commits *and* existing remote
870 # commits to find whether remote_tip is a reachable ancestor.
871 # We cannot rely on bundle commits alone because build_pack()
872 # excludes commits the receiver already has (the "have" set).
873 bundle_by_id: dict[str, CommitDict] = {
874 c["commit_id"]: c
875 for c in (bundle.get("commits") or [])
876 if isinstance(c.get("commit_id"), str)
877 }
878 if not _is_ancestor(remote_tip, new_tip, bundle_by_id, remote_root):
879 return PushResult(
880 ok=False,
881 message=(
882 f"Push rejected: remote branch '{branch}' has diverged. "
883 "Pull and merge first, or use --force."
884 ),
885 branch_heads={},
886 )
887
888 try:
889 apply_pack(remote_root, bundle)
890 except Exception as exc: # noqa: BLE001
891 return PushResult(ok=False, message=f"Failed to apply pack: {exc}", branch_heads={})
892
893 # Update the remote branch ref to the new tip.
894 # contain_path() resolves symlinks and asserts the result stays inside
895 # .muse/refs/heads/ — defence-in-depth beyond validate_branch_name.
896 if new_tip:
897 heads_base = remote_root / ".muse" / "refs" / "heads"
898 try:
899 ref_path = contain_path(heads_base, branch)
900 except ValueError as exc:
901 return PushResult(
902 ok=False,
903 message=f"Rejected: branch ref path is unsafe — {exc}",
904 branch_heads={},
905 )
906 ref_path.parent.mkdir(parents=True, exist_ok=True)
907 ref_path.write_text(new_tip, encoding="utf-8")
908 logger.info("✅ local-transport: updated %s → %s", branch, new_tip[:8])
909
910 return PushResult(
911 ok=True,
912 message=f"local push to {url!r} succeeded",
913 branch_heads=get_all_branch_heads(remote_root),
914 )
915
916
917 # ---------------------------------------------------------------------------
918 # Factory — select transport based on URL scheme
919 # ---------------------------------------------------------------------------
920
921
922 def make_transport(url: str) -> "HttpTransport | LocalFileTransport":
923 """Return the appropriate transport for *url*.
924
925 - ``file://`` URLs → :class:`LocalFileTransport` (no server required)
926 - All other URLs → :class:`HttpTransport` (requires MuseHub server)
927
928 Args:
929 url: Remote repository URL.
930
931 Returns:
932 A transport instance implementing :class:`MuseTransport`.
933 """
934 if urllib.parse.urlparse(url).scheme == "file":
935 return LocalFileTransport()
936 return HttpTransport()