cgcardona / muse public
test_code_query.py python
172 lines 5.8 KB
8d5137ed fix(security): full surface hardening — validation, path containment, p… Gabriel Cardona <cgcardona@gmail.com> 10h ago
1 """Tests for the code domain query evaluator."""
2
3 import pathlib
4
5 import pytest
6
7 from muse.domain import SemVerBump
8 from muse.plugins.code._code_query import (
9 AndExpr,
10 Comparison,
11 OrExpr,
12 build_evaluator,
13 _parse_query,
14 )
15
16
17 # ---------------------------------------------------------------------------
18 # Parser
19 # ---------------------------------------------------------------------------
20
21
22 class TestParseQuery:
23 def test_simple_equality(self) -> None:
24 q = _parse_query("author == 'gabriel'")
25 assert isinstance(q, OrExpr)
26 assert len(q.clauses) == 1
27 and_expr = q.clauses[0]
28 assert isinstance(and_expr, AndExpr)
29 cmp = and_expr.clauses[0]
30 assert cmp.field == "author"
31 assert cmp.op == "=="
32 assert cmp.value == "gabriel"
33
34 def test_and_expression(self) -> None:
35 q = _parse_query("author == 'x' and language == 'Python'")
36 assert len(q.clauses[0].clauses) == 2
37
38 def test_or_expression(self) -> None:
39 q = _parse_query("author == 'x' or author == 'y'")
40 assert len(q.clauses) == 2
41
42 def test_contains_operator(self) -> None:
43 q = _parse_query("agent_id contains claude")
44 cmp = q.clauses[0].clauses[0]
45 assert cmp.op == "contains"
46 assert cmp.value == "claude"
47
48 def test_startswith_operator(self) -> None:
49 q = _parse_query("symbol startswith my_")
50 cmp = q.clauses[0].clauses[0]
51 assert cmp.op == "startswith"
52
53 def test_unknown_field_raises(self) -> None:
54 with pytest.raises(ValueError, match="Unknown field"):
55 _parse_query("nonexistent == 'x'")
56
57 def test_double_quoted_string(self) -> None:
58 q = _parse_query('author == "gabriel"')
59 cmp = q.clauses[0].clauses[0]
60 assert cmp.value == "gabriel"
61
62
63 # ---------------------------------------------------------------------------
64 # build_evaluator + evaluator logic
65 # ---------------------------------------------------------------------------
66
67
68 import datetime
69 from muse.core.query_engine import QueryMatch
70 from muse.core.store import CommitRecord
71 from muse.domain import StructuredDelta, InsertOp
72
73
74 def _make_commit(
75 author: str = "alice",
76 agent_id: str = "",
77 model_id: str = "",
78 branch: str = "main",
79 sem_ver_bump: SemVerBump = "none",
80 delta: StructuredDelta | None = None,
81 ) -> CommitRecord:
82 return CommitRecord(
83 commit_id="abc1234",
84 repo_id="repo",
85 branch=branch,
86 snapshot_id="s" * 64,
87 message="test commit",
88 committed_at=datetime.datetime.now(datetime.timezone.utc),
89 author=author,
90 agent_id=agent_id,
91 model_id=model_id,
92 sem_ver_bump=sem_ver_bump,
93 structured_delta=delta,
94 )
95
96
97 class TestBuildEvaluator:
98 def test_author_match(self) -> None:
99 evaluator = build_evaluator("author == 'alice'")
100 commit = _make_commit(author="alice")
101 results = evaluator(commit, {}, pathlib.Path("."))
102 assert len(results) == 1
103
104 def test_author_no_match(self) -> None:
105 evaluator = build_evaluator("author == 'bob'")
106 commit = _make_commit(author="alice")
107 results = evaluator(commit, {}, pathlib.Path("."))
108 assert results == []
109
110 def test_agent_id_contains(self) -> None:
111 evaluator = build_evaluator("agent_id contains claude")
112 commit = _make_commit(agent_id="claude-v4")
113 results = evaluator(commit, {}, pathlib.Path("."))
114 assert len(results) == 1
115
116 def test_sem_ver_bump_match(self) -> None:
117 evaluator = build_evaluator("sem_ver_bump == 'major'")
118 commit = _make_commit(sem_ver_bump="major")
119 results = evaluator(commit, {}, pathlib.Path("."))
120 assert len(results) == 1
121
122 def test_and_both_must_match(self) -> None:
123 evaluator = build_evaluator("author == 'alice' and agent_id == 'bot'")
124 # Only author matches, not agent_id.
125 commit = _make_commit(author="alice", agent_id="human")
126 results = evaluator(commit, {}, pathlib.Path("."))
127 assert results == []
128
129 def test_or_one_match_sufficient(self) -> None:
130 evaluator = build_evaluator("author == 'alice' or author == 'bob'")
131 commit_alice = _make_commit(author="alice")
132 commit_bob = _make_commit(author="bob")
133 assert len(evaluator(commit_alice, {}, pathlib.Path("."))) >= 1
134 assert len(evaluator(commit_bob, {}, pathlib.Path("."))) >= 1
135
136 def test_branch_match(self) -> None:
137 evaluator = build_evaluator("branch == 'dev'")
138 commit = _make_commit(branch="dev")
139 assert len(evaluator(commit, {}, pathlib.Path("."))) >= 1
140
141 def test_symbol_match_from_delta(self) -> None:
142 op = InsertOp(
143 op="insert",
144 address="src/utils.py::my_func",
145 position=None,
146 content_id="hash1",
147 content_summary="added my_func",
148 )
149 delta = StructuredDelta(domain="code", ops=[op], summary="1 symbol added")
150 commit = _make_commit(delta=delta)
151 evaluator = build_evaluator("symbol == 'my_func'")
152 results = evaluator(commit, {}, pathlib.Path("."))
153 assert len(results) >= 1
154 assert any("my_func" in r.get("detail", "") for r in results)
155
156 def test_change_added_match(self) -> None:
157 op = InsertOp(
158 op="insert",
159 address="src/foo.py::bar",
160 position=None,
161 content_id="h1",
162 content_summary="bar added",
163 )
164 delta = StructuredDelta(domain="code", ops=[op], summary="added bar")
165 commit = _make_commit(delta=delta)
166 evaluator = build_evaluator("change == 'added'")
167 results = evaluator(commit, {}, pathlib.Path("."))
168 assert len(results) >= 1
169
170 def test_invalid_query_raises(self) -> None:
171 with pytest.raises(ValueError):
172 build_evaluator("badfield == 'x'")