musehub_events.py
python
| 1 | """MuseHub activity event service — single point of access for the event stream. |
| 2 | |
| 3 | This module is the ONLY place that touches the ``musehub_events`` table. |
| 4 | Route handlers record events atomically alongside their primary action (e.g. |
| 5 | a commit push records both the commit row and a ``commit_pushed`` event in the |
| 6 | same DB transaction). |
| 7 | |
| 8 | Boundary rules: |
| 9 | - Must NOT import state stores, SSE queues, or LLM clients. |
| 10 | - Must NOT import musehub.core.* modules. |
| 11 | - May import ORM models from musehub.db.musehub_models. |
| 12 | - May import Pydantic response models from musehub.models.musehub. |
| 13 | |
| 14 | Event type vocabulary |
| 15 | --------------------- |
| 16 | commit_pushed — a commit was pushed to the repo |
| 17 | pr_opened — a pull request was opened |
| 18 | pr_merged — a pull request was merged |
| 19 | pr_closed — a pull request was closed without merge |
| 20 | issue_opened — an issue was opened |
| 21 | issue_closed — an issue was closed |
| 22 | branch_created — a new branch was created |
| 23 | branch_deleted — a branch was deleted |
| 24 | tag_pushed — a tag was pushed |
| 25 | session_started — a recording session was started |
| 26 | session_ended — a recording session was ended |
| 27 | agent_action — an AI agent performed a tracked MCP tool call |
| 28 | |
| 29 | Agent events carry `is_agent: true` and optionally `agent_name` in `event_metadata`. |
| 30 | The activity feed surfaces agent events with a distinct badge. |
| 31 | """ |
| 32 | |
| 33 | import logging |
| 34 | |
| 35 | from sqlalchemy import and_, func, or_, select |
| 36 | from sqlalchemy.ext.asyncio import AsyncSession |
| 37 | |
| 38 | from musehub.db import musehub_models as db |
| 39 | from musehub.models.musehub import ( |
| 40 | ActivityEventResponse, |
| 41 | ActivityFeedResponse, |
| 42 | UserActivityEventItem, |
| 43 | UserActivityFeedResponse, |
| 44 | ) |
| 45 | |
| 46 | logger = logging.getLogger(__name__) |
| 47 | |
| 48 | # Recognised event types — validated on record to prevent silent typos. |
| 49 | KNOWN_EVENT_TYPES: frozenset[str] = frozenset( |
| 50 | { |
| 51 | "commit_pushed", |
| 52 | "pr_opened", |
| 53 | "pr_merged", |
| 54 | "pr_closed", |
| 55 | "issue_opened", |
| 56 | "issue_closed", |
| 57 | "branch_created", |
| 58 | "branch_deleted", |
| 59 | "tag_pushed", |
| 60 | "session_started", |
| 61 | "session_ended", |
| 62 | "agent_action", |
| 63 | } |
| 64 | ) |
| 65 | |
| 66 | |
| 67 | def _to_response(row: db.MusehubEvent) -> ActivityEventResponse: |
| 68 | return ActivityEventResponse( |
| 69 | event_id=row.event_id, |
| 70 | repo_id=row.repo_id, |
| 71 | event_type=row.event_type, |
| 72 | actor=row.actor, |
| 73 | description=row.description, |
| 74 | metadata=dict(row.event_metadata), |
| 75 | created_at=row.created_at, |
| 76 | ) |
| 77 | |
| 78 | |
| 79 | async def record_event( |
| 80 | session: AsyncSession, |
| 81 | *, |
| 82 | repo_id: str, |
| 83 | event_type: str, |
| 84 | actor: str, |
| 85 | description: str, |
| 86 | metadata: dict[str, object] | None = None, |
| 87 | is_agent: bool = False, |
| 88 | agent_name: str | None = None, |
| 89 | ) -> ActivityEventResponse: |
| 90 | """Append a new event row to the activity stream for ``repo_id``. |
| 91 | |
| 92 | Call this inside the same DB transaction as the primary action so the event |
| 93 | is committed atomically with the action it describes. The caller is |
| 94 | responsible for calling ``await session.commit()`` after the transaction. |
| 95 | |
| 96 | ``event_type`` must be one of ``KNOWN_EVENT_TYPES``; an unknown type is |
| 97 | logged as a warning and stored anyway (no hard failure — append-only safety |
| 98 | beats strict validation at the DB layer). |
| 99 | |
| 100 | When ``is_agent=True`` the event is tagged with ``_is_agent: true`` (and |
| 101 | optionally ``_agent_name``) in ``event_metadata``. The activity feed |
| 102 | template surfaces these as an "agent" badge beside the event. |
| 103 | """ |
| 104 | if event_type not in KNOWN_EVENT_TYPES: |
| 105 | logger.warning("⚠️ Unknown event_type %r recorded for repo %s", event_type, repo_id) |
| 106 | |
| 107 | merged_metadata: dict[str, object] = dict(metadata or {}) |
| 108 | if is_agent: |
| 109 | merged_metadata["_is_agent"] = True |
| 110 | if agent_name: |
| 111 | merged_metadata["_agent_name"] = agent_name |
| 112 | |
| 113 | row = db.MusehubEvent( |
| 114 | repo_id=repo_id, |
| 115 | event_type=event_type, |
| 116 | actor=actor, |
| 117 | description=description, |
| 118 | event_metadata=merged_metadata, |
| 119 | ) |
| 120 | session.add(row) |
| 121 | await session.flush() # populate event_id without committing |
| 122 | logger.debug("✅ Queued event %s (%s) for repo %s", row.event_id, event_type, repo_id) |
| 123 | return _to_response(row) |
| 124 | |
| 125 | |
| 126 | async def list_events( |
| 127 | session: AsyncSession, |
| 128 | repo_id: str, |
| 129 | *, |
| 130 | event_type: str | None = None, |
| 131 | page: int = 1, |
| 132 | page_size: int = 30, |
| 133 | ) -> ActivityFeedResponse: |
| 134 | """Return a paginated, newest-first slice of the activity feed for ``repo_id``. |
| 135 | |
| 136 | ``event_type`` filters to a single event type when provided; pass ``None`` |
| 137 | to include all event types. ``page`` is 1-indexed. |
| 138 | """ |
| 139 | page = max(1, page) |
| 140 | page_size = max(1, min(page_size, 100)) |
| 141 | |
| 142 | base_where = db.MusehubEvent.repo_id == repo_id |
| 143 | if event_type is not None: |
| 144 | type_filter = db.MusehubEvent.event_type == event_type |
| 145 | else: |
| 146 | type_filter = None |
| 147 | |
| 148 | # Count total matching rows |
| 149 | count_stmt = select(func.count()).select_from(db.MusehubEvent).where(base_where) |
| 150 | if type_filter is not None: |
| 151 | count_stmt = count_stmt.where(type_filter) |
| 152 | total: int = (await session.execute(count_stmt)).scalar_one() |
| 153 | |
| 154 | # Fetch the requested page (newest first) |
| 155 | page_stmt = ( |
| 156 | select(db.MusehubEvent) |
| 157 | .where(base_where) |
| 158 | .order_by(db.MusehubEvent.created_at.desc()) |
| 159 | .offset((page - 1) * page_size) |
| 160 | .limit(page_size) |
| 161 | ) |
| 162 | if type_filter is not None: |
| 163 | page_stmt = page_stmt.where(type_filter) |
| 164 | |
| 165 | rows = (await session.execute(page_stmt)).scalars().all() |
| 166 | |
| 167 | return ActivityFeedResponse( |
| 168 | events=[_to_response(r) for r in rows], |
| 169 | total=total, |
| 170 | page=page, |
| 171 | page_size=page_size, |
| 172 | event_type_filter=event_type, |
| 173 | ) |
| 174 | |
| 175 | |
| 176 | # --------------------------------------------------------------------------- |
| 177 | # User public activity feed |
| 178 | # --------------------------------------------------------------------------- |
| 179 | |
| 180 | # Maps public API type vocabulary → internal DB event_type values. |
| 181 | # Types with no DB equivalent (star, fork, comment) map to empty lists and |
| 182 | # will always return an empty result when used as a filter. |
| 183 | _USER_TYPE_TO_DB_TYPES: dict[str, list[str]] = { |
| 184 | "push": ["commit_pushed", "branch_created", "branch_deleted"], |
| 185 | "pull_request": ["pr_opened", "pr_merged", "pr_closed"], |
| 186 | "issue": ["issue_opened", "issue_closed"], |
| 187 | "release": ["tag_pushed"], |
| 188 | "star": [], |
| 189 | "fork": [], |
| 190 | "comment": [], |
| 191 | } |
| 192 | |
| 193 | # Maps DB event_type → public API type vocabulary for the response payload. |
| 194 | _DB_TYPE_TO_USER_TYPE: dict[str, str] = { |
| 195 | "commit_pushed": "push", |
| 196 | "branch_created": "push", |
| 197 | "branch_deleted": "push", |
| 198 | "pr_opened": "pull_request", |
| 199 | "pr_merged": "pull_request", |
| 200 | "pr_closed": "pull_request", |
| 201 | "issue_opened": "issue", |
| 202 | "issue_closed": "issue", |
| 203 | "tag_pushed": "release", |
| 204 | "session_started": "push", |
| 205 | "session_ended": "push", |
| 206 | } |
| 207 | |
| 208 | |
| 209 | def _to_user_activity_item( |
| 210 | event: db.MusehubEvent, |
| 211 | repo: db.MusehubRepo, |
| 212 | ) -> UserActivityEventItem: |
| 213 | """Convert an ORM event row + its repo into the public user activity response shape.""" |
| 214 | return UserActivityEventItem( |
| 215 | id=event.event_id, |
| 216 | type=_DB_TYPE_TO_USER_TYPE.get(event.event_type, event.event_type), |
| 217 | actor=event.actor, |
| 218 | repo=f"{repo.owner}/{repo.slug}", |
| 219 | payload=dict(event.event_metadata), |
| 220 | created_at=event.created_at, |
| 221 | ) |
| 222 | |
| 223 | |
| 224 | async def list_user_activity( |
| 225 | session: AsyncSession, |
| 226 | username: str, |
| 227 | *, |
| 228 | caller_user_id: str | None = None, |
| 229 | type_filter: str | None = None, |
| 230 | limit: int = 30, |
| 231 | before_id: str | None = None, |
| 232 | ) -> UserActivityFeedResponse: |
| 233 | """Return a cursor-paginated public activity feed for ``username``. |
| 234 | |
| 235 | Events are drawn from the ``musehub_events`` table, filtered to repos the |
| 236 | caller is allowed to see: |
| 237 | - Public repos: always visible. |
| 238 | - Private repos: visible only when ``caller_user_id`` matches the repo owner. |
| 239 | |
| 240 | ``type_filter`` accepts the public API vocabulary (push, pull_request, issue, |
| 241 | release, star, fork, comment) and maps it to the DB's internal event_type |
| 242 | values. Types without DB equivalents (star, fork, comment) always return |
| 243 | an empty feed. |
| 244 | |
| 245 | Cursor pagination: pass the ``next_cursor`` value from a previous response |
| 246 | as ``before_id`` to fetch the next page. Events are returned newest-first; |
| 247 | ``before_id`` is the event UUID of the *oldest* event on the last page, and |
| 248 | this function returns events created *before* that event's timestamp. |
| 249 | """ |
| 250 | limit = max(1, min(limit, 100)) |
| 251 | |
| 252 | # Resolve the db event_type list from the public API filter. |
| 253 | db_types: list[str] | None = None |
| 254 | if type_filter is not None: |
| 255 | db_types = _USER_TYPE_TO_DB_TYPES.get(type_filter, []) |
| 256 | if not db_types: |
| 257 | # Type has no DB equivalent (star/fork/comment) or is unknown. |
| 258 | return UserActivityFeedResponse( |
| 259 | events=[], next_cursor=None, type_filter=type_filter |
| 260 | ) |
| 261 | |
| 262 | # Resolve the cursor anchor (before_id → created_at timestamp). |
| 263 | cursor_dt = None |
| 264 | cursor_event_id: str | None = None |
| 265 | if before_id is not None: |
| 266 | anchor = ( |
| 267 | await session.execute( |
| 268 | select(db.MusehubEvent).where(db.MusehubEvent.event_id == before_id) |
| 269 | ) |
| 270 | ).scalar_one_or_none() |
| 271 | if anchor is not None: |
| 272 | cursor_dt = anchor.created_at |
| 273 | cursor_event_id = anchor.event_id |
| 274 | |
| 275 | # Build the query: join events → repos, filter by actor and visibility. |
| 276 | # Public repos are always visible; private repos are visible only to their owner. |
| 277 | if caller_user_id is not None: |
| 278 | visibility_filter = or_( |
| 279 | db.MusehubRepo.visibility == "public", |
| 280 | db.MusehubRepo.owner_user_id == caller_user_id, |
| 281 | ) |
| 282 | else: |
| 283 | visibility_filter = db.MusehubRepo.visibility == "public" |
| 284 | |
| 285 | stmt = ( |
| 286 | select(db.MusehubEvent, db.MusehubRepo) |
| 287 | .join(db.MusehubRepo, db.MusehubEvent.repo_id == db.MusehubRepo.repo_id) |
| 288 | .where( |
| 289 | and_( |
| 290 | db.MusehubEvent.actor == username, |
| 291 | visibility_filter, |
| 292 | ) |
| 293 | ) |
| 294 | .order_by(db.MusehubEvent.created_at.desc(), db.MusehubEvent.event_id.desc()) |
| 295 | .limit(limit + 1) # fetch one extra to detect if there is a next page |
| 296 | ) |
| 297 | |
| 298 | if db_types is not None: |
| 299 | stmt = stmt.where(db.MusehubEvent.event_type.in_(db_types)) |
| 300 | |
| 301 | if cursor_dt is not None: |
| 302 | # Cursor pagination: events strictly before the anchor timestamp, |
| 303 | # or same timestamp but earlier event_id (deterministic ordering). |
| 304 | stmt = stmt.where( |
| 305 | or_( |
| 306 | db.MusehubEvent.created_at < cursor_dt, |
| 307 | and_( |
| 308 | db.MusehubEvent.created_at == cursor_dt, |
| 309 | db.MusehubEvent.event_id < (cursor_event_id or ""), |
| 310 | ), |
| 311 | ) |
| 312 | ) |
| 313 | |
| 314 | rows = (await session.execute(stmt)).all() |
| 315 | |
| 316 | has_more = len(rows) > limit |
| 317 | page_rows = rows[:limit] |
| 318 | |
| 319 | items = [_to_user_activity_item(ev, repo) for ev, repo in page_rows] |
| 320 | next_cursor = items[-1].id if has_more and items else None |
| 321 | |
| 322 | logger.debug( |
| 323 | "✅ User activity feed username=%s count=%d has_more=%s", |
| 324 | username, |
| 325 | len(items), |
| 326 | has_more, |
| 327 | ) |
| 328 | return UserActivityFeedResponse( |
| 329 | events=items, |
| 330 | next_cursor=next_cursor, |
| 331 | type_filter=type_filter, |
| 332 | ) |