cgcardona / muse public
test_coordination.py python
343 lines 13.0 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for muse/core/coordination.py — multi-agent coordination layer.
2
3 Coverage
4 --------
5 Directory helpers
6 - _ensure_coord_dirs creates .muse/coordination/reservations/ and intents/.
7
8 Reservation
9 - create_reservation writes a valid JSON file.
10 - Reservation.from_dict / to_dict round-trip.
11 - Reservation.is_active() returns True for non-expired, False for expired.
12 - load_all_reservations loads all files including expired.
13 - active_reservations filters out expired reservations.
14 - Corrupt reservation file is skipped with a warning.
15 - Multiple reservations can coexist for the same address.
16
17 Intent
18 - create_intent writes a valid JSON file.
19 - Intent.from_dict / to_dict round-trip.
20 - load_all_intents loads all files.
21 - Corrupt intent file is skipped.
22
23 Schema
24 - All records have schema_version == 1.
25 - created_at and expires_at are ISO 8601 strings.
26 - operation field is None-able for reservations.
27 """
28
29 import datetime
30 import json
31 import pathlib
32
33 import pytest
34
35 from muse.core.coordination import (
36 Intent,
37 Reservation,
38 active_reservations,
39 create_intent,
40 create_reservation,
41 load_all_intents,
42 load_all_reservations,
43 )
44
45
46 # ---------------------------------------------------------------------------
47 # Helpers
48 # ---------------------------------------------------------------------------
49
50
51 def _now() -> datetime.datetime:
52 return datetime.datetime.now(datetime.timezone.utc)
53
54
55 def _future(seconds: int = 3600) -> datetime.datetime:
56 return _now() + datetime.timedelta(seconds=seconds)
57
58
59 def _past(seconds: int = 60) -> datetime.datetime:
60 return _now() - datetime.timedelta(seconds=seconds)
61
62
63 # ---------------------------------------------------------------------------
64 # Reservation — create and load
65 # ---------------------------------------------------------------------------
66
67
68 class TestCreateReservation:
69 def test_creates_json_file(self, tmp_path: pathlib.Path) -> None:
70 res = create_reservation(
71 tmp_path,
72 run_id="agent-1",
73 branch="feature-x",
74 addresses=["src/billing.py::compute_total"],
75 )
76 rdir = tmp_path / ".muse" / "coordination" / "reservations"
77 assert rdir.exists()
78 files = list(rdir.glob("*.json"))
79 assert len(files) == 1
80 data = json.loads(files[0].read_text())
81 assert data["reservation_id"] == res.reservation_id
82 assert data["run_id"] == "agent-1"
83 assert data["branch"] == "feature-x"
84 assert data["addresses"] == ["src/billing.py::compute_total"]
85 assert data["schema_version"] == 1
86
87 def test_default_ttl_sets_future_expiry(self, tmp_path: pathlib.Path) -> None:
88 res = create_reservation(tmp_path, run_id="r", branch="main", addresses=[])
89 assert res.expires_at > _now()
90
91 def test_custom_ttl(self, tmp_path: pathlib.Path) -> None:
92 res = create_reservation(
93 tmp_path, run_id="r", branch="main", addresses=[], ttl_seconds=7200
94 )
95 delta = res.expires_at - res.created_at
96 assert abs(delta.total_seconds() - 7200) < 5
97
98 def test_operation_stored(self, tmp_path: pathlib.Path) -> None:
99 res = create_reservation(
100 tmp_path, run_id="r", branch="main",
101 addresses=["src/a.py::f"], operation="rename"
102 )
103 assert res.operation == "rename"
104 rdir = tmp_path / ".muse" / "coordination" / "reservations"
105 data = json.loads(list(rdir.glob("*.json"))[0].read_text())
106 assert data["operation"] == "rename"
107
108 def test_none_operation(self, tmp_path: pathlib.Path) -> None:
109 res = create_reservation(tmp_path, run_id="r", branch="main", addresses=[])
110 assert res.operation is None
111
112 def test_multiple_addresses(self, tmp_path: pathlib.Path) -> None:
113 addrs = ["src/a.py::f", "src/b.py::g", "src/c.py::h"]
114 res = create_reservation(tmp_path, run_id="r", branch="main", addresses=addrs)
115 assert res.addresses == addrs
116
117 def test_multiple_reservations_coexist(self, tmp_path: pathlib.Path) -> None:
118 create_reservation(tmp_path, run_id="a1", branch="main", addresses=["src/a.py::f"])
119 create_reservation(tmp_path, run_id="a2", branch="main", addresses=["src/a.py::f"])
120 rdir = tmp_path / ".muse" / "coordination" / "reservations"
121 assert len(list(rdir.glob("*.json"))) == 2
122
123
124 # ---------------------------------------------------------------------------
125 # Reservation — to_dict / from_dict
126 # ---------------------------------------------------------------------------
127
128
129 class TestReservationRoundTrip:
130 def test_to_dict_from_dict(self) -> None:
131 now = _now()
132 future = _future()
133 res = Reservation(
134 reservation_id="test-uuid",
135 run_id="agent-7",
136 branch="feature-y",
137 addresses=["src/x.py::func"],
138 created_at=now,
139 expires_at=future,
140 operation="move",
141 )
142 d = res.to_dict()
143 res2 = Reservation.from_dict(d)
144 assert res2.reservation_id == "test-uuid"
145 assert res2.run_id == "agent-7"
146 assert res2.branch == "feature-y"
147 assert res2.addresses == ["src/x.py::func"]
148 assert res2.operation == "move"
149 # Timestamps round-trip via ISO 8601
150 assert abs((res2.expires_at - future).total_seconds()) < 1
151
152 def test_schema_version_in_dict(self) -> None:
153 res = Reservation(
154 reservation_id="x", run_id="r", branch="b",
155 addresses=[], created_at=_now(), expires_at=_future(), operation=None
156 )
157 assert res.to_dict()["schema_version"] == 1
158
159
160 # ---------------------------------------------------------------------------
161 # Reservation — is_active
162 # ---------------------------------------------------------------------------
163
164
165 class TestReservationIsActive:
166 def test_active_when_future_expiry(self, tmp_path: pathlib.Path) -> None:
167 res = create_reservation(tmp_path, run_id="r", branch="main", addresses=[], ttl_seconds=3600)
168 assert res.is_active()
169
170 def test_inactive_when_past_expiry(self) -> None:
171 res = Reservation(
172 reservation_id="x", run_id="r", branch="b",
173 addresses=[], created_at=_past(120), expires_at=_past(60), operation=None
174 )
175 assert not res.is_active()
176
177
178 # ---------------------------------------------------------------------------
179 # load_all_reservations / active_reservations
180 # ---------------------------------------------------------------------------
181
182
183 class TestLoadReservations:
184 def test_load_all_includes_expired(self, tmp_path: pathlib.Path) -> None:
185 create_reservation(tmp_path, run_id="r1", branch="main", addresses=[], ttl_seconds=3600)
186 # Manually write an expired reservation.
187 past = _past(120)
188 expired = Reservation(
189 reservation_id="expired-uuid",
190 run_id="r2",
191 branch="main",
192 addresses=[],
193 created_at=_past(200),
194 expires_at=past,
195 operation=None,
196 )
197 rdir = tmp_path / ".muse" / "coordination" / "reservations"
198 rdir.mkdir(parents=True, exist_ok=True)
199 (rdir / "expired-uuid.json").write_text(json.dumps(expired.to_dict()) + "\n")
200
201 all_res = load_all_reservations(tmp_path)
202 assert len(all_res) == 2
203
204 def test_active_reservations_filters_expired(self, tmp_path: pathlib.Path) -> None:
205 create_reservation(tmp_path, run_id="r1", branch="main", addresses=[], ttl_seconds=3600)
206 past = _past(120)
207 expired = Reservation(
208 reservation_id="expired-uuid",
209 run_id="r2", branch="main", addresses=[],
210 created_at=_past(200), expires_at=past, operation=None,
211 )
212 rdir = tmp_path / ".muse" / "coordination" / "reservations"
213 rdir.mkdir(parents=True, exist_ok=True)
214 (rdir / "expired-uuid.json").write_text(json.dumps(expired.to_dict()) + "\n")
215
216 active = active_reservations(tmp_path)
217 assert len(active) == 1
218 assert active[0].run_id == "r1"
219
220 def test_empty_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None:
221 rdir = tmp_path / ".muse" / "coordination" / "reservations"
222 rdir.mkdir(parents=True, exist_ok=True)
223 assert load_all_reservations(tmp_path) == []
224
225 def test_nonexistent_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None:
226 assert load_all_reservations(tmp_path) == []
227
228 def test_corrupt_file_skipped(self, tmp_path: pathlib.Path) -> None:
229 rdir = tmp_path / ".muse" / "coordination" / "reservations"
230 rdir.mkdir(parents=True, exist_ok=True)
231 (rdir / "bad.json").write_text("not valid json{{{")
232 result = load_all_reservations(tmp_path)
233 assert result == []
234
235
236 # ---------------------------------------------------------------------------
237 # Intent — create and load
238 # ---------------------------------------------------------------------------
239
240
241 class TestCreateIntent:
242 def test_creates_json_file(self, tmp_path: pathlib.Path) -> None:
243 intent = create_intent(
244 tmp_path,
245 reservation_id="res-uuid",
246 run_id="agent-2",
247 branch="feature-z",
248 addresses=["src/billing.py::Invoice"],
249 operation="rename",
250 detail="rename to InvoiceRecord",
251 )
252 idir = tmp_path / ".muse" / "coordination" / "intents"
253 assert idir.exists()
254 files = list(idir.glob("*.json"))
255 assert len(files) == 1
256 data = json.loads(files[0].read_text())
257 assert data["intent_id"] == intent.intent_id
258 assert data["reservation_id"] == "res-uuid"
259 assert data["operation"] == "rename"
260 assert data["detail"] == "rename to InvoiceRecord"
261 assert data["schema_version"] == 1
262
263 def test_empty_detail_defaults_to_empty_string(self, tmp_path: pathlib.Path) -> None:
264 intent = create_intent(
265 tmp_path, reservation_id="", run_id="r", branch="main",
266 addresses=[], operation="modify",
267 )
268 assert intent.detail == ""
269
270 def test_multiple_intents(self, tmp_path: pathlib.Path) -> None:
271 create_intent(tmp_path, reservation_id="", run_id="a", branch="main",
272 addresses=["x.py::f"], operation="rename")
273 create_intent(tmp_path, reservation_id="", run_id="b", branch="main",
274 addresses=["x.py::g"], operation="delete")
275 idir = tmp_path / ".muse" / "coordination" / "intents"
276 assert len(list(idir.glob("*.json"))) == 2
277
278
279 # ---------------------------------------------------------------------------
280 # Intent — to_dict / from_dict
281 # ---------------------------------------------------------------------------
282
283
284 class TestIntentRoundTrip:
285 def test_to_dict_from_dict(self) -> None:
286 now = _now()
287 intent = Intent(
288 intent_id="intent-uuid",
289 reservation_id="res-uuid",
290 run_id="agent-3",
291 branch="dev",
292 addresses=["src/y.py::Bar"],
293 operation="extract",
294 created_at=now,
295 detail="extract helper",
296 )
297 d = intent.to_dict()
298 intent2 = Intent.from_dict(d)
299 assert intent2.intent_id == "intent-uuid"
300 assert intent2.reservation_id == "res-uuid"
301 assert intent2.operation == "extract"
302 assert intent2.detail == "extract helper"
303 assert intent2.addresses == ["src/y.py::Bar"]
304
305 def test_schema_version_in_dict(self) -> None:
306 intent = Intent(
307 intent_id="x", reservation_id="", run_id="r", branch="b",
308 addresses=[], operation="modify", created_at=_now(), detail="",
309 )
310 assert intent.to_dict()["schema_version"] == 1
311
312
313 # ---------------------------------------------------------------------------
314 # load_all_intents
315 # ---------------------------------------------------------------------------
316
317
318 class TestLoadAllIntents:
319 def test_empty_dir(self, tmp_path: pathlib.Path) -> None:
320 idir = tmp_path / ".muse" / "coordination" / "intents"
321 idir.mkdir(parents=True, exist_ok=True)
322 assert load_all_intents(tmp_path) == []
323
324 def test_nonexistent_dir(self, tmp_path: pathlib.Path) -> None:
325 assert load_all_intents(tmp_path) == []
326
327 def test_loads_created_intents(self, tmp_path: pathlib.Path) -> None:
328 create_intent(tmp_path, reservation_id="r", run_id="a", branch="main",
329 addresses=["x.py::f"], operation="rename")
330 create_intent(tmp_path, reservation_id="r", run_id="b", branch="dev",
331 addresses=["y.py::g"], operation="modify")
332 intents = load_all_intents(tmp_path)
333 assert len(intents) == 2
334 ops = {i.operation for i in intents}
335 assert "rename" in ops
336 assert "modify" in ops
337
338 def test_corrupt_intent_skipped(self, tmp_path: pathlib.Path) -> None:
339 idir = tmp_path / ".muse" / "coordination" / "intents"
340 idir.mkdir(parents=True, exist_ok=True)
341 (idir / "bad.json").write_text("{invalid")
342 result = load_all_intents(tmp_path)
343 assert result == []