identity.py
python
| 1 | """Global identity store — ``~/.muse/identity.toml``. |
| 2 | |
| 3 | Credentials (bearer tokens) are kept here, separate from per-repository |
| 4 | configuration. This means tokens are never accidentally committed to |
| 5 | version control, and a single identity can authenticate across all |
| 6 | repositories on the same hub. |
| 7 | |
| 8 | Why global, not per-repo |
| 9 | ------------------------- |
| 10 | Git hides tokens in ``~/.netrc`` or the credential helper chain — an |
| 11 | afterthought. Muse makes identity a first-class, machine-scoped concept. |
| 12 | The repository knows *where* the hub is (``[hub] url`` in config.toml). |
| 13 | The machine knows *who you are* (this file). The two concerns are |
| 14 | deliberately separated. |
| 15 | |
| 16 | Identity types |
| 17 | -------------- |
| 18 | ``type = "human"`` |
| 19 | A person. Authenticated via OAuth or a personal access token. No |
| 20 | explicit capability list — the hub governs what humans can do via roles. |
| 21 | |
| 22 | ``type = "agent"`` |
| 23 | An autonomous process. Authenticated via a scoped capability token. |
| 24 | The ``capabilities`` field in this file reflects what the token allows, |
| 25 | enabling agents to self-inspect before attempting an operation. |
| 26 | |
| 27 | File format |
| 28 | ----------- |
| 29 | TOML with one section per hub hostname:: |
| 30 | |
| 31 | ["musehub.ai"] |
| 32 | type = "human" |
| 33 | name = "Alice" |
| 34 | id = "usr_abc123" |
| 35 | token = "eyJ..." # bearer token — NEVER logged |
| 36 | |
| 37 | ["staging.musehub.ai"] |
| 38 | type = "agent" |
| 39 | name = "composer-v2" |
| 40 | id = "agt_def456" |
| 41 | token = "eyJ..." |
| 42 | capabilities = ["read:*", "write:midi", "commit"] |
| 43 | |
| 44 | Security model |
| 45 | -------------- |
| 46 | - ``~/.muse/`` is created with mode 0o700 (user-only directory). |
| 47 | - ``~/.muse/identity.toml`` is written with mode 0o600 **from the first |
| 48 | byte** — using ``os.open()`` + ``os.fchmod()`` before any data is written, |
| 49 | eliminating the TOCTOU window that ``write_text()`` + ``chmod()`` creates. |
| 50 | - Writes are atomic: data goes to a temp file in the same directory, then |
| 51 | ``os.replace()`` renames it over the target. A kill signal during write |
| 52 | leaves the old file intact, never a partial file. |
| 53 | - Symlink guard: if the target path is already a symlink, write is refused. |
| 54 | This blocks symlink-based credential-overwrite attacks. |
| 55 | - All log calls that reference a token mask it as ``"Bearer ***"``. |
| 56 | - The file is never read or written as part of a repository snapshot. |
| 57 | """ |
| 58 | |
| 59 | from __future__ import annotations |
| 60 | |
| 61 | import contextlib |
| 62 | import fcntl |
| 63 | import logging |
| 64 | import os |
| 65 | import pathlib |
| 66 | import stat |
| 67 | import tempfile |
| 68 | import tomllib |
| 69 | from collections.abc import Generator |
| 70 | from typing import TypedDict |
| 71 | |
| 72 | logger = logging.getLogger(__name__) |
| 73 | |
| 74 | _IDENTITY_DIR = pathlib.Path.home() / ".muse" |
| 75 | _IDENTITY_FILE = _IDENTITY_DIR / "identity.toml" |
| 76 | |
| 77 | |
| 78 | # --------------------------------------------------------------------------- |
| 79 | # Types |
| 80 | # --------------------------------------------------------------------------- |
| 81 | |
| 82 | |
| 83 | class IdentityEntry(TypedDict, total=False): |
| 84 | """One authenticated identity, keyed by hub hostname in identity.toml.""" |
| 85 | |
| 86 | type: str # "human" | "agent" |
| 87 | name: str # display name |
| 88 | id: str # hub-assigned identity ID |
| 89 | token: str # bearer token — never logged |
| 90 | capabilities: list[str] # agent capability strings (empty for humans) |
| 91 | |
| 92 | |
| 93 | # --------------------------------------------------------------------------- |
| 94 | # Path helper |
| 95 | # --------------------------------------------------------------------------- |
| 96 | |
| 97 | |
| 98 | def get_identity_path() -> pathlib.Path: |
| 99 | """Return the path to the global identity file (``~/.muse/identity.toml``).""" |
| 100 | return _IDENTITY_FILE |
| 101 | |
| 102 | |
| 103 | # --------------------------------------------------------------------------- |
| 104 | # URL → hostname normalisation |
| 105 | # --------------------------------------------------------------------------- |
| 106 | |
| 107 | |
| 108 | def _hostname_from_url(url: str) -> str: |
| 109 | """Normalise *url* to a lowercase hostname suitable for use as a dict key. |
| 110 | |
| 111 | Security properties |
| 112 | ------------------- |
| 113 | - Strips the scheme (``https://``), so different scheme representations of |
| 114 | the same host resolve to the same key. |
| 115 | - Strips userinfo (``user:password@``) — embedded credentials in a URL are |
| 116 | never stored as part of the hostname key. |
| 117 | - Normalises to lowercase — DNS is case-insensitive, so ``MUSEHUB.AI`` |
| 118 | and ``musehub.ai`` are the same host and must resolve to the same entry. |
| 119 | |
| 120 | Examples:: |
| 121 | |
| 122 | "https://musehub.ai/repos/x" → "musehub.ai" |
| 123 | "https://admin:s3cr3t@musehub.ai" → "musehub.ai" |
| 124 | "MUSEHUB.AI" → "musehub.ai" |
| 125 | "https://musehub.ai" → "musehub.ai" |
| 126 | "musehub.ai:8443" → "musehub.ai:8443" |
| 127 | """ |
| 128 | stripped = url.strip().rstrip("/") |
| 129 | # Remove scheme. |
| 130 | if "://" in stripped: |
| 131 | stripped = stripped.split("://", 1)[1] |
| 132 | # Remove userinfo (user:password@) — never embed credentials in the key. |
| 133 | if "@" in stripped: |
| 134 | stripped = stripped.rsplit("@", 1)[1] |
| 135 | # Keep only host[:port], strip any path. |
| 136 | hostname = stripped.split("/")[0] |
| 137 | # Normalise to lowercase — DNS is case-insensitive. |
| 138 | return hostname.lower() |
| 139 | |
| 140 | |
| 141 | # --------------------------------------------------------------------------- |
| 142 | # TOML serialiser (write-side — stdlib tomllib is read-only) |
| 143 | # --------------------------------------------------------------------------- |
| 144 | |
| 145 | |
| 146 | def _toml_escape(value: str) -> str: |
| 147 | """Escape a string value for embedding in a TOML double-quoted string.""" |
| 148 | return value.replace("\\", "\\\\").replace('"', '\\"') |
| 149 | |
| 150 | |
| 151 | def _dump_identity(identities: dict[str, IdentityEntry]) -> str: |
| 152 | """Serialise a hostname → entry mapping to TOML text. |
| 153 | |
| 154 | All hostnames are quoted in the section header so that dotted names |
| 155 | (e.g. ``musehub.ai``) are treated as literal keys, not nested tables. |
| 156 | All string values are TOML-escaped to prevent injection. |
| 157 | """ |
| 158 | lines: list[str] = [] |
| 159 | for hostname in sorted(identities): |
| 160 | entry = identities[hostname] |
| 161 | # Always quote the section key — dotted names are literal, not nested. |
| 162 | lines.append(f'["{_toml_escape(hostname)}"]') |
| 163 | t = entry.get("type", "") |
| 164 | if t: |
| 165 | lines.append(f'type = "{_toml_escape(t)}"') |
| 166 | name = entry.get("name", "") |
| 167 | if name: |
| 168 | lines.append(f'name = "{_toml_escape(name)}"') |
| 169 | identity_id = entry.get("id", "") |
| 170 | if identity_id: |
| 171 | lines.append(f'id = "{_toml_escape(identity_id)}"') |
| 172 | token = entry.get("token", "") |
| 173 | if token: |
| 174 | lines.append(f'token = "{_toml_escape(token)}"') |
| 175 | caps = entry.get("capabilities") or [] |
| 176 | if caps: |
| 177 | # Each capability string is individually escaped. |
| 178 | caps_str = ", ".join(f'"{_toml_escape(c)}"' for c in caps) |
| 179 | lines.append(f"capabilities = [{caps_str}]") |
| 180 | lines.append("") |
| 181 | return "\n".join(lines) |
| 182 | |
| 183 | |
| 184 | # --------------------------------------------------------------------------- |
| 185 | # Load / save |
| 186 | # --------------------------------------------------------------------------- |
| 187 | |
| 188 | |
| 189 | def _load_all(path: pathlib.Path) -> dict[str, IdentityEntry]: |
| 190 | """Load all identity entries from *path*. Returns empty dict if absent.""" |
| 191 | if not path.is_file(): |
| 192 | return {} |
| 193 | try: |
| 194 | with path.open("rb") as fh: |
| 195 | raw = tomllib.load(fh) |
| 196 | except Exception as exc: # noqa: BLE001 |
| 197 | # Log only the exception *type*, never its message — a TOML parse |
| 198 | # error surfaced by tomllib includes the offending line, which can |
| 199 | # contain a fragment of the token being written when the file is corrupt. |
| 200 | logger.warning( |
| 201 | "⚠️ Failed to parse identity file %s (%s — run `muse auth login` to re-authenticate)", |
| 202 | path, |
| 203 | type(exc).__name__, |
| 204 | ) |
| 205 | return {} |
| 206 | |
| 207 | result: dict[str, IdentityEntry] = {} |
| 208 | for hostname, raw_entry in raw.items(): |
| 209 | if not isinstance(raw_entry, dict): |
| 210 | continue |
| 211 | entry: IdentityEntry = {} |
| 212 | t = raw_entry.get("type") |
| 213 | if isinstance(t, str): |
| 214 | entry["type"] = t |
| 215 | n = raw_entry.get("name") |
| 216 | if isinstance(n, str): |
| 217 | entry["name"] = n |
| 218 | i = raw_entry.get("id") |
| 219 | if isinstance(i, str): |
| 220 | entry["id"] = i |
| 221 | tok = raw_entry.get("token") |
| 222 | if isinstance(tok, str): |
| 223 | entry["token"] = tok |
| 224 | caps = raw_entry.get("capabilities") |
| 225 | if isinstance(caps, list): |
| 226 | entry["capabilities"] = [str(c) for c in caps if isinstance(c, str)] |
| 227 | result[hostname] = entry |
| 228 | |
| 229 | return result |
| 230 | |
| 231 | |
| 232 | @contextlib.contextmanager |
| 233 | def _identity_write_lock() -> Generator[None, None, None]: |
| 234 | """Acquire an exclusive advisory write-lock on the identity store. |
| 235 | |
| 236 | Uses a dedicated lock file (``~/.muse/.identity.lock``) so that the lock |
| 237 | survives the atomic rename of ``identity.toml`` itself. |
| 238 | |
| 239 | Advisory (cooperative) locking protects all Muse processes that use this |
| 240 | lock against concurrent read-modify-write races. Direct file edits by |
| 241 | external tools bypass the lock — that is acceptable; the user is then |
| 242 | responsible for data consistency. |
| 243 | |
| 244 | POSIX-only (``fcntl.flock``). The lock is blocking with no timeout; |
| 245 | CLI commands are short-lived and lock contention is expected to be brief. |
| 246 | """ |
| 247 | lock_path = _IDENTITY_DIR / ".identity.lock" |
| 248 | _IDENTITY_DIR.mkdir(parents=True, exist_ok=True) |
| 249 | # Create the lock file with owner-only permissions; O_CLOEXEC prevents |
| 250 | # child processes from inheriting the file descriptor. |
| 251 | lock_fd = os.open( |
| 252 | str(lock_path), |
| 253 | os.O_CREAT | os.O_WRONLY | os.O_CLOEXEC, |
| 254 | stat.S_IRUSR | stat.S_IWUSR, |
| 255 | ) |
| 256 | try: |
| 257 | fcntl.flock(lock_fd, fcntl.LOCK_EX) |
| 258 | try: |
| 259 | yield |
| 260 | finally: |
| 261 | fcntl.flock(lock_fd, fcntl.LOCK_UN) |
| 262 | finally: |
| 263 | os.close(lock_fd) |
| 264 | |
| 265 | |
| 266 | def _save_all(identities: dict[str, IdentityEntry], path: pathlib.Path) -> None: |
| 267 | """Write *identities* to *path* securely. |
| 268 | |
| 269 | Security guarantees |
| 270 | ------------------- |
| 271 | 1. **Symlink guard** — refuses to write if *path* is already a symlink, |
| 272 | preventing an attacker from pre-placing a symlink to a file they want |
| 273 | overwritten. |
| 274 | 2. **0o700 directory** — ``~/.muse/`` is restricted to the owner so other |
| 275 | local users cannot list or traverse it. |
| 276 | 3. **0o600 from byte zero** — the temp file is ``fchmod``-ed to 0o600 |
| 277 | *before* any data is written, eliminating the TOCTOU window that |
| 278 | ``write_text()`` + ``chmod()`` creates. |
| 279 | 4. **Atomic rename** — ``os.replace()`` swaps the temp file over the |
| 280 | target atomically; a kill signal during write leaves the old file intact. |
| 281 | """ |
| 282 | dir_path = path.parent |
| 283 | |
| 284 | # 1. Create ~/.muse/ with owner-only permissions (0o700). |
| 285 | dir_path.mkdir(parents=True, exist_ok=True) |
| 286 | try: |
| 287 | os.chmod(dir_path, stat.S_IRWXU) # 0o700 |
| 288 | except OSError as exc: |
| 289 | logger.warning("⚠️ Could not set permissions on %s: %s", dir_path, exc) |
| 290 | |
| 291 | # 2. Symlink guard — never follow a symlink placed at the target path. |
| 292 | if path.is_symlink(): |
| 293 | raise OSError( |
| 294 | f"Security: {path} is a symlink. " |
| 295 | "Refusing to write credentials to a symlink target." |
| 296 | ) |
| 297 | |
| 298 | text = _dump_identity(identities) |
| 299 | |
| 300 | # 3. Write to a temp file in the same directory (same fs → atomic rename). |
| 301 | # Set 0o600 via fchmod *before* writing any data. |
| 302 | fd, tmp_path_str = tempfile.mkstemp(dir=dir_path, prefix=".identity-tmp-") |
| 303 | tmp_path = pathlib.Path(tmp_path_str) |
| 304 | try: |
| 305 | os.fchmod(fd, stat.S_IRUSR | stat.S_IWUSR) # 0o600 before any data |
| 306 | with os.fdopen(fd, "w", encoding="utf-8") as fh: |
| 307 | fh.write(text) |
| 308 | # 4. Atomic rename — old file stays intact if we crash before this. |
| 309 | os.replace(tmp_path, path) |
| 310 | except Exception: |
| 311 | try: |
| 312 | tmp_path.unlink(missing_ok=True) |
| 313 | except OSError: |
| 314 | pass |
| 315 | raise |
| 316 | |
| 317 | |
| 318 | # --------------------------------------------------------------------------- |
| 319 | # Public API |
| 320 | # --------------------------------------------------------------------------- |
| 321 | |
| 322 | |
| 323 | def load_identity(hub_url: str) -> IdentityEntry | None: |
| 324 | """Return the stored identity for *hub_url*, or ``None`` if absent. |
| 325 | |
| 326 | The URL is normalised to a hostname before lookup, so |
| 327 | ``https://musehub.ai/repos/x`` and ``musehub.ai`` resolve to the same |
| 328 | entry. |
| 329 | |
| 330 | Args: |
| 331 | hub_url: Hub URL or bare hostname. |
| 332 | |
| 333 | Returns: |
| 334 | :class:`IdentityEntry` if an identity is stored, else ``None``. |
| 335 | """ |
| 336 | hostname = _hostname_from_url(hub_url) |
| 337 | return _load_all(_IDENTITY_FILE).get(hostname) |
| 338 | |
| 339 | |
| 340 | def save_identity(hub_url: str, entry: IdentityEntry) -> None: |
| 341 | """Store *entry* as the identity for *hub_url*. |
| 342 | |
| 343 | The entire read-modify-write cycle is wrapped in an exclusive advisory |
| 344 | lock so that concurrent ``muse auth login`` calls (e.g. from parallel |
| 345 | agents) cannot race and overwrite each other's entries. |
| 346 | |
| 347 | Creates ``~/.muse/identity.toml`` with mode 0o600 if it does not exist. |
| 348 | |
| 349 | Args: |
| 350 | hub_url: Hub URL or bare hostname. |
| 351 | entry: Identity data to store. |
| 352 | """ |
| 353 | hostname = _hostname_from_url(hub_url) |
| 354 | with _identity_write_lock(): |
| 355 | identities = _load_all(_IDENTITY_FILE) |
| 356 | identities[hostname] = entry |
| 357 | _save_all(identities, _IDENTITY_FILE) |
| 358 | logger.info("✅ Identity for %s saved (Bearer ***)", hostname) |
| 359 | |
| 360 | |
| 361 | def clear_identity(hub_url: str) -> bool: |
| 362 | """Remove the stored identity for *hub_url*. |
| 363 | |
| 364 | The entire read-modify-write cycle is wrapped in an exclusive advisory |
| 365 | lock (see :func:`save_identity`). |
| 366 | |
| 367 | Args: |
| 368 | hub_url: Hub URL or bare hostname. |
| 369 | |
| 370 | Returns: |
| 371 | ``True`` if an entry was removed, ``False`` if no entry existed. |
| 372 | """ |
| 373 | hostname = _hostname_from_url(hub_url) |
| 374 | with _identity_write_lock(): |
| 375 | identities = _load_all(_IDENTITY_FILE) |
| 376 | if hostname not in identities: |
| 377 | return False |
| 378 | del identities[hostname] |
| 379 | _save_all(identities, _IDENTITY_FILE) |
| 380 | logger.info("✅ Identity for %s cleared", hostname) |
| 381 | return True |
| 382 | |
| 383 | |
| 384 | def resolve_token(hub_url: str) -> str | None: |
| 385 | """Return the bearer token for *hub_url*, or ``None``. |
| 386 | |
| 387 | The token is NEVER logged by this function. |
| 388 | |
| 389 | Args: |
| 390 | hub_url: Hub URL or bare hostname. |
| 391 | |
| 392 | Returns: |
| 393 | Token string if present and non-empty, else ``None``. |
| 394 | """ |
| 395 | entry = load_identity(hub_url) |
| 396 | if entry is None: |
| 397 | return None |
| 398 | tok = entry.get("token", "") |
| 399 | return tok.strip() if tok.strip() else None |
| 400 | |
| 401 | |
| 402 | def list_all_identities() -> dict[str, IdentityEntry]: |
| 403 | """Return all stored identities keyed by hub hostname. |
| 404 | |
| 405 | Returns an empty dict if the identity file does not exist. |
| 406 | """ |
| 407 | return _load_all(_IDENTITY_FILE) |