test_cmd_tag.py
python
| 1 | """Comprehensive tests for ``muse tag``. |
| 2 | |
| 3 | Covers: |
| 4 | - Unit: write_tag, delete_tag, get_tags_for_commit, get_all_tags |
| 5 | - Integration: add → list → remove round-trip |
| 6 | - E2E: full CLI via CliRunner |
| 7 | - Security: tag names sanitized, ref validation |
| 8 | - Stress: many tags on many commits |
| 9 | """ |
| 10 | |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import datetime |
| 14 | import json |
| 15 | import pathlib |
| 16 | import uuid |
| 17 | |
| 18 | import pytest |
| 19 | from typer.testing import CliRunner |
| 20 | |
| 21 | from muse.cli.app import cli |
| 22 | |
| 23 | runner = CliRunner() |
| 24 | |
| 25 | |
| 26 | # --------------------------------------------------------------------------- |
| 27 | # Helpers |
| 28 | # --------------------------------------------------------------------------- |
| 29 | |
| 30 | def _env(root: pathlib.Path) -> dict[str, str]: |
| 31 | return {"MUSE_REPO_ROOT": str(root)} |
| 32 | |
| 33 | |
| 34 | def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 35 | muse_dir = tmp_path / ".muse" |
| 36 | muse_dir.mkdir() |
| 37 | repo_id = str(uuid.uuid4()) |
| 38 | (muse_dir / "repo.json").write_text(json.dumps({ |
| 39 | "repo_id": repo_id, |
| 40 | "domain": "midi", |
| 41 | "default_branch": "main", |
| 42 | "created_at": "2025-01-01T00:00:00+00:00", |
| 43 | }), encoding="utf-8") |
| 44 | (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 45 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 46 | (muse_dir / "snapshots").mkdir() |
| 47 | (muse_dir / "commits").mkdir() |
| 48 | (muse_dir / "objects").mkdir() |
| 49 | return tmp_path, repo_id |
| 50 | |
| 51 | |
| 52 | def _make_commit( |
| 53 | root: pathlib.Path, repo_id: str, branch: str = "main", message: str = "test" |
| 54 | ) -> str: |
| 55 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 56 | from muse.core.snapshot import compute_snapshot_id, compute_commit_id |
| 57 | |
| 58 | ref_file = root / ".muse" / "refs" / "heads" / branch |
| 59 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 60 | manifest: dict[str, str] = {} |
| 61 | snap_id = compute_snapshot_id(manifest) |
| 62 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 63 | commit_id = compute_commit_id( |
| 64 | parent_ids=[parent_id] if parent_id else [], |
| 65 | snapshot_id=snap_id, message=message, |
| 66 | committed_at_iso=committed_at.isoformat(), |
| 67 | ) |
| 68 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 69 | write_commit(root, CommitRecord( |
| 70 | commit_id=commit_id, repo_id=repo_id, branch=branch, |
| 71 | snapshot_id=snap_id, message=message, committed_at=committed_at, |
| 72 | parent_commit_id=parent_id, |
| 73 | )) |
| 74 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 75 | ref_file.write_text(commit_id, encoding="utf-8") |
| 76 | return commit_id |
| 77 | |
| 78 | |
| 79 | # --------------------------------------------------------------------------- |
| 80 | # Unit tests |
| 81 | # --------------------------------------------------------------------------- |
| 82 | |
| 83 | class TestTagUnit: |
| 84 | def test_write_and_read_tag(self, tmp_path: pathlib.Path) -> None: |
| 85 | root, repo_id = _init_repo(tmp_path) |
| 86 | commit_id = _make_commit(root, repo_id) |
| 87 | from muse.core.store import TagRecord, write_tag, get_tags_for_commit |
| 88 | tag = TagRecord(tag_id=str(uuid.uuid4()), repo_id=repo_id, |
| 89 | commit_id=commit_id, tag="emotion:joyful") |
| 90 | write_tag(root, tag) |
| 91 | tags = get_tags_for_commit(root, repo_id, commit_id) |
| 92 | assert len(tags) == 1 |
| 93 | assert tags[0].tag == "emotion:joyful" |
| 94 | |
| 95 | def test_delete_tag(self, tmp_path: pathlib.Path) -> None: |
| 96 | root, repo_id = _init_repo(tmp_path) |
| 97 | commit_id = _make_commit(root, repo_id) |
| 98 | from muse.core.store import TagRecord, write_tag, get_tags_for_commit, delete_tag |
| 99 | tag_id = str(uuid.uuid4()) |
| 100 | write_tag(root, TagRecord(tag_id=tag_id, repo_id=repo_id, |
| 101 | commit_id=commit_id, tag="section:chorus")) |
| 102 | assert len(get_tags_for_commit(root, repo_id, commit_id)) == 1 |
| 103 | assert delete_tag(root, repo_id, tag_id) is True |
| 104 | assert get_tags_for_commit(root, repo_id, commit_id) == [] |
| 105 | |
| 106 | def test_delete_nonexistent_tag_returns_false(self, tmp_path: pathlib.Path) -> None: |
| 107 | root, repo_id = _init_repo(tmp_path) |
| 108 | from muse.core.store import delete_tag |
| 109 | assert delete_tag(root, repo_id, str(uuid.uuid4())) is False |
| 110 | |
| 111 | def test_get_all_tags_empty(self, tmp_path: pathlib.Path) -> None: |
| 112 | root, repo_id = _init_repo(tmp_path) |
| 113 | from muse.core.store import get_all_tags |
| 114 | assert get_all_tags(root, repo_id) == [] |
| 115 | |
| 116 | |
| 117 | # --------------------------------------------------------------------------- |
| 118 | # Integration tests |
| 119 | # --------------------------------------------------------------------------- |
| 120 | |
| 121 | class TestTagIntegration: |
| 122 | def test_add_and_list_tag(self, tmp_path: pathlib.Path) -> None: |
| 123 | root, repo_id = _init_repo(tmp_path) |
| 124 | _make_commit(root, repo_id) |
| 125 | result = runner.invoke(cli, ["tag", "add", "emotion:joyful"], env=_env(root), catch_exceptions=False) |
| 126 | assert result.exit_code == 0 |
| 127 | assert "Tagged" in result.output |
| 128 | |
| 129 | result2 = runner.invoke(cli, ["tag", "list"], env=_env(root), catch_exceptions=False) |
| 130 | assert "emotion:joyful" in result2.output |
| 131 | |
| 132 | def test_list_tags_for_specific_commit(self, tmp_path: pathlib.Path) -> None: |
| 133 | root, repo_id = _init_repo(tmp_path) |
| 134 | commit_id = _make_commit(root, repo_id) |
| 135 | runner.invoke(cli, ["tag", "add", "section:verse"], env=_env(root), catch_exceptions=False) |
| 136 | result = runner.invoke(cli, ["tag", "list", commit_id[:12]], env=_env(root), catch_exceptions=False) |
| 137 | assert "section:verse" in result.output |
| 138 | |
| 139 | def test_remove_tag(self, tmp_path: pathlib.Path) -> None: |
| 140 | root, repo_id = _init_repo(tmp_path) |
| 141 | _make_commit(root, repo_id) |
| 142 | runner.invoke(cli, ["tag", "add", "emotion:tense"], env=_env(root), catch_exceptions=False) |
| 143 | result = runner.invoke(cli, ["tag", "remove", "emotion:tense"], env=_env(root), catch_exceptions=False) |
| 144 | assert result.exit_code == 0 |
| 145 | assert "Removed" in result.output |
| 146 | result2 = runner.invoke(cli, ["tag", "list"], env=_env(root), catch_exceptions=False) |
| 147 | assert "emotion:tense" not in result2.output |
| 148 | |
| 149 | def test_remove_nonexistent_tag_fails(self, tmp_path: pathlib.Path) -> None: |
| 150 | root, repo_id = _init_repo(tmp_path) |
| 151 | _make_commit(root, repo_id) |
| 152 | result = runner.invoke(cli, ["tag", "remove", "ghost:tag"], env=_env(root)) |
| 153 | assert result.exit_code != 0 |
| 154 | |
| 155 | def test_add_multiple_tags_same_commit(self, tmp_path: pathlib.Path) -> None: |
| 156 | root, repo_id = _init_repo(tmp_path) |
| 157 | _make_commit(root, repo_id) |
| 158 | runner.invoke(cli, ["tag", "add", "key:Am"], env=_env(root), catch_exceptions=False) |
| 159 | runner.invoke(cli, ["tag", "add", "tempo:120bpm"], env=_env(root), catch_exceptions=False) |
| 160 | result = runner.invoke(cli, ["tag", "list"], env=_env(root), catch_exceptions=False) |
| 161 | assert "key:Am" in result.output |
| 162 | assert "tempo:120bpm" in result.output |
| 163 | |
| 164 | def test_tag_on_invalid_ref_fails(self, tmp_path: pathlib.Path) -> None: |
| 165 | root, repo_id = _init_repo(tmp_path) |
| 166 | _make_commit(root, repo_id) |
| 167 | result = runner.invoke(cli, ["tag", "add", "emotion:sad", "deadbeef" * 8], env=_env(root)) |
| 168 | assert result.exit_code != 0 |
| 169 | |
| 170 | |
| 171 | # --------------------------------------------------------------------------- |
| 172 | # Security tests |
| 173 | # --------------------------------------------------------------------------- |
| 174 | |
| 175 | class TestTagSecurity: |
| 176 | def test_tag_with_control_characters_sanitized_in_output( |
| 177 | self, tmp_path: pathlib.Path |
| 178 | ) -> None: |
| 179 | root, repo_id = _init_repo(tmp_path) |
| 180 | _make_commit(root, repo_id) |
| 181 | malicious = "emotion:\x1b[31mred\x1b[0m" |
| 182 | runner.invoke(cli, ["tag", "add", malicious], env=_env(root), catch_exceptions=False) |
| 183 | result = runner.invoke(cli, ["tag", "list"], env=_env(root), catch_exceptions=False) |
| 184 | assert result.exit_code == 0 |
| 185 | assert "\x1b" not in result.output |
| 186 | |
| 187 | |
| 188 | # --------------------------------------------------------------------------- |
| 189 | # Stress tests |
| 190 | # --------------------------------------------------------------------------- |
| 191 | |
| 192 | class TestTagStress: |
| 193 | def test_many_tags_on_many_commits(self, tmp_path: pathlib.Path) -> None: |
| 194 | root, repo_id = _init_repo(tmp_path) |
| 195 | commit_ids = [_make_commit(root, repo_id, message=f"commit {i}") for i in range(30)] |
| 196 | from muse.core.store import TagRecord, write_tag, get_all_tags |
| 197 | tag_types = ["emotion:joyful", "section:chorus", "key:Am", "tempo:120bpm", "stage:master"] |
| 198 | for i, cid in enumerate(commit_ids): |
| 199 | write_tag(root, TagRecord( |
| 200 | tag_id=str(uuid.uuid4()), repo_id=repo_id, |
| 201 | commit_id=cid, tag=tag_types[i % len(tag_types)], |
| 202 | )) |
| 203 | all_tags = get_all_tags(root, repo_id) |
| 204 | assert len(all_tags) == 30 |
| 205 | result = runner.invoke(cli, ["tag", "list"], env=_env(root), catch_exceptions=False) |
| 206 | assert result.exit_code == 0 |
| 207 | for tag_type in tag_types: |
| 208 | assert tag_type in result.output |