gabriel / muse public
log.py python
223 lines 8.0 KB
e8c4265e feat(hardening): final sweep — security, performance, API consistency, docs Gabriel Cardona <gabriel@tellurstori.com> 2d ago
1 """muse log — display commit history.
2
3 Output modes
4 ------------
5
6 Default::
7
8 commit a1b2c3d4 (HEAD -> main)
9 Author: gabriel
10 Date: 2026-03-16 12:00:00 UTC
11
12 Add verse melody
13
14 --oneline::
15
16 a1b2c3d4 (HEAD -> main) Add verse melody
17 f9e8d7c6 Initial commit
18
19 --graph::
20
21 * a1b2c3d4 (HEAD -> main) Add verse melody
22 * f9e8d7c6 Initial commit
23
24 --stat::
25
26 commit a1b2c3d4 (HEAD -> main)
27 Date: 2026-03-16 12:00:00 UTC
28
29 Add verse melody
30
31 tracks/drums.mid | added
32 1 file changed
33
34 Filters: --since, --until, --author, --section, --track, --emotion
35 """
36
37 from __future__ import annotations
38
39 import json
40 import logging
41 import pathlib
42 import re
43 from datetime import datetime, timedelta, timezone
44
45 import typer
46
47 from muse.core.errors import ExitCode
48 from muse.core.repo import require_repo
49 from muse.core.store import CommitRecord, get_commit_snapshot_manifest, get_commits_for_branch, read_current_branch
50 from muse.core.validation import sanitize_display
51
52 logger = logging.getLogger(__name__)
53
54 app = typer.Typer()
55
56 _DEFAULT_LIMIT = 1000
57
58
59 def _read_branch(root: pathlib.Path) -> str:
60 return read_current_branch(root)
61
62
63 def _read_repo_id(root: pathlib.Path) -> str:
64 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
65
66
67 def _parse_date(text: str) -> datetime:
68 text = text.strip().lower()
69 now = datetime.now(timezone.utc)
70 if text == "today":
71 return now.replace(hour=0, minute=0, second=0, microsecond=0)
72 if text == "yesterday":
73 return (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
74 m = re.match(r"^(\d+)\s+(day|week|month|year)s?\s+ago$", text)
75 if m:
76 n = int(m.group(1))
77 unit = m.group(2)
78 deltas = {"day": timedelta(days=n), "week": timedelta(weeks=n),
79 "month": timedelta(days=n * 30), "year": timedelta(days=n * 365)}
80 return now - deltas[unit]
81 for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
82 try:
83 return datetime.strptime(text, fmt).replace(tzinfo=timezone.utc)
84 except ValueError:
85 continue
86 raise ValueError(f"Cannot parse date: {text!r}")
87
88
89 def _file_diff(root: pathlib.Path, commit: CommitRecord) -> tuple[list[str], list[str]]:
90 """Return (added, removed) file lists relative to the commit's parent."""
91 current_manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {}
92 if commit.parent_commit_id:
93 parent_manifest = get_commit_snapshot_manifest(root, commit.parent_commit_id) or {}
94 else:
95 parent_manifest = {}
96 added = sorted(set(current_manifest) - set(parent_manifest))
97 removed = sorted(set(parent_manifest) - set(current_manifest))
98 return added, removed
99
100
101 def _format_date(dt: datetime) -> str:
102 return dt.strftime("%Y-%m-%d %H:%M:%S UTC") if dt.tzinfo else str(dt)
103
104
105 @app.callback(invoke_without_command=True)
106 def log(
107 ctx: typer.Context,
108 ref: str | None = typer.Argument(None, help="Branch or commit to start from."),
109 oneline: bool = typer.Option(False, "--oneline", help="One line per commit."),
110 graph: bool = typer.Option(False, "--graph", help="ASCII graph."),
111 stat: bool = typer.Option(False, "--stat", help="Show file change summary."),
112 patch: bool = typer.Option(False, "--patch", "-p", help="Show file change summary (added/removed/modified counts) alongside each commit."),
113 limit: int = typer.Option(_DEFAULT_LIMIT, "-n", "--max-count", help="Limit number of commits."),
114 since: str | None = typer.Option(None, "--since", help="Show commits after date."),
115 until: str | None = typer.Option(None, "--until", help="Show commits before date."),
116 author: str | None = typer.Option(None, "--author", help="Filter by author."),
117 section: str | None = typer.Option(None, "--section", help="Filter by section metadata."),
118 track: str | None = typer.Option(None, "--track", help="Filter by track metadata."),
119 emotion: str | None = typer.Option(None, "--emotion", help="Filter by emotion metadata."),
120 fmt: str = typer.Option("text", "--format", "-f", help="Output format: text or json."),
121 ) -> None:
122 """Display commit history.
123
124 Agents should pass ``--format json`` to receive a JSON array where each
125 element is a commit object with fields: ``commit_id``, ``branch``,
126 ``message``, ``author``, ``committed_at``, ``parent_commit_id``,
127 ``snapshot_id``, ``metadata``, and ``sem_ver_bump``.
128 """
129 if fmt not in ("text", "json"):
130 typer.echo(f"❌ Unknown --format '{sanitize_display(fmt)}'. Choose text or json.", err=True)
131 raise typer.Exit(code=ExitCode.USER_ERROR)
132 if limit < 1:
133 typer.echo("❌ --max-count must be at least 1.", err=True)
134 raise typer.Exit(code=ExitCode.USER_ERROR)
135 root = require_repo()
136 repo_id = _read_repo_id(root)
137 branch = ref or _read_branch(root)
138
139 since_dt = _parse_date(since) if since else None
140 until_dt = _parse_date(until) if until else None
141
142 commits = get_commits_for_branch(root, repo_id, branch)
143
144 # Apply filters
145 filtered: list[CommitRecord] = []
146 for c in commits:
147 if since_dt and c.committed_at < since_dt:
148 continue
149 if until_dt and c.committed_at > until_dt:
150 continue
151 if author and author.lower() not in c.author.lower():
152 continue
153 if section and c.metadata.get("section") != section:
154 continue
155 if track and c.metadata.get("track") != track:
156 continue
157 if emotion and c.metadata.get("emotion") != emotion:
158 continue
159 filtered.append(c)
160 # Guard against zero or negative limit causing unbounded traversal.
161 if limit > 0 and len(filtered) >= limit:
162 break
163
164 if not filtered:
165 if fmt == "json":
166 typer.echo("[]")
167 else:
168 typer.echo("(no commits)")
169 return
170
171 if fmt == "json":
172 typer.echo(json.dumps([{
173 "commit_id": c.commit_id,
174 "branch": c.branch,
175 "message": c.message,
176 "author": c.author,
177 "committed_at": c.committed_at.isoformat(),
178 "parent_commit_id": c.parent_commit_id,
179 "snapshot_id": c.snapshot_id,
180 "metadata": c.metadata,
181 "sem_ver_bump": c.sem_ver_bump,
182 } for c in filtered], indent=2, default=str))
183 return
184
185 head_commit_id = filtered[0].commit_id if filtered else None
186
187 for c in filtered:
188 is_head = c.commit_id == head_commit_id
189 ref_label = f" (HEAD -> {branch})" if is_head else ""
190
191 msg = sanitize_display(c.message)
192 author = sanitize_display(c.author)
193
194 if oneline:
195 typer.echo(f"{c.commit_id[:8]}{ref_label} {msg}")
196
197 elif graph:
198 typer.echo(f"* {c.commit_id[:8]}{ref_label} {msg}")
199
200 else:
201 typer.echo(f"commit {c.commit_id[:8]}{ref_label}")
202 if author:
203 typer.echo(f"Author: {author}")
204 typer.echo(f"Date: {_format_date(c.committed_at)}")
205 if c.sem_ver_bump and c.sem_ver_bump != "none":
206 typer.echo(f"SemVer: {c.sem_ver_bump.upper()}")
207 if c.breaking_changes:
208 safe_breaks = [sanitize_display(b) for b in c.breaking_changes[:3]]
209 typer.echo(f"Breaking: {', '.join(safe_breaks)}"
210 + (f" +{len(c.breaking_changes) - 3} more" if len(c.breaking_changes) > 3 else ""))
211 if c.metadata:
212 meta_parts = [f"{sanitize_display(k)}: {sanitize_display(v)}" for k, v in sorted(c.metadata.items())]
213 typer.echo(f"Meta: {', '.join(meta_parts)}")
214 typer.echo(f"\n {msg}\n")
215
216 if stat or patch:
217 added, removed = _file_diff(root, c)
218 for p in added:
219 typer.echo(f" + {p}")
220 for p in removed:
221 typer.echo(f" - {p}")
222 if added or removed:
223 typer.echo(f" {len(added)} added, {len(removed)} removed\n")