gabriel / muse public
test_cmd_merge.py python
154 lines 6.3 KB
86000da9 fix: replace typer CliRunner with argparse-compatible test helper Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Comprehensive tests for ``muse merge``.
2
3 Covers:
4 - E2E: merge fast-forward, merge with conflicts, --format json
5 - Integration: HEAD updated after merge, conflict state written
6 - Stress: merge with many files
7 """
8
9 from __future__ import annotations
10
11 import datetime
12 import json
13 import pathlib
14 import uuid
15
16 import pytest
17 from tests.cli_test_helper import CliRunner
18
19 cli = None # argparse migration — CliRunner ignores this arg
20
21 runner = CliRunner()
22
23
24 # ---------------------------------------------------------------------------
25 # Shared helpers
26 # ---------------------------------------------------------------------------
27
28 def _env(root: pathlib.Path) -> dict[str, str]:
29 return {"MUSE_REPO_ROOT": str(root)}
30
31
32 def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
33 muse_dir = tmp_path / ".muse"
34 muse_dir.mkdir()
35 repo_id = str(uuid.uuid4())
36 (muse_dir / "repo.json").write_text(json.dumps({
37 "repo_id": repo_id,
38 "domain": "midi",
39 "default_branch": "main",
40 "created_at": "2025-01-01T00:00:00+00:00",
41 }), encoding="utf-8")
42 (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
43 (muse_dir / "refs" / "heads").mkdir(parents=True)
44 (muse_dir / "snapshots").mkdir()
45 (muse_dir / "commits").mkdir()
46 (muse_dir / "objects").mkdir()
47 return tmp_path, repo_id
48
49
50 def _make_commit(root: pathlib.Path, repo_id: str, branch: str = "main",
51 message: str = "test",
52 manifest: dict[str, str] | None = None) -> str:
53 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
54 from muse.core.snapshot import compute_snapshot_id, compute_commit_id
55
56 ref_file = root / ".muse" / "refs" / "heads" / branch
57 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
58 m = manifest or {}
59 snap_id = compute_snapshot_id(m)
60 committed_at = datetime.datetime.now(datetime.timezone.utc)
61 commit_id = compute_commit_id(
62 parent_ids=[parent_id] if parent_id else [],
63 snapshot_id=snap_id, message=message,
64 committed_at_iso=committed_at.isoformat(),
65 )
66 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m))
67 write_commit(root, CommitRecord(
68 commit_id=commit_id, repo_id=repo_id, branch=branch,
69 snapshot_id=snap_id, message=message, committed_at=committed_at,
70 parent_commit_id=parent_id,
71 ))
72 ref_file.parent.mkdir(parents=True, exist_ok=True)
73 ref_file.write_text(commit_id, encoding="utf-8")
74 return commit_id
75
76
77 def _write_object(root: pathlib.Path, content: bytes) -> str:
78 import hashlib
79 obj_id = hashlib.sha256(content).hexdigest()
80 obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:]
81 obj_path.parent.mkdir(parents=True, exist_ok=True)
82 obj_path.write_bytes(content)
83 return obj_id
84
85
86 # ---------------------------------------------------------------------------
87 # Tests
88 # ---------------------------------------------------------------------------
89
90 class TestMergeCLI:
91 def test_merge_branch_into_main(self, tmp_path: pathlib.Path) -> None:
92 root, repo_id = _init_repo(tmp_path)
93 base_id = _make_commit(root, repo_id, branch="main", message="base")
94 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
95 obj = _write_object(root, b"feature content")
96 _make_commit(root, repo_id, branch="feature", message="feature work",
97 manifest={"new_track.mid": obj})
98 result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False)
99 assert result.exit_code == 0
100
101 def test_merge_nonexistent_branch_fails(self, tmp_path: pathlib.Path) -> None:
102 root, repo_id = _init_repo(tmp_path)
103 _make_commit(root, repo_id)
104 result = runner.invoke(cli, ["merge", "does-not-exist"], env=_env(root))
105 assert result.exit_code != 0
106
107 def test_merge_format_json(self, tmp_path: pathlib.Path) -> None:
108 root, repo_id = _init_repo(tmp_path)
109 base_id = _make_commit(root, repo_id, branch="main", message="base")
110 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
111 _make_commit(root, repo_id, branch="feature", message="feat")
112 result = runner.invoke(
113 cli, ["merge", "--format", "json", "feature"], env=_env(root), catch_exceptions=False
114 )
115 assert result.exit_code == 0
116 data = json.loads(result.output)
117 assert isinstance(data, dict)
118
119 def test_merge_message_flag(self, tmp_path: pathlib.Path) -> None:
120 root, repo_id = _init_repo(tmp_path)
121 base_id = _make_commit(root, repo_id, branch="main", message="base")
122 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
123 _make_commit(root, repo_id, branch="feature", message="feat")
124 result = runner.invoke(
125 cli, ["merge", "--message", "Merge feature", "feature"],
126 env=_env(root), catch_exceptions=False
127 )
128 assert result.exit_code == 0
129
130 def test_merge_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None:
131 root, repo_id = _init_repo(tmp_path)
132 _make_commit(root, repo_id)
133 result = runner.invoke(cli, ["merge", "../evil"], env=_env(root))
134 assert result.exit_code != 0
135
136 def test_merge_output_sanitized(self, tmp_path: pathlib.Path) -> None:
137 root, repo_id = _init_repo(tmp_path)
138 base_id = _make_commit(root, repo_id, branch="main", message="base")
139 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
140 _make_commit(root, repo_id, branch="feature", message="feat")
141 result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False)
142 assert "\x1b" not in result.output
143
144
145 class TestMergeStress:
146 def test_merge_feature_with_many_files(self, tmp_path: pathlib.Path) -> None:
147 root, repo_id = _init_repo(tmp_path)
148 base_id = _make_commit(root, repo_id, branch="main", message="base")
149 (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id)
150 manifest = {f"track_{i:03d}.mid": _write_object(root, f"data {i}".encode())
151 for i in range(30)}
152 _make_commit(root, repo_id, branch="feature", message="many files", manifest=manifest)
153 result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False)
154 assert result.exit_code == 0