test_core_transport.py
python
| 1 | """Tests for muse.core.transport — HttpTransport and response parsers.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | import unittest.mock |
| 7 | import urllib.error |
| 8 | import urllib.request |
| 9 | from io import BytesIO |
| 10 | |
| 11 | import pytest |
| 12 | |
| 13 | from muse.core.pack import PackBundle, RemoteInfo |
| 14 | from muse.core.transport import ( |
| 15 | HttpTransport, |
| 16 | TransportError, |
| 17 | _parse_bundle, |
| 18 | _parse_push_result, |
| 19 | _parse_remote_info, |
| 20 | ) |
| 21 | |
| 22 | |
| 23 | # --------------------------------------------------------------------------- |
| 24 | # Helpers |
| 25 | # --------------------------------------------------------------------------- |
| 26 | |
| 27 | |
| 28 | def _mock_response(body: bytes, status: int = 200) -> unittest.mock.MagicMock: |
| 29 | """Return a mock urllib response context manager.""" |
| 30 | resp = unittest.mock.MagicMock() |
| 31 | resp.read.return_value = body |
| 32 | resp.__enter__ = lambda s: s |
| 33 | resp.__exit__ = unittest.mock.MagicMock(return_value=False) |
| 34 | return resp |
| 35 | |
| 36 | |
| 37 | def _http_error(code: int, body: bytes = b"") -> urllib.error.HTTPError: |
| 38 | return urllib.error.HTTPError( |
| 39 | url="https://example.com", |
| 40 | code=code, |
| 41 | msg=str(code), |
| 42 | hdrs=None, |
| 43 | fp=BytesIO(body), |
| 44 | ) |
| 45 | |
| 46 | |
| 47 | # --------------------------------------------------------------------------- |
| 48 | # _parse_remote_info |
| 49 | # --------------------------------------------------------------------------- |
| 50 | |
| 51 | |
| 52 | class TestParseRemoteInfo: |
| 53 | def test_valid_response(self) -> None: |
| 54 | raw = json.dumps( |
| 55 | { |
| 56 | "repo_id": "r123", |
| 57 | "domain": "midi", |
| 58 | "default_branch": "main", |
| 59 | "branch_heads": {"main": "abc123", "dev": "def456"}, |
| 60 | } |
| 61 | ).encode() |
| 62 | info = _parse_remote_info(raw) |
| 63 | assert info["repo_id"] == "r123" |
| 64 | assert info["domain"] == "midi" |
| 65 | assert info["default_branch"] == "main" |
| 66 | assert info["branch_heads"] == {"main": "abc123", "dev": "def456"} |
| 67 | |
| 68 | def test_invalid_json_raises_json_error(self) -> None: |
| 69 | import json as _json |
| 70 | with pytest.raises(_json.JSONDecodeError): |
| 71 | _parse_remote_info(b"not json") |
| 72 | |
| 73 | def test_non_dict_response_returns_defaults(self) -> None: |
| 74 | raw = json.dumps([1, 2, 3]).encode() |
| 75 | info = _parse_remote_info(raw) |
| 76 | assert info["repo_id"] == "" |
| 77 | assert info["branch_heads"] == {} |
| 78 | |
| 79 | def test_missing_fields_get_defaults(self) -> None: |
| 80 | raw = json.dumps({"repo_id": "x"}).encode() |
| 81 | info = _parse_remote_info(raw) |
| 82 | assert info["repo_id"] == "x" |
| 83 | assert info["domain"] == "midi" |
| 84 | assert info["default_branch"] == "main" |
| 85 | assert info["branch_heads"] == {} |
| 86 | |
| 87 | def test_non_string_branch_heads_excluded(self) -> None: |
| 88 | raw = json.dumps( |
| 89 | {"branch_heads": {"main": "abc", "bad": 123}} |
| 90 | ).encode() |
| 91 | info = _parse_remote_info(raw) |
| 92 | assert "main" in info["branch_heads"] |
| 93 | assert "bad" not in info["branch_heads"] |
| 94 | |
| 95 | |
| 96 | # --------------------------------------------------------------------------- |
| 97 | # _parse_bundle |
| 98 | # --------------------------------------------------------------------------- |
| 99 | |
| 100 | |
| 101 | class TestParseBundle: |
| 102 | def test_empty_json_object_returns_empty_bundle(self) -> None: |
| 103 | bundle = _parse_bundle(b"{}") |
| 104 | assert bundle == {} |
| 105 | |
| 106 | def test_non_dict_returns_empty_bundle(self) -> None: |
| 107 | bundle = _parse_bundle(b"[]") |
| 108 | assert bundle == {} |
| 109 | |
| 110 | def test_commits_extracted(self) -> None: |
| 111 | raw = json.dumps( |
| 112 | { |
| 113 | "commits": [ |
| 114 | { |
| 115 | "commit_id": "c1", |
| 116 | "repo_id": "r1", |
| 117 | "branch": "main", |
| 118 | "snapshot_id": "s1", |
| 119 | "message": "test", |
| 120 | "committed_at": "2026-01-01T00:00:00+00:00", |
| 121 | "parent_commit_id": None, |
| 122 | "parent2_commit_id": None, |
| 123 | "author": "bob", |
| 124 | "metadata": {}, |
| 125 | } |
| 126 | ] |
| 127 | } |
| 128 | ).encode() |
| 129 | bundle = _parse_bundle(raw) |
| 130 | commits = bundle.get("commits") or [] |
| 131 | assert len(commits) == 1 |
| 132 | assert commits[0]["commit_id"] == "c1" |
| 133 | |
| 134 | def test_objects_extracted(self) -> None: |
| 135 | import base64 |
| 136 | raw = json.dumps( |
| 137 | { |
| 138 | "objects": [ |
| 139 | { |
| 140 | "object_id": "abc123", |
| 141 | "content_b64": base64.b64encode(b"hello").decode(), |
| 142 | } |
| 143 | ] |
| 144 | } |
| 145 | ).encode() |
| 146 | bundle = _parse_bundle(raw) |
| 147 | objs = bundle.get("objects") or [] |
| 148 | assert len(objs) == 1 |
| 149 | assert objs[0]["object_id"] == "abc123" |
| 150 | |
| 151 | def test_object_missing_fields_excluded(self) -> None: |
| 152 | raw = json.dumps( |
| 153 | {"objects": [{"object_id": "abc"}]} # missing content_b64 |
| 154 | ).encode() |
| 155 | bundle = _parse_bundle(raw) |
| 156 | assert (bundle.get("objects") or []) == [] |
| 157 | |
| 158 | def test_branch_heads_extracted(self) -> None: |
| 159 | raw = json.dumps({"branch_heads": {"main": "abc123"}}).encode() |
| 160 | bundle = _parse_bundle(raw) |
| 161 | assert bundle.get("branch_heads") == {"main": "abc123"} |
| 162 | |
| 163 | |
| 164 | # --------------------------------------------------------------------------- |
| 165 | # _parse_push_result |
| 166 | # --------------------------------------------------------------------------- |
| 167 | |
| 168 | |
| 169 | class TestParsePushResult: |
| 170 | def test_success_response(self) -> None: |
| 171 | raw = json.dumps( |
| 172 | {"ok": True, "message": "pushed", "branch_heads": {"main": "abc"}} |
| 173 | ).encode() |
| 174 | result = _parse_push_result(raw) |
| 175 | assert result["ok"] is True |
| 176 | assert result["message"] == "pushed" |
| 177 | assert result["branch_heads"] == {"main": "abc"} |
| 178 | |
| 179 | def test_failure_response(self) -> None: |
| 180 | raw = json.dumps({"ok": False, "message": "rejected", "branch_heads": {}}).encode() |
| 181 | result = _parse_push_result(raw) |
| 182 | assert result["ok"] is False |
| 183 | assert result["message"] == "rejected" |
| 184 | |
| 185 | def test_non_dict_returns_not_ok(self) -> None: |
| 186 | result = _parse_push_result(b"null") |
| 187 | assert result["ok"] is False |
| 188 | |
| 189 | def test_missing_ok_defaults_false(self) -> None: |
| 190 | raw = json.dumps({"message": "hm", "branch_heads": {}}).encode() |
| 191 | result = _parse_push_result(raw) |
| 192 | assert result["ok"] is False |
| 193 | |
| 194 | |
| 195 | # --------------------------------------------------------------------------- |
| 196 | # HttpTransport — mocked urlopen |
| 197 | # --------------------------------------------------------------------------- |
| 198 | |
| 199 | |
| 200 | class TestHttpTransportFetchRemoteInfo: |
| 201 | def test_calls_correct_endpoint(self) -> None: |
| 202 | body = json.dumps( |
| 203 | { |
| 204 | "repo_id": "r1", |
| 205 | "domain": "midi", |
| 206 | "default_branch": "main", |
| 207 | "branch_heads": {"main": "abc"}, |
| 208 | } |
| 209 | ).encode() |
| 210 | mock_resp = _mock_response(body) |
| 211 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 212 | transport = HttpTransport() |
| 213 | info = transport.fetch_remote_info("https://hub.example.com/repos/r1", None) |
| 214 | req = m.call_args[0][0] |
| 215 | assert req.full_url == "https://hub.example.com/repos/r1/refs" |
| 216 | assert info["repo_id"] == "r1" |
| 217 | |
| 218 | def test_bearer_token_sent(self) -> None: |
| 219 | body = json.dumps( |
| 220 | {"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}} |
| 221 | ).encode() |
| 222 | mock_resp = _mock_response(body) |
| 223 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 224 | HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", "my-token") |
| 225 | req = m.call_args[0][0] |
| 226 | assert req.get_header("Authorization") == "Bearer my-token" |
| 227 | |
| 228 | def test_no_token_no_auth_header(self) -> None: |
| 229 | body = json.dumps( |
| 230 | {"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}} |
| 231 | ).encode() |
| 232 | mock_resp = _mock_response(body) |
| 233 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 234 | HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) |
| 235 | req = m.call_args[0][0] |
| 236 | assert req.get_header("Authorization") is None |
| 237 | |
| 238 | def test_http_401_raises_transport_error(self) -> None: |
| 239 | with unittest.mock.patch( |
| 240 | "urllib.request.urlopen", side_effect=_http_error(401, b"Unauthorized") |
| 241 | ): |
| 242 | with pytest.raises(TransportError) as exc_info: |
| 243 | HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) |
| 244 | assert exc_info.value.status_code == 401 |
| 245 | |
| 246 | def test_http_404_raises_transport_error(self) -> None: |
| 247 | with unittest.mock.patch( |
| 248 | "urllib.request.urlopen", side_effect=_http_error(404) |
| 249 | ): |
| 250 | with pytest.raises(TransportError) as exc_info: |
| 251 | HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) |
| 252 | assert exc_info.value.status_code == 404 |
| 253 | |
| 254 | def test_http_500_raises_transport_error(self) -> None: |
| 255 | with unittest.mock.patch( |
| 256 | "urllib.request.urlopen", side_effect=_http_error(500, b"Internal Error") |
| 257 | ): |
| 258 | with pytest.raises(TransportError) as exc_info: |
| 259 | HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) |
| 260 | assert exc_info.value.status_code == 500 |
| 261 | |
| 262 | def test_url_error_raises_transport_error_with_code_0(self) -> None: |
| 263 | with unittest.mock.patch( |
| 264 | "urllib.request.urlopen", |
| 265 | side_effect=urllib.error.URLError("Name or service not known"), |
| 266 | ): |
| 267 | with pytest.raises(TransportError) as exc_info: |
| 268 | HttpTransport().fetch_remote_info("https://bad.host/r", None) |
| 269 | assert exc_info.value.status_code == 0 |
| 270 | |
| 271 | def test_trailing_slash_stripped_from_url(self) -> None: |
| 272 | body = json.dumps( |
| 273 | {"repo_id": "r", "domain": "midi", "default_branch": "main", "branch_heads": {}} |
| 274 | ).encode() |
| 275 | mock_resp = _mock_response(body) |
| 276 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 277 | HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1/", None) |
| 278 | req = m.call_args[0][0] |
| 279 | assert req.full_url == "https://hub.example.com/repos/r1/refs" |
| 280 | |
| 281 | |
| 282 | class TestHttpTransportFetchPack: |
| 283 | def test_posts_to_fetch_endpoint(self) -> None: |
| 284 | bundle_body = json.dumps( |
| 285 | { |
| 286 | "commits": [], |
| 287 | "snapshots": [], |
| 288 | "objects": [], |
| 289 | "branch_heads": {"main": "abc"}, |
| 290 | } |
| 291 | ).encode() |
| 292 | mock_resp = _mock_response(bundle_body) |
| 293 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 294 | transport = HttpTransport() |
| 295 | bundle = transport.fetch_pack( |
| 296 | "https://hub.example.com/repos/r1", |
| 297 | "tok", |
| 298 | want=["abc"], |
| 299 | have=["def"], |
| 300 | ) |
| 301 | req = m.call_args[0][0] |
| 302 | assert req.full_url == "https://hub.example.com/repos/r1/fetch" |
| 303 | sent = json.loads(req.data) |
| 304 | assert sent["want"] == ["abc"] |
| 305 | assert sent["have"] == ["def"] |
| 306 | assert bundle.get("branch_heads") == {"main": "abc"} |
| 307 | |
| 308 | def test_http_409_raises_transport_error(self) -> None: |
| 309 | with unittest.mock.patch( |
| 310 | "urllib.request.urlopen", side_effect=_http_error(409) |
| 311 | ): |
| 312 | with pytest.raises(TransportError) as exc_info: |
| 313 | HttpTransport().fetch_pack("https://hub.example.com/r", None, [], []) |
| 314 | assert exc_info.value.status_code == 409 |
| 315 | |
| 316 | |
| 317 | class TestHttpTransportPushPack: |
| 318 | def test_posts_to_push_endpoint(self) -> None: |
| 319 | push_body = json.dumps( |
| 320 | {"ok": True, "message": "ok", "branch_heads": {"main": "new"}} |
| 321 | ).encode() |
| 322 | mock_resp = _mock_response(push_body) |
| 323 | bundle: PackBundle = {"commits": [], "snapshots": [], "objects": []} |
| 324 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 325 | result = HttpTransport().push_pack( |
| 326 | "https://hub.example.com/repos/r1", "tok", bundle, "main", False |
| 327 | ) |
| 328 | req = m.call_args[0][0] |
| 329 | assert req.full_url == "https://hub.example.com/repos/r1/push" |
| 330 | sent = json.loads(req.data) |
| 331 | assert sent["branch"] == "main" |
| 332 | assert sent["force"] is False |
| 333 | assert result["ok"] is True |
| 334 | |
| 335 | def test_force_flag_sent(self) -> None: |
| 336 | push_body = json.dumps( |
| 337 | {"ok": True, "message": "", "branch_heads": {}} |
| 338 | ).encode() |
| 339 | mock_resp = _mock_response(push_body) |
| 340 | bundle: PackBundle = {} |
| 341 | with unittest.mock.patch("urllib.request.urlopen", return_value=mock_resp) as m: |
| 342 | HttpTransport().push_pack("https://hub.example.com/r", None, bundle, "main", True) |
| 343 | req = m.call_args[0][0] |
| 344 | sent = json.loads(req.data) |
| 345 | assert sent["force"] is True |
| 346 | |
| 347 | def test_push_rejected_raises_transport_error(self) -> None: |
| 348 | with unittest.mock.patch( |
| 349 | "urllib.request.urlopen", side_effect=_http_error(409, b"non-fast-forward") |
| 350 | ): |
| 351 | with pytest.raises(TransportError) as exc_info: |
| 352 | HttpTransport().push_pack( |
| 353 | "https://hub.example.com/r", None, {}, "main", False |
| 354 | ) |
| 355 | assert exc_info.value.status_code == 409 |