gabriel / muse public
reconcile.py python
207 lines 6.8 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 5d ago
1 """muse reconcile — recommend merge ordering and integration strategy.
2
3 Reads active reservations, intents, and branch divergence to recommend:
4
5 1. **Merge ordering** — which branches should be merged first to minimize
6 downstream conflicts.
7 2. **Integration strategy** — fast-forward, squash, or rebase for each branch.
8 3. **Conflict hotspots** — symbols reserved by multiple agents that need
9 special attention.
10
11 ``muse reconcile`` is a *read-only* planning command. It does not write to
12 branches, commit history, or the coordination layer. It provides the plan;
13 agents execute it.
14
15 Why this exists
16 ---------------
17 In a system with millions of concurrent agents, merges happen constantly.
18 Without coordination, every merge introduces friction. ``muse reconcile``
19 gives an orchestration agent a complete picture of the current coordination
20 state and a recommended action plan.
21
22 Usage::
23
24 muse reconcile
25 muse reconcile --json
26
27 Output::
28
29 Reconciliation report
30 ──────────────────────────────────────────────────────────────
31
32 Active reservations: 3 Active intents: 2
33 Conflict hotspots: 1
34
35 Recommended merge order:
36 1. feature/billing (3 symbols, 0 predicted conflicts)
37 2. feature/auth (5 symbols, 1 predicted conflict)
38
39 Conflict hotspot:
40 src/billing.py::compute_total
41 reserved by: agent-41 (feature/billing), agent-42 (feature/auth)
42 recommendation: resolve feature/billing first; agent-42 must rebase
43
44 Integration strategy:
45 feature/billing → fast-forward (no conflicts predicted)
46 feature/auth → rebase onto main after feature/billing lands
47
48 Flags:
49
50 ``--json``
51 Emit the reconciliation report as JSON.
52 """
53
54 import json
55 import logging
56
57 import typer
58
59 from muse.core.coordination import active_reservations, load_all_intents
60 from muse.core.repo import require_repo
61
62 logger = logging.getLogger(__name__)
63
64 app = typer.Typer()
65
66
67 class _BranchSummary:
68 def __init__(self, branch: str) -> None:
69 self.branch = branch
70 self.reserved_addresses: list[str] = []
71 self.intents: list[str] = []
72 self.run_ids: set[str] = set()
73 self.conflict_count: int = 0
74
75 def to_dict(self) -> dict[str, str | int | list[str]]:
76 return {
77 "branch": self.branch,
78 "reserved_addresses": self.reserved_addresses,
79 "intents": self.intents,
80 "run_ids": sorted(self.run_ids),
81 "predicted_conflicts": self.conflict_count,
82 }
83
84
85 @app.callback(invoke_without_command=True)
86 def reconcile(
87 ctx: typer.Context,
88 as_json: bool = typer.Option(False, "--json", help="Emit report as JSON."),
89 ) -> None:
90 """Recommend merge ordering and integration strategy.
91
92 Reads coordination state (reservations + intents) and produces a
93 recommended action plan: which branches to merge first, what strategy
94 to use, and which conflict hotspots need manual attention.
95
96 Does not write anything — purely advisory output.
97 """
98 root = require_repo()
99 reservations = active_reservations(root)
100 intents = load_all_intents(root)
101
102 # Aggregate by branch.
103 branch_map: dict[str, _BranchSummary] = {}
104 for res in reservations:
105 b = res.branch
106 if b not in branch_map:
107 branch_map[b] = _BranchSummary(b)
108 branch_map[b].reserved_addresses.extend(res.addresses)
109 branch_map[b].run_ids.add(res.run_id)
110
111 for it in intents:
112 b = it.branch
113 if b not in branch_map:
114 branch_map[b] = _BranchSummary(b)
115 branch_map[b].intents.append(it.operation)
116 branch_map[b].run_ids.add(it.run_id)
117
118 # Detect conflict hotspots.
119 addr_branches: dict[str, list[str]] = {}
120 for res in reservations:
121 for addr in res.addresses:
122 addr_branches.setdefault(addr, []).append(res.branch)
123
124 hotspots: dict[str, list[str]] = {
125 addr: branches
126 for addr, branches in addr_branches.items()
127 if len(set(branches)) > 1
128 }
129
130 # Compute conflict counts per branch based on hotspot participation.
131 for addr, branches in hotspots.items():
132 unique_branches = list(dict.fromkeys(branches))
133 for b in unique_branches:
134 if b in branch_map:
135 branch_map[b].conflict_count += 1
136
137 # Recommend merge order: fewer conflicts → merge first.
138 ordered = sorted(
139 branch_map.values(),
140 key=lambda bs: (bs.conflict_count, len(bs.reserved_addresses)),
141 )
142
143 # Recommend integration strategies.
144 strategies: dict[str, str] = {}
145 for bs in ordered:
146 if bs.conflict_count == 0:
147 strategies[bs.branch] = "fast-forward (no conflicts predicted)"
148 elif bs.conflict_count <= 2:
149 strategies[bs.branch] = "rebase onto main before merging"
150 else:
151 strategies[bs.branch] = "manual conflict resolution required"
152
153 if as_json:
154 typer.echo(json.dumps(
155 {
156 "schema_version": 1,
157 "active_reservations": len(reservations),
158 "active_intents": len(intents),
159 "conflict_hotspots": len(hotspots),
160 "branches": [bs.to_dict() for bs in ordered],
161 "recommended_merge_order": [bs.branch for bs in ordered],
162 "strategies": strategies,
163 "hotspots": [
164 {"address": addr, "branches": list(dict.fromkeys(brs))}
165 for addr, brs in sorted(hotspots.items())
166 ],
167 },
168 indent=2,
169 ))
170 return
171
172 typer.echo("\nReconciliation report")
173 typer.echo("─" * 62)
174 typer.echo(
175 f" Active reservations: {len(reservations)} "
176 f"Active intents: {len(intents)} "
177 f"Conflict hotspots: {len(hotspots)}"
178 )
179
180 if not reservations and not intents:
181 typer.echo(
182 "\n (no active coordination data — run 'muse reserve' or 'muse intent' first)"
183 )
184 return
185
186 if ordered:
187 typer.echo(f"\n Recommended merge order:")
188 for rank, bs in enumerate(ordered, 1):
189 c = bs.conflict_count
190 typer.echo(
191 f" {rank}. {bs.branch:<30} ({len(bs.reserved_addresses)} addresses, "
192 f"{c} conflict(s))"
193 )
194
195 if hotspots:
196 typer.echo(f"\n Conflict hotspot(s):")
197 for addr, branches in sorted(hotspots.items()):
198 unique = list(dict.fromkeys(branches))
199 typer.echo(f" {addr}")
200 typer.echo(f" reserved by: {', '.join(unique)}")
201 first = unique[0]
202 rest = ", ".join(unique[1:])
203 typer.echo(f" → resolve {first!r} first; {rest} must rebase")
204
205 typer.echo(f"\n Integration strategies:")
206 for bs in ordered:
207 typer.echo(f" {bs.branch:<30} → {strategies[bs.branch]}")