gabriel / musehub public
wire.py python
292 lines 10.7 KB
4a63ff4c feat(mwp): full msgpack wire protocol — replace JSON+base64 on all push… Gabriel Cardona <cgcardona@gmail.com> 12h ago
1 """Wire protocol Pydantic models — Muse CLI native format (MWP — Muse Wire Protocol).
2
3 These models match the Muse CLI ``HttpTransport`` wire format exactly.
4 All fields are snake_case to match Muse's internal CommitDict/SnapshotDict/
5 ObjectPayload TypedDicts.
6
7 The wire protocol is intentionally separate from the REST API's CamelModel:
8 Wire protocol /wire/repos/{repo_id}/ ← Muse CLI speaks here (MWP, msgpack)
9 REST API /api/repos/{id}/ ← agents and integrations speak here
10 MCP /mcp ← agents speak here too
11
12 Encoding
13 --------
14 All wire endpoints accept and return ``application/x-msgpack`` binary.
15 Objects are transported as raw ``bytes`` under the ``content`` key — no
16 base64 encoding overhead.
17
18 Denial-of-Service limits
19 ------------------------
20 All list fields that arrive over the network are capped so a single large
21 request cannot exhaust memory or DB connections:
22
23 MAX_COMMITS_PER_PUSH = 10 000 — one push should carry at most 10k commits
24 MAX_OBJECTS_PER_PUSH = 1 000 — ditto for binary blobs per chunk
25 MAX_SNAPSHOTS_PER_PUSH = 10 000 — ditto for snapshot manifests
26 MAX_WANT_PER_FETCH = 1 000 — fetch want/have lists
27 MAX_OBJECT_BYTES = 38_000_000 — ~38 MB raw; larger objects use presigned URLs
28 """
29 from __future__ import annotations
30
31 from typing import Any
32
33 from pydantic import BaseModel, Field, field_validator
34
35 # ── Per-request DoS limits ────────────────────────────────────────────────────
36 MAX_COMMITS_PER_PUSH: int = 10_000
37 MAX_OBJECTS_PER_PUSH: int = 1_000
38 MAX_SNAPSHOTS_PER_PUSH: int = 10_000
39 MAX_WANT_PER_FETCH: int = 1_000
40 # Raw bytes limit per object — objects above this must use presigned URL upload.
41 MAX_OBJECT_BYTES: int = 38_000_000
42
43
44 class WireCommit(BaseModel):
45 """Muse native commit record — mirrors CommitDict from muse.core.store."""
46
47 commit_id: str
48 repo_id: str = ""
49 branch: str = ""
50 snapshot_id: str | None = None
51 message: str = ""
52 committed_at: str = "" # ISO-8601 UTC string
53 parent_commit_id: str | None = None # first parent (linear history)
54 parent2_commit_id: str | None = None # second parent (merge commits)
55 author: str = ""
56 metadata: dict[str, str] = Field(default_factory=dict)
57 structured_delta: Any = None # domain-specific delta blob
58 sem_ver_bump: str = "none" # "none" | "patch" | "minor" | "major"
59 breaking_changes: list[str] = Field(default_factory=list)
60 agent_id: str = ""
61 model_id: str = ""
62 toolchain_id: str = ""
63 prompt_hash: str = ""
64 signature: str = ""
65 signer_key_id: str = ""
66 format_version: int = 1
67 reviewed_by: list[str] = Field(default_factory=list)
68 test_runs: int = 0
69
70 model_config = {"extra": "ignore"} # tolerate future Muse fields gracefully
71
72
73 class WireSnapshot(BaseModel):
74 """Muse native snapshot — mirrors SnapshotDict from muse.core.store.
75
76 The manifest maps file paths to content-addressed object IDs,
77 e.g. ``{"muse/core/pack.py": "sha256:abc123..."}``.
78 """
79
80 snapshot_id: str
81 # max_length caps the number of manifest entries — a 10 000-file snapshot
82 # would already be pathologically large; prevent unbounded dict parsing.
83 manifest: dict[str, str] = Field(default_factory=dict, max_length=10_000)
84 created_at: str = ""
85
86 model_config = {"extra": "ignore"}
87
88
89 class WireObject(BaseModel):
90 """Content-addressed object payload — mirrors ObjectPayload from muse.core.pack.
91
92 MWP encodes ``content`` as raw bytes (msgpack bin type) — no base64 overhead.
93 """
94
95 object_id: str
96 content: bytes = Field(max_length=MAX_OBJECT_BYTES)
97 path: str = Field(default="", max_length=4096)
98
99 model_config = {"extra": "ignore"}
100
101 @field_validator("content")
102 @classmethod
103 def _check_content_size(cls, v: bytes) -> bytes:
104 if len(v) > MAX_OBJECT_BYTES:
105 raise ValueError(
106 f"content exceeds maximum size ({MAX_OBJECT_BYTES} bytes). "
107 "Upload large objects directly via presigned URL instead."
108 )
109 return v
110
111
112 class WireBundle(BaseModel):
113 """A pack bundle sent in a push request.
114
115 Mirrors PackBundle from muse.core.pack. All fields are optional because
116 a minimal push may only contain commits (no new objects).
117
118 List lengths are capped to prevent DoS via an oversized single request.
119 See the module-level ``MAX_*`` constants for the exact limits.
120 """
121
122 commits: list[WireCommit] = Field(default_factory=list, max_length=MAX_COMMITS_PER_PUSH)
123 snapshots: list[WireSnapshot] = Field(default_factory=list, max_length=MAX_SNAPSHOTS_PER_PUSH)
124 objects: list[WireObject] = Field(default_factory=list, max_length=MAX_OBJECTS_PER_PUSH)
125 branch_heads: dict[str, str] = Field(default_factory=dict)
126
127
128 class WirePushRequest(BaseModel):
129 """Body for ``POST /wire/repos/{repo_id}/push``.
130
131 Matches the payload built by HttpTransport.push_pack():
132 ``{"bundle": {...}, "branch": "main", "force": false}``
133 """
134
135 bundle: WireBundle
136 branch: str
137 force: bool = False
138
139
140 class WireFetchRequest(BaseModel):
141 """Body for ``POST /wire/repos/{repo_id}/fetch``.
142
143 Matches HttpTransport.fetch_pack() payload:
144 ``{"want": [...sha...], "have": [...sha...]}``
145
146 ``want`` — commit SHAs the client wants.
147 ``have`` — commit SHAs the client already has (exclusion list).
148 """
149
150 want: list[str] = Field(default_factory=list, max_length=MAX_WANT_PER_FETCH)
151 have: list[str] = Field(default_factory=list, max_length=MAX_WANT_PER_FETCH)
152
153
154 class WireRefsResponse(BaseModel):
155 """Response for ``GET /wire/repos/{repo_id}/refs``.
156
157 Parsed by HttpTransport._parse_remote_info() into RemoteInfo.
158 """
159
160 repo_id: str
161 domain: str
162 default_branch: str
163 branch_heads: dict[str, str]
164
165
166 class WirePushResponse(BaseModel):
167 """Response for ``POST /wire/repos/{repo_id}/push``.
168
169 Parsed by HttpTransport._parse_push_result() into PushResult.
170 ``branch_heads`` is what the Muse CLI reads; ``remote_head`` is bonus
171 information for MCP consumers.
172 """
173
174 ok: bool
175 message: str
176 branch_heads: dict[str, str]
177 remote_head: str = ""
178
179
180 class WireFetchResponse(BaseModel):
181 """Response for ``POST /wire/repos/{repo_id}/fetch``.
182
183 Parsed by HttpTransport._parse_bundle() into PackBundle.
184 """
185
186 commits: list[WireCommit] = Field(default_factory=list)
187 snapshots: list[WireSnapshot] = Field(default_factory=list)
188 objects: list[WireObject] = Field(default_factory=list)
189 branch_heads: dict[str, str] = Field(default_factory=dict)
190
191
192 class WireObjectsRequest(BaseModel):
193 """Body for ``POST /{owner}/{slug}/push/objects`` — chunked object pre-upload.
194
195 Clients split large pushes into multiple calls to this endpoint before
196 calling ``POST /{owner}/{slug}/push`` with an empty objects list.
197 Objects are content-addressed (SHA-256) so uploading the same object
198 twice is always safe — the server skips objects it already holds.
199 """
200
201 objects: list[WireObject] = Field(default_factory=list, max_length=MAX_OBJECTS_PER_PUSH)
202
203
204 class WireObjectsResponse(BaseModel):
205 """Response for ``POST /{owner}/{slug}/push/objects``."""
206
207 stored: int # objects written to storage this call
208 skipped: int # objects already present (idempotent no-op)
209
210
211 # ── MWP/2 — Phase 1: object-level deduplication ──────────────────────────────
212
213
214 class WireFilterRequest(BaseModel):
215 """Body for ``POST /{owner}/{slug}/filter-objects``.
216
217 Client sends the full list of object IDs it intends to push; server
218 responds with only the subset the remote does NOT already hold.
219 The client then uploads only the missing objects, skipping anything
220 the server already has (by prior push, shared history, etc.).
221
222 This is the single highest-impact change in MWP/2: an incremental push
223 that changes two files in a 10 000-object repo sends two objects, not
224 10 000.
225 """
226
227 object_ids: list[str] = Field(default_factory=list, max_length=50_000)
228
229
230 class WireFilterResponse(BaseModel):
231 """Response for ``POST /{owner}/{slug}/filter-objects``."""
232
233 missing: list[str] # subset of object_ids the remote does NOT have
234
235
236 # ── MWP/2 — Phase 3: presigned object storage URLs ───────────────────────────
237
238
239 class WirePresignRequest(BaseModel):
240 """Body for ``POST /{owner}/{slug}/presign``.
241
242 For large objects (> 64 KB), the client requests presigned PUT URLs and
243 uploads directly to object storage, bypassing the API server entirely.
244 For the local ``local://`` backend the server returns an empty
245 ``presigned`` dict and lists all IDs in ``inline`` — the client then
246 sends those through the normal pack flow.
247
248 ``direction`` — ``"put"`` for push, ``"get"`` for pull/fetch.
249 ``ttl_seconds`` — URL lifetime; capped at 3 600 s (1 hour).
250 """
251
252 object_ids: list[str] = Field(default_factory=list, max_length=10_000)
253 direction: str = "put" # "put" | "get"
254 ttl_seconds: int = Field(default=300, ge=30, le=3600)
255
256
257 class WirePresignResponse(BaseModel):
258 """Response for ``POST /{owner}/{slug}/presign``."""
259
260 presigned: dict[str, str] = Field(default_factory=dict)
261 # object_ids whose backend does not support presigned URLs (local://);
262 # client must include these in the normal pack body instead.
263 inline: list[str] = Field(default_factory=list)
264
265
266 # ── MWP/2 — Phase 5: depth-limited commit negotiation ────────────────────────
267
268
269 class WireNegotiateRequest(BaseModel):
270 """Body for ``POST /{owner}/{slug}/negotiate``.
271
272 Multi-round ACK/NAK protocol (analogous to Git pack-protocol v2). The
273 client sends the branch tips it *wants* and a depth-limited list of
274 commits it already *has* (≤ 256 per round). The server responds with
275 which ``have`` commits it recognises (``ack``) and whether it has enough
276 information to compute the delta (``ready``).
277
278 When ``ready`` is False the client walks back another ``NEGOTIATE_DEPTH``
279 ancestors and calls this endpoint again, narrowing toward the common base
280 without ever sending the full commit history.
281 """
282
283 want: list[str] = Field(default_factory=list, max_length=256)
284 have: list[str] = Field(default_factory=list, max_length=256)
285
286
287 class WireNegotiateResponse(BaseModel):
288 """Response for ``POST /{owner}/{slug}/negotiate``."""
289
290 ack: list[str] # have-IDs the server recognises
291 common_base: str | None # deepest shared ancestor found, if any
292 ready: bool # True → client has enough info to build pack