cgcardona / muse public
muse_log_render.py python
210 lines 6.8 KB
d87ef453 Introduce Muse v2 architecture: domain-agnostic VCS with plugin interface Gabriel Cardona <gabriel@tellurstori.com> 4d ago
1 """Muse Log Renderer — ``git log --graph`` style ASCII visualization.
2
3 Takes a ``MuseLogGraph`` and produces:
4 1. ASCII graph with branch/merge lines
5 2. Pretty-printed JSON
6 3. Summary table
7
8 Pure rendering — no I/O, no DB, no mutations.
9 """
10
11 from __future__ import annotations
12
13 import json
14 from collections import defaultdict
15
16 from muse.plugins.music.services.muse_log_graph import MuseLogGraph, MuseLogNode
17
18
19 def render_ascii_graph(graph: MuseLogGraph) -> str:
20 """Render a ``git log --graph --oneline`` style ASCII visualization.
21
22 Processes nodes newest-first. Each active "column" tracks a
23 variation_id we expect to encounter next (following parent links).
24 Merges create forks; convergence collapses columns.
25 """
26 if not graph.nodes:
27 return "(empty graph)"
28
29 nodes = list(reversed(graph.nodes))
30 lines: list[str] = []
31 columns: list[str | None] = []
32
33 for node in nodes:
34 vid = node.variation_id
35
36 col = _index_of(columns, vid)
37 if col is None:
38 col = len(columns)
39 columns.append(vid)
40
41 n_cols = len(columns)
42 short = vid[:8]
43 head = " (HEAD)" if node.is_head else ""
44 intent = node.intent or ""
45 label = f"{short} {intent}{head}"
46
47 is_merge = node.parent2 is not None
48
49 # Draw the commit line
50 parts = _col_chars(columns, n_cols, active=col)
51 if is_merge:
52 lines.append(" ".join(parts) + f" {label}")
53 else:
54 lines.append(" ".join(parts) + f" {label}")
55
56 # Handle parent wiring
57 if is_merge:
58 # Primary parent stays in this column
59 columns[col] = node.parent
60
61 p2 = node.parent2
62 p2_col = _index_of(columns, p2)
63
64 if p2_col is not None and p2_col != col:
65 # Parent2 already tracked — draw convergence and collapse
66 lo, hi = min(col, p2_col), max(col, p2_col)
67 merge_parts: list[str] = []
68 for i in range(n_cols):
69 if i == lo:
70 merge_parts.append("|")
71 elif i == hi:
72 merge_parts.append("/")
73 elif columns[i] is not None:
74 merge_parts.append("|")
75 else:
76 merge_parts.append(" ")
77 lines.append(" ".join(merge_parts))
78 columns[hi] = None
79 else:
80 # Parent2 not yet tracked — open a new column
81 columns.append(p2)
82 fork_parts: list[str] = []
83 for i in range(len(columns)):
84 if i == col:
85 fork_parts.append("|")
86 elif i == len(columns) - 1:
87 fork_parts.append("\\")
88 elif columns[i] is not None:
89 fork_parts.append("|")
90 else:
91 fork_parts.append(" ")
92 lines.append(" ".join(fork_parts))
93 else:
94 # Simple linear commit — track parent
95 columns[col] = node.parent
96
97 # Draw convergence lines when multiple columns point to the same parent,
98 # then collapse the duplicate columns.
99 _draw_and_collapse_duplicates(columns, lines)
100
101 # Trim trailing dead columns
102 while columns and columns[-1] is None:
103 columns.pop()
104
105 return "\n".join(lines)
106
107
108 def render_json(graph: MuseLogGraph) -> str:
109 """Pretty-print the MuseLogGraph JSON."""
110 return json.dumps(graph.to_response().model_dump(), indent=2, default=str)
111
112
113 def render_summary_table(
114 graph: MuseLogGraph,
115 *,
116 checkouts_executed: int = 0,
117 drift_blocks: int = 0,
118 conflict_merges: int = 0,
119 forced_ops: int = 0,
120 ) -> str:
121 """Render a summary statistics table."""
122 total_commits = len(graph.nodes)
123 merges = sum(1 for n in graph.nodes if n.parent2 is not None)
124
125 child_set: set[str] = set()
126 for n in graph.nodes:
127 if n.parent:
128 child_set.add(n.parent)
129 if n.parent2:
130 child_set.add(n.parent2)
131 leaf_nodes = [n for n in graph.nodes if n.variation_id not in child_set]
132 branch_heads = len(leaf_nodes)
133
134 rows = [
135 ("Commits", str(total_commits)),
136 ("Merges", str(merges)),
137 ("Branch heads", str(branch_heads)),
138 ("Conflict merges attempted", str(conflict_merges)),
139 ("Checkouts executed", str(checkouts_executed)),
140 ("Drift blocks", str(drift_blocks)),
141 ("Forced operations", str(forced_ops)),
142 ]
143 max_label = max(len(r[0]) for r in rows)
144 lines = ["┌" + "─" * (max_label + 2) + "┬" + "──────┐"]
145 for label, value in rows:
146 lines.append(f"│ {label:<{max_label}} │ {value:>4} │")
147 lines.append("└" + "─" * (max_label + 2) + "┴" + "──────┘")
148 return "\n".join(lines)
149
150
151 # ── Private helpers ───────────────────────────────────────────────────────
152
153
154 def _index_of(columns: list[str | None], vid: str | None) -> int | None:
155 if vid is None:
156 return None
157 for i, c in enumerate(columns):
158 if c == vid:
159 return i
160 return None
161
162
163 def _col_chars(columns: list[str | None], n: int, active: int) -> list[str]:
164 parts: list[str] = []
165 for i in range(n):
166 if i == active:
167 parts.append("*")
168 elif columns[i] is not None:
169 parts.append("|")
170 else:
171 parts.append(" ")
172 return parts
173
174
175 def _draw_and_collapse_duplicates(
176 columns: list[str | None], lines: list[str],
177 ) -> None:
178 """When multiple columns track the same parent, draw ``/`` convergence
179 lines and collapse the rightmost duplicates."""
180 changed = True
181 while changed:
182 changed = False
183 seen: dict[str, int] = {}
184 for i, c in enumerate(columns):
185 if c is None:
186 continue
187 if c in seen:
188 keep, remove = seen[c], i
189 lo, hi = min(keep, remove), max(keep, remove)
190 n = len(columns)
191 parts: list[str] = []
192 for j in range(n):
193 if j == lo:
194 parts.append("|")
195 elif j == hi:
196 parts.append("/")
197 elif columns[j] is not None:
198 parts.append("|")
199 else:
200 parts.append(" ")
201 lines.append(" ".join(parts))
202 columns[hi] = None
203 # Trim trailing Nones immediately so subsequent
204 # iterations see a clean column list.
205 while columns and columns[-1] is None:
206 columns.pop()
207 changed = True
208 break
209 else:
210 seen[c] = i