test_provenance.py
python
| 1 | """Tests for muse.core.provenance — AgentIdentity, HMAC signing, key I/O.""" |
| 2 | |
| 3 | import pathlib |
| 4 | import tempfile |
| 5 | |
| 6 | import pytest |
| 7 | |
| 8 | from muse.core.provenance import ( |
| 9 | AgentIdentity, |
| 10 | generate_agent_key, |
| 11 | key_fingerprint, |
| 12 | make_agent_identity, |
| 13 | read_agent_key, |
| 14 | sign_commit_hmac, |
| 15 | sign_commit_record, |
| 16 | verify_commit_hmac, |
| 17 | write_agent_key, |
| 18 | ) |
| 19 | from muse.core.store import CommitRecord |
| 20 | import datetime |
| 21 | |
| 22 | |
| 23 | # --------------------------------------------------------------------------- |
| 24 | # AgentIdentity factory |
| 25 | # --------------------------------------------------------------------------- |
| 26 | |
| 27 | |
| 28 | class TestMakeAgentIdentity: |
| 29 | def test_required_fields_present(self) -> None: |
| 30 | identity = make_agent_identity( |
| 31 | agent_id="test-agent", |
| 32 | model_id="gpt-5", |
| 33 | toolchain_id="muse-v2", |
| 34 | ) |
| 35 | assert identity["agent_id"] == "test-agent" |
| 36 | assert identity.get("model_id") == "gpt-5" |
| 37 | assert identity.get("toolchain_id") == "muse-v2" |
| 38 | |
| 39 | def test_prompt_hash_is_hex(self) -> None: |
| 40 | identity = make_agent_identity( |
| 41 | agent_id="a", |
| 42 | model_id="m", |
| 43 | toolchain_id="t", |
| 44 | prompt="system: you are a music agent", |
| 45 | ) |
| 46 | prompt_hash = identity.get("prompt_hash", "") |
| 47 | assert isinstance(prompt_hash, str) |
| 48 | assert len(prompt_hash) == 64 |
| 49 | assert all(c in "0123456789abcdef" for c in prompt_hash) |
| 50 | |
| 51 | def test_no_prompt_gives_no_hash_key(self) -> None: |
| 52 | identity = make_agent_identity(agent_id="a", model_id="m", toolchain_id="t") |
| 53 | # When no prompt is provided, prompt_hash is absent (total=False TypedDict). |
| 54 | assert identity.get("prompt_hash", "") == "" |
| 55 | |
| 56 | def test_execution_context_hash_populated(self) -> None: |
| 57 | identity = make_agent_identity( |
| 58 | agent_id="a", |
| 59 | model_id="m", |
| 60 | toolchain_id="t", |
| 61 | execution_context='{"env": "ci", "version": "1.2.3"}', |
| 62 | ) |
| 63 | ec_hash = identity.get("execution_context_hash", "") |
| 64 | assert isinstance(ec_hash, str) |
| 65 | assert len(ec_hash) == 64 |
| 66 | |
| 67 | |
| 68 | # --------------------------------------------------------------------------- |
| 69 | # Key generation and fingerprinting |
| 70 | # --------------------------------------------------------------------------- |
| 71 | |
| 72 | |
| 73 | class TestKeyGeneration: |
| 74 | def test_generate_key_is_32_bytes(self) -> None: |
| 75 | key = generate_agent_key() |
| 76 | assert isinstance(key, bytes) |
| 77 | assert len(key) == 32 |
| 78 | |
| 79 | def test_keys_are_unique(self) -> None: |
| 80 | keys = {generate_agent_key() for _ in range(10)} |
| 81 | assert len(keys) == 10 |
| 82 | |
| 83 | def test_fingerprint_is_short_hex(self) -> None: |
| 84 | key = generate_agent_key() |
| 85 | fp = key_fingerprint(key) |
| 86 | assert isinstance(fp, str) |
| 87 | assert 8 <= len(fp) <= 16 |
| 88 | assert all(c in "0123456789abcdef" for c in fp) |
| 89 | |
| 90 | def test_fingerprint_is_deterministic(self) -> None: |
| 91 | key = b"\x01" * 32 |
| 92 | assert key_fingerprint(key) == key_fingerprint(key) |
| 93 | |
| 94 | |
| 95 | # --------------------------------------------------------------------------- |
| 96 | # Key I/O |
| 97 | # --------------------------------------------------------------------------- |
| 98 | |
| 99 | |
| 100 | class TestKeyIO: |
| 101 | def test_write_and_read_roundtrip(self, tmp_path: pathlib.Path) -> None: |
| 102 | key = generate_agent_key() |
| 103 | agent_id = "roundtrip-agent" |
| 104 | write_agent_key(tmp_path, agent_id, key) |
| 105 | recovered = read_agent_key(tmp_path, agent_id) |
| 106 | assert recovered == key |
| 107 | |
| 108 | def test_read_missing_key_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 109 | result = read_agent_key(tmp_path, "nonexistent-agent") |
| 110 | assert result is None |
| 111 | |
| 112 | def test_key_file_is_readable(self, tmp_path: pathlib.Path) -> None: |
| 113 | key = b"\xde\xad\xbe\xef" + b"\x00" * 28 |
| 114 | write_agent_key(tmp_path, "hex-agent", key) |
| 115 | # Find the key file and verify it roundtrips correctly. |
| 116 | key_dir = tmp_path / ".muse" / "keys" |
| 117 | files = list(key_dir.rglob("*.key")) |
| 118 | assert files |
| 119 | recovered = read_agent_key(tmp_path, "hex-agent") |
| 120 | assert recovered == key |
| 121 | |
| 122 | |
| 123 | # --------------------------------------------------------------------------- |
| 124 | # HMAC signing / verification |
| 125 | # --------------------------------------------------------------------------- |
| 126 | |
| 127 | |
| 128 | class TestHMACSigning: |
| 129 | def test_sign_and_verify_succeed(self) -> None: |
| 130 | key = generate_agent_key() |
| 131 | commit_hash = "abc123def456" * 4 |
| 132 | sig = sign_commit_hmac(commit_hash, key) |
| 133 | assert verify_commit_hmac(commit_hash, sig, key) |
| 134 | |
| 135 | def test_wrong_key_fails(self) -> None: |
| 136 | key1 = generate_agent_key() |
| 137 | key2 = generate_agent_key() |
| 138 | commit_hash = "abc123" |
| 139 | sig = sign_commit_hmac(commit_hash, key1) |
| 140 | assert not verify_commit_hmac(commit_hash, sig, key2) |
| 141 | |
| 142 | def test_wrong_commit_hash_fails(self) -> None: |
| 143 | key = generate_agent_key() |
| 144 | sig = sign_commit_hmac("commit-a", key) |
| 145 | assert not verify_commit_hmac("commit-b", sig, key) |
| 146 | |
| 147 | def test_tampered_signature_fails(self) -> None: |
| 148 | key = generate_agent_key() |
| 149 | sig = sign_commit_hmac("abc", key) |
| 150 | tampered = sig[:-4] + "0000" |
| 151 | assert not verify_commit_hmac("abc", tampered, key) |
| 152 | |
| 153 | def test_signature_is_hex_string(self) -> None: |
| 154 | key = generate_agent_key() |
| 155 | sig = sign_commit_hmac("test-commit", key) |
| 156 | assert isinstance(sig, str) |
| 157 | assert all(c in "0123456789abcdef" for c in sig) |
| 158 | |
| 159 | def test_signature_length_is_64(self) -> None: |
| 160 | key = generate_agent_key() |
| 161 | sig = sign_commit_hmac("test-commit", key) |
| 162 | assert len(sig) == 64 # HMAC-SHA256 produces 32 bytes = 64 hex chars |
| 163 | |
| 164 | |
| 165 | # --------------------------------------------------------------------------- |
| 166 | # sign_commit_record |
| 167 | # --------------------------------------------------------------------------- |
| 168 | |
| 169 | |
| 170 | class TestSignCommitRecord: |
| 171 | def test_sign_commit_record_writes_signature(self, tmp_path: pathlib.Path) -> None: |
| 172 | key = generate_agent_key() |
| 173 | agent_id = "sign-test-agent" |
| 174 | write_agent_key(tmp_path, agent_id, key) |
| 175 | |
| 176 | commit_id = "deadbeef" * 8 |
| 177 | result = sign_commit_record(commit_id, agent_id, tmp_path) |
| 178 | assert result is not None |
| 179 | sig, fprint = result |
| 180 | assert sig != "" |
| 181 | assert fprint == key_fingerprint(key) |
| 182 | assert verify_commit_hmac(commit_id, sig, key) |
| 183 | |
| 184 | def test_sign_commit_record_no_key_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 185 | result = sign_commit_record("aabbccdd" * 8, "ghost-agent", tmp_path) |
| 186 | assert result is None |