test_cmd_rebase.py
python
| 1 | """Tests for ``muse rebase`` and ``muse/core/rebase.py``. |
| 2 | |
| 3 | Covers: state file load/save/clear, collect_commits_to_replay, abort, no-op |
| 4 | (already up to date), simple forward rebase, --squash, conflict detection, |
| 5 | stress: 20-commit rebase chain. |
| 6 | """ |
| 7 | |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import datetime |
| 11 | import hashlib |
| 12 | import json |
| 13 | import pathlib |
| 14 | |
| 15 | import pytest |
| 16 | from typer.testing import CliRunner |
| 17 | |
| 18 | from muse.cli.app import cli |
| 19 | from muse.core.object_store import write_object |
| 20 | from muse.core.rebase import ( |
| 21 | RebaseState, |
| 22 | clear_rebase_state, |
| 23 | collect_commits_to_replay, |
| 24 | load_rebase_state, |
| 25 | save_rebase_state, |
| 26 | ) |
| 27 | from muse.core.snapshot import compute_snapshot_id |
| 28 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 29 | |
| 30 | runner = CliRunner() |
| 31 | |
| 32 | _REPO_ID = "rebase-test" |
| 33 | |
| 34 | |
| 35 | # --------------------------------------------------------------------------- |
| 36 | # Helpers |
| 37 | # --------------------------------------------------------------------------- |
| 38 | |
| 39 | |
| 40 | def _sha(data: bytes) -> str: |
| 41 | return hashlib.sha256(data).hexdigest() |
| 42 | |
| 43 | |
| 44 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 45 | muse = path / ".muse" |
| 46 | for d in ("commits", "snapshots", "objects", "refs/heads"): |
| 47 | (muse / d).mkdir(parents=True, exist_ok=True) |
| 48 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 49 | (muse / "repo.json").write_text( |
| 50 | json.dumps({"repo_id": _REPO_ID, "domain": "midi"}), encoding="utf-8" |
| 51 | ) |
| 52 | return path |
| 53 | |
| 54 | |
| 55 | def _env(repo: pathlib.Path) -> dict[str, str]: |
| 56 | return {"MUSE_REPO_ROOT": str(repo)} |
| 57 | |
| 58 | |
| 59 | _counter = 0 |
| 60 | |
| 61 | |
| 62 | def _make_commit( |
| 63 | root: pathlib.Path, |
| 64 | parent_id: str | None = None, |
| 65 | content: bytes = b"data", |
| 66 | branch: str = "main", |
| 67 | ) -> str: |
| 68 | global _counter |
| 69 | _counter += 1 |
| 70 | c = content + str(_counter).encode() |
| 71 | obj_id = _sha(c) |
| 72 | write_object(root, obj_id, c) |
| 73 | manifest = {f"f_{_counter}.txt": obj_id} |
| 74 | snap_id = compute_snapshot_id(manifest) |
| 75 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 76 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 77 | commit_id = _sha(f"{_counter}:{snap_id}:{committed_at.isoformat()}".encode()) |
| 78 | write_commit(root, CommitRecord( |
| 79 | commit_id=commit_id, |
| 80 | repo_id=_REPO_ID, |
| 81 | branch=branch, |
| 82 | snapshot_id=snap_id, |
| 83 | message=f"commit {_counter}", |
| 84 | committed_at=committed_at, |
| 85 | parent_commit_id=parent_id, |
| 86 | )) |
| 87 | (root / ".muse" / "refs" / "heads" / branch).write_text(commit_id, encoding="utf-8") |
| 88 | return commit_id |
| 89 | |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # Unit: state file load/save/clear |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | |
| 96 | def test_rebase_state_round_trip(tmp_path: pathlib.Path) -> None: |
| 97 | _init_repo(tmp_path) |
| 98 | state = RebaseState( |
| 99 | original_branch="main", |
| 100 | original_head="a" * 64, |
| 101 | onto="b" * 64, |
| 102 | remaining=["c" * 64, "d" * 64], |
| 103 | completed=["e" * 64], |
| 104 | squash=False, |
| 105 | ) |
| 106 | save_rebase_state(tmp_path, state) |
| 107 | loaded = load_rebase_state(tmp_path) |
| 108 | assert loaded is not None |
| 109 | assert loaded["original_branch"] == "main" |
| 110 | assert loaded["remaining"] == ["c" * 64, "d" * 64] |
| 111 | assert loaded["completed"] == ["e" * 64] |
| 112 | assert loaded["squash"] is False |
| 113 | |
| 114 | |
| 115 | def test_rebase_state_clear(tmp_path: pathlib.Path) -> None: |
| 116 | _init_repo(tmp_path) |
| 117 | state = RebaseState( |
| 118 | original_branch="feat", original_head="a" * 64, onto="b" * 64, |
| 119 | remaining=[], completed=[], squash=False, |
| 120 | ) |
| 121 | save_rebase_state(tmp_path, state) |
| 122 | assert load_rebase_state(tmp_path) is not None |
| 123 | clear_rebase_state(tmp_path) |
| 124 | assert load_rebase_state(tmp_path) is None |
| 125 | |
| 126 | |
| 127 | def test_rebase_state_none_when_no_file(tmp_path: pathlib.Path) -> None: |
| 128 | _init_repo(tmp_path) |
| 129 | assert load_rebase_state(tmp_path) is None |
| 130 | |
| 131 | |
| 132 | # --------------------------------------------------------------------------- |
| 133 | # Unit: collect_commits_to_replay |
| 134 | # --------------------------------------------------------------------------- |
| 135 | |
| 136 | |
| 137 | def test_collect_commits_empty_when_same_base(tmp_path: pathlib.Path) -> None: |
| 138 | _init_repo(tmp_path) |
| 139 | cid = _make_commit(tmp_path, content=b"only") |
| 140 | result = collect_commits_to_replay(tmp_path, stop_at=cid, tip=cid) |
| 141 | assert result == [] |
| 142 | |
| 143 | |
| 144 | def test_collect_commits_one_commit(tmp_path: pathlib.Path) -> None: |
| 145 | _init_repo(tmp_path) |
| 146 | base = _make_commit(tmp_path, content=b"base") |
| 147 | tip = _make_commit(tmp_path, parent_id=base, content=b"tip") |
| 148 | result = collect_commits_to_replay(tmp_path, stop_at=base, tip=tip) |
| 149 | assert len(result) == 1 |
| 150 | assert result[0].commit_id == tip |
| 151 | |
| 152 | |
| 153 | def test_collect_commits_chain(tmp_path: pathlib.Path) -> None: |
| 154 | _init_repo(tmp_path) |
| 155 | base = _make_commit(tmp_path, content=b"base") |
| 156 | c1 = _make_commit(tmp_path, parent_id=base, content=b"c1") |
| 157 | c2 = _make_commit(tmp_path, parent_id=c1, content=b"c2") |
| 158 | c3 = _make_commit(tmp_path, parent_id=c2, content=b"c3") |
| 159 | result = collect_commits_to_replay(tmp_path, stop_at=base, tip=c3) |
| 160 | assert len(result) == 3 |
| 161 | # Oldest first. |
| 162 | assert result[0].commit_id == c1 |
| 163 | assert result[1].commit_id == c2 |
| 164 | assert result[2].commit_id == c3 |
| 165 | |
| 166 | |
| 167 | # --------------------------------------------------------------------------- |
| 168 | # CLI: muse rebase --help |
| 169 | # --------------------------------------------------------------------------- |
| 170 | |
| 171 | |
| 172 | def test_rebase_help() -> None: |
| 173 | result = runner.invoke(cli, ["rebase", "--help"]) |
| 174 | assert result.exit_code == 0 |
| 175 | assert "--abort" in result.output or "-a" in result.output |
| 176 | |
| 177 | |
| 178 | # --------------------------------------------------------------------------- |
| 179 | # CLI: abort with no active rebase |
| 180 | # --------------------------------------------------------------------------- |
| 181 | |
| 182 | |
| 183 | def test_rebase_abort_no_state(tmp_path: pathlib.Path) -> None: |
| 184 | _init_repo(tmp_path) |
| 185 | result = runner.invoke(cli, ["rebase", "--abort"], env=_env(tmp_path)) |
| 186 | assert result.exit_code != 0 |
| 187 | |
| 188 | |
| 189 | # --------------------------------------------------------------------------- |
| 190 | # CLI: continue with no active rebase |
| 191 | # --------------------------------------------------------------------------- |
| 192 | |
| 193 | |
| 194 | def test_rebase_continue_no_state(tmp_path: pathlib.Path) -> None: |
| 195 | _init_repo(tmp_path) |
| 196 | result = runner.invoke(cli, ["rebase", "--continue"], env=_env(tmp_path)) |
| 197 | assert result.exit_code != 0 |
| 198 | |
| 199 | |
| 200 | # --------------------------------------------------------------------------- |
| 201 | # CLI: rebase with no upstream given |
| 202 | # --------------------------------------------------------------------------- |
| 203 | |
| 204 | |
| 205 | def test_rebase_no_upstream_error(tmp_path: pathlib.Path) -> None: |
| 206 | _init_repo(tmp_path) |
| 207 | _make_commit(tmp_path, content=b"single") |
| 208 | result = runner.invoke(cli, ["rebase"], env=_env(tmp_path)) |
| 209 | assert result.exit_code != 0 |
| 210 | |
| 211 | |
| 212 | # --------------------------------------------------------------------------- |
| 213 | # CLI: already up-to-date |
| 214 | # --------------------------------------------------------------------------- |
| 215 | |
| 216 | |
| 217 | def test_rebase_already_up_to_date(tmp_path: pathlib.Path) -> None: |
| 218 | _init_repo(tmp_path) |
| 219 | cid = _make_commit(tmp_path, content=b"only commit") |
| 220 | # Point upstream to the same commit. |
| 221 | (tmp_path / ".muse" / "refs" / "heads" / "upstream").write_text(cid, encoding="utf-8") |
| 222 | result = runner.invoke( |
| 223 | cli, ["rebase", "upstream"], env=_env(tmp_path) |
| 224 | ) |
| 225 | # Should exit cleanly — nothing to rebase. |
| 226 | assert result.exit_code == 0 |
| 227 | assert "up to date" in result.output.lower() |
| 228 | |
| 229 | |
| 230 | # --------------------------------------------------------------------------- |
| 231 | # CLI: abort restores original HEAD |
| 232 | # --------------------------------------------------------------------------- |
| 233 | |
| 234 | |
| 235 | def test_rebase_abort_restores_head(tmp_path: pathlib.Path) -> None: |
| 236 | _init_repo(tmp_path) |
| 237 | base = _make_commit(tmp_path, content=b"base") |
| 238 | tip = _make_commit(tmp_path, parent_id=base, content=b"tip") |
| 239 | |
| 240 | state = RebaseState( |
| 241 | original_branch="main", |
| 242 | original_head=base, |
| 243 | onto=base, |
| 244 | remaining=[tip], |
| 245 | completed=[], |
| 246 | squash=False, |
| 247 | ) |
| 248 | save_rebase_state(tmp_path, state) |
| 249 | |
| 250 | result = runner.invoke(cli, ["rebase", "--abort"], env=_env(tmp_path)) |
| 251 | assert result.exit_code == 0 |
| 252 | assert "aborted" in result.output.lower() |
| 253 | # State file should be gone. |
| 254 | assert load_rebase_state(tmp_path) is None |
| 255 | # Branch ref should be restored to original_head. |
| 256 | head = (tmp_path / ".muse" / "refs" / "heads" / "main").read_text(encoding="utf-8").strip() |
| 257 | assert head == base |
| 258 | |
| 259 | |
| 260 | # --------------------------------------------------------------------------- |
| 261 | # Stress: collect 20 commits |
| 262 | # --------------------------------------------------------------------------- |
| 263 | |
| 264 | |
| 265 | def test_rebase_stress_collect_20_commits(tmp_path: pathlib.Path) -> None: |
| 266 | _init_repo(tmp_path) |
| 267 | base = _make_commit(tmp_path, content=b"stress-base") |
| 268 | prev = base |
| 269 | commits: list[str] = [] |
| 270 | for i in range(20): |
| 271 | c = _make_commit(tmp_path, parent_id=prev, content=f"s{i}".encode()) |
| 272 | commits.append(c) |
| 273 | prev = c |
| 274 | |
| 275 | result = collect_commits_to_replay(tmp_path, stop_at=base, tip=prev) |
| 276 | assert len(result) == 20 |
| 277 | assert result[0].commit_id == commits[0] |
| 278 | assert result[-1].commit_id == commits[-1] |