cgcardona / muse public
test_cli_plumbing.py python
739 lines 25.3 KB
189a2e45 feat: three-tier CLI architecture — plumbing, core porcelain, semantic … Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for all Tier 1 plumbing commands under ``muse plumbing …``.
2
3 Each plumbing command is tested via the Typer CliRunner so tests exercise the
4 full CLI stack including argument parsing, error handling, and JSON output
5 format. All commands are accessed through the ``plumbing`` sub-namespace.
6
7 The ``MUSE_REPO_ROOT`` env-var is used to point repo-discovery at the test
8 fixture without requiring ``os.chdir``.
9 """
10
11 from __future__ import annotations
12
13 import datetime
14 import json
15 import pathlib
16
17 import pytest
18 from typer.testing import CliRunner
19
20 from muse.cli.app import cli
21 from muse.core.errors import ExitCode
22 from muse.core.object_store import write_object
23 from muse.core.pack import build_pack
24 from muse.core.store import (
25 CommitRecord,
26 SnapshotRecord,
27 write_commit,
28 write_snapshot,
29 )
30
31 runner = CliRunner()
32
33 # ---------------------------------------------------------------------------
34 # Helpers
35 # ---------------------------------------------------------------------------
36
37
38 def _init_repo(path: pathlib.Path) -> pathlib.Path:
39 """Create a minimal .muse/ directory structure."""
40 muse = path / ".muse"
41 (muse / "commits").mkdir(parents=True)
42 (muse / "snapshots").mkdir(parents=True)
43 (muse / "objects").mkdir(parents=True)
44 (muse / "refs" / "heads").mkdir(parents=True)
45 muse.joinpath("HEAD").write_text("refs/heads/main")
46 muse.joinpath("repo.json").write_text(
47 json.dumps({"repo_id": "test-repo-id", "domain": "generic"})
48 )
49 return path
50
51
52 def _make_object(repo: pathlib.Path, content: bytes) -> str:
53 import hashlib
54
55 oid = hashlib.sha256(content).hexdigest()
56 write_object(repo, oid, content)
57 return oid
58
59
60 def _make_snapshot(
61 repo: pathlib.Path, snap_id: str, manifest: dict[str, str]
62 ) -> SnapshotRecord:
63 snap = SnapshotRecord(
64 snapshot_id=snap_id,
65 manifest=manifest,
66 created_at=datetime.datetime(2026, 3, 18, tzinfo=datetime.timezone.utc),
67 )
68 write_snapshot(repo, snap)
69 return snap
70
71
72 def _make_commit(
73 repo: pathlib.Path,
74 commit_id: str,
75 snapshot_id: str,
76 *,
77 branch: str = "main",
78 parent_commit_id: str | None = None,
79 message: str = "test commit",
80 ) -> CommitRecord:
81 rec = CommitRecord(
82 commit_id=commit_id,
83 repo_id="test-repo-id",
84 branch=branch,
85 snapshot_id=snapshot_id,
86 message=message,
87 committed_at=datetime.datetime(2026, 3, 18, tzinfo=datetime.timezone.utc),
88 author="tester",
89 parent_commit_id=parent_commit_id,
90 )
91 write_commit(repo, rec)
92 return rec
93
94
95 def _set_head(repo: pathlib.Path, branch: str, commit_id: str) -> None:
96 ref = repo / ".muse" / "refs" / "heads" / branch
97 ref.parent.mkdir(parents=True, exist_ok=True)
98 ref.write_text(commit_id)
99
100
101 def _repo_env(repo: pathlib.Path) -> dict[str, str]:
102 """Return env dict that sets MUSE_REPO_ROOT to the given path."""
103 return {"MUSE_REPO_ROOT": str(repo)}
104
105
106 # ---------------------------------------------------------------------------
107 # hash-object
108 # ---------------------------------------------------------------------------
109
110
111 class TestHashObject:
112 def test_hash_file_json_output(self, tmp_path: pathlib.Path) -> None:
113 f = tmp_path / "test.txt"
114 f.write_bytes(b"hello world")
115 result = runner.invoke(cli, ["plumbing", "hash-object", str(f)])
116 assert result.exit_code == 0, result.output
117 data = json.loads(result.stdout)
118 assert "object_id" in data
119 assert len(data["object_id"]) == 64
120 assert data["stored"] is False
121
122 def test_hash_file_text_format(self, tmp_path: pathlib.Path) -> None:
123 f = tmp_path / "data.bin"
124 f.write_bytes(b"test bytes")
125 result = runner.invoke(
126 cli, ["plumbing", "hash-object", "--format", "text", str(f)]
127 )
128 assert result.exit_code == 0, result.output
129 assert len(result.stdout.strip()) == 64
130
131 def test_hash_and_write(self, tmp_path: pathlib.Path) -> None:
132 repo = _init_repo(tmp_path / "repo")
133 f = repo / "sample.txt"
134 f.write_bytes(b"write me")
135 result = runner.invoke(
136 cli,
137 ["plumbing", "hash-object", "--write", str(f)],
138 env=_repo_env(repo),
139 catch_exceptions=False,
140 )
141 assert result.exit_code == 0, result.output
142 data = json.loads(result.stdout)
143 assert data["stored"] is True
144
145 def test_missing_file_errors(self, tmp_path: pathlib.Path) -> None:
146 result = runner.invoke(cli, ["plumbing", "hash-object", str(tmp_path / "no.txt")])
147 assert result.exit_code == ExitCode.USER_ERROR
148
149 def test_directory_errors(self, tmp_path: pathlib.Path) -> None:
150 result = runner.invoke(cli, ["plumbing", "hash-object", str(tmp_path)])
151 assert result.exit_code == ExitCode.USER_ERROR
152
153
154 # ---------------------------------------------------------------------------
155 # cat-object
156 # ---------------------------------------------------------------------------
157
158
159 class TestCatObject:
160 def test_cat_raw_bytes(self, tmp_path: pathlib.Path) -> None:
161 repo = _init_repo(tmp_path)
162 content = b"raw content data"
163 oid = _make_object(repo, content)
164 result = runner.invoke(
165 cli, ["plumbing", "cat-object", oid],
166 env=_repo_env(repo),
167 catch_exceptions=False,
168 )
169 assert result.exit_code == 0, result.output
170 assert result.stdout_bytes == content
171
172 def test_cat_info_format(self, tmp_path: pathlib.Path) -> None:
173 repo = _init_repo(tmp_path)
174 content = b"info content"
175 oid = _make_object(repo, content)
176 result = runner.invoke(
177 cli, ["plumbing", "cat-object", "--format", "info", oid],
178 env=_repo_env(repo),
179 )
180 assert result.exit_code == 0, result.output
181 data = json.loads(result.stdout)
182 assert data["object_id"] == oid
183 assert data["present"] is True
184 assert data["size_bytes"] == len(content)
185
186 def test_missing_object_errors(self, tmp_path: pathlib.Path) -> None:
187 repo = _init_repo(tmp_path)
188 result = runner.invoke(
189 cli, ["plumbing", "cat-object", "a" * 64],
190 env=_repo_env(repo),
191 )
192 assert result.exit_code == ExitCode.USER_ERROR
193
194 def test_missing_object_info_format(self, tmp_path: pathlib.Path) -> None:
195 repo = _init_repo(tmp_path)
196 oid = "b" * 64
197 result = runner.invoke(
198 cli, ["plumbing", "cat-object", "--format", "info", oid],
199 env=_repo_env(repo),
200 )
201 assert result.exit_code == ExitCode.USER_ERROR
202 data = json.loads(result.stdout)
203 assert data["present"] is False
204
205
206 # ---------------------------------------------------------------------------
207 # rev-parse
208 # ---------------------------------------------------------------------------
209
210
211 class TestRevParse:
212 def test_resolve_branch(self, tmp_path: pathlib.Path) -> None:
213 repo = _init_repo(tmp_path)
214 oid = _make_object(repo, b"data")
215 _make_snapshot(repo, "snap1", {"f": oid})
216 _make_commit(repo, "c" * 64, "snap1")
217 _set_head(repo, "main", "c" * 64)
218
219 result = runner.invoke(
220 cli, ["plumbing", "rev-parse", "main"],
221 env=_repo_env(repo),
222 )
223 assert result.exit_code == 0, result.output
224 data = json.loads(result.stdout)
225 assert data["commit_id"] == "c" * 64
226 assert data["ref"] == "main"
227
228 def test_resolve_head(self, tmp_path: pathlib.Path) -> None:
229 repo = _init_repo(tmp_path)
230 oid = _make_object(repo, b"data")
231 _make_snapshot(repo, "snap1", {"f": oid})
232 _make_commit(repo, "d" * 64, "snap1")
233 _set_head(repo, "main", "d" * 64)
234
235 result = runner.invoke(
236 cli, ["plumbing", "rev-parse", "HEAD"],
237 env=_repo_env(repo),
238 )
239 assert result.exit_code == 0, result.output
240 data = json.loads(result.stdout)
241 assert data["commit_id"] == "d" * 64
242
243 def test_resolve_text_format(self, tmp_path: pathlib.Path) -> None:
244 repo = _init_repo(tmp_path)
245 oid = _make_object(repo, b"data")
246 _make_snapshot(repo, "snap1", {"f": oid})
247 _make_commit(repo, "e" * 64, "snap1")
248 _set_head(repo, "main", "e" * 64)
249
250 result = runner.invoke(
251 cli, ["plumbing", "rev-parse", "--format", "text", "main"],
252 env=_repo_env(repo),
253 )
254 assert result.exit_code == 0, result.output
255 assert result.stdout.strip() == "e" * 64
256
257 def test_unknown_ref_errors(self, tmp_path: pathlib.Path) -> None:
258 repo = _init_repo(tmp_path)
259 result = runner.invoke(
260 cli, ["plumbing", "rev-parse", "nonexistent"],
261 env=_repo_env(repo),
262 )
263 assert result.exit_code == ExitCode.USER_ERROR
264 data = json.loads(result.stdout)
265 assert data["commit_id"] is None
266
267
268 # ---------------------------------------------------------------------------
269 # ls-files
270 # ---------------------------------------------------------------------------
271
272
273 class TestLsFiles:
274 def test_lists_files_json(self, tmp_path: pathlib.Path) -> None:
275 repo = _init_repo(tmp_path)
276 oid = _make_object(repo, b"track data")
277 _make_snapshot(repo, "snap1", {"tracks/drums.mid": oid})
278 _make_commit(repo, "f" * 64, "snap1")
279 _set_head(repo, "main", "f" * 64)
280
281 result = runner.invoke(
282 cli, ["plumbing", "ls-files"],
283 env=_repo_env(repo),
284 )
285 assert result.exit_code == 0, result.output
286 data = json.loads(result.stdout)
287 assert data["file_count"] == 1
288 assert data["files"][0]["path"] == "tracks/drums.mid"
289 assert data["files"][0]["object_id"] == oid
290
291 def test_lists_files_text(self, tmp_path: pathlib.Path) -> None:
292 repo = _init_repo(tmp_path)
293 oid = _make_object(repo, b"data")
294 _make_snapshot(repo, "snap1", {"a.txt": oid})
295 _make_commit(repo, "a" * 64, "snap1")
296 _set_head(repo, "main", "a" * 64)
297
298 result = runner.invoke(
299 cli, ["plumbing", "ls-files", "--format", "text"],
300 env=_repo_env(repo),
301 )
302 assert result.exit_code == 0, result.output
303 assert "a.txt" in result.stdout
304
305 def test_with_explicit_commit(self, tmp_path: pathlib.Path) -> None:
306 repo = _init_repo(tmp_path)
307 oid = _make_object(repo, b"data")
308 _make_snapshot(repo, "snap1", {"x.mid": oid})
309 _make_commit(repo, "1" * 64, "snap1")
310
311 result = runner.invoke(
312 cli, ["plumbing", "ls-files", "--commit", "1" * 64],
313 env=_repo_env(repo),
314 )
315 assert result.exit_code == 0, result.output
316 data = json.loads(result.stdout)
317 assert data["commit_id"] == "1" * 64
318
319 def test_no_commits_errors(self, tmp_path: pathlib.Path) -> None:
320 repo = _init_repo(tmp_path)
321 result = runner.invoke(
322 cli, ["plumbing", "ls-files"],
323 env=_repo_env(repo),
324 )
325 assert result.exit_code == ExitCode.USER_ERROR
326
327
328 # ---------------------------------------------------------------------------
329 # read-commit
330 # ---------------------------------------------------------------------------
331
332
333 class TestReadCommit:
334 def test_reads_commit_json(self, tmp_path: pathlib.Path) -> None:
335 repo = _init_repo(tmp_path)
336 oid = _make_object(repo, b"data")
337 _make_snapshot(repo, "snap1", {"f": oid})
338 _make_commit(repo, "2" * 64, "snap1", message="my message")
339
340 result = runner.invoke(
341 cli, ["plumbing", "read-commit", "2" * 64],
342 env=_repo_env(repo),
343 catch_exceptions=False,
344 )
345 assert result.exit_code == 0, result.output
346 data = json.loads(result.stdout)
347 assert data["commit_id"] == "2" * 64
348 assert data["message"] == "my message"
349 assert data["snapshot_id"] == "snap1"
350
351 def test_missing_commit_errors(self, tmp_path: pathlib.Path) -> None:
352 repo = _init_repo(tmp_path)
353 result = runner.invoke(
354 cli, ["plumbing", "read-commit", "z" * 64],
355 env=_repo_env(repo),
356 )
357 assert result.exit_code == ExitCode.USER_ERROR
358 data = json.loads(result.stdout)
359 assert "error" in data
360
361
362 # ---------------------------------------------------------------------------
363 # read-snapshot
364 # ---------------------------------------------------------------------------
365
366
367 class TestReadSnapshot:
368 def test_reads_snapshot_json(self, tmp_path: pathlib.Path) -> None:
369 repo = _init_repo(tmp_path)
370 oid = _make_object(repo, b"snap data")
371 _make_snapshot(repo, "mysnapshot", {"track.mid": oid})
372
373 result = runner.invoke(
374 cli, ["plumbing", "read-snapshot", "mysnapshot"],
375 env=_repo_env(repo),
376 catch_exceptions=False,
377 )
378 assert result.exit_code == 0, result.output
379 data = json.loads(result.stdout)
380 assert data["snapshot_id"] == "mysnapshot"
381 assert data["file_count"] == 1
382 assert "track.mid" in data["manifest"]
383
384 def test_missing_snapshot_errors(self, tmp_path: pathlib.Path) -> None:
385 repo = _init_repo(tmp_path)
386 result = runner.invoke(
387 cli, ["plumbing", "read-snapshot", "nothere"],
388 env=_repo_env(repo),
389 )
390 assert result.exit_code == ExitCode.USER_ERROR
391
392
393 # ---------------------------------------------------------------------------
394 # commit-tree
395 # ---------------------------------------------------------------------------
396
397
398 class TestCommitTree:
399 def test_creates_commit_from_snapshot(self, tmp_path: pathlib.Path) -> None:
400 repo = _init_repo(tmp_path)
401 oid = _make_object(repo, b"content")
402 _make_snapshot(repo, "snap1", {"file.txt": oid})
403
404 result = runner.invoke(
405 cli,
406 [
407 "plumbing", "commit-tree",
408 "--snapshot", "snap1",
409 "--message", "plumbing commit",
410 "--author", "bot",
411 ],
412 env=_repo_env(repo),
413 catch_exceptions=False,
414 )
415 assert result.exit_code == 0, result.output
416 data = json.loads(result.stdout)
417 assert "commit_id" in data
418 assert len(data["commit_id"]) == 64
419
420 def test_with_parent(self, tmp_path: pathlib.Path) -> None:
421 repo = _init_repo(tmp_path)
422 oid = _make_object(repo, b"data")
423 _make_snapshot(repo, "snap1", {"a": oid})
424 _make_commit(repo, "p" * 64, "snap1")
425
426 _make_snapshot(repo, "snap2", {"b": oid})
427 result = runner.invoke(
428 cli,
429 [
430 "plumbing", "commit-tree",
431 "--snapshot", "snap2",
432 "--parent", "p" * 64,
433 "--message", "child",
434 ],
435 env=_repo_env(repo),
436 catch_exceptions=False,
437 )
438 assert result.exit_code == 0, result.output
439 data = json.loads(result.stdout)
440 assert "commit_id" in data
441
442 def test_missing_snapshot_errors(self, tmp_path: pathlib.Path) -> None:
443 repo = _init_repo(tmp_path)
444 result = runner.invoke(
445 cli,
446 ["plumbing", "commit-tree", "--snapshot", "nosuch"],
447 env=_repo_env(repo),
448 )
449 assert result.exit_code == ExitCode.USER_ERROR
450
451
452 # ---------------------------------------------------------------------------
453 # update-ref
454 # ---------------------------------------------------------------------------
455
456
457 class TestUpdateRef:
458 def test_creates_branch_ref(self, tmp_path: pathlib.Path) -> None:
459 repo = _init_repo(tmp_path)
460 oid = _make_object(repo, b"x")
461 _make_snapshot(repo, "s1", {"x": oid})
462 _make_commit(repo, "3" * 64, "s1")
463
464 result = runner.invoke(
465 cli,
466 ["plumbing", "update-ref", "feature", "3" * 64],
467 env=_repo_env(repo),
468 catch_exceptions=False,
469 )
470 assert result.exit_code == 0, result.output
471 data = json.loads(result.stdout)
472 assert data["branch"] == "feature"
473 assert data["commit_id"] == "3" * 64
474 ref = repo / ".muse" / "refs" / "heads" / "feature"
475 assert ref.read_text() == "3" * 64
476
477 def test_updates_existing_ref(self, tmp_path: pathlib.Path) -> None:
478 repo = _init_repo(tmp_path)
479 oid = _make_object(repo, b"y")
480 _make_snapshot(repo, "s1", {"y": oid})
481 _make_commit(repo, "4" * 64, "s1")
482 _make_commit(repo, "5" * 64, "s1", parent_commit_id="4" * 64)
483 _set_head(repo, "main", "4" * 64)
484
485 result = runner.invoke(
486 cli,
487 ["plumbing", "update-ref", "main", "5" * 64],
488 env=_repo_env(repo),
489 )
490 assert result.exit_code == 0, result.output
491 data = json.loads(result.stdout)
492 assert data["previous"] == "4" * 64
493 assert data["commit_id"] == "5" * 64
494
495 def test_delete_ref(self, tmp_path: pathlib.Path) -> None:
496 repo = _init_repo(tmp_path)
497 _set_head(repo, "todelete", "x" * 64)
498 result = runner.invoke(
499 cli,
500 ["plumbing", "update-ref", "--delete", "todelete"],
501 env=_repo_env(repo),
502 )
503 assert result.exit_code == 0, result.output
504 data = json.loads(result.stdout)
505 assert data["deleted"] is True
506 ref = repo / ".muse" / "refs" / "heads" / "todelete"
507 assert not ref.exists()
508
509 def test_verify_commit_not_found_errors(self, tmp_path: pathlib.Path) -> None:
510 repo = _init_repo(tmp_path)
511 result = runner.invoke(
512 cli,
513 ["plumbing", "update-ref", "main", "0" * 64],
514 env=_repo_env(repo),
515 )
516 assert result.exit_code == ExitCode.USER_ERROR
517
518 def test_no_verify_skips_commit_check(self, tmp_path: pathlib.Path) -> None:
519 repo = _init_repo(tmp_path)
520 result = runner.invoke(
521 cli,
522 ["plumbing", "update-ref", "--no-verify", "feature", "9" * 64],
523 env=_repo_env(repo),
524 )
525 assert result.exit_code == 0, result.output
526 ref = repo / ".muse" / "refs" / "heads" / "feature"
527 assert ref.read_text() == "9" * 64
528
529
530 # ---------------------------------------------------------------------------
531 # commit-graph
532 # ---------------------------------------------------------------------------
533
534
535 class TestCommitGraph:
536 def test_linear_graph(self, tmp_path: pathlib.Path) -> None:
537 repo = _init_repo(tmp_path)
538 oid = _make_object(repo, b"data")
539 _make_snapshot(repo, "s1", {"f": oid})
540 _make_commit(repo, "c1" + "0" * 62, "s1", message="first")
541 _make_commit(
542 repo,
543 "c2" + "0" * 62,
544 "s1",
545 message="second",
546 parent_commit_id="c1" + "0" * 62,
547 )
548 _set_head(repo, "main", "c2" + "0" * 62)
549
550 result = runner.invoke(
551 cli, ["plumbing", "commit-graph"],
552 env=_repo_env(repo),
553 catch_exceptions=False,
554 )
555 assert result.exit_code == 0, result.output
556 data = json.loads(result.stdout)
557 assert data["count"] == 2
558 commit_ids = [c["commit_id"] for c in data["commits"]]
559 assert "c2" + "0" * 62 in commit_ids
560 assert "c1" + "0" * 62 in commit_ids
561
562 def test_text_format(self, tmp_path: pathlib.Path) -> None:
563 repo = _init_repo(tmp_path)
564 oid = _make_object(repo, b"data")
565 _make_snapshot(repo, "s1", {"f": oid})
566 _make_commit(repo, "e1" + "0" * 62, "s1")
567 _set_head(repo, "main", "e1" + "0" * 62)
568
569 result = runner.invoke(
570 cli, ["plumbing", "commit-graph", "--format", "text"],
571 env=_repo_env(repo),
572 )
573 assert result.exit_code == 0, result.output
574 assert "e1" + "0" * 62 in result.stdout
575
576 def test_explicit_tip(self, tmp_path: pathlib.Path) -> None:
577 repo = _init_repo(tmp_path)
578 oid = _make_object(repo, b"data")
579 _make_snapshot(repo, "s1", {"f": oid})
580 _make_commit(repo, "t1" + "0" * 62, "s1")
581
582 result = runner.invoke(
583 cli, ["plumbing", "commit-graph", "--tip", "t1" + "0" * 62],
584 env=_repo_env(repo),
585 )
586 assert result.exit_code == 0, result.output
587 data = json.loads(result.stdout)
588 assert data["tip"] == "t1" + "0" * 62
589
590 def test_no_commits_errors(self, tmp_path: pathlib.Path) -> None:
591 repo = _init_repo(tmp_path)
592 result = runner.invoke(
593 cli, ["plumbing", "commit-graph"],
594 env=_repo_env(repo),
595 )
596 assert result.exit_code == ExitCode.USER_ERROR
597
598
599 # ---------------------------------------------------------------------------
600 # pack-objects
601 # ---------------------------------------------------------------------------
602
603
604 class TestPackObjects:
605 def test_packs_head(self, tmp_path: pathlib.Path) -> None:
606 repo = _init_repo(tmp_path)
607 oid = _make_object(repo, b"pack me")
608 _make_snapshot(repo, "s1", {"f.mid": oid})
609 _make_commit(repo, "p" + "0" * 63, "s1")
610 _set_head(repo, "main", "p" + "0" * 63)
611
612 result = runner.invoke(
613 cli, ["plumbing", "pack-objects", "HEAD"],
614 env=_repo_env(repo),
615 catch_exceptions=False,
616 )
617 assert result.exit_code == 0, result.output
618 data = json.loads(result.stdout)
619 assert "commits" in data
620 assert len(data["commits"]) >= 1
621
622 def test_packs_explicit_commit(self, tmp_path: pathlib.Path) -> None:
623 repo = _init_repo(tmp_path)
624 oid = _make_object(repo, b"explicit")
625 _make_snapshot(repo, "s1", {"g": oid})
626 commit_id = "q" + "0" * 63
627 _make_commit(repo, commit_id, "s1")
628
629 result = runner.invoke(
630 cli, ["plumbing", "pack-objects", commit_id],
631 env=_repo_env(repo),
632 catch_exceptions=False,
633 )
634 assert result.exit_code == 0, result.output
635 data = json.loads(result.stdout)
636 commit_ids = [c["commit_id"] for c in data["commits"]]
637 assert commit_id in commit_ids
638
639 def test_no_commits_on_head_errors(self, tmp_path: pathlib.Path) -> None:
640 repo = _init_repo(tmp_path)
641 result = runner.invoke(
642 cli, ["plumbing", "pack-objects", "HEAD"],
643 env=_repo_env(repo),
644 )
645 assert result.exit_code == ExitCode.USER_ERROR
646
647
648 # ---------------------------------------------------------------------------
649 # unpack-objects
650 # ---------------------------------------------------------------------------
651
652
653 class TestUnpackObjects:
654 def test_unpacks_valid_bundle(self, tmp_path: pathlib.Path) -> None:
655 source = _init_repo(tmp_path / "src")
656 dest = _init_repo(tmp_path / "dst")
657
658 oid = _make_object(source, b"unpack me")
659 _make_snapshot(source, "s1", {"h.mid": oid})
660 commit_id = "u" + "0" * 63
661 _make_commit(source, commit_id, "s1")
662
663 bundle = build_pack(source, [commit_id])
664 bundle_json = json.dumps(bundle)
665
666 result = runner.invoke(
667 cli,
668 ["plumbing", "unpack-objects"],
669 input=bundle_json,
670 env=_repo_env(dest),
671 catch_exceptions=False,
672 )
673 assert result.exit_code == 0, result.output
674 data = json.loads(result.stdout)
675 assert "objects_written" in data
676 assert data["commits_written"] == 1
677 assert data["objects_written"] == 1
678
679 def test_invalid_json_errors(self, tmp_path: pathlib.Path) -> None:
680 repo = _init_repo(tmp_path)
681 result = runner.invoke(
682 cli,
683 ["plumbing", "unpack-objects"],
684 input="NOT_VALID_JSON",
685 env=_repo_env(repo),
686 )
687 assert result.exit_code == ExitCode.USER_ERROR
688
689 def test_idempotent_unpack(self, tmp_path: pathlib.Path) -> None:
690 repo = _init_repo(tmp_path)
691 oid = _make_object(repo, b"idempotent")
692 _make_snapshot(repo, "s1", {"i.txt": oid})
693 commit_id = "i" + "0" * 63
694 _make_commit(repo, commit_id, "s1")
695
696 bundle = build_pack(repo, [commit_id])
697 bundle_json = json.dumps(bundle)
698
699 result1 = runner.invoke(
700 cli, ["plumbing", "unpack-objects"],
701 input=bundle_json,
702 env=_repo_env(repo),
703 )
704 assert result1.exit_code == 0, result1.output
705
706 result2 = runner.invoke(
707 cli, ["plumbing", "unpack-objects"],
708 input=bundle_json,
709 env=_repo_env(repo),
710 )
711 assert result2.exit_code == 0, result2.output
712 data = json.loads(result2.stdout)
713 assert data["objects_written"] == 0
714 assert data["objects_skipped"] == 1
715
716
717 # ---------------------------------------------------------------------------
718 # ls-remote (moved to plumbing namespace)
719 # ---------------------------------------------------------------------------
720
721
722 class TestLsRemote:
723 def test_bare_url_transport_error(self) -> None:
724 """Bare URL to a non-existent server produces exit code INTERNAL_ERROR."""
725 result = runner.invoke(
726 cli,
727 ["plumbing", "ls-remote", "https://localhost:0/no-such-server"],
728 )
729 assert result.exit_code == ExitCode.INTERNAL_ERROR
730
731 def test_non_url_non_remote_errors(self, tmp_path: pathlib.Path) -> None:
732 """A non-URL, non-configured remote name exits with code USER_ERROR."""
733 repo = _init_repo(tmp_path)
734 result = runner.invoke(
735 cli,
736 ["plumbing", "ls-remote", "not-a-url-or-remote"],
737 env=_repo_env(repo),
738 )
739 assert result.exit_code == ExitCode.USER_ERROR