test_cmd_merge.py
python
| 1 | """Comprehensive tests for ``muse merge``. |
| 2 | |
| 3 | Covers: |
| 4 | - E2E: merge fast-forward, merge with conflicts, --format json |
| 5 | - Integration: HEAD updated after merge, conflict state written |
| 6 | - Stress: merge with many files |
| 7 | """ |
| 8 | |
| 9 | from __future__ import annotations |
| 10 | |
| 11 | import datetime |
| 12 | import json |
| 13 | import pathlib |
| 14 | import uuid |
| 15 | |
| 16 | import pytest |
| 17 | from typer.testing import CliRunner |
| 18 | |
| 19 | from muse.cli.app import cli |
| 20 | |
| 21 | runner = CliRunner() |
| 22 | |
| 23 | |
| 24 | # --------------------------------------------------------------------------- |
| 25 | # Shared helpers |
| 26 | # --------------------------------------------------------------------------- |
| 27 | |
| 28 | def _env(root: pathlib.Path) -> dict[str, str]: |
| 29 | return {"MUSE_REPO_ROOT": str(root)} |
| 30 | |
| 31 | |
| 32 | def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 33 | muse_dir = tmp_path / ".muse" |
| 34 | muse_dir.mkdir() |
| 35 | repo_id = str(uuid.uuid4()) |
| 36 | (muse_dir / "repo.json").write_text(json.dumps({ |
| 37 | "repo_id": repo_id, |
| 38 | "domain": "midi", |
| 39 | "default_branch": "main", |
| 40 | "created_at": "2025-01-01T00:00:00+00:00", |
| 41 | }), encoding="utf-8") |
| 42 | (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 43 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 44 | (muse_dir / "snapshots").mkdir() |
| 45 | (muse_dir / "commits").mkdir() |
| 46 | (muse_dir / "objects").mkdir() |
| 47 | return tmp_path, repo_id |
| 48 | |
| 49 | |
| 50 | def _make_commit(root: pathlib.Path, repo_id: str, branch: str = "main", |
| 51 | message: str = "test", |
| 52 | manifest: dict[str, str] | None = None) -> str: |
| 53 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 54 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 55 | |
| 56 | ref_file = root / ".muse" / "refs" / "heads" / branch |
| 57 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 58 | m = manifest or {} |
| 59 | snap_id = compute_snapshot_id(m) |
| 60 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 61 | commit_id = compute_commit_id( |
| 62 | parent_ids=[parent_id] if parent_id else [], |
| 63 | snapshot_id=snap_id, message=message, |
| 64 | committed_at_iso=committed_at.isoformat(), |
| 65 | ) |
| 66 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) |
| 67 | write_commit(root, CommitRecord( |
| 68 | commit_id=commit_id, repo_id=repo_id, branch=branch, |
| 69 | snapshot_id=snap_id, message=message, committed_at=committed_at, |
| 70 | parent_commit_id=parent_id, |
| 71 | )) |
| 72 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 73 | ref_file.write_text(commit_id, encoding="utf-8") |
| 74 | return commit_id |
| 75 | |
| 76 | |
| 77 | def _write_object(root: pathlib.Path, content: bytes) -> str: |
| 78 | import hashlib |
| 79 | obj_id = hashlib.sha256(content).hexdigest() |
| 80 | obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] |
| 81 | obj_path.parent.mkdir(parents=True, exist_ok=True) |
| 82 | obj_path.write_bytes(content) |
| 83 | return obj_id |
| 84 | |
| 85 | |
| 86 | # --------------------------------------------------------------------------- |
| 87 | # Tests |
| 88 | # --------------------------------------------------------------------------- |
| 89 | |
| 90 | class TestMergeCLI: |
| 91 | def test_merge_branch_into_main(self, tmp_path: pathlib.Path) -> None: |
| 92 | root, repo_id = _init_repo(tmp_path) |
| 93 | base_id = _make_commit(root, repo_id, branch="main", message="base") |
| 94 | (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) |
| 95 | obj = _write_object(root, b"feature content") |
| 96 | _make_commit(root, repo_id, branch="feature", message="feature work", |
| 97 | manifest={"new_track.mid": obj}) |
| 98 | result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False) |
| 99 | assert result.exit_code == 0 |
| 100 | |
| 101 | def test_merge_nonexistent_branch_fails(self, tmp_path: pathlib.Path) -> None: |
| 102 | root, repo_id = _init_repo(tmp_path) |
| 103 | _make_commit(root, repo_id) |
| 104 | result = runner.invoke(cli, ["merge", "does-not-exist"], env=_env(root)) |
| 105 | assert result.exit_code != 0 |
| 106 | |
| 107 | def test_merge_format_json(self, tmp_path: pathlib.Path) -> None: |
| 108 | root, repo_id = _init_repo(tmp_path) |
| 109 | base_id = _make_commit(root, repo_id, branch="main", message="base") |
| 110 | (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) |
| 111 | _make_commit(root, repo_id, branch="feature", message="feat") |
| 112 | result = runner.invoke( |
| 113 | cli, ["merge", "--format", "json", "feature"], env=_env(root), catch_exceptions=False |
| 114 | ) |
| 115 | assert result.exit_code == 0 |
| 116 | data = json.loads(result.output) |
| 117 | assert isinstance(data, dict) |
| 118 | |
| 119 | def test_merge_message_flag(self, tmp_path: pathlib.Path) -> None: |
| 120 | root, repo_id = _init_repo(tmp_path) |
| 121 | base_id = _make_commit(root, repo_id, branch="main", message="base") |
| 122 | (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) |
| 123 | _make_commit(root, repo_id, branch="feature", message="feat") |
| 124 | result = runner.invoke( |
| 125 | cli, ["merge", "--message", "Merge feature", "feature"], |
| 126 | env=_env(root), catch_exceptions=False |
| 127 | ) |
| 128 | assert result.exit_code == 0 |
| 129 | |
| 130 | def test_merge_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 131 | root, repo_id = _init_repo(tmp_path) |
| 132 | _make_commit(root, repo_id) |
| 133 | result = runner.invoke(cli, ["merge", "../evil"], env=_env(root)) |
| 134 | assert result.exit_code != 0 |
| 135 | |
| 136 | def test_merge_output_sanitized(self, tmp_path: pathlib.Path) -> None: |
| 137 | root, repo_id = _init_repo(tmp_path) |
| 138 | base_id = _make_commit(root, repo_id, branch="main", message="base") |
| 139 | (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) |
| 140 | _make_commit(root, repo_id, branch="feature", message="feat") |
| 141 | result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False) |
| 142 | assert "\x1b" not in result.output |
| 143 | |
| 144 | |
| 145 | class TestMergeStress: |
| 146 | def test_merge_feature_with_many_files(self, tmp_path: pathlib.Path) -> None: |
| 147 | root, repo_id = _init_repo(tmp_path) |
| 148 | base_id = _make_commit(root, repo_id, branch="main", message="base") |
| 149 | (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) |
| 150 | manifest = {f"track_{i:03d}.mid": _write_object(root, f"data {i}".encode()) |
| 151 | for i in range(30)} |
| 152 | _make_commit(root, repo_id, branch="feature", message="many files", manifest=manifest) |
| 153 | result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False) |
| 154 | assert result.exit_code == 0 |