gabriel / muse public
test_cmd_snapshot.py python
223 lines 7.6 KB
86000da9 fix: replace typer CliRunner with argparse-compatible test helper Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Tests for ``muse snapshot`` subcommands.
2
3 Covers: create (text/json), list (empty/populated/limit), show (text/json/prefix),
4 export (tar.gz/zip), short flags, stress: 50 files.
5 """
6
7 from __future__ import annotations
8
9 import json
10 import pathlib
11 import tarfile
12 import zipfile
13
14 import pytest
15 from tests.cli_test_helper import CliRunner
16
17 cli = None # argparse migration — CliRunner ignores this arg
18
19 runner = CliRunner()
20
21
22 # ---------------------------------------------------------------------------
23 # Helpers
24 # ---------------------------------------------------------------------------
25
26
27 def _init_repo(path: pathlib.Path) -> pathlib.Path:
28 muse = path / ".muse"
29 for d in ("commits", "snapshots", "objects", "refs/heads"):
30 (muse / d).mkdir(parents=True, exist_ok=True)
31 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
32 (muse / "repo.json").write_text(
33 json.dumps({"repo_id": "snap-test", "domain": "midi"}), encoding="utf-8"
34 )
35 return path
36
37
38 def _env(repo: pathlib.Path) -> dict[str, str]:
39 return {"MUSE_REPO_ROOT": str(repo)}
40
41
42 def _create_files(root: pathlib.Path, count: int = 3) -> list[str]:
43 names: list[str] = []
44 for i in range(count):
45 name = f"file_{i}.txt"
46 (root / name).write_text(f"content {i}", encoding="utf-8")
47 names.append(name)
48 return names
49
50
51 # ---------------------------------------------------------------------------
52 # Unit: snapshot create
53 # ---------------------------------------------------------------------------
54
55
56 def test_snapshot_create_help() -> None:
57 result = runner.invoke(cli, ["snapshot", "--help"])
58 assert result.exit_code == 0
59
60
61 def test_snapshot_create_text(tmp_path: pathlib.Path) -> None:
62 _init_repo(tmp_path)
63 _create_files(tmp_path, 3)
64 result = runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path))
65 assert result.exit_code == 0
66 assert "file(s)" in result.output
67
68
69 def test_snapshot_create_json(tmp_path: pathlib.Path) -> None:
70 _init_repo(tmp_path)
71 _create_files(tmp_path, 2)
72 result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path))
73 assert result.exit_code == 0
74 data = json.loads(result.output)
75 assert "snapshot_id" in data
76 assert data["file_count"] >= 1
77
78
79 def test_snapshot_create_with_note(tmp_path: pathlib.Path) -> None:
80 _init_repo(tmp_path)
81 _create_files(tmp_path, 1)
82 result = runner.invoke(cli, ["snapshot", "create", "-m", "WIP note"], env=_env(tmp_path))
83 assert result.exit_code == 0
84 assert "WIP note" in result.output
85
86
87 def test_snapshot_create_short_flags(tmp_path: pathlib.Path) -> None:
88 _init_repo(tmp_path)
89 _create_files(tmp_path, 1)
90 result = runner.invoke(cli, ["snapshot", "create", "-m", "test", "-f", "json"], env=_env(tmp_path))
91 assert result.exit_code == 0
92 data = json.loads(result.output)
93 assert "snapshot_id" in data
94
95
96 # ---------------------------------------------------------------------------
97 # Unit: snapshot list
98 # ---------------------------------------------------------------------------
99
100
101 def test_snapshot_list_empty(tmp_path: pathlib.Path) -> None:
102 _init_repo(tmp_path)
103 result = runner.invoke(cli, ["snapshot", "list"], env=_env(tmp_path))
104 assert result.exit_code == 0
105 assert "no snapshots" in result.output.lower()
106
107
108 def test_snapshot_list_after_create(tmp_path: pathlib.Path) -> None:
109 _init_repo(tmp_path)
110 _create_files(tmp_path, 2)
111 runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path))
112 result = runner.invoke(cli, ["snapshot", "list"], env=_env(tmp_path))
113 assert result.exit_code == 0
114 lines = [l for l in result.output.strip().split("\n") if l.strip()]
115 assert len(lines) >= 1
116
117
118 def test_snapshot_list_json(tmp_path: pathlib.Path) -> None:
119 _init_repo(tmp_path)
120 _create_files(tmp_path, 2)
121 runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path))
122 result = runner.invoke(cli, ["snapshot", "list", "-f", "json"], env=_env(tmp_path))
123 assert result.exit_code == 0
124 data = json.loads(result.output)
125 assert isinstance(data, list)
126 assert len(data) >= 1
127 assert "snapshot_id" in data[0]
128
129
130 def test_snapshot_list_limit(tmp_path: pathlib.Path) -> None:
131 _init_repo(tmp_path)
132 _create_files(tmp_path, 1)
133 for _ in range(5):
134 runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path))
135 result = runner.invoke(cli, ["snapshot", "list", "-n", "3", "-f", "json"], env=_env(tmp_path))
136 assert result.exit_code == 0
137 data = json.loads(result.output)
138 assert len(data) <= 3
139
140
141 # ---------------------------------------------------------------------------
142 # Unit: snapshot show
143 # ---------------------------------------------------------------------------
144
145
146 def test_snapshot_show_json(tmp_path: pathlib.Path) -> None:
147 _init_repo(tmp_path)
148 _create_files(tmp_path, 2)
149 create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path))
150 snap_id = json.loads(create_result.output)["snapshot_id"]
151 result = runner.invoke(cli, ["snapshot", "show", snap_id], env=_env(tmp_path))
152 assert result.exit_code == 0
153 data = json.loads(result.output)
154 assert data["snapshot_id"] == snap_id
155 assert "manifest" in data
156
157
158 def test_snapshot_show_prefix(tmp_path: pathlib.Path) -> None:
159 _init_repo(tmp_path)
160 _create_files(tmp_path, 1)
161 create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path))
162 snap_id = json.loads(create_result.output)["snapshot_id"]
163 # Use first 12 chars as prefix.
164 result = runner.invoke(cli, ["snapshot", "show", snap_id[:12]], env=_env(tmp_path))
165 assert result.exit_code == 0
166
167
168 def test_snapshot_show_not_found(tmp_path: pathlib.Path) -> None:
169 _init_repo(tmp_path)
170 result = runner.invoke(cli, ["snapshot", "show", "nonexistent"], env=_env(tmp_path))
171 assert result.exit_code != 0
172
173
174 # ---------------------------------------------------------------------------
175 # Unit: snapshot export
176 # ---------------------------------------------------------------------------
177
178
179 def test_snapshot_export_tar_gz(tmp_path: pathlib.Path) -> None:
180 _init_repo(tmp_path)
181 _create_files(tmp_path, 2)
182 create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path))
183 snap_id = json.loads(create_result.output)["snapshot_id"]
184 out_file = tmp_path / "snap.tar.gz"
185 result = runner.invoke(
186 cli,
187 ["snapshot", "export", snap_id, "--output", str(out_file)],
188 env=_env(tmp_path),
189 )
190 assert result.exit_code == 0
191 assert out_file.exists()
192 assert tarfile.is_tarfile(str(out_file))
193
194
195 def test_snapshot_export_zip(tmp_path: pathlib.Path) -> None:
196 _init_repo(tmp_path)
197 _create_files(tmp_path, 2)
198 create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path))
199 snap_id = json.loads(create_result.output)["snapshot_id"]
200 out_file = tmp_path / "snap.zip"
201 result = runner.invoke(
202 cli,
203 ["snapshot", "export", snap_id, "--format", "zip", "--output", str(out_file)],
204 env=_env(tmp_path),
205 )
206 assert result.exit_code == 0
207 assert out_file.exists()
208 assert zipfile.is_zipfile(str(out_file))
209
210
211 # ---------------------------------------------------------------------------
212 # Stress: 50 files
213 # ---------------------------------------------------------------------------
214
215
216 def test_snapshot_create_stress_50_files(tmp_path: pathlib.Path) -> None:
217 _init_repo(tmp_path)
218 for i in range(50):
219 (tmp_path / f"stress_{i}.txt").write_bytes(b"x" * 1024)
220 result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path))
221 assert result.exit_code == 0
222 data = json.loads(result.output)
223 assert data["file_count"] >= 50