gabriel / muse public
test_cmd_merge.py python
154 lines 6.3 KB
95b86799 feat: add --format json to all porcelain commands for agent-first output Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Comprehensive tests for ``muse merge``.
2
3 Covers:
4 - E2E: merge fast-forward, merge with conflicts, --format json
5 - Integration: HEAD updated after merge, conflict state written
6 - Stress: merge with many files
7 """
8
9 from __future__ import annotations
10
11 import datetime
12 import json
13 import pathlib
14 import uuid
15
16 import pytest
17 from typer.testing import CliRunner
18
19 from muse.cli.app import cli
20
21 runner = CliRunner()
22
23
24 # ---------------------------------------------------------------------------
25 # Shared helpers
26 # ---------------------------------------------------------------------------
27
28 def _env(root: pathlib.Path) -> dict[str, str]:
29 return {"MUSE_REPO_ROOT": str(root)}
30
31
32 def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
33 muse_dir = tmp_path / ".muse"
34 muse_dir.mkdir()
35 repo_id = str(uuid.uuid4())
36 (muse_dir / "repo.json").write_text(json.dumps({
37 "repo_id": repo_id,
38 "domain": "midi",
39 "default_branch": "main",
40 "created_at": "2025-01-01T00:00:00+00:00",
41 }), encoding="utf-8")
42 (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
43 (muse_dir / "refs" / "heads").mkdir(parents=True)
44 (muse_dir / "snapshots").mkdir()
45 (muse_dir / "commits").mkdir()
46 (muse_dir / "objects").mkdir()
47 return tmp_path, repo_id
48
49
50 def _make_commit(root: pathlib.Path, repo_id: str, branch: str = "main",
51 message: str = "test",
52 manifest: dict[str, str] | None = None) -> str:
53 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
54 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
55
56 ref_file = root / ".muse" / "refs" / "heads" / branch
57 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
58 m = manifest or {}
59 snap_id = compute_snapshot_id(m)
60 committed_at = datetime.datetime.now(datetime.timezone.utc)
61 commit_id = compute_commit_id(
62 parent_ids=[parent_id] if parent_id else [],
63 snapshot_id=snap_id, message=message,
64 committed_at_iso=committed_at.isoformat(),
65 )
66 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m))
67 write_commit(root, CommitRecord(
68 commit_id=commit_id, repo_id=repo_id, branch=branch,
69 snapshot_id=snap_id, message=message, committed_at=committed_at,
70 parent_commit_id=parent_id,
71 ))
72 ref_file.parent.mkdir(parents=True, exist_ok=True)
73 ref_file.write_text(commit_id, encoding="utf-8")
74 return commit_id
75
76
77 def _write_object(root: pathlib.Path, content: bytes) -> str:
78 import hashlib
79 obj_id = hashlib.sha256(content).hexdigest()
80 obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:]
81 obj_path.parent.mkdir(parents=True, exist_ok=True)
82 obj_path.write_bytes(content)
83 return obj_id
84
85
86 # ---------------------------------------------------------------------------
87 # Tests
88 # ---------------------------------------------------------------------------
89
90 class TestMergeCLI:
91 def test_merge_branch_into_main(self, tmp_path: pathlib.Path) -> None:
92 root, repo_id = _init_repo(tmp_path)
93 base_id = _make_commit(root, repo_id, branch="main", message="base")
94 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
95 obj = _write_object(root, b"feature content")
96 _make_commit(root, repo_id, branch="feature", message="feature work",
97 manifest={"new_track.mid": obj})
98 result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False)
99 assert result.exit_code == 0
100
101 def test_merge_nonexistent_branch_fails(self, tmp_path: pathlib.Path) -> None:
102 root, repo_id = _init_repo(tmp_path)
103 _make_commit(root, repo_id)
104 result = runner.invoke(cli, ["merge", "does-not-exist"], env=_env(root))
105 assert result.exit_code != 0
106
107 def test_merge_format_json(self, tmp_path: pathlib.Path) -> None:
108 root, repo_id = _init_repo(tmp_path)
109 base_id = _make_commit(root, repo_id, branch="main", message="base")
110 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
111 _make_commit(root, repo_id, branch="feature", message="feat")
112 result = runner.invoke(
113 cli, ["merge", "--format", "json", "feature"], env=_env(root), catch_exceptions=False
114 )
115 assert result.exit_code == 0
116 data = json.loads(result.output)
117 assert isinstance(data, dict)
118
119 def test_merge_message_flag(self, tmp_path: pathlib.Path) -> None:
120 root, repo_id = _init_repo(tmp_path)
121 base_id = _make_commit(root, repo_id, branch="main", message="base")
122 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
123 _make_commit(root, repo_id, branch="feature", message="feat")
124 result = runner.invoke(
125 cli, ["merge", "--message", "Merge feature", "feature"],
126 env=_env(root), catch_exceptions=False
127 )
128 assert result.exit_code == 0
129
130 def test_merge_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None:
131 root, repo_id = _init_repo(tmp_path)
132 _make_commit(root, repo_id)
133 result = runner.invoke(cli, ["merge", "../evil"], env=_env(root))
134 assert result.exit_code != 0
135
136 def test_merge_output_sanitized(self, tmp_path: pathlib.Path) -> None:
137 root, repo_id = _init_repo(tmp_path)
138 base_id = _make_commit(root, repo_id, branch="main", message="base")
139 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
140 _make_commit(root, repo_id, branch="feature", message="feat")
141 result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False)
142 assert "\x1b" not in result.output
143
144
145 class TestMergeStress:
146 def test_merge_feature_with_many_files(self, tmp_path: pathlib.Path) -> None:
147 root, repo_id = _init_repo(tmp_path)
148 base_id = _make_commit(root, repo_id, branch="main", message="base")
149 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
150 manifest = {f"track_{i:03d}.mid": _write_object(root, f"data {i}".encode())
151 for i in range(30)}
152 _make_commit(root, repo_id, branch="feature", message="many files", manifest=manifest)
153 result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False)
154 assert result.exit_code == 0