gabriel / muse public
test_plumbing_commit_graph_enhancements.py python
224 lines 7.8 KB
86000da9 fix: replace typer CliRunner with argparse-compatible test helper Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Tests for the new commit-graph flags: --count, --first-parent, --ancestry-path."""
2
3 from __future__ import annotations
4
5 import datetime
6 import hashlib
7 import json
8 import pathlib
9
10 import pytest
11 from tests.cli_test_helper import CliRunner
12
13 cli = None # argparse migration — CliRunner ignores this arg
14 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
15
16 runner = CliRunner()
17
18
19 def _sha(tag: str) -> str:
20 return hashlib.sha256(tag.encode()).hexdigest()
21
22
23 def _init_repo(path: pathlib.Path) -> pathlib.Path:
24 muse = path / ".muse"
25 (muse / "commits").mkdir(parents=True)
26 (muse / "snapshots").mkdir(parents=True)
27 (muse / "objects").mkdir(parents=True)
28 (muse / "refs" / "heads").mkdir(parents=True)
29 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
30 (muse / "repo.json").write_text(
31 json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8"
32 )
33 return path
34
35
36 def _env(repo: pathlib.Path) -> dict[str, str]:
37 return {"MUSE_REPO_ROOT": str(repo)}
38
39
40 def _snap(repo: pathlib.Path, tag: str) -> str:
41 sid = _sha(tag + ":snap")
42 write_snapshot(
43 repo,
44 SnapshotRecord(
45 snapshot_id=sid,
46 manifest={},
47 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
48 ),
49 )
50 return sid
51
52
53 def _commit(
54 repo: pathlib.Path,
55 tag: str,
56 branch: str = "main",
57 parent: str | None = None,
58 parent2: str | None = None,
59 ) -> str:
60 sid = _snap(repo, tag)
61 cid = _sha(tag)
62 write_commit(
63 repo,
64 CommitRecord(
65 commit_id=cid,
66 repo_id="test-repo",
67 branch=branch,
68 snapshot_id=sid,
69 message=tag,
70 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
71 author="tester",
72 parent_commit_id=parent,
73 parent2_commit_id=parent2,
74 ),
75 )
76 ref_path = repo / ".muse" / "refs" / "heads" / branch
77 ref_path.parent.mkdir(parents=True, exist_ok=True)
78 ref_path.write_text(cid, encoding="utf-8")
79 return cid
80
81
82 class TestCommitGraphCount:
83 def test_count_returns_integer(self, tmp_path: pathlib.Path) -> None:
84 _init_repo(tmp_path)
85 c1 = _commit(tmp_path, "a", parent=None)
86 _commit(tmp_path, "b", parent=c1)
87 result = runner.invoke(cli, ["plumbing", "commit-graph", "--count"], env=_env(tmp_path))
88 assert result.exit_code == 0
89 data = json.loads(result.output)
90 assert "count" in data
91 assert data["count"] == 2
92 assert "commits" not in data # full node list suppressed
93
94 def test_count_no_commits_returns_error(self, tmp_path: pathlib.Path) -> None:
95 _init_repo(tmp_path)
96 result = runner.invoke(cli, ["plumbing", "commit-graph", "--count"], env=_env(tmp_path))
97 assert result.exit_code != 0
98
99 def test_count_with_stop_at(self, tmp_path: pathlib.Path) -> None:
100 _init_repo(tmp_path)
101 base = _commit(tmp_path, "base", parent=None)
102 _commit(tmp_path, "feature", parent=base)
103 result = runner.invoke(
104 cli, ["plumbing", "commit-graph", "--stop-at", base, "--count"], env=_env(tmp_path)
105 )
106 assert result.exit_code == 0
107 data = json.loads(result.output)
108 assert data["count"] == 1 # only "feature", base excluded
109
110 def test_count_short_flag(self, tmp_path: pathlib.Path) -> None:
111 _init_repo(tmp_path)
112 _commit(tmp_path, "a")
113 result = runner.invoke(cli, ["plumbing", "commit-graph", "-c"], env=_env(tmp_path))
114 assert result.exit_code == 0
115 data = json.loads(result.output)
116 assert "count" in data
117
118
119 class TestCommitGraphFirstParent:
120 def test_first_parent_only(self, tmp_path: pathlib.Path) -> None:
121 _init_repo(tmp_path)
122 c1 = _commit(tmp_path, "c1", parent=None)
123 c2 = _commit(tmp_path, "c2", parent=c1)
124 result = runner.invoke(
125 cli, ["plumbing", "commit-graph", "--first-parent", "--count"], env=_env(tmp_path)
126 )
127 assert result.exit_code == 0
128 data = json.loads(result.output)
129 assert data["count"] == 2
130
131 def test_first_parent_excludes_merge_parent(self, tmp_path: pathlib.Path) -> None:
132 """With --first-parent, second parents of merges are not followed."""
133 _init_repo(tmp_path)
134 c1 = _commit(tmp_path, "c1", parent=None)
135 c2 = _commit(tmp_path, "branch_tip", "feat", parent=c1)
136 # merge commit with c1 as first parent, c2 as second parent
137 _commit(tmp_path, "merge", parent=c1, parent2=c2)
138 result = runner.invoke(
139 cli, ["plumbing", "commit-graph", "--first-parent", "--count"], env=_env(tmp_path)
140 )
141 assert result.exit_code == 0
142 data = json.loads(result.output)
143 # Should NOT follow c2 branch; only main chain: merge → c1
144 assert data["count"] == 2 # merge + c1
145
146 def test_first_parent_short_flag(self, tmp_path: pathlib.Path) -> None:
147 _init_repo(tmp_path)
148 _commit(tmp_path, "c1")
149 result = runner.invoke(
150 cli, ["plumbing", "commit-graph", "-1", "--count"], env=_env(tmp_path)
151 )
152 assert result.exit_code == 0
153
154
155 class TestCommitGraphAncestryPath:
156 def test_ancestry_path_requires_stop_at(self, tmp_path: pathlib.Path) -> None:
157 _init_repo(tmp_path)
158 _commit(tmp_path, "c1")
159 result = runner.invoke(
160 cli, ["plumbing", "commit-graph", "--ancestry-path"], env=_env(tmp_path)
161 )
162 assert result.exit_code != 0
163 data = json.loads(result.output)
164 assert "error" in data
165
166 def test_ancestry_path_with_stop_at_runs(self, tmp_path: pathlib.Path) -> None:
167 _init_repo(tmp_path)
168 base = _commit(tmp_path, "base", parent=None)
169 _commit(tmp_path, "feature", parent=base)
170 result = runner.invoke(
171 cli,
172 ["plumbing", "commit-graph", "--stop-at", base, "--ancestry-path"],
173 env=_env(tmp_path),
174 )
175 assert result.exit_code == 0
176 data = json.loads(result.output)
177 assert "commits" in data
178
179 def test_ancestry_path_short_flag(self, tmp_path: pathlib.Path) -> None:
180 _init_repo(tmp_path)
181 base = _commit(tmp_path, "base", parent=None)
182 _commit(tmp_path, "next", parent=base)
183 result = runner.invoke(
184 cli,
185 ["plumbing", "commit-graph", "--stop-at", base, "-a"],
186 env=_env(tmp_path),
187 )
188 assert result.exit_code == 0
189
190
191 class TestCommitGraphCombined:
192 def test_first_parent_and_count(self, tmp_path: pathlib.Path) -> None:
193 _init_repo(tmp_path)
194 c1 = _commit(tmp_path, "c1", parent=None)
195 _commit(tmp_path, "c2", parent=c1)
196 result = runner.invoke(
197 cli, ["plumbing", "commit-graph", "--first-parent", "--count"], env=_env(tmp_path)
198 )
199 assert result.exit_code == 0
200 data = json.loads(result.output)
201 assert data["count"] == 2
202
203 def test_count_always_emits_json(self, tmp_path: pathlib.Path) -> None:
204 """--count emits JSON even when --format text is specified."""
205 _init_repo(tmp_path)
206 _commit(tmp_path, "c1")
207 result = runner.invoke(
208 cli,
209 ["plumbing", "commit-graph", "--count", "--format", "text"],
210 env=_env(tmp_path),
211 )
212 assert result.exit_code == 0
213 data = json.loads(result.output)
214 assert "count" in data
215
216 def test_invalid_format_errors(self, tmp_path: pathlib.Path) -> None:
217 _init_repo(tmp_path)
218 _commit(tmp_path, "c1")
219 result = runner.invoke(
220 cli, ["plumbing", "commit-graph", "--format", "csv"], env=_env(tmp_path)
221 )
222 assert result.exit_code != 0
223 data = json.loads(result.output)
224 assert "error" in data