gabriel / muse public
reconcile.py python
210 lines 6.9 KB
8aa515d5 refactor: consolidate schema_version to single source of truth Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """muse reconcile — recommend merge ordering and integration strategy.
2
3 Reads active reservations, intents, and branch divergence to recommend:
4
5 1. **Merge ordering** — which branches should be merged first to minimize
6 downstream conflicts.
7 2. **Integration strategy** — fast-forward, squash, or rebase for each branch.
8 3. **Conflict hotspots** — symbols reserved by multiple agents that need
9 special attention.
10
11 ``muse reconcile`` is a *read-only* planning command. It does not write to
12 branches, commit history, or the coordination layer. It provides the plan;
13 agents execute it.
14
15 Why this exists
16 ---------------
17 In a system with millions of concurrent agents, merges happen constantly.
18 Without coordination, every merge introduces friction. ``muse reconcile``
19 gives an orchestration agent a complete picture of the current coordination
20 state and a recommended action plan.
21
22 Usage::
23
24 muse reconcile
25 muse reconcile --json
26
27 Output::
28
29 Reconciliation report
30 ──────────────────────────────────────────────────────────────
31
32 Active reservations: 3 Active intents: 2
33 Conflict hotspots: 1
34
35 Recommended merge order:
36 1. feature/billing (3 symbols, 0 predicted conflicts)
37 2. feature/auth (5 symbols, 1 predicted conflict)
38
39 Conflict hotspot:
40 src/billing.py::compute_total
41 reserved by: agent-41 (feature/billing), agent-42 (feature/auth)
42 recommendation: resolve feature/billing first; agent-42 must rebase
43
44 Integration strategy:
45 feature/billing → fast-forward (no conflicts predicted)
46 feature/auth → rebase onto main after feature/billing lands
47
48 Flags:
49
50 ``--json``
51 Emit the reconciliation report as JSON.
52 """
53
54 from __future__ import annotations
55
56 import json
57 import logging
58
59 import typer
60
61 from muse._version import __version__
62 from muse.core.coordination import active_reservations, load_all_intents
63 from muse.core.repo import require_repo
64
65 logger = logging.getLogger(__name__)
66
67 app = typer.Typer()
68
69
70 class _BranchSummary:
71 def __init__(self, branch: str) -> None:
72 self.branch = branch
73 self.reserved_addresses: list[str] = []
74 self.intents: list[str] = []
75 self.run_ids: set[str] = set()
76 self.conflict_count: int = 0
77
78 def to_dict(self) -> dict[str, str | int | list[str]]:
79 return {
80 "branch": self.branch,
81 "reserved_addresses": self.reserved_addresses,
82 "intents": self.intents,
83 "run_ids": sorted(self.run_ids),
84 "predicted_conflicts": self.conflict_count,
85 }
86
87
88 @app.callback(invoke_without_command=True)
89 def reconcile(
90 ctx: typer.Context,
91 as_json: bool = typer.Option(False, "--json", help="Emit report as JSON."),
92 ) -> None:
93 """Recommend merge ordering and integration strategy.
94
95 Reads coordination state (reservations + intents) and produces a
96 recommended action plan: which branches to merge first, what strategy
97 to use, and which conflict hotspots need manual attention.
98
99 Does not write anything — purely advisory output.
100 """
101 root = require_repo()
102 reservations = active_reservations(root)
103 intents = load_all_intents(root)
104
105 # Aggregate by branch.
106 branch_map: dict[str, _BranchSummary] = {}
107 for res in reservations:
108 b = res.branch
109 if b not in branch_map:
110 branch_map[b] = _BranchSummary(b)
111 branch_map[b].reserved_addresses.extend(res.addresses)
112 branch_map[b].run_ids.add(res.run_id)
113
114 for it in intents:
115 b = it.branch
116 if b not in branch_map:
117 branch_map[b] = _BranchSummary(b)
118 branch_map[b].intents.append(it.operation)
119 branch_map[b].run_ids.add(it.run_id)
120
121 # Detect conflict hotspots.
122 addr_branches: dict[str, list[str]] = {}
123 for res in reservations:
124 for addr in res.addresses:
125 addr_branches.setdefault(addr, []).append(res.branch)
126
127 hotspots: dict[str, list[str]] = {
128 addr: branches
129 for addr, branches in addr_branches.items()
130 if len(set(branches)) > 1
131 }
132
133 # Compute conflict counts per branch based on hotspot participation.
134 for addr, branches in hotspots.items():
135 unique_branches = list(dict.fromkeys(branches))
136 for b in unique_branches:
137 if b in branch_map:
138 branch_map[b].conflict_count += 1
139
140 # Recommend merge order: fewer conflicts → merge first.
141 ordered = sorted(
142 branch_map.values(),
143 key=lambda bs: (bs.conflict_count, len(bs.reserved_addresses)),
144 )
145
146 # Recommend integration strategies.
147 strategies: dict[str, str] = {}
148 for bs in ordered:
149 if bs.conflict_count == 0:
150 strategies[bs.branch] = "fast-forward (no conflicts predicted)"
151 elif bs.conflict_count <= 2:
152 strategies[bs.branch] = "rebase onto main before merging"
153 else:
154 strategies[bs.branch] = "manual conflict resolution required"
155
156 if as_json:
157 typer.echo(json.dumps(
158 {
159 "schema_version": __version__,
160 "active_reservations": len(reservations),
161 "active_intents": len(intents),
162 "conflict_hotspots": len(hotspots),
163 "branches": [bs.to_dict() for bs in ordered],
164 "recommended_merge_order": [bs.branch for bs in ordered],
165 "strategies": strategies,
166 "hotspots": [
167 {"address": addr, "branches": list(dict.fromkeys(brs))}
168 for addr, brs in sorted(hotspots.items())
169 ],
170 },
171 indent=2,
172 ))
173 return
174
175 typer.echo("\nReconciliation report")
176 typer.echo("─" * 62)
177 typer.echo(
178 f" Active reservations: {len(reservations)} "
179 f"Active intents: {len(intents)} "
180 f"Conflict hotspots: {len(hotspots)}"
181 )
182
183 if not reservations and not intents:
184 typer.echo(
185 "\n (no active coordination data — run 'muse reserve' or 'muse intent' first)"
186 )
187 return
188
189 if ordered:
190 typer.echo(f"\n Recommended merge order:")
191 for rank, bs in enumerate(ordered, 1):
192 c = bs.conflict_count
193 typer.echo(
194 f" {rank}. {bs.branch:<30} ({len(bs.reserved_addresses)} addresses, "
195 f"{c} conflict(s))"
196 )
197
198 if hotspots:
199 typer.echo(f"\n Conflict hotspot(s):")
200 for addr, branches in sorted(hotspots.items()):
201 unique = list(dict.fromkeys(branches))
202 typer.echo(f" {addr}")
203 typer.echo(f" reserved by: {', '.join(unique)}")
204 first = unique[0]
205 rest = ", ".join(unique[1:])
206 typer.echo(f" → resolve {first!r} first; {rest} must rebase")
207
208 typer.echo(f"\n Integration strategies:")
209 for bs in ordered:
210 typer.echo(f" {bs.branch:<30} → {strategies[bs.branch]}")