test_cmd_snapshot.py
python
| 1 | """Tests for ``muse snapshot`` subcommands. |
| 2 | |
| 3 | Covers: create (text/json), list (empty/populated/limit), show (text/json/prefix), |
| 4 | export (tar.gz/zip), short flags, stress: 50 files. |
| 5 | """ |
| 6 | |
| 7 | from __future__ import annotations |
| 8 | |
| 9 | import json |
| 10 | import pathlib |
| 11 | import tarfile |
| 12 | import zipfile |
| 13 | |
| 14 | import pytest |
| 15 | from tests.cli_test_helper import CliRunner |
| 16 | |
| 17 | cli = None # argparse migration — CliRunner ignores this arg |
| 18 | |
| 19 | runner = CliRunner() |
| 20 | |
| 21 | |
| 22 | # --------------------------------------------------------------------------- |
| 23 | # Helpers |
| 24 | # --------------------------------------------------------------------------- |
| 25 | |
| 26 | |
| 27 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 28 | muse = path / ".muse" |
| 29 | for d in ("commits", "snapshots", "objects", "refs/heads"): |
| 30 | (muse / d).mkdir(parents=True, exist_ok=True) |
| 31 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 32 | (muse / "repo.json").write_text( |
| 33 | json.dumps({"repo_id": "snap-test", "domain": "midi"}), encoding="utf-8" |
| 34 | ) |
| 35 | return path |
| 36 | |
| 37 | |
| 38 | def _env(repo: pathlib.Path) -> dict[str, str]: |
| 39 | return {"MUSE_REPO_ROOT": str(repo)} |
| 40 | |
| 41 | |
| 42 | def _create_files(root: pathlib.Path, count: int = 3) -> list[str]: |
| 43 | names: list[str] = [] |
| 44 | for i in range(count): |
| 45 | name = f"file_{i}.txt" |
| 46 | (root / name).write_text(f"content {i}", encoding="utf-8") |
| 47 | names.append(name) |
| 48 | return names |
| 49 | |
| 50 | |
| 51 | # --------------------------------------------------------------------------- |
| 52 | # Unit: snapshot create |
| 53 | # --------------------------------------------------------------------------- |
| 54 | |
| 55 | |
| 56 | def test_snapshot_create_help() -> None: |
| 57 | result = runner.invoke(cli, ["snapshot", "--help"]) |
| 58 | assert result.exit_code == 0 |
| 59 | |
| 60 | |
| 61 | def test_snapshot_create_text(tmp_path: pathlib.Path) -> None: |
| 62 | _init_repo(tmp_path) |
| 63 | _create_files(tmp_path, 3) |
| 64 | result = runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path)) |
| 65 | assert result.exit_code == 0 |
| 66 | assert "file(s)" in result.output |
| 67 | |
| 68 | |
| 69 | def test_snapshot_create_json(tmp_path: pathlib.Path) -> None: |
| 70 | _init_repo(tmp_path) |
| 71 | _create_files(tmp_path, 2) |
| 72 | result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path)) |
| 73 | assert result.exit_code == 0 |
| 74 | data = json.loads(result.output) |
| 75 | assert "snapshot_id" in data |
| 76 | assert data["file_count"] >= 1 |
| 77 | |
| 78 | |
| 79 | def test_snapshot_create_with_note(tmp_path: pathlib.Path) -> None: |
| 80 | _init_repo(tmp_path) |
| 81 | _create_files(tmp_path, 1) |
| 82 | result = runner.invoke(cli, ["snapshot", "create", "-m", "WIP note"], env=_env(tmp_path)) |
| 83 | assert result.exit_code == 0 |
| 84 | assert "WIP note" in result.output |
| 85 | |
| 86 | |
| 87 | def test_snapshot_create_short_flags(tmp_path: pathlib.Path) -> None: |
| 88 | _init_repo(tmp_path) |
| 89 | _create_files(tmp_path, 1) |
| 90 | result = runner.invoke(cli, ["snapshot", "create", "-m", "test", "-f", "json"], env=_env(tmp_path)) |
| 91 | assert result.exit_code == 0 |
| 92 | data = json.loads(result.output) |
| 93 | assert "snapshot_id" in data |
| 94 | |
| 95 | |
| 96 | # --------------------------------------------------------------------------- |
| 97 | # Unit: snapshot list |
| 98 | # --------------------------------------------------------------------------- |
| 99 | |
| 100 | |
| 101 | def test_snapshot_list_empty(tmp_path: pathlib.Path) -> None: |
| 102 | _init_repo(tmp_path) |
| 103 | result = runner.invoke(cli, ["snapshot", "list"], env=_env(tmp_path)) |
| 104 | assert result.exit_code == 0 |
| 105 | assert "no snapshots" in result.output.lower() |
| 106 | |
| 107 | |
| 108 | def test_snapshot_list_after_create(tmp_path: pathlib.Path) -> None: |
| 109 | _init_repo(tmp_path) |
| 110 | _create_files(tmp_path, 2) |
| 111 | runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path)) |
| 112 | result = runner.invoke(cli, ["snapshot", "list"], env=_env(tmp_path)) |
| 113 | assert result.exit_code == 0 |
| 114 | lines = [l for l in result.output.strip().split("\n") if l.strip()] |
| 115 | assert len(lines) >= 1 |
| 116 | |
| 117 | |
| 118 | def test_snapshot_list_json(tmp_path: pathlib.Path) -> None: |
| 119 | _init_repo(tmp_path) |
| 120 | _create_files(tmp_path, 2) |
| 121 | runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path)) |
| 122 | result = runner.invoke(cli, ["snapshot", "list", "-f", "json"], env=_env(tmp_path)) |
| 123 | assert result.exit_code == 0 |
| 124 | data = json.loads(result.output) |
| 125 | assert isinstance(data, list) |
| 126 | assert len(data) >= 1 |
| 127 | assert "snapshot_id" in data[0] |
| 128 | |
| 129 | |
| 130 | def test_snapshot_list_limit(tmp_path: pathlib.Path) -> None: |
| 131 | _init_repo(tmp_path) |
| 132 | _create_files(tmp_path, 1) |
| 133 | for _ in range(5): |
| 134 | runner.invoke(cli, ["snapshot", "create"], env=_env(tmp_path)) |
| 135 | result = runner.invoke(cli, ["snapshot", "list", "-n", "3", "-f", "json"], env=_env(tmp_path)) |
| 136 | assert result.exit_code == 0 |
| 137 | data = json.loads(result.output) |
| 138 | assert len(data) <= 3 |
| 139 | |
| 140 | |
| 141 | # --------------------------------------------------------------------------- |
| 142 | # Unit: snapshot show |
| 143 | # --------------------------------------------------------------------------- |
| 144 | |
| 145 | |
| 146 | def test_snapshot_show_json(tmp_path: pathlib.Path) -> None: |
| 147 | _init_repo(tmp_path) |
| 148 | _create_files(tmp_path, 2) |
| 149 | create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path)) |
| 150 | snap_id = json.loads(create_result.output)["snapshot_id"] |
| 151 | result = runner.invoke(cli, ["snapshot", "show", snap_id], env=_env(tmp_path)) |
| 152 | assert result.exit_code == 0 |
| 153 | data = json.loads(result.output) |
| 154 | assert data["snapshot_id"] == snap_id |
| 155 | assert "manifest" in data |
| 156 | |
| 157 | |
| 158 | def test_snapshot_show_prefix(tmp_path: pathlib.Path) -> None: |
| 159 | _init_repo(tmp_path) |
| 160 | _create_files(tmp_path, 1) |
| 161 | create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path)) |
| 162 | snap_id = json.loads(create_result.output)["snapshot_id"] |
| 163 | # Use first 12 chars as prefix. |
| 164 | result = runner.invoke(cli, ["snapshot", "show", snap_id[:12]], env=_env(tmp_path)) |
| 165 | assert result.exit_code == 0 |
| 166 | |
| 167 | |
| 168 | def test_snapshot_show_not_found(tmp_path: pathlib.Path) -> None: |
| 169 | _init_repo(tmp_path) |
| 170 | result = runner.invoke(cli, ["snapshot", "show", "nonexistent"], env=_env(tmp_path)) |
| 171 | assert result.exit_code != 0 |
| 172 | |
| 173 | |
| 174 | # --------------------------------------------------------------------------- |
| 175 | # Unit: snapshot export |
| 176 | # --------------------------------------------------------------------------- |
| 177 | |
| 178 | |
| 179 | def test_snapshot_export_tar_gz(tmp_path: pathlib.Path) -> None: |
| 180 | _init_repo(tmp_path) |
| 181 | _create_files(tmp_path, 2) |
| 182 | create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path)) |
| 183 | snap_id = json.loads(create_result.output)["snapshot_id"] |
| 184 | out_file = tmp_path / "snap.tar.gz" |
| 185 | result = runner.invoke( |
| 186 | cli, |
| 187 | ["snapshot", "export", snap_id, "--output", str(out_file)], |
| 188 | env=_env(tmp_path), |
| 189 | ) |
| 190 | assert result.exit_code == 0 |
| 191 | assert out_file.exists() |
| 192 | assert tarfile.is_tarfile(str(out_file)) |
| 193 | |
| 194 | |
| 195 | def test_snapshot_export_zip(tmp_path: pathlib.Path) -> None: |
| 196 | _init_repo(tmp_path) |
| 197 | _create_files(tmp_path, 2) |
| 198 | create_result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path)) |
| 199 | snap_id = json.loads(create_result.output)["snapshot_id"] |
| 200 | out_file = tmp_path / "snap.zip" |
| 201 | result = runner.invoke( |
| 202 | cli, |
| 203 | ["snapshot", "export", snap_id, "--format", "zip", "--output", str(out_file)], |
| 204 | env=_env(tmp_path), |
| 205 | ) |
| 206 | assert result.exit_code == 0 |
| 207 | assert out_file.exists() |
| 208 | assert zipfile.is_zipfile(str(out_file)) |
| 209 | |
| 210 | |
| 211 | # --------------------------------------------------------------------------- |
| 212 | # Stress: 50 files |
| 213 | # --------------------------------------------------------------------------- |
| 214 | |
| 215 | |
| 216 | def test_snapshot_create_stress_50_files(tmp_path: pathlib.Path) -> None: |
| 217 | _init_repo(tmp_path) |
| 218 | for i in range(50): |
| 219 | (tmp_path / f"stress_{i}.txt").write_bytes(b"x" * 1024) |
| 220 | result = runner.invoke(cli, ["snapshot", "create", "-f", "json"], env=_env(tmp_path)) |
| 221 | assert result.exit_code == 0 |
| 222 | data = json.loads(result.output) |
| 223 | assert data["file_count"] >= 50 |