gabriel / muse public
bisect.py python
421 lines 12.3 KB
faec8c4d feat(hardening): add config/bisect CLI tests and fix bisect convergence bug Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """Bisect engine — binary search through commit history to locate regressions.
2
3 ``muse bisect`` is a power-tool for both human engineers and autonomous agents.
4 Given a known-bad commit and a known-good commit, it performs a binary search
5 through the commits between them, asking at each midpoint "is this good or
6 bad?" until the first bad commit is isolated.
7
8 Agent-safety
9 ------------
10 The ``run`` subcommand accepts an arbitrary shell command. The command is
11 executed in a subprocess; the exit code determines the verdict:
12
13 0 → good (the bug is NOT present)
14 125 → skip (cannot test this commit — e.g. build fails)
15 any other → bad (the bug IS present)
16
17 This mirrors Git's bisect protocol so any existing test harness works without
18 modification.
19
20 State file
21 ----------
22 Bisect state is stored at ``.muse/BISECT_STATE.toml``::
23
24 bad_id = "<sha256>"
25 good_ids = ["<sha256>", …]
26 skipped_ids = ["<sha256>", …]
27 remaining = ["<sha256>", …] # sorted oldest-first
28 log = ["<sha256> <verdict> <ts>", …]
29
30 The remaining list is rebuilt at every step so it tolerates interruptions.
31 """
32
33 from __future__ import annotations
34
35 import datetime
36 import logging
37 import pathlib
38 import shutil
39 import subprocess
40 import sys
41 from dataclasses import dataclass, field
42 from typing import TypedDict
43
44 logger = logging.getLogger(__name__)
45
46 _BISECT_STATE_FILE = ".muse/BISECT_STATE.toml"
47 _SKIP_EXIT_CODE = 125
48
49
50 # ---------------------------------------------------------------------------
51 # State persistence
52 # ---------------------------------------------------------------------------
53
54
55 class BisectStateDict(TypedDict, total=False):
56 """On-disk shape of the bisect state."""
57
58 bad_id: str
59 good_ids: list[str]
60 skipped_ids: list[str]
61 remaining: list[str]
62 log: list[str]
63 branch: str
64
65
66 def _state_path(repo_root: pathlib.Path) -> pathlib.Path:
67 return repo_root / ".muse" / "BISECT_STATE.toml"
68
69
70 def _load_state(repo_root: pathlib.Path) -> BisectStateDict | None:
71 """Return the bisect state if a session is active, else None."""
72 import tomllib
73
74 path = _state_path(repo_root)
75 if not path.exists():
76 return None
77 try:
78 raw = tomllib.loads(path.read_text(encoding="utf-8"))
79 except Exception as exc:
80 logger.warning("⚠️ Could not read bisect state: %s", exc)
81 return None
82 state: BisectStateDict = {}
83 if "bad_id" in raw and isinstance(raw["bad_id"], str):
84 state["bad_id"] = raw["bad_id"]
85 if "branch" in raw and isinstance(raw["branch"], str):
86 state["branch"] = raw["branch"]
87 for key in ("good_ids", "skipped_ids", "remaining", "log"):
88 val = raw.get(key)
89 if isinstance(val, list) and all(isinstance(x, str) for x in val):
90 str_list: list[str] = list(val)
91 if key == "good_ids":
92 state["good_ids"] = str_list
93 elif key == "skipped_ids":
94 state["skipped_ids"] = str_list
95 elif key == "remaining":
96 state["remaining"] = str_list
97 elif key == "log":
98 state["log"] = str_list
99 return state
100
101
102 def _save_state(repo_root: pathlib.Path, state: BisectStateDict) -> None:
103 """Write the bisect state to disk (TOML, atomic)."""
104 path = _state_path(repo_root)
105 lines = [f'bad_id = "{state.get("bad_id", "")}"']
106 if "branch" in state:
107 lines.append(f'branch = "{state["branch"]}"')
108 for key, items in (
109 ("good_ids", state.get("good_ids", [])),
110 ("skipped_ids", state.get("skipped_ids", [])),
111 ("remaining", state.get("remaining", [])),
112 ("log", state.get("log", [])),
113 ):
114 formatted = ", ".join(f'"{v}"' for v in items)
115 lines.append(f"{key} = [{formatted}]")
116 tmp = path.with_suffix(".tmp")
117 tmp.write_text("\n".join(lines) + "\n", encoding="utf-8")
118 tmp.replace(path)
119
120
121 # ---------------------------------------------------------------------------
122 # Commit graph helpers
123 # ---------------------------------------------------------------------------
124
125
126 def _ancestors(
127 repo_root: pathlib.Path,
128 start_id: str,
129 ) -> list[str]:
130 """Return commit IDs from start_id back to root, oldest first."""
131 from muse.core.store import read_commit
132
133 visited: list[str] = []
134 queue = [start_id]
135 seen: set[str] = set()
136 while queue:
137 cid = queue.pop()
138 if cid in seen:
139 continue
140 seen.add(cid)
141 commit = read_commit(repo_root, cid)
142 if commit is None:
143 continue
144 visited.append(cid)
145 if commit.parent_commit_id:
146 queue.append(commit.parent_commit_id)
147 if commit.parent2_commit_id:
148 queue.append(commit.parent2_commit_id)
149 # Reverse to get oldest-first chronological order.
150 return list(reversed(visited))
151
152
153 def _reachable_from_good(
154 repo_root: pathlib.Path,
155 good_ids: list[str],
156 ) -> set[str]:
157 """Return all commits reachable (inclusive) from any good commit."""
158 from muse.core.store import read_commit
159
160 reachable: set[str] = set()
161 queue = list(good_ids)
162 while queue:
163 cid = queue.pop()
164 if cid in reachable:
165 continue
166 reachable.add(cid)
167 commit = read_commit(repo_root, cid)
168 if commit is None:
169 continue
170 if commit.parent_commit_id:
171 queue.append(commit.parent_commit_id)
172 if commit.parent2_commit_id:
173 queue.append(commit.parent2_commit_id)
174 return reachable
175
176
177 def _build_remaining(
178 repo_root: pathlib.Path,
179 bad_id: str,
180 good_ids: list[str],
181 skipped_ids: list[str],
182 ) -> list[str]:
183 """Return commits strictly between good and bad (exclusive of both endpoints).
184
185 Excludes ``bad_id`` itself — it is already the known-bad boundary.
186 Excludes good-reachable commits (already confirmed good).
187 Excludes skipped commits.
188
189 When this list has length 0, ``bad_id`` is the first bad commit.
190 When it has length 1, there is exactly one candidate to test — if it is
191 bad, it becomes the new ``bad_id`` and remaining collapses to 0; if it is
192 good, ``bad_id`` is confirmed as first-bad.
193 """
194 ancestors_of_bad = _ancestors(repo_root, bad_id)
195 good_reachable = _reachable_from_good(repo_root, good_ids)
196 skipped = set(skipped_ids)
197 return [
198 cid for cid in ancestors_of_bad
199 if cid not in good_reachable and cid not in skipped and cid != bad_id
200 ]
201
202
203 def _midpoint(remaining: list[str]) -> str | None:
204 if not remaining:
205 return None
206 return remaining[len(remaining) // 2]
207
208
209 # ---------------------------------------------------------------------------
210 # Result type
211 # ---------------------------------------------------------------------------
212
213
214 @dataclass
215 class BisectResult:
216 """Result of a single bisect step."""
217
218 done: bool = False
219 """True when we have isolated the first bad commit."""
220
221 first_bad: str | None = None
222 """Commit ID of the isolated first-bad commit."""
223
224 next_to_test: str | None = None
225 """Commit ID to test next (None when done)."""
226
227 remaining_count: int = 0
228 """How many commits remain to test."""
229
230 steps_remaining: int = 0
231 """Approximate remaining binary-search steps."""
232
233 verdict: str = ""
234 """The verdict just applied: 'good', 'bad', 'skip'."""
235
236
237 # ---------------------------------------------------------------------------
238 # Public API
239 # ---------------------------------------------------------------------------
240
241
242 def start_bisect(
243 repo_root: pathlib.Path,
244 bad_id: str,
245 good_ids: list[str],
246 branch: str = "",
247 ) -> BisectResult:
248 """Start a new bisect session.
249
250 Args:
251 repo_root: Repository root.
252 bad_id: Known-bad commit.
253 good_ids: One or more known-good commits.
254 branch: Current branch name (for display only).
255 """
256 remaining = _build_remaining(repo_root, bad_id, good_ids, skipped_ids=[])
257 state: BisectStateDict = {
258 "bad_id": bad_id,
259 "good_ids": good_ids,
260 "skipped_ids": [],
261 "remaining": remaining,
262 "log": [
263 f"{bad_id} bad {_ts()}",
264 *[f"{g} good {_ts()}" for g in good_ids],
265 ],
266 }
267 if branch:
268 state["branch"] = branch
269 _save_state(repo_root, state)
270
271 import math
272 if not remaining:
273 # bad_id is immediately the first bad commit (no unknowns).
274 return BisectResult(
275 done=True,
276 first_bad=bad_id,
277 next_to_test=None,
278 remaining_count=0,
279 steps_remaining=0,
280 verdict="started",
281 )
282 next_id = _midpoint(remaining)
283 steps = int(math.log2(len(remaining) + 1))
284 return BisectResult(
285 done=False,
286 first_bad=None,
287 next_to_test=next_id,
288 remaining_count=len(remaining),
289 steps_remaining=steps,
290 verdict="started",
291 )
292
293
294 def _ts() -> str:
295 return datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
296
297
298 def _apply_verdict(
299 repo_root: pathlib.Path,
300 commit_id: str,
301 verdict: str,
302 ) -> BisectResult:
303 """Apply a 'good', 'bad', or 'skip' verdict and return the next step."""
304 import math
305
306 state = _load_state(repo_root)
307 if state is None:
308 raise RuntimeError("No bisect session in progress. Run 'muse bisect start' first.")
309
310 bad_id = state.get("bad_id", "")
311 good_ids = list(state.get("good_ids", []))
312 skipped_ids = list(state.get("skipped_ids", []))
313 log = list(state.get("log", []))
314
315 if verdict == "good":
316 good_ids.append(commit_id)
317 elif verdict == "bad":
318 bad_id = commit_id
319 else:
320 skipped_ids.append(commit_id)
321
322 log.append(f"{commit_id} {verdict} {_ts()}")
323
324 remaining = _build_remaining(repo_root, bad_id, good_ids, skipped_ids)
325 new_state: BisectStateDict = {
326 "bad_id": bad_id,
327 "good_ids": good_ids,
328 "skipped_ids": skipped_ids,
329 "remaining": remaining,
330 "log": log,
331 }
332 if "branch" in state:
333 new_state["branch"] = state["branch"]
334 _save_state(repo_root, new_state)
335
336 if len(remaining) == 0:
337 # No unknowns remain — bad_id is confirmed as the first bad commit.
338 return BisectResult(
339 done=True,
340 first_bad=bad_id,
341 next_to_test=None,
342 remaining_count=0,
343 steps_remaining=0,
344 verdict=verdict,
345 )
346
347 next_id = _midpoint(remaining)
348 steps = int(math.log2(len(remaining) + 1))
349 return BisectResult(
350 done=False,
351 first_bad=None,
352 next_to_test=next_id,
353 remaining_count=len(remaining),
354 steps_remaining=steps,
355 verdict=verdict,
356 )
357
358
359 def mark_good(repo_root: pathlib.Path, commit_id: str) -> BisectResult:
360 """Mark *commit_id* as good and advance the bisect."""
361 return _apply_verdict(repo_root, commit_id, "good")
362
363
364 def mark_bad(repo_root: pathlib.Path, commit_id: str) -> BisectResult:
365 """Mark *commit_id* as bad and advance the bisect."""
366 return _apply_verdict(repo_root, commit_id, "bad")
367
368
369 def skip_commit(repo_root: pathlib.Path, commit_id: str) -> BisectResult:
370 """Skip *commit_id* (e.g. build fails) and advance the bisect."""
371 return _apply_verdict(repo_root, commit_id, "skip")
372
373
374 def reset_bisect(repo_root: pathlib.Path) -> None:
375 """End the bisect session and remove state."""
376 path = _state_path(repo_root)
377 if path.exists():
378 path.unlink()
379
380
381 def get_bisect_log(repo_root: pathlib.Path) -> list[str]:
382 """Return the bisect log entries, oldest first."""
383 state = _load_state(repo_root)
384 if state is None:
385 return []
386 return list(state.get("log", []))
387
388
389 def is_bisect_active(repo_root: pathlib.Path) -> bool:
390 """Return True if a bisect session is in progress."""
391 return _state_path(repo_root).exists()
392
393
394 def run_bisect_command(
395 repo_root: pathlib.Path,
396 command: str,
397 current_commit_id: str,
398 ) -> BisectResult:
399 """Run *command* in a shell, interpret exit code, and apply verdict.
400
401 Exit codes::
402
403 0 → good
404 125 → skip
405 1-124, 126-255 → bad
406
407 The command is executed with the repository root as the working directory.
408 """
409 result = subprocess.run(
410 command,
411 shell=True,
412 cwd=str(repo_root),
413 )
414 code = result.returncode
415 if code == 0:
416 verdict = "good"
417 elif code == _SKIP_EXIT_CODE:
418 verdict = "skip"
419 else:
420 verdict = "bad"
421 return _apply_verdict(repo_root, current_commit_id, verdict)