cgcardona / muse public
test_provenance.py python
186 lines 6.4 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for muse.core.provenance — AgentIdentity, HMAC signing, key I/O."""
2
3 import pathlib
4 import tempfile
5
6 import pytest
7
8 from muse.core.provenance import (
9 AgentIdentity,
10 generate_agent_key,
11 key_fingerprint,
12 make_agent_identity,
13 read_agent_key,
14 sign_commit_hmac,
15 sign_commit_record,
16 verify_commit_hmac,
17 write_agent_key,
18 )
19 from muse.core.store import CommitRecord
20 import datetime
21
22
23 # ---------------------------------------------------------------------------
24 # AgentIdentity factory
25 # ---------------------------------------------------------------------------
26
27
28 class TestMakeAgentIdentity:
29 def test_required_fields_present(self) -> None:
30 identity = make_agent_identity(
31 agent_id="test-agent",
32 model_id="gpt-5",
33 toolchain_id="muse-v2",
34 )
35 assert identity["agent_id"] == "test-agent"
36 assert identity.get("model_id") == "gpt-5"
37 assert identity.get("toolchain_id") == "muse-v2"
38
39 def test_prompt_hash_is_hex(self) -> None:
40 identity = make_agent_identity(
41 agent_id="a",
42 model_id="m",
43 toolchain_id="t",
44 prompt="system: you are a music agent",
45 )
46 prompt_hash = identity.get("prompt_hash", "")
47 assert isinstance(prompt_hash, str)
48 assert len(prompt_hash) == 64
49 assert all(c in "0123456789abcdef" for c in prompt_hash)
50
51 def test_no_prompt_gives_no_hash_key(self) -> None:
52 identity = make_agent_identity(agent_id="a", model_id="m", toolchain_id="t")
53 # When no prompt is provided, prompt_hash is absent (total=False TypedDict).
54 assert identity.get("prompt_hash", "") == ""
55
56 def test_execution_context_hash_populated(self) -> None:
57 identity = make_agent_identity(
58 agent_id="a",
59 model_id="m",
60 toolchain_id="t",
61 execution_context='{"env": "ci", "version": "1.2.3"}',
62 )
63 ec_hash = identity.get("execution_context_hash", "")
64 assert isinstance(ec_hash, str)
65 assert len(ec_hash) == 64
66
67
68 # ---------------------------------------------------------------------------
69 # Key generation and fingerprinting
70 # ---------------------------------------------------------------------------
71
72
73 class TestKeyGeneration:
74 def test_generate_key_is_32_bytes(self) -> None:
75 key = generate_agent_key()
76 assert isinstance(key, bytes)
77 assert len(key) == 32
78
79 def test_keys_are_unique(self) -> None:
80 keys = {generate_agent_key() for _ in range(10)}
81 assert len(keys) == 10
82
83 def test_fingerprint_is_short_hex(self) -> None:
84 key = generate_agent_key()
85 fp = key_fingerprint(key)
86 assert isinstance(fp, str)
87 assert 8 <= len(fp) <= 16
88 assert all(c in "0123456789abcdef" for c in fp)
89
90 def test_fingerprint_is_deterministic(self) -> None:
91 key = b"\x01" * 32
92 assert key_fingerprint(key) == key_fingerprint(key)
93
94
95 # ---------------------------------------------------------------------------
96 # Key I/O
97 # ---------------------------------------------------------------------------
98
99
100 class TestKeyIO:
101 def test_write_and_read_roundtrip(self, tmp_path: pathlib.Path) -> None:
102 key = generate_agent_key()
103 agent_id = "roundtrip-agent"
104 write_agent_key(tmp_path, agent_id, key)
105 recovered = read_agent_key(tmp_path, agent_id)
106 assert recovered == key
107
108 def test_read_missing_key_returns_none(self, tmp_path: pathlib.Path) -> None:
109 result = read_agent_key(tmp_path, "nonexistent-agent")
110 assert result is None
111
112 def test_key_file_is_readable(self, tmp_path: pathlib.Path) -> None:
113 key = b"\xde\xad\xbe\xef" + b"\x00" * 28
114 write_agent_key(tmp_path, "hex-agent", key)
115 # Find the key file and verify it roundtrips correctly.
116 key_dir = tmp_path / ".muse" / "keys"
117 files = list(key_dir.rglob("*.key"))
118 assert files
119 recovered = read_agent_key(tmp_path, "hex-agent")
120 assert recovered == key
121
122
123 # ---------------------------------------------------------------------------
124 # HMAC signing / verification
125 # ---------------------------------------------------------------------------
126
127
128 class TestHMACSigning:
129 def test_sign_and_verify_succeed(self) -> None:
130 key = generate_agent_key()
131 commit_hash = "abc123def456" * 4
132 sig = sign_commit_hmac(commit_hash, key)
133 assert verify_commit_hmac(commit_hash, sig, key)
134
135 def test_wrong_key_fails(self) -> None:
136 key1 = generate_agent_key()
137 key2 = generate_agent_key()
138 commit_hash = "abc123"
139 sig = sign_commit_hmac(commit_hash, key1)
140 assert not verify_commit_hmac(commit_hash, sig, key2)
141
142 def test_wrong_commit_hash_fails(self) -> None:
143 key = generate_agent_key()
144 sig = sign_commit_hmac("commit-a", key)
145 assert not verify_commit_hmac("commit-b", sig, key)
146
147 def test_tampered_signature_fails(self) -> None:
148 key = generate_agent_key()
149 sig = sign_commit_hmac("abc", key)
150 tampered = sig[:-4] + "0000"
151 assert not verify_commit_hmac("abc", tampered, key)
152
153 def test_signature_is_hex_string(self) -> None:
154 key = generate_agent_key()
155 sig = sign_commit_hmac("test-commit", key)
156 assert isinstance(sig, str)
157 assert all(c in "0123456789abcdef" for c in sig)
158
159 def test_signature_length_is_64(self) -> None:
160 key = generate_agent_key()
161 sig = sign_commit_hmac("test-commit", key)
162 assert len(sig) == 64 # HMAC-SHA256 produces 32 bytes = 64 hex chars
163
164
165 # ---------------------------------------------------------------------------
166 # sign_commit_record
167 # ---------------------------------------------------------------------------
168
169
170 class TestSignCommitRecord:
171 def test_sign_commit_record_writes_signature(self, tmp_path: pathlib.Path) -> None:
172 key = generate_agent_key()
173 agent_id = "sign-test-agent"
174 write_agent_key(tmp_path, agent_id, key)
175
176 commit_id = "deadbeef" * 8
177 result = sign_commit_record(commit_id, agent_id, tmp_path)
178 assert result is not None
179 sig, fprint = result
180 assert sig != ""
181 assert fprint == key_fingerprint(key)
182 assert verify_commit_hmac(commit_id, sig, key)
183
184 def test_sign_commit_record_no_key_returns_none(self, tmp_path: pathlib.Path) -> None:
185 result = sign_commit_record("aabbccdd" * 8, "ghost-agent", tmp_path)
186 assert result is None