bisect.py
python
| 1 | """Bisect engine — binary search through commit history to locate regressions. |
| 2 | |
| 3 | ``muse bisect`` is a power-tool for both human engineers and autonomous agents. |
| 4 | Given a known-bad commit and a known-good commit, it performs a binary search |
| 5 | through the commits between them, asking at each midpoint "is this good or |
| 6 | bad?" until the first bad commit is isolated. |
| 7 | |
| 8 | Agent-safety |
| 9 | ------------ |
| 10 | The ``run`` subcommand accepts an arbitrary shell command. The command is |
| 11 | executed in a subprocess; the exit code determines the verdict: |
| 12 | |
| 13 | 0 → good (the bug is NOT present) |
| 14 | 125 → skip (cannot test this commit — e.g. build fails) |
| 15 | any other → bad (the bug IS present) |
| 16 | |
| 17 | This mirrors Git's bisect protocol so any existing test harness works without |
| 18 | modification. |
| 19 | |
| 20 | State file |
| 21 | ---------- |
| 22 | Bisect state is stored at ``.muse/BISECT_STATE.toml``:: |
| 23 | |
| 24 | bad_id = "<sha256>" |
| 25 | good_ids = ["<sha256>", …] |
| 26 | skipped_ids = ["<sha256>", …] |
| 27 | remaining = ["<sha256>", …] # sorted oldest-first |
| 28 | log = ["<sha256> <verdict> <ts>", …] |
| 29 | |
| 30 | The remaining list is rebuilt at every step so it tolerates interruptions. |
| 31 | """ |
| 32 | |
| 33 | from __future__ import annotations |
| 34 | |
| 35 | import datetime |
| 36 | import logging |
| 37 | import pathlib |
| 38 | import shutil |
| 39 | import subprocess |
| 40 | import sys |
| 41 | from dataclasses import dataclass, field |
| 42 | from typing import TypedDict |
| 43 | |
| 44 | logger = logging.getLogger(__name__) |
| 45 | |
| 46 | _BISECT_STATE_FILE = ".muse/BISECT_STATE.toml" |
| 47 | _SKIP_EXIT_CODE = 125 |
| 48 | |
| 49 | |
| 50 | # --------------------------------------------------------------------------- |
| 51 | # State persistence |
| 52 | # --------------------------------------------------------------------------- |
| 53 | |
| 54 | |
| 55 | class BisectStateDict(TypedDict, total=False): |
| 56 | """On-disk shape of the bisect state.""" |
| 57 | |
| 58 | bad_id: str |
| 59 | good_ids: list[str] |
| 60 | skipped_ids: list[str] |
| 61 | remaining: list[str] |
| 62 | log: list[str] |
| 63 | branch: str |
| 64 | |
| 65 | |
| 66 | def _state_path(repo_root: pathlib.Path) -> pathlib.Path: |
| 67 | return repo_root / ".muse" / "BISECT_STATE.toml" |
| 68 | |
| 69 | |
| 70 | def _load_state(repo_root: pathlib.Path) -> BisectStateDict | None: |
| 71 | """Return the bisect state if a session is active, else None.""" |
| 72 | import tomllib |
| 73 | |
| 74 | path = _state_path(repo_root) |
| 75 | if not path.exists(): |
| 76 | return None |
| 77 | try: |
| 78 | raw = tomllib.loads(path.read_text(encoding="utf-8")) |
| 79 | except Exception as exc: |
| 80 | logger.warning("⚠️ Could not read bisect state: %s", exc) |
| 81 | return None |
| 82 | state: BisectStateDict = {} |
| 83 | if "bad_id" in raw and isinstance(raw["bad_id"], str): |
| 84 | state["bad_id"] = raw["bad_id"] |
| 85 | if "branch" in raw and isinstance(raw["branch"], str): |
| 86 | state["branch"] = raw["branch"] |
| 87 | for key in ("good_ids", "skipped_ids", "remaining", "log"): |
| 88 | val = raw.get(key) |
| 89 | if isinstance(val, list) and all(isinstance(x, str) for x in val): |
| 90 | str_list: list[str] = list(val) |
| 91 | if key == "good_ids": |
| 92 | state["good_ids"] = str_list |
| 93 | elif key == "skipped_ids": |
| 94 | state["skipped_ids"] = str_list |
| 95 | elif key == "remaining": |
| 96 | state["remaining"] = str_list |
| 97 | elif key == "log": |
| 98 | state["log"] = str_list |
| 99 | return state |
| 100 | |
| 101 | |
| 102 | def _save_state(repo_root: pathlib.Path, state: BisectStateDict) -> None: |
| 103 | """Write the bisect state to disk (TOML, atomic).""" |
| 104 | path = _state_path(repo_root) |
| 105 | lines = [f'bad_id = "{state.get("bad_id", "")}"'] |
| 106 | if "branch" in state: |
| 107 | lines.append(f'branch = "{state["branch"]}"') |
| 108 | for key, items in ( |
| 109 | ("good_ids", state.get("good_ids", [])), |
| 110 | ("skipped_ids", state.get("skipped_ids", [])), |
| 111 | ("remaining", state.get("remaining", [])), |
| 112 | ("log", state.get("log", [])), |
| 113 | ): |
| 114 | formatted = ", ".join(f'"{v}"' for v in items) |
| 115 | lines.append(f"{key} = [{formatted}]") |
| 116 | tmp = path.with_suffix(".tmp") |
| 117 | tmp.write_text("\n".join(lines) + "\n", encoding="utf-8") |
| 118 | tmp.replace(path) |
| 119 | |
| 120 | |
| 121 | # --------------------------------------------------------------------------- |
| 122 | # Commit graph helpers |
| 123 | # --------------------------------------------------------------------------- |
| 124 | |
| 125 | |
| 126 | def _ancestors( |
| 127 | repo_root: pathlib.Path, |
| 128 | start_id: str, |
| 129 | ) -> list[str]: |
| 130 | """Return commit IDs from start_id back to root, oldest first.""" |
| 131 | from muse.core.store import read_commit |
| 132 | |
| 133 | visited: list[str] = [] |
| 134 | queue = [start_id] |
| 135 | seen: set[str] = set() |
| 136 | while queue: |
| 137 | cid = queue.pop() |
| 138 | if cid in seen: |
| 139 | continue |
| 140 | seen.add(cid) |
| 141 | commit = read_commit(repo_root, cid) |
| 142 | if commit is None: |
| 143 | continue |
| 144 | visited.append(cid) |
| 145 | if commit.parent_commit_id: |
| 146 | queue.append(commit.parent_commit_id) |
| 147 | if commit.parent2_commit_id: |
| 148 | queue.append(commit.parent2_commit_id) |
| 149 | # Reverse to get oldest-first chronological order. |
| 150 | return list(reversed(visited)) |
| 151 | |
| 152 | |
| 153 | def _reachable_from_good( |
| 154 | repo_root: pathlib.Path, |
| 155 | good_ids: list[str], |
| 156 | ) -> set[str]: |
| 157 | """Return all commits reachable (inclusive) from any good commit.""" |
| 158 | from muse.core.store import read_commit |
| 159 | |
| 160 | reachable: set[str] = set() |
| 161 | queue = list(good_ids) |
| 162 | while queue: |
| 163 | cid = queue.pop() |
| 164 | if cid in reachable: |
| 165 | continue |
| 166 | reachable.add(cid) |
| 167 | commit = read_commit(repo_root, cid) |
| 168 | if commit is None: |
| 169 | continue |
| 170 | if commit.parent_commit_id: |
| 171 | queue.append(commit.parent_commit_id) |
| 172 | if commit.parent2_commit_id: |
| 173 | queue.append(commit.parent2_commit_id) |
| 174 | return reachable |
| 175 | |
| 176 | |
| 177 | def _build_remaining( |
| 178 | repo_root: pathlib.Path, |
| 179 | bad_id: str, |
| 180 | good_ids: list[str], |
| 181 | skipped_ids: list[str], |
| 182 | ) -> list[str]: |
| 183 | """Return commits strictly between good and bad (exclusive of both endpoints). |
| 184 | |
| 185 | Excludes ``bad_id`` itself — it is already the known-bad boundary. |
| 186 | Excludes good-reachable commits (already confirmed good). |
| 187 | Excludes skipped commits. |
| 188 | |
| 189 | When this list has length 0, ``bad_id`` is the first bad commit. |
| 190 | When it has length 1, there is exactly one candidate to test — if it is |
| 191 | bad, it becomes the new ``bad_id`` and remaining collapses to 0; if it is |
| 192 | good, ``bad_id`` is confirmed as first-bad. |
| 193 | """ |
| 194 | ancestors_of_bad = _ancestors(repo_root, bad_id) |
| 195 | good_reachable = _reachable_from_good(repo_root, good_ids) |
| 196 | skipped = set(skipped_ids) |
| 197 | return [ |
| 198 | cid for cid in ancestors_of_bad |
| 199 | if cid not in good_reachable and cid not in skipped and cid != bad_id |
| 200 | ] |
| 201 | |
| 202 | |
| 203 | def _midpoint(remaining: list[str]) -> str | None: |
| 204 | if not remaining: |
| 205 | return None |
| 206 | return remaining[len(remaining) // 2] |
| 207 | |
| 208 | |
| 209 | # --------------------------------------------------------------------------- |
| 210 | # Result type |
| 211 | # --------------------------------------------------------------------------- |
| 212 | |
| 213 | |
| 214 | @dataclass |
| 215 | class BisectResult: |
| 216 | """Result of a single bisect step.""" |
| 217 | |
| 218 | done: bool = False |
| 219 | """True when we have isolated the first bad commit.""" |
| 220 | |
| 221 | first_bad: str | None = None |
| 222 | """Commit ID of the isolated first-bad commit.""" |
| 223 | |
| 224 | next_to_test: str | None = None |
| 225 | """Commit ID to test next (None when done).""" |
| 226 | |
| 227 | remaining_count: int = 0 |
| 228 | """How many commits remain to test.""" |
| 229 | |
| 230 | steps_remaining: int = 0 |
| 231 | """Approximate remaining binary-search steps.""" |
| 232 | |
| 233 | verdict: str = "" |
| 234 | """The verdict just applied: 'good', 'bad', 'skip'.""" |
| 235 | |
| 236 | |
| 237 | # --------------------------------------------------------------------------- |
| 238 | # Public API |
| 239 | # --------------------------------------------------------------------------- |
| 240 | |
| 241 | |
| 242 | def start_bisect( |
| 243 | repo_root: pathlib.Path, |
| 244 | bad_id: str, |
| 245 | good_ids: list[str], |
| 246 | branch: str = "", |
| 247 | ) -> BisectResult: |
| 248 | """Start a new bisect session. |
| 249 | |
| 250 | Args: |
| 251 | repo_root: Repository root. |
| 252 | bad_id: Known-bad commit. |
| 253 | good_ids: One or more known-good commits. |
| 254 | branch: Current branch name (for display only). |
| 255 | """ |
| 256 | remaining = _build_remaining(repo_root, bad_id, good_ids, skipped_ids=[]) |
| 257 | state: BisectStateDict = { |
| 258 | "bad_id": bad_id, |
| 259 | "good_ids": good_ids, |
| 260 | "skipped_ids": [], |
| 261 | "remaining": remaining, |
| 262 | "log": [ |
| 263 | f"{bad_id} bad {_ts()}", |
| 264 | *[f"{g} good {_ts()}" for g in good_ids], |
| 265 | ], |
| 266 | } |
| 267 | if branch: |
| 268 | state["branch"] = branch |
| 269 | _save_state(repo_root, state) |
| 270 | |
| 271 | import math |
| 272 | if not remaining: |
| 273 | # bad_id is immediately the first bad commit (no unknowns). |
| 274 | return BisectResult( |
| 275 | done=True, |
| 276 | first_bad=bad_id, |
| 277 | next_to_test=None, |
| 278 | remaining_count=0, |
| 279 | steps_remaining=0, |
| 280 | verdict="started", |
| 281 | ) |
| 282 | next_id = _midpoint(remaining) |
| 283 | steps = int(math.log2(len(remaining) + 1)) |
| 284 | return BisectResult( |
| 285 | done=False, |
| 286 | first_bad=None, |
| 287 | next_to_test=next_id, |
| 288 | remaining_count=len(remaining), |
| 289 | steps_remaining=steps, |
| 290 | verdict="started", |
| 291 | ) |
| 292 | |
| 293 | |
| 294 | def _ts() -> str: |
| 295 | return datetime.datetime.now(tz=datetime.timezone.utc).isoformat() |
| 296 | |
| 297 | |
| 298 | def _apply_verdict( |
| 299 | repo_root: pathlib.Path, |
| 300 | commit_id: str, |
| 301 | verdict: str, |
| 302 | ) -> BisectResult: |
| 303 | """Apply a 'good', 'bad', or 'skip' verdict and return the next step.""" |
| 304 | import math |
| 305 | |
| 306 | state = _load_state(repo_root) |
| 307 | if state is None: |
| 308 | raise RuntimeError("No bisect session in progress. Run 'muse bisect start' first.") |
| 309 | |
| 310 | bad_id = state.get("bad_id", "") |
| 311 | good_ids = list(state.get("good_ids", [])) |
| 312 | skipped_ids = list(state.get("skipped_ids", [])) |
| 313 | log = list(state.get("log", [])) |
| 314 | |
| 315 | if verdict == "good": |
| 316 | good_ids.append(commit_id) |
| 317 | elif verdict == "bad": |
| 318 | bad_id = commit_id |
| 319 | else: |
| 320 | skipped_ids.append(commit_id) |
| 321 | |
| 322 | log.append(f"{commit_id} {verdict} {_ts()}") |
| 323 | |
| 324 | remaining = _build_remaining(repo_root, bad_id, good_ids, skipped_ids) |
| 325 | new_state: BisectStateDict = { |
| 326 | "bad_id": bad_id, |
| 327 | "good_ids": good_ids, |
| 328 | "skipped_ids": skipped_ids, |
| 329 | "remaining": remaining, |
| 330 | "log": log, |
| 331 | } |
| 332 | if "branch" in state: |
| 333 | new_state["branch"] = state["branch"] |
| 334 | _save_state(repo_root, new_state) |
| 335 | |
| 336 | if len(remaining) == 0: |
| 337 | # No unknowns remain — bad_id is confirmed as the first bad commit. |
| 338 | return BisectResult( |
| 339 | done=True, |
| 340 | first_bad=bad_id, |
| 341 | next_to_test=None, |
| 342 | remaining_count=0, |
| 343 | steps_remaining=0, |
| 344 | verdict=verdict, |
| 345 | ) |
| 346 | |
| 347 | next_id = _midpoint(remaining) |
| 348 | steps = int(math.log2(len(remaining) + 1)) |
| 349 | return BisectResult( |
| 350 | done=False, |
| 351 | first_bad=None, |
| 352 | next_to_test=next_id, |
| 353 | remaining_count=len(remaining), |
| 354 | steps_remaining=steps, |
| 355 | verdict=verdict, |
| 356 | ) |
| 357 | |
| 358 | |
| 359 | def mark_good(repo_root: pathlib.Path, commit_id: str) -> BisectResult: |
| 360 | """Mark *commit_id* as good and advance the bisect.""" |
| 361 | return _apply_verdict(repo_root, commit_id, "good") |
| 362 | |
| 363 | |
| 364 | def mark_bad(repo_root: pathlib.Path, commit_id: str) -> BisectResult: |
| 365 | """Mark *commit_id* as bad and advance the bisect.""" |
| 366 | return _apply_verdict(repo_root, commit_id, "bad") |
| 367 | |
| 368 | |
| 369 | def skip_commit(repo_root: pathlib.Path, commit_id: str) -> BisectResult: |
| 370 | """Skip *commit_id* (e.g. build fails) and advance the bisect.""" |
| 371 | return _apply_verdict(repo_root, commit_id, "skip") |
| 372 | |
| 373 | |
| 374 | def reset_bisect(repo_root: pathlib.Path) -> None: |
| 375 | """End the bisect session and remove state.""" |
| 376 | path = _state_path(repo_root) |
| 377 | if path.exists(): |
| 378 | path.unlink() |
| 379 | |
| 380 | |
| 381 | def get_bisect_log(repo_root: pathlib.Path) -> list[str]: |
| 382 | """Return the bisect log entries, oldest first.""" |
| 383 | state = _load_state(repo_root) |
| 384 | if state is None: |
| 385 | return [] |
| 386 | return list(state.get("log", [])) |
| 387 | |
| 388 | |
| 389 | def is_bisect_active(repo_root: pathlib.Path) -> bool: |
| 390 | """Return True if a bisect session is in progress.""" |
| 391 | return _state_path(repo_root).exists() |
| 392 | |
| 393 | |
| 394 | def run_bisect_command( |
| 395 | repo_root: pathlib.Path, |
| 396 | command: str, |
| 397 | current_commit_id: str, |
| 398 | ) -> BisectResult: |
| 399 | """Run *command* in a shell, interpret exit code, and apply verdict. |
| 400 | |
| 401 | Exit codes:: |
| 402 | |
| 403 | 0 → good |
| 404 | 125 → skip |
| 405 | 1-124, 126-255 → bad |
| 406 | |
| 407 | The command is executed with the repository root as the working directory. |
| 408 | """ |
| 409 | result = subprocess.run( |
| 410 | command, |
| 411 | shell=True, |
| 412 | cwd=str(repo_root), |
| 413 | ) |
| 414 | code = result.returncode |
| 415 | if code == 0: |
| 416 | verdict = "good" |
| 417 | elif code == _SKIP_EXIT_CODE: |
| 418 | verdict = "skip" |
| 419 | else: |
| 420 | verdict = "bad" |
| 421 | return _apply_verdict(repo_root, current_commit_id, verdict) |