gabriel / musehub public
musehub_events.py python
332 lines 11.1 KB
7f1d07e8 feat: domains, MCP expansion, MIDI player, and production hardening (#8) Gabriel Cardona <cgcardona@gmail.com> 4d ago
1 """MuseHub activity event service — single point of access for the event stream.
2
3 This module is the ONLY place that touches the ``musehub_events`` table.
4 Route handlers record events atomically alongside their primary action (e.g.
5 a commit push records both the commit row and a ``commit_pushed`` event in the
6 same DB transaction).
7
8 Boundary rules:
9 - Must NOT import state stores, SSE queues, or LLM clients.
10 - Must NOT import musehub.core.* modules.
11 - May import ORM models from musehub.db.musehub_models.
12 - May import Pydantic response models from musehub.models.musehub.
13
14 Event type vocabulary
15 ---------------------
16 commit_pushed — a commit was pushed to the repo
17 pr_opened — a pull request was opened
18 pr_merged — a pull request was merged
19 pr_closed — a pull request was closed without merge
20 issue_opened — an issue was opened
21 issue_closed — an issue was closed
22 branch_created — a new branch was created
23 branch_deleted — a branch was deleted
24 tag_pushed — a tag was pushed
25 session_started — a recording session was started
26 session_ended — a recording session was ended
27 agent_action — an AI agent performed a tracked MCP tool call
28
29 Agent events carry `is_agent: true` and optionally `agent_name` in `event_metadata`.
30 The activity feed surfaces agent events with a distinct badge.
31 """
32
33 import logging
34
35 from sqlalchemy import and_, func, or_, select
36 from sqlalchemy.ext.asyncio import AsyncSession
37
38 from musehub.db import musehub_models as db
39 from musehub.models.musehub import (
40 ActivityEventResponse,
41 ActivityFeedResponse,
42 UserActivityEventItem,
43 UserActivityFeedResponse,
44 )
45
46 logger = logging.getLogger(__name__)
47
48 # Recognised event types — validated on record to prevent silent typos.
49 KNOWN_EVENT_TYPES: frozenset[str] = frozenset(
50 {
51 "commit_pushed",
52 "pr_opened",
53 "pr_merged",
54 "pr_closed",
55 "issue_opened",
56 "issue_closed",
57 "branch_created",
58 "branch_deleted",
59 "tag_pushed",
60 "session_started",
61 "session_ended",
62 "agent_action",
63 }
64 )
65
66
67 def _to_response(row: db.MusehubEvent) -> ActivityEventResponse:
68 return ActivityEventResponse(
69 event_id=row.event_id,
70 repo_id=row.repo_id,
71 event_type=row.event_type,
72 actor=row.actor,
73 description=row.description,
74 metadata=dict(row.event_metadata),
75 created_at=row.created_at,
76 )
77
78
79 async def record_event(
80 session: AsyncSession,
81 *,
82 repo_id: str,
83 event_type: str,
84 actor: str,
85 description: str,
86 metadata: dict[str, object] | None = None,
87 is_agent: bool = False,
88 agent_name: str | None = None,
89 ) -> ActivityEventResponse:
90 """Append a new event row to the activity stream for ``repo_id``.
91
92 Call this inside the same DB transaction as the primary action so the event
93 is committed atomically with the action it describes. The caller is
94 responsible for calling ``await session.commit()`` after the transaction.
95
96 ``event_type`` must be one of ``KNOWN_EVENT_TYPES``; an unknown type is
97 logged as a warning and stored anyway (no hard failure — append-only safety
98 beats strict validation at the DB layer).
99
100 When ``is_agent=True`` the event is tagged with ``_is_agent: true`` (and
101 optionally ``_agent_name``) in ``event_metadata``. The activity feed
102 template surfaces these as an "agent" badge beside the event.
103 """
104 if event_type not in KNOWN_EVENT_TYPES:
105 logger.warning("⚠️ Unknown event_type %r recorded for repo %s", event_type, repo_id)
106
107 merged_metadata: dict[str, object] = dict(metadata or {})
108 if is_agent:
109 merged_metadata["_is_agent"] = True
110 if agent_name:
111 merged_metadata["_agent_name"] = agent_name
112
113 row = db.MusehubEvent(
114 repo_id=repo_id,
115 event_type=event_type,
116 actor=actor,
117 description=description,
118 event_metadata=merged_metadata,
119 )
120 session.add(row)
121 await session.flush() # populate event_id without committing
122 logger.debug("✅ Queued event %s (%s) for repo %s", row.event_id, event_type, repo_id)
123 return _to_response(row)
124
125
126 async def list_events(
127 session: AsyncSession,
128 repo_id: str,
129 *,
130 event_type: str | None = None,
131 page: int = 1,
132 page_size: int = 30,
133 ) -> ActivityFeedResponse:
134 """Return a paginated, newest-first slice of the activity feed for ``repo_id``.
135
136 ``event_type`` filters to a single event type when provided; pass ``None``
137 to include all event types. ``page`` is 1-indexed.
138 """
139 page = max(1, page)
140 page_size = max(1, min(page_size, 100))
141
142 base_where = db.MusehubEvent.repo_id == repo_id
143 if event_type is not None:
144 type_filter = db.MusehubEvent.event_type == event_type
145 else:
146 type_filter = None
147
148 # Count total matching rows
149 count_stmt = select(func.count()).select_from(db.MusehubEvent).where(base_where)
150 if type_filter is not None:
151 count_stmt = count_stmt.where(type_filter)
152 total: int = (await session.execute(count_stmt)).scalar_one()
153
154 # Fetch the requested page (newest first)
155 page_stmt = (
156 select(db.MusehubEvent)
157 .where(base_where)
158 .order_by(db.MusehubEvent.created_at.desc())
159 .offset((page - 1) * page_size)
160 .limit(page_size)
161 )
162 if type_filter is not None:
163 page_stmt = page_stmt.where(type_filter)
164
165 rows = (await session.execute(page_stmt)).scalars().all()
166
167 return ActivityFeedResponse(
168 events=[_to_response(r) for r in rows],
169 total=total,
170 page=page,
171 page_size=page_size,
172 event_type_filter=event_type,
173 )
174
175
176 # ---------------------------------------------------------------------------
177 # User public activity feed
178 # ---------------------------------------------------------------------------
179
180 # Maps public API type vocabulary → internal DB event_type values.
181 # Types with no DB equivalent (star, fork, comment) map to empty lists and
182 # will always return an empty result when used as a filter.
183 _USER_TYPE_TO_DB_TYPES: dict[str, list[str]] = {
184 "push": ["commit_pushed", "branch_created", "branch_deleted"],
185 "pull_request": ["pr_opened", "pr_merged", "pr_closed"],
186 "issue": ["issue_opened", "issue_closed"],
187 "release": ["tag_pushed"],
188 "star": [],
189 "fork": [],
190 "comment": [],
191 }
192
193 # Maps DB event_type → public API type vocabulary for the response payload.
194 _DB_TYPE_TO_USER_TYPE: dict[str, str] = {
195 "commit_pushed": "push",
196 "branch_created": "push",
197 "branch_deleted": "push",
198 "pr_opened": "pull_request",
199 "pr_merged": "pull_request",
200 "pr_closed": "pull_request",
201 "issue_opened": "issue",
202 "issue_closed": "issue",
203 "tag_pushed": "release",
204 "session_started": "push",
205 "session_ended": "push",
206 }
207
208
209 def _to_user_activity_item(
210 event: db.MusehubEvent,
211 repo: db.MusehubRepo,
212 ) -> UserActivityEventItem:
213 """Convert an ORM event row + its repo into the public user activity response shape."""
214 return UserActivityEventItem(
215 id=event.event_id,
216 type=_DB_TYPE_TO_USER_TYPE.get(event.event_type, event.event_type),
217 actor=event.actor,
218 repo=f"{repo.owner}/{repo.slug}",
219 payload=dict(event.event_metadata),
220 created_at=event.created_at,
221 )
222
223
224 async def list_user_activity(
225 session: AsyncSession,
226 username: str,
227 *,
228 caller_user_id: str | None = None,
229 type_filter: str | None = None,
230 limit: int = 30,
231 before_id: str | None = None,
232 ) -> UserActivityFeedResponse:
233 """Return a cursor-paginated public activity feed for ``username``.
234
235 Events are drawn from the ``musehub_events`` table, filtered to repos the
236 caller is allowed to see:
237 - Public repos: always visible.
238 - Private repos: visible only when ``caller_user_id`` matches the repo owner.
239
240 ``type_filter`` accepts the public API vocabulary (push, pull_request, issue,
241 release, star, fork, comment) and maps it to the DB's internal event_type
242 values. Types without DB equivalents (star, fork, comment) always return
243 an empty feed.
244
245 Cursor pagination: pass the ``next_cursor`` value from a previous response
246 as ``before_id`` to fetch the next page. Events are returned newest-first;
247 ``before_id`` is the event UUID of the *oldest* event on the last page, and
248 this function returns events created *before* that event's timestamp.
249 """
250 limit = max(1, min(limit, 100))
251
252 # Resolve the db event_type list from the public API filter.
253 db_types: list[str] | None = None
254 if type_filter is not None:
255 db_types = _USER_TYPE_TO_DB_TYPES.get(type_filter, [])
256 if not db_types:
257 # Type has no DB equivalent (star/fork/comment) or is unknown.
258 return UserActivityFeedResponse(
259 events=[], next_cursor=None, type_filter=type_filter
260 )
261
262 # Resolve the cursor anchor (before_id → created_at timestamp).
263 cursor_dt = None
264 cursor_event_id: str | None = None
265 if before_id is not None:
266 anchor = (
267 await session.execute(
268 select(db.MusehubEvent).where(db.MusehubEvent.event_id == before_id)
269 )
270 ).scalar_one_or_none()
271 if anchor is not None:
272 cursor_dt = anchor.created_at
273 cursor_event_id = anchor.event_id
274
275 # Build the query: join events → repos, filter by actor and visibility.
276 # Public repos are always visible; private repos are visible only to their owner.
277 if caller_user_id is not None:
278 visibility_filter = or_(
279 db.MusehubRepo.visibility == "public",
280 db.MusehubRepo.owner_user_id == caller_user_id,
281 )
282 else:
283 visibility_filter = db.MusehubRepo.visibility == "public"
284
285 stmt = (
286 select(db.MusehubEvent, db.MusehubRepo)
287 .join(db.MusehubRepo, db.MusehubEvent.repo_id == db.MusehubRepo.repo_id)
288 .where(
289 and_(
290 db.MusehubEvent.actor == username,
291 visibility_filter,
292 )
293 )
294 .order_by(db.MusehubEvent.created_at.desc(), db.MusehubEvent.event_id.desc())
295 .limit(limit + 1) # fetch one extra to detect if there is a next page
296 )
297
298 if db_types is not None:
299 stmt = stmt.where(db.MusehubEvent.event_type.in_(db_types))
300
301 if cursor_dt is not None:
302 # Cursor pagination: events strictly before the anchor timestamp,
303 # or same timestamp but earlier event_id (deterministic ordering).
304 stmt = stmt.where(
305 or_(
306 db.MusehubEvent.created_at < cursor_dt,
307 and_(
308 db.MusehubEvent.created_at == cursor_dt,
309 db.MusehubEvent.event_id < (cursor_event_id or ""),
310 ),
311 )
312 )
313
314 rows = (await session.execute(stmt)).all()
315
316 has_more = len(rows) > limit
317 page_rows = rows[:limit]
318
319 items = [_to_user_activity_item(ev, repo) for ev, repo in page_rows]
320 next_cursor = items[-1].id if has_more and items else None
321
322 logger.debug(
323 "✅ User activity feed username=%s count=%d has_more=%s",
324 username,
325 len(items),
326 has_more,
327 )
328 return UserActivityFeedResponse(
329 events=items,
330 next_cursor=next_cursor,
331 type_filter=type_filter,
332 )