test_coordination.py
python
| 1 | """Tests for muse/core/coordination.py — multi-agent coordination layer. |
| 2 | |
| 3 | Coverage |
| 4 | -------- |
| 5 | Directory helpers |
| 6 | - _ensure_coord_dirs creates .muse/coordination/reservations/ and intents/. |
| 7 | |
| 8 | Reservation |
| 9 | - create_reservation writes a valid JSON file. |
| 10 | - Reservation.from_dict / to_dict round-trip. |
| 11 | - Reservation.is_active() returns True for non-expired, False for expired. |
| 12 | - load_all_reservations loads all files including expired. |
| 13 | - active_reservations filters out expired reservations. |
| 14 | - Corrupt reservation file is skipped with a warning. |
| 15 | - Multiple reservations can coexist for the same address. |
| 16 | |
| 17 | Intent |
| 18 | - create_intent writes a valid JSON file. |
| 19 | - Intent.from_dict / to_dict round-trip. |
| 20 | - load_all_intents loads all files. |
| 21 | - Corrupt intent file is skipped. |
| 22 | |
| 23 | Schema |
| 24 | - All records have schema_version == 1. |
| 25 | - created_at and expires_at are ISO 8601 strings. |
| 26 | - operation field is None-able for reservations. |
| 27 | """ |
| 28 | |
| 29 | import datetime |
| 30 | import json |
| 31 | import pathlib |
| 32 | |
| 33 | import pytest |
| 34 | |
| 35 | from muse.core.coordination import ( |
| 36 | Intent, |
| 37 | Reservation, |
| 38 | active_reservations, |
| 39 | create_intent, |
| 40 | create_reservation, |
| 41 | load_all_intents, |
| 42 | load_all_reservations, |
| 43 | ) |
| 44 | |
| 45 | |
| 46 | # --------------------------------------------------------------------------- |
| 47 | # Helpers |
| 48 | # --------------------------------------------------------------------------- |
| 49 | |
| 50 | |
| 51 | def _now() -> datetime.datetime: |
| 52 | return datetime.datetime.now(datetime.timezone.utc) |
| 53 | |
| 54 | |
| 55 | def _future(seconds: int = 3600) -> datetime.datetime: |
| 56 | return _now() + datetime.timedelta(seconds=seconds) |
| 57 | |
| 58 | |
| 59 | def _past(seconds: int = 60) -> datetime.datetime: |
| 60 | return _now() - datetime.timedelta(seconds=seconds) |
| 61 | |
| 62 | |
| 63 | # --------------------------------------------------------------------------- |
| 64 | # Reservation — create and load |
| 65 | # --------------------------------------------------------------------------- |
| 66 | |
| 67 | |
| 68 | class TestCreateReservation: |
| 69 | def test_creates_json_file(self, tmp_path: pathlib.Path) -> None: |
| 70 | res = create_reservation( |
| 71 | tmp_path, |
| 72 | run_id="agent-1", |
| 73 | branch="feature-x", |
| 74 | addresses=["src/billing.py::compute_total"], |
| 75 | ) |
| 76 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 77 | assert rdir.exists() |
| 78 | files = list(rdir.glob("*.json")) |
| 79 | assert len(files) == 1 |
| 80 | data = json.loads(files[0].read_text()) |
| 81 | assert data["reservation_id"] == res.reservation_id |
| 82 | assert data["run_id"] == "agent-1" |
| 83 | assert data["branch"] == "feature-x" |
| 84 | assert data["addresses"] == ["src/billing.py::compute_total"] |
| 85 | assert data["schema_version"] == 1 |
| 86 | |
| 87 | def test_default_ttl_sets_future_expiry(self, tmp_path: pathlib.Path) -> None: |
| 88 | res = create_reservation(tmp_path, run_id="r", branch="main", addresses=[]) |
| 89 | assert res.expires_at > _now() |
| 90 | |
| 91 | def test_custom_ttl(self, tmp_path: pathlib.Path) -> None: |
| 92 | res = create_reservation( |
| 93 | tmp_path, run_id="r", branch="main", addresses=[], ttl_seconds=7200 |
| 94 | ) |
| 95 | delta = res.expires_at - res.created_at |
| 96 | assert abs(delta.total_seconds() - 7200) < 5 |
| 97 | |
| 98 | def test_operation_stored(self, tmp_path: pathlib.Path) -> None: |
| 99 | res = create_reservation( |
| 100 | tmp_path, run_id="r", branch="main", |
| 101 | addresses=["src/a.py::f"], operation="rename" |
| 102 | ) |
| 103 | assert res.operation == "rename" |
| 104 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 105 | data = json.loads(list(rdir.glob("*.json"))[0].read_text()) |
| 106 | assert data["operation"] == "rename" |
| 107 | |
| 108 | def test_none_operation(self, tmp_path: pathlib.Path) -> None: |
| 109 | res = create_reservation(tmp_path, run_id="r", branch="main", addresses=[]) |
| 110 | assert res.operation is None |
| 111 | |
| 112 | def test_multiple_addresses(self, tmp_path: pathlib.Path) -> None: |
| 113 | addrs = ["src/a.py::f", "src/b.py::g", "src/c.py::h"] |
| 114 | res = create_reservation(tmp_path, run_id="r", branch="main", addresses=addrs) |
| 115 | assert res.addresses == addrs |
| 116 | |
| 117 | def test_multiple_reservations_coexist(self, tmp_path: pathlib.Path) -> None: |
| 118 | create_reservation(tmp_path, run_id="a1", branch="main", addresses=["src/a.py::f"]) |
| 119 | create_reservation(tmp_path, run_id="a2", branch="main", addresses=["src/a.py::f"]) |
| 120 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 121 | assert len(list(rdir.glob("*.json"))) == 2 |
| 122 | |
| 123 | |
| 124 | # --------------------------------------------------------------------------- |
| 125 | # Reservation — to_dict / from_dict |
| 126 | # --------------------------------------------------------------------------- |
| 127 | |
| 128 | |
| 129 | class TestReservationRoundTrip: |
| 130 | def test_to_dict_from_dict(self) -> None: |
| 131 | now = _now() |
| 132 | future = _future() |
| 133 | res = Reservation( |
| 134 | reservation_id="test-uuid", |
| 135 | run_id="agent-7", |
| 136 | branch="feature-y", |
| 137 | addresses=["src/x.py::func"], |
| 138 | created_at=now, |
| 139 | expires_at=future, |
| 140 | operation="move", |
| 141 | ) |
| 142 | d = res.to_dict() |
| 143 | res2 = Reservation.from_dict(d) |
| 144 | assert res2.reservation_id == "test-uuid" |
| 145 | assert res2.run_id == "agent-7" |
| 146 | assert res2.branch == "feature-y" |
| 147 | assert res2.addresses == ["src/x.py::func"] |
| 148 | assert res2.operation == "move" |
| 149 | # Timestamps round-trip via ISO 8601 |
| 150 | assert abs((res2.expires_at - future).total_seconds()) < 1 |
| 151 | |
| 152 | def test_schema_version_in_dict(self) -> None: |
| 153 | res = Reservation( |
| 154 | reservation_id="x", run_id="r", branch="b", |
| 155 | addresses=[], created_at=_now(), expires_at=_future(), operation=None |
| 156 | ) |
| 157 | assert res.to_dict()["schema_version"] == 1 |
| 158 | |
| 159 | |
| 160 | # --------------------------------------------------------------------------- |
| 161 | # Reservation — is_active |
| 162 | # --------------------------------------------------------------------------- |
| 163 | |
| 164 | |
| 165 | class TestReservationIsActive: |
| 166 | def test_active_when_future_expiry(self, tmp_path: pathlib.Path) -> None: |
| 167 | res = create_reservation(tmp_path, run_id="r", branch="main", addresses=[], ttl_seconds=3600) |
| 168 | assert res.is_active() |
| 169 | |
| 170 | def test_inactive_when_past_expiry(self) -> None: |
| 171 | res = Reservation( |
| 172 | reservation_id="x", run_id="r", branch="b", |
| 173 | addresses=[], created_at=_past(120), expires_at=_past(60), operation=None |
| 174 | ) |
| 175 | assert not res.is_active() |
| 176 | |
| 177 | |
| 178 | # --------------------------------------------------------------------------- |
| 179 | # load_all_reservations / active_reservations |
| 180 | # --------------------------------------------------------------------------- |
| 181 | |
| 182 | |
| 183 | class TestLoadReservations: |
| 184 | def test_load_all_includes_expired(self, tmp_path: pathlib.Path) -> None: |
| 185 | create_reservation(tmp_path, run_id="r1", branch="main", addresses=[], ttl_seconds=3600) |
| 186 | # Manually write an expired reservation. |
| 187 | past = _past(120) |
| 188 | expired = Reservation( |
| 189 | reservation_id="expired-uuid", |
| 190 | run_id="r2", |
| 191 | branch="main", |
| 192 | addresses=[], |
| 193 | created_at=_past(200), |
| 194 | expires_at=past, |
| 195 | operation=None, |
| 196 | ) |
| 197 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 198 | rdir.mkdir(parents=True, exist_ok=True) |
| 199 | (rdir / "expired-uuid.json").write_text(json.dumps(expired.to_dict()) + "\n") |
| 200 | |
| 201 | all_res = load_all_reservations(tmp_path) |
| 202 | assert len(all_res) == 2 |
| 203 | |
| 204 | def test_active_reservations_filters_expired(self, tmp_path: pathlib.Path) -> None: |
| 205 | create_reservation(tmp_path, run_id="r1", branch="main", addresses=[], ttl_seconds=3600) |
| 206 | past = _past(120) |
| 207 | expired = Reservation( |
| 208 | reservation_id="expired-uuid", |
| 209 | run_id="r2", branch="main", addresses=[], |
| 210 | created_at=_past(200), expires_at=past, operation=None, |
| 211 | ) |
| 212 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 213 | rdir.mkdir(parents=True, exist_ok=True) |
| 214 | (rdir / "expired-uuid.json").write_text(json.dumps(expired.to_dict()) + "\n") |
| 215 | |
| 216 | active = active_reservations(tmp_path) |
| 217 | assert len(active) == 1 |
| 218 | assert active[0].run_id == "r1" |
| 219 | |
| 220 | def test_empty_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 221 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 222 | rdir.mkdir(parents=True, exist_ok=True) |
| 223 | assert load_all_reservations(tmp_path) == [] |
| 224 | |
| 225 | def test_nonexistent_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 226 | assert load_all_reservations(tmp_path) == [] |
| 227 | |
| 228 | def test_corrupt_file_skipped(self, tmp_path: pathlib.Path) -> None: |
| 229 | rdir = tmp_path / ".muse" / "coordination" / "reservations" |
| 230 | rdir.mkdir(parents=True, exist_ok=True) |
| 231 | (rdir / "bad.json").write_text("not valid json{{{") |
| 232 | result = load_all_reservations(tmp_path) |
| 233 | assert result == [] |
| 234 | |
| 235 | |
| 236 | # --------------------------------------------------------------------------- |
| 237 | # Intent — create and load |
| 238 | # --------------------------------------------------------------------------- |
| 239 | |
| 240 | |
| 241 | class TestCreateIntent: |
| 242 | def test_creates_json_file(self, tmp_path: pathlib.Path) -> None: |
| 243 | intent = create_intent( |
| 244 | tmp_path, |
| 245 | reservation_id="res-uuid", |
| 246 | run_id="agent-2", |
| 247 | branch="feature-z", |
| 248 | addresses=["src/billing.py::Invoice"], |
| 249 | operation="rename", |
| 250 | detail="rename to InvoiceRecord", |
| 251 | ) |
| 252 | idir = tmp_path / ".muse" / "coordination" / "intents" |
| 253 | assert idir.exists() |
| 254 | files = list(idir.glob("*.json")) |
| 255 | assert len(files) == 1 |
| 256 | data = json.loads(files[0].read_text()) |
| 257 | assert data["intent_id"] == intent.intent_id |
| 258 | assert data["reservation_id"] == "res-uuid" |
| 259 | assert data["operation"] == "rename" |
| 260 | assert data["detail"] == "rename to InvoiceRecord" |
| 261 | assert data["schema_version"] == 1 |
| 262 | |
| 263 | def test_empty_detail_defaults_to_empty_string(self, tmp_path: pathlib.Path) -> None: |
| 264 | intent = create_intent( |
| 265 | tmp_path, reservation_id="", run_id="r", branch="main", |
| 266 | addresses=[], operation="modify", |
| 267 | ) |
| 268 | assert intent.detail == "" |
| 269 | |
| 270 | def test_multiple_intents(self, tmp_path: pathlib.Path) -> None: |
| 271 | create_intent(tmp_path, reservation_id="", run_id="a", branch="main", |
| 272 | addresses=["x.py::f"], operation="rename") |
| 273 | create_intent(tmp_path, reservation_id="", run_id="b", branch="main", |
| 274 | addresses=["x.py::g"], operation="delete") |
| 275 | idir = tmp_path / ".muse" / "coordination" / "intents" |
| 276 | assert len(list(idir.glob("*.json"))) == 2 |
| 277 | |
| 278 | |
| 279 | # --------------------------------------------------------------------------- |
| 280 | # Intent — to_dict / from_dict |
| 281 | # --------------------------------------------------------------------------- |
| 282 | |
| 283 | |
| 284 | class TestIntentRoundTrip: |
| 285 | def test_to_dict_from_dict(self) -> None: |
| 286 | now = _now() |
| 287 | intent = Intent( |
| 288 | intent_id="intent-uuid", |
| 289 | reservation_id="res-uuid", |
| 290 | run_id="agent-3", |
| 291 | branch="dev", |
| 292 | addresses=["src/y.py::Bar"], |
| 293 | operation="extract", |
| 294 | created_at=now, |
| 295 | detail="extract helper", |
| 296 | ) |
| 297 | d = intent.to_dict() |
| 298 | intent2 = Intent.from_dict(d) |
| 299 | assert intent2.intent_id == "intent-uuid" |
| 300 | assert intent2.reservation_id == "res-uuid" |
| 301 | assert intent2.operation == "extract" |
| 302 | assert intent2.detail == "extract helper" |
| 303 | assert intent2.addresses == ["src/y.py::Bar"] |
| 304 | |
| 305 | def test_schema_version_in_dict(self) -> None: |
| 306 | intent = Intent( |
| 307 | intent_id="x", reservation_id="", run_id="r", branch="b", |
| 308 | addresses=[], operation="modify", created_at=_now(), detail="", |
| 309 | ) |
| 310 | assert intent.to_dict()["schema_version"] == 1 |
| 311 | |
| 312 | |
| 313 | # --------------------------------------------------------------------------- |
| 314 | # load_all_intents |
| 315 | # --------------------------------------------------------------------------- |
| 316 | |
| 317 | |
| 318 | class TestLoadAllIntents: |
| 319 | def test_empty_dir(self, tmp_path: pathlib.Path) -> None: |
| 320 | idir = tmp_path / ".muse" / "coordination" / "intents" |
| 321 | idir.mkdir(parents=True, exist_ok=True) |
| 322 | assert load_all_intents(tmp_path) == [] |
| 323 | |
| 324 | def test_nonexistent_dir(self, tmp_path: pathlib.Path) -> None: |
| 325 | assert load_all_intents(tmp_path) == [] |
| 326 | |
| 327 | def test_loads_created_intents(self, tmp_path: pathlib.Path) -> None: |
| 328 | create_intent(tmp_path, reservation_id="r", run_id="a", branch="main", |
| 329 | addresses=["x.py::f"], operation="rename") |
| 330 | create_intent(tmp_path, reservation_id="r", run_id="b", branch="dev", |
| 331 | addresses=["y.py::g"], operation="modify") |
| 332 | intents = load_all_intents(tmp_path) |
| 333 | assert len(intents) == 2 |
| 334 | ops = {i.operation for i in intents} |
| 335 | assert "rename" in ops |
| 336 | assert "modify" in ops |
| 337 | |
| 338 | def test_corrupt_intent_skipped(self, tmp_path: pathlib.Path) -> None: |
| 339 | idir = tmp_path / ".muse" / "coordination" / "intents" |
| 340 | idir.mkdir(parents=True, exist_ok=True) |
| 341 | (idir / "bad.json").write_text("{invalid") |
| 342 | result = load_all_intents(tmp_path) |
| 343 | assert result == [] |