plugin.py
python
| 1 | """Bitcoin domain plugin — multidimensional version control for Bitcoin state. |
| 2 | |
| 3 | This plugin implements the full :class:`~muse.domain.MuseDomainPlugin` protocol |
| 4 | plus both optional extensions: |
| 5 | |
| 6 | - :class:`~muse.domain.StructuredMergePlugin` — operation-level OT merge with |
| 7 | **double-spend detection**: when two agents concurrently mark the same UTXO |
| 8 | as spent, MUSE surfaces the conflict before any transaction hits the mempool. |
| 9 | |
| 10 | - :class:`~muse.domain.CRDTPlugin` — convergent CRDT join for high-throughput |
| 11 | multi-agent scenarios where millions of agents operate per second. ``join`` |
| 12 | always succeeds; no conflict state exists in CRDT mode. |
| 13 | |
| 14 | What is versioned |
| 15 | ----------------- |
| 16 | The plugin versions your *relationship* with Bitcoin — never private keys. |
| 17 | All descriptors are watch-only xpubs. Signing happens outside MUSE; MUSE |
| 18 | tracks the intent, the strategy, and the outcomes. |
| 19 | |
| 20 | +--------------------------+-----------------------------------------------+ |
| 21 | | Workdir path | What it tracks | |
| 22 | +==========================+===============================================+ |
| 23 | | wallet/utxos.json | Unspent transaction outputs (the coin set) | |
| 24 | | wallet/transactions.json | Confirmed transaction history | |
| 25 | | wallet/labels.json | Address semantic annotations | |
| 26 | | wallet/descriptors.json | Watch-only wallet descriptors (xpubs) | |
| 27 | | channels/channels.json | Lightning payment channel states | |
| 28 | | channels/routing.json | Lightning routing policies | |
| 29 | | strategy/agent.json | Agent DCA / fee / rebalancing configuration | |
| 30 | | strategy/execution.json | Agent decision event log (append-only) | |
| 31 | | oracles/prices.json | BTC/USD price feed | |
| 32 | | oracles/fees.json | Mempool fee surface snapshots | |
| 33 | | network/peers.json | Known P2P network peers | |
| 34 | | network/mempool.json | Local mempool view (pending transactions) | |
| 35 | +--------------------------+-----------------------------------------------+ |
| 36 | |
| 37 | Branch semantics |
| 38 | ---------------- |
| 39 | ``main`` — real wallet state: UTXOs, confirmed transactions, active channels. |
| 40 | ``feat/<name>`` — strategy experiment: ``simulation_mode: true`` in |
| 41 | ``strategy/agent.json`` means no real transactions are broadcast. |
| 42 | |
| 43 | MUSE merge checks whether the strategy branch and the main branch have |
| 44 | incompatible UTXO spends — the OT merge engine's double-spend detector runs |
| 45 | before any signing happens. |
| 46 | """ |
| 47 | |
| 48 | from __future__ import annotations |
| 49 | |
| 50 | import hashlib |
| 51 | import json |
| 52 | import logging |
| 53 | import os |
| 54 | import pathlib |
| 55 | import stat as stat_mod |
| 56 | from typing import Literal |
| 57 | |
| 58 | from muse.core.crdts import AWMap, ORSet, VectorClock |
| 59 | from muse.core.crdts.aw_map import AWMapDict |
| 60 | from muse.core.crdts.or_set import ORSetDict |
| 61 | from muse.core.object_store import object_path |
| 62 | from muse.core.op_transform import merge_op_lists |
| 63 | from muse.core.schema import ( |
| 64 | CRDTDimensionSpec, |
| 65 | DimensionSpec, |
| 66 | DomainSchema, |
| 67 | MapSchema, |
| 68 | SequenceSchema, |
| 69 | SetSchema, |
| 70 | ) |
| 71 | from muse.domain import ( |
| 72 | CRDTSnapshotManifest, |
| 73 | ConflictRecord, |
| 74 | DeleteOp, |
| 75 | DomainOp, |
| 76 | DriftReport, |
| 77 | FieldMutation, |
| 78 | InsertOp, |
| 79 | LiveState, |
| 80 | MergeResult, |
| 81 | MutateOp, |
| 82 | PatchOp, |
| 83 | ReplaceOp, |
| 84 | SnapshotManifest, |
| 85 | StateDelta, |
| 86 | StateSnapshot, |
| 87 | StructuredDelta, |
| 88 | ) |
| 89 | from muse.plugins.bitcoin._query import ( |
| 90 | channel_summary_line, |
| 91 | double_spend_candidates, |
| 92 | fee_surface_str, |
| 93 | format_sat, |
| 94 | latest_fee_estimate, |
| 95 | latest_price, |
| 96 | mempool_summary_line, |
| 97 | strategy_summary_line, |
| 98 | total_balance_sat, |
| 99 | utxo_key, |
| 100 | utxo_summary_line, |
| 101 | ) |
| 102 | from muse.plugins.bitcoin._types import ( |
| 103 | AddressLabelRecord, |
| 104 | AgentStrategyRecord, |
| 105 | DescriptorRecord, |
| 106 | ExecutionEventRecord, |
| 107 | FeeEstimateRecord, |
| 108 | LightningChannelRecord, |
| 109 | NetworkPeerRecord, |
| 110 | OraclePriceTickRecord, |
| 111 | PendingTxRecord, |
| 112 | RoutingPolicyRecord, |
| 113 | TransactionRecord, |
| 114 | UTXORecord, |
| 115 | ) |
| 116 | |
| 117 | logger = logging.getLogger(__name__) |
| 118 | |
| 119 | _DOMAIN_NAME = "bitcoin" |
| 120 | |
| 121 | # Recognized semantic file suffixes → diff handler key |
| 122 | _SEMANTIC_SUFFIXES: dict[str, str] = { |
| 123 | "utxos.json": "utxos", |
| 124 | "transactions.json": "transactions", |
| 125 | "labels.json": "labels", |
| 126 | "descriptors.json": "descriptors", |
| 127 | "channels.json": "channels", |
| 128 | "routing.json": "routing", |
| 129 | "agent.json": "strategy", |
| 130 | "execution.json": "execution", |
| 131 | "prices.json": "prices", |
| 132 | "fees.json": "fees", |
| 133 | "peers.json": "peers", |
| 134 | "mempool.json": "mempool", |
| 135 | } |
| 136 | |
| 137 | |
| 138 | # --------------------------------------------------------------------------- |
| 139 | # Internal helpers — JSON loading (typed) |
| 140 | # --------------------------------------------------------------------------- |
| 141 | |
| 142 | |
| 143 | def _load_utxos(data: bytes) -> list[UTXORecord]: |
| 144 | result: list[UTXORecord] = json.loads(data) |
| 145 | return result |
| 146 | |
| 147 | |
| 148 | def _load_transactions(data: bytes) -> list[TransactionRecord]: |
| 149 | result: list[TransactionRecord] = json.loads(data) |
| 150 | return result |
| 151 | |
| 152 | |
| 153 | def _load_labels(data: bytes) -> list[AddressLabelRecord]: |
| 154 | result: list[AddressLabelRecord] = json.loads(data) |
| 155 | return result |
| 156 | |
| 157 | |
| 158 | def _load_descriptors(data: bytes) -> list[DescriptorRecord]: |
| 159 | result: list[DescriptorRecord] = json.loads(data) |
| 160 | return result |
| 161 | |
| 162 | |
| 163 | def _load_channels(data: bytes) -> list[LightningChannelRecord]: |
| 164 | result: list[LightningChannelRecord] = json.loads(data) |
| 165 | return result |
| 166 | |
| 167 | |
| 168 | def _load_routing(data: bytes) -> list[RoutingPolicyRecord]: |
| 169 | result: list[RoutingPolicyRecord] = json.loads(data) |
| 170 | return result |
| 171 | |
| 172 | |
| 173 | def _load_strategy(data: bytes) -> AgentStrategyRecord: |
| 174 | result: AgentStrategyRecord = json.loads(data) |
| 175 | return result |
| 176 | |
| 177 | |
| 178 | def _load_execution(data: bytes) -> list[ExecutionEventRecord]: |
| 179 | result: list[ExecutionEventRecord] = json.loads(data) |
| 180 | return result |
| 181 | |
| 182 | |
| 183 | def _load_prices(data: bytes) -> list[OraclePriceTickRecord]: |
| 184 | result: list[OraclePriceTickRecord] = json.loads(data) |
| 185 | return result |
| 186 | |
| 187 | |
| 188 | def _load_fees(data: bytes) -> list[FeeEstimateRecord]: |
| 189 | result: list[FeeEstimateRecord] = json.loads(data) |
| 190 | return result |
| 191 | |
| 192 | |
| 193 | def _load_peers(data: bytes) -> list[NetworkPeerRecord]: |
| 194 | result: list[NetworkPeerRecord] = json.loads(data) |
| 195 | return result |
| 196 | |
| 197 | |
| 198 | def _load_mempool(data: bytes) -> list[PendingTxRecord]: |
| 199 | result: list[PendingTxRecord] = json.loads(data) |
| 200 | return result |
| 201 | |
| 202 | |
| 203 | def _content_hash(data: bytes) -> str: |
| 204 | return hashlib.sha256(data).hexdigest() |
| 205 | |
| 206 | |
| 207 | def _hash_file(path: pathlib.Path) -> str: |
| 208 | return _content_hash(path.read_bytes()) |
| 209 | |
| 210 | |
| 211 | def _jhash(json_str: str) -> str: |
| 212 | """SHA-256 of a pre-serialized canonical JSON string.""" |
| 213 | return hashlib.sha256(json_str.encode()).hexdigest() |
| 214 | |
| 215 | |
| 216 | |
| 217 | |
| 218 | # --------------------------------------------------------------------------- |
| 219 | # Semantic diff helpers — one per recognized file type |
| 220 | # --------------------------------------------------------------------------- |
| 221 | |
| 222 | |
| 223 | def _diff_utxos( |
| 224 | path: str, |
| 225 | old_data: bytes, |
| 226 | new_data: bytes, |
| 227 | ) -> list[DomainOp]: |
| 228 | """Diff two UTXO sets at the UTXO level (txid:vout identity).""" |
| 229 | base_list = _load_utxos(old_data) |
| 230 | target_list = _load_utxos(new_data) |
| 231 | |
| 232 | base_map: dict[str, UTXORecord] = {utxo_key(u): u for u in base_list} |
| 233 | target_map: dict[str, UTXORecord] = {utxo_key(u): u for u in target_list} |
| 234 | |
| 235 | ops: list[DomainOp] = [] |
| 236 | |
| 237 | for key in sorted(set(target_map) - set(base_map)): |
| 238 | utxo = target_map[key] |
| 239 | ops.append( |
| 240 | InsertOp( |
| 241 | op="insert", |
| 242 | address=f"{path}::{key}", |
| 243 | position=None, |
| 244 | content_id=_jhash(json.dumps(utxo, sort_keys=True, separators=(",", ":"))), |
| 245 | content_summary=( |
| 246 | f"received +{format_sat(utxo['amount_sat'])}" |
| 247 | f" at {utxo['address'][:16]}…" |
| 248 | f" ({utxo['script_type']})" |
| 249 | ), |
| 250 | ) |
| 251 | ) |
| 252 | |
| 253 | for key in sorted(set(base_map) - set(target_map)): |
| 254 | utxo = base_map[key] |
| 255 | ops.append( |
| 256 | DeleteOp( |
| 257 | op="delete", |
| 258 | address=f"{path}::{key}", |
| 259 | position=None, |
| 260 | content_id=_jhash(json.dumps(utxo, sort_keys=True, separators=(",", ":"))), |
| 261 | content_summary=( |
| 262 | f"spent -{format_sat(utxo['amount_sat'])}" |
| 263 | f" from {utxo['address'][:16]}…" |
| 264 | ), |
| 265 | ) |
| 266 | ) |
| 267 | |
| 268 | for key in sorted(set(base_map) & set(target_map)): |
| 269 | b = base_map[key] |
| 270 | t = target_map[key] |
| 271 | b_id = _jhash(json.dumps(b, sort_keys=True, separators=(",", ":"))) |
| 272 | t_id = _jhash(json.dumps(t, sort_keys=True, separators=(",", ":"))) |
| 273 | if b_id == t_id: |
| 274 | continue |
| 275 | fields: dict[str, FieldMutation] = {} |
| 276 | if b["confirmations"] != t["confirmations"]: |
| 277 | fields["confirmations"] = FieldMutation( |
| 278 | old=str(b["confirmations"]), |
| 279 | new=str(t["confirmations"]), |
| 280 | ) |
| 281 | if b["label"] != t["label"]: |
| 282 | fields["label"] = FieldMutation( |
| 283 | old=str(b["label"] or ""), |
| 284 | new=str(t["label"] or ""), |
| 285 | ) |
| 286 | if fields: |
| 287 | ops.append( |
| 288 | MutateOp( |
| 289 | op="mutate", |
| 290 | address=f"{path}::{key}", |
| 291 | entity_id=key, |
| 292 | old_content_id=b_id, |
| 293 | new_content_id=t_id, |
| 294 | fields=fields, |
| 295 | old_summary=f"UTXO {key} (prev)", |
| 296 | new_summary=f"UTXO {key} (updated)", |
| 297 | position=None, |
| 298 | ) |
| 299 | ) |
| 300 | else: |
| 301 | ops.append( |
| 302 | ReplaceOp( |
| 303 | op="replace", |
| 304 | address=f"{path}::{key}", |
| 305 | position=None, |
| 306 | old_content_id=b_id, |
| 307 | new_content_id=t_id, |
| 308 | old_summary=f"UTXO {key} (prev)", |
| 309 | new_summary=f"UTXO {key} (updated)", |
| 310 | ) |
| 311 | ) |
| 312 | |
| 313 | return ops |
| 314 | |
| 315 | |
| 316 | def _diff_transactions( |
| 317 | path: str, |
| 318 | old_data: bytes, |
| 319 | new_data: bytes, |
| 320 | ) -> list[DomainOp]: |
| 321 | """Diff confirmed transaction histories (append-only by txid).""" |
| 322 | base_txids: set[str] = {t["txid"] for t in _load_transactions(old_data)} |
| 323 | target_map: dict[str, TransactionRecord] = { |
| 324 | t["txid"]: t for t in _load_transactions(new_data) |
| 325 | } |
| 326 | |
| 327 | ops: list[DomainOp] = [] |
| 328 | for txid in sorted(set(target_map) - base_txids): |
| 329 | tx = target_map[txid] |
| 330 | ops.append( |
| 331 | InsertOp( |
| 332 | op="insert", |
| 333 | address=f"{path}::{txid}", |
| 334 | position=None, |
| 335 | content_id=_jhash(json.dumps(tx, sort_keys=True, separators=(",", ":"))), |
| 336 | content_summary=( |
| 337 | f"tx {txid[:12]}…" |
| 338 | f" fee={format_sat(tx['fee_sat'])}" |
| 339 | f" {'confirmed' if tx['confirmed'] else 'unconfirmed'}" |
| 340 | ), |
| 341 | ) |
| 342 | ) |
| 343 | return ops |
| 344 | |
| 345 | |
| 346 | def _diff_labels( |
| 347 | path: str, |
| 348 | old_data: bytes, |
| 349 | new_data: bytes, |
| 350 | ) -> list[DomainOp]: |
| 351 | """Diff address label sets (by address identity).""" |
| 352 | base_map: dict[str, AddressLabelRecord] = { |
| 353 | lbl["address"]: lbl for lbl in _load_labels(old_data) |
| 354 | } |
| 355 | target_map: dict[str, AddressLabelRecord] = { |
| 356 | lbl["address"]: lbl for lbl in _load_labels(new_data) |
| 357 | } |
| 358 | |
| 359 | ops: list[DomainOp] = [] |
| 360 | for addr in sorted(set(target_map) - set(base_map)): |
| 361 | lbl = target_map[addr] |
| 362 | ops.append( |
| 363 | InsertOp( |
| 364 | op="insert", |
| 365 | address=f"{path}::{addr}", |
| 366 | position=None, |
| 367 | content_id=_jhash(json.dumps(lbl, sort_keys=True, separators=(",", ":"))), |
| 368 | content_summary=f"label {lbl['label']!r} → {addr[:16]}…", |
| 369 | ) |
| 370 | ) |
| 371 | for addr in sorted(set(base_map) - set(target_map)): |
| 372 | lbl = base_map[addr] |
| 373 | ops.append( |
| 374 | DeleteOp( |
| 375 | op="delete", |
| 376 | address=f"{path}::{addr}", |
| 377 | position=None, |
| 378 | content_id=_jhash(json.dumps(lbl, sort_keys=True, separators=(",", ":"))), |
| 379 | content_summary=f"removed label from {addr[:16]}…", |
| 380 | ) |
| 381 | ) |
| 382 | for addr in sorted(set(base_map) & set(target_map)): |
| 383 | b, t = base_map[addr], target_map[addr] |
| 384 | b_id = _jhash(json.dumps(b, sort_keys=True, separators=(",", ":"))) |
| 385 | t_id = _jhash(json.dumps(t, sort_keys=True, separators=(",", ":"))) |
| 386 | if b_id != t_id: |
| 387 | ops.append( |
| 388 | MutateOp( |
| 389 | op="mutate", |
| 390 | address=f"{path}::{addr}", |
| 391 | entity_id=addr, |
| 392 | old_content_id=b_id, |
| 393 | new_content_id=t_id, |
| 394 | fields={"label": FieldMutation(old=b["label"], new=t["label"])}, |
| 395 | old_summary=f"label {b['label']!r}", |
| 396 | new_summary=f"label {t['label']!r}", |
| 397 | position=None, |
| 398 | ) |
| 399 | ) |
| 400 | return ops |
| 401 | |
| 402 | |
| 403 | def _diff_channels( |
| 404 | path: str, |
| 405 | old_data: bytes, |
| 406 | new_data: bytes, |
| 407 | ) -> list[DomainOp]: |
| 408 | """Diff Lightning channel states (by channel_id identity).""" |
| 409 | base_map: dict[str, LightningChannelRecord] = { |
| 410 | c["channel_id"]: c for c in _load_channels(old_data) |
| 411 | } |
| 412 | target_map: dict[str, LightningChannelRecord] = { |
| 413 | c["channel_id"]: c for c in _load_channels(new_data) |
| 414 | } |
| 415 | |
| 416 | ops: list[DomainOp] = [] |
| 417 | for cid in sorted(set(target_map) - set(base_map)): |
| 418 | ch = target_map[cid] |
| 419 | ops.append( |
| 420 | InsertOp( |
| 421 | op="insert", |
| 422 | address=f"{path}::{cid}", |
| 423 | position=None, |
| 424 | content_id=_jhash(json.dumps(ch, sort_keys=True, separators=(",", ":"))), |
| 425 | content_summary=( |
| 426 | f"channel opened with {(ch['peer_alias'] or ch['peer_pubkey'][:16] + '…')}" |
| 427 | f" capacity={format_sat(ch['capacity_sat'])}" |
| 428 | ), |
| 429 | ) |
| 430 | ) |
| 431 | for cid in sorted(set(base_map) - set(target_map)): |
| 432 | ch = base_map[cid] |
| 433 | ops.append( |
| 434 | DeleteOp( |
| 435 | op="delete", |
| 436 | address=f"{path}::{cid}", |
| 437 | position=None, |
| 438 | content_id=_jhash(json.dumps(ch, sort_keys=True, separators=(",", ":"))), |
| 439 | content_summary=( |
| 440 | f"channel closed with {(ch['peer_alias'] or ch['peer_pubkey'][:16] + '…')}" |
| 441 | ), |
| 442 | ) |
| 443 | ) |
| 444 | for cid in sorted(set(base_map) & set(target_map)): |
| 445 | b, t = base_map[cid], target_map[cid] |
| 446 | b_id = _jhash(json.dumps(b, sort_keys=True, separators=(",", ":"))) |
| 447 | t_id = _jhash(json.dumps(t, sort_keys=True, separators=(",", ":"))) |
| 448 | if b_id == t_id: |
| 449 | continue |
| 450 | ch_fields: dict[str, FieldMutation] = {} |
| 451 | if b["local_balance_sat"] != t["local_balance_sat"]: |
| 452 | ch_fields["local_balance_sat"] = FieldMutation( |
| 453 | old=str(b["local_balance_sat"]), new=str(t["local_balance_sat"]) |
| 454 | ) |
| 455 | if b["remote_balance_sat"] != t["remote_balance_sat"]: |
| 456 | ch_fields["remote_balance_sat"] = FieldMutation( |
| 457 | old=str(b["remote_balance_sat"]), new=str(t["remote_balance_sat"]) |
| 458 | ) |
| 459 | if b["is_active"] != t["is_active"]: |
| 460 | ch_fields["is_active"] = FieldMutation( |
| 461 | old=str(b["is_active"]), new=str(t["is_active"]) |
| 462 | ) |
| 463 | if b["htlc_count"] != t["htlc_count"]: |
| 464 | ch_fields["htlc_count"] = FieldMutation( |
| 465 | old=str(b["htlc_count"]), new=str(t["htlc_count"]) |
| 466 | ) |
| 467 | if not ch_fields: |
| 468 | ch_fields["state"] = FieldMutation(old="prev", new="updated") |
| 469 | ops.append( |
| 470 | MutateOp( |
| 471 | op="mutate", |
| 472 | address=f"{path}::{cid}", |
| 473 | entity_id=cid, |
| 474 | old_content_id=b_id, |
| 475 | new_content_id=t_id, |
| 476 | fields=ch_fields, |
| 477 | old_summary=f"channel {cid} (prev)", |
| 478 | new_summary=f"channel {cid} (updated)", |
| 479 | position=None, |
| 480 | ) |
| 481 | ) |
| 482 | return ops |
| 483 | |
| 484 | |
| 485 | def _diff_strategy( |
| 486 | path: str, |
| 487 | old_data: bytes, |
| 488 | new_data: bytes, |
| 489 | ) -> list[DomainOp]: |
| 490 | """Diff agent strategy at field level using MutateOp per changed field.""" |
| 491 | base_strat = _load_strategy(old_data) |
| 492 | target_strat = _load_strategy(new_data) |
| 493 | b_id = _jhash(json.dumps(base_strat, sort_keys=True, separators=(",", ":"))) |
| 494 | t_id = _jhash(json.dumps(target_strat, sort_keys=True, separators=(",", ":"))) |
| 495 | if b_id == t_id: |
| 496 | return [] |
| 497 | strat_fields: dict[str, FieldMutation] = {} |
| 498 | if base_strat["name"] != target_strat["name"]: |
| 499 | strat_fields["name"] = FieldMutation(old=base_strat["name"], new=target_strat["name"]) |
| 500 | if base_strat["max_fee_rate_sat_vbyte"] != target_strat["max_fee_rate_sat_vbyte"]: |
| 501 | strat_fields["max_fee_rate_sat_vbyte"] = FieldMutation( |
| 502 | old=str(base_strat["max_fee_rate_sat_vbyte"]), |
| 503 | new=str(target_strat["max_fee_rate_sat_vbyte"]), |
| 504 | ) |
| 505 | if base_strat["min_confirmations"] != target_strat["min_confirmations"]: |
| 506 | strat_fields["min_confirmations"] = FieldMutation( |
| 507 | old=str(base_strat["min_confirmations"]), |
| 508 | new=str(target_strat["min_confirmations"]), |
| 509 | ) |
| 510 | if base_strat["utxo_consolidation_threshold"] != target_strat["utxo_consolidation_threshold"]: |
| 511 | strat_fields["utxo_consolidation_threshold"] = FieldMutation( |
| 512 | old=str(base_strat["utxo_consolidation_threshold"]), |
| 513 | new=str(target_strat["utxo_consolidation_threshold"]), |
| 514 | ) |
| 515 | if base_strat["dca_amount_sat"] != target_strat["dca_amount_sat"]: |
| 516 | strat_fields["dca_amount_sat"] = FieldMutation( |
| 517 | old=str(base_strat["dca_amount_sat"]), |
| 518 | new=str(target_strat["dca_amount_sat"]), |
| 519 | ) |
| 520 | if base_strat["dca_interval_blocks"] != target_strat["dca_interval_blocks"]: |
| 521 | strat_fields["dca_interval_blocks"] = FieldMutation( |
| 522 | old=str(base_strat["dca_interval_blocks"]), |
| 523 | new=str(target_strat["dca_interval_blocks"]), |
| 524 | ) |
| 525 | if base_strat["lightning_rebalance_threshold"] != target_strat["lightning_rebalance_threshold"]: |
| 526 | strat_fields["lightning_rebalance_threshold"] = FieldMutation( |
| 527 | old=str(base_strat["lightning_rebalance_threshold"]), |
| 528 | new=str(target_strat["lightning_rebalance_threshold"]), |
| 529 | ) |
| 530 | if base_strat["coin_selection"] != target_strat["coin_selection"]: |
| 531 | strat_fields["coin_selection"] = FieldMutation( |
| 532 | old=base_strat["coin_selection"], new=target_strat["coin_selection"] |
| 533 | ) |
| 534 | if base_strat["simulation_mode"] != target_strat["simulation_mode"]: |
| 535 | strat_fields["simulation_mode"] = FieldMutation( |
| 536 | old=str(base_strat["simulation_mode"]), |
| 537 | new=str(target_strat["simulation_mode"]), |
| 538 | ) |
| 539 | return [ |
| 540 | MutateOp( |
| 541 | op="mutate", |
| 542 | address=path, |
| 543 | entity_id="agent_strategy", |
| 544 | old_content_id=b_id, |
| 545 | new_content_id=t_id, |
| 546 | fields=strat_fields, |
| 547 | old_summary=strategy_summary_line(base_strat), |
| 548 | new_summary=strategy_summary_line(target_strat), |
| 549 | position=None, |
| 550 | ) |
| 551 | ] |
| 552 | |
| 553 | |
| 554 | def _diff_execution( |
| 555 | path: str, |
| 556 | old_data: bytes, |
| 557 | new_data: bytes, |
| 558 | ) -> list[DomainOp]: |
| 559 | """Diff execution event logs (append-only by timestamp+txid).""" |
| 560 | base_set: set[str] = { |
| 561 | f"{e['timestamp']}:{e['txid'] or ''}" for e in _load_execution(old_data) |
| 562 | } |
| 563 | ops: list[DomainOp] = [] |
| 564 | for ev in _load_execution(new_data): |
| 565 | key = f"{ev['timestamp']}:{ev['txid'] or ''}" |
| 566 | if key not in base_set: |
| 567 | ops.append( |
| 568 | InsertOp( |
| 569 | op="insert", |
| 570 | address=f"{path}::{key}", |
| 571 | position=None, |
| 572 | content_id=_jhash(json.dumps(ev, sort_keys=True, separators=(",", ":"))), |
| 573 | content_summary=f"event {ev['event_type']}: {ev['note'][:60]}", |
| 574 | ) |
| 575 | ) |
| 576 | return ops |
| 577 | |
| 578 | |
| 579 | def _diff_time_series( |
| 580 | path: str, |
| 581 | old_data: bytes, |
| 582 | new_data: bytes, |
| 583 | loader_key: Literal["prices", "fees"], |
| 584 | ) -> list[DomainOp]: |
| 585 | """Diff an append-only time-series dimension (prices or fees).""" |
| 586 | ops: list[DomainOp] = [] |
| 587 | |
| 588 | if loader_key == "prices": |
| 589 | base_tss: set[int] = {p["timestamp"] for p in _load_prices(old_data)} |
| 590 | for tick in _load_prices(new_data): |
| 591 | if tick["timestamp"] not in base_tss: |
| 592 | price_str = f"${tick['price_usd']:,.2f}" |
| 593 | ops.append( |
| 594 | InsertOp( |
| 595 | op="insert", |
| 596 | address=f"{path}::{tick['timestamp']}", |
| 597 | position=None, |
| 598 | content_id=_jhash(json.dumps(tick, sort_keys=True, separators=(",", ":"))), |
| 599 | content_summary=f"BTC/USD {price_str} from {tick['source']}", |
| 600 | ) |
| 601 | ) |
| 602 | else: |
| 603 | base_tss_f: set[int] = {e["timestamp"] for e in _load_fees(old_data)} |
| 604 | for est in _load_fees(new_data): |
| 605 | if est["timestamp"] not in base_tss_f: |
| 606 | ops.append( |
| 607 | InsertOp( |
| 608 | op="insert", |
| 609 | address=f"{path}::{est['timestamp']}", |
| 610 | position=None, |
| 611 | content_id=_jhash(json.dumps(est, sort_keys=True, separators=(",", ":"))), |
| 612 | content_summary=fee_surface_str(est), |
| 613 | ) |
| 614 | ) |
| 615 | return ops |
| 616 | |
| 617 | |
| 618 | def _diff_peers( |
| 619 | path: str, |
| 620 | old_data: bytes, |
| 621 | new_data: bytes, |
| 622 | ) -> list[DomainOp]: |
| 623 | """Diff peer lists (by pubkey identity).""" |
| 624 | base_map: dict[str, NetworkPeerRecord] = { |
| 625 | p["pubkey"]: p for p in _load_peers(old_data) |
| 626 | } |
| 627 | target_map: dict[str, NetworkPeerRecord] = { |
| 628 | p["pubkey"]: p for p in _load_peers(new_data) |
| 629 | } |
| 630 | ops: list[DomainOp] = [] |
| 631 | for pk in sorted(set(target_map) - set(base_map)): |
| 632 | peer = target_map[pk] |
| 633 | ops.append( |
| 634 | InsertOp( |
| 635 | op="insert", |
| 636 | address=f"{path}::{pk}", |
| 637 | position=None, |
| 638 | content_id=_jhash(json.dumps(peer, sort_keys=True, separators=(",", ":"))), |
| 639 | content_summary=f"peer {peer['alias'] or pk[:16]}… connected={peer['connected']}", |
| 640 | ) |
| 641 | ) |
| 642 | for pk in sorted(set(base_map) - set(target_map)): |
| 643 | peer = base_map[pk] |
| 644 | ops.append( |
| 645 | DeleteOp( |
| 646 | op="delete", |
| 647 | address=f"{path}::{pk}", |
| 648 | position=None, |
| 649 | content_id=_jhash(json.dumps(peer, sort_keys=True, separators=(",", ":"))), |
| 650 | content_summary=f"peer removed: {peer['alias'] or pk[:16]}…", |
| 651 | ) |
| 652 | ) |
| 653 | return ops |
| 654 | |
| 655 | |
| 656 | def _diff_mempool( |
| 657 | path: str, |
| 658 | old_data: bytes, |
| 659 | new_data: bytes, |
| 660 | ) -> list[DomainOp]: |
| 661 | """Diff mempool snapshots (volatile set by txid).""" |
| 662 | base_txids: set[str] = {t["txid"] for t in _load_mempool(old_data)} |
| 663 | target_map: dict[str, PendingTxRecord] = { |
| 664 | t["txid"]: t for t in _load_mempool(new_data) |
| 665 | } |
| 666 | ops: list[DomainOp] = [] |
| 667 | for txid in sorted(set(target_map) - base_txids): |
| 668 | tx = target_map[txid] |
| 669 | ops.append( |
| 670 | InsertOp( |
| 671 | op="insert", |
| 672 | address=f"{path}::{txid}", |
| 673 | position=None, |
| 674 | content_id=_jhash(json.dumps(tx, sort_keys=True, separators=(",", ":"))), |
| 675 | content_summary=mempool_summary_line([tx]), |
| 676 | ) |
| 677 | ) |
| 678 | for txid in sorted(base_txids - set(target_map)): |
| 679 | ops.append( |
| 680 | DeleteOp( |
| 681 | op="delete", |
| 682 | address=f"{path}::{txid}", |
| 683 | position=None, |
| 684 | content_id=txid, |
| 685 | content_summary=f"tx {txid[:12]}… left mempool", |
| 686 | ) |
| 687 | ) |
| 688 | return ops |
| 689 | |
| 690 | |
| 691 | def _semantic_child_ops( |
| 692 | path: str, |
| 693 | old_data: bytes, |
| 694 | new_data: bytes, |
| 695 | handler: str, |
| 696 | ) -> list[DomainOp]: |
| 697 | """Dispatch to the correct semantic diff helper for a known file.""" |
| 698 | if handler == "utxos": |
| 699 | return _diff_utxos(path, old_data, new_data) |
| 700 | if handler == "transactions": |
| 701 | return _diff_transactions(path, old_data, new_data) |
| 702 | if handler == "labels": |
| 703 | return _diff_labels(path, old_data, new_data) |
| 704 | if handler == "channels": |
| 705 | return _diff_channels(path, old_data, new_data) |
| 706 | if handler == "strategy": |
| 707 | return _diff_strategy(path, old_data, new_data) |
| 708 | if handler == "execution": |
| 709 | return _diff_execution(path, old_data, new_data) |
| 710 | if handler == "prices": |
| 711 | return _diff_time_series(path, old_data, new_data, "prices") |
| 712 | if handler == "fees": |
| 713 | return _diff_time_series(path, old_data, new_data, "fees") |
| 714 | if handler == "peers": |
| 715 | return _diff_peers(path, old_data, new_data) |
| 716 | if handler == "mempool": |
| 717 | return _diff_mempool(path, old_data, new_data) |
| 718 | return [] |
| 719 | |
| 720 | |
| 721 | def _handler_for_path(path: str) -> str | None: |
| 722 | """Return the semantic handler key for a workdir path, or ``None``.""" |
| 723 | for suffix, handler in _SEMANTIC_SUFFIXES.items(): |
| 724 | if path.endswith(suffix): |
| 725 | return handler |
| 726 | return None |
| 727 | |
| 728 | |
| 729 | def _diff_modified_file( |
| 730 | path: str, |
| 731 | old_hash: str, |
| 732 | new_hash: str, |
| 733 | repo_root: pathlib.Path | None, |
| 734 | ) -> DomainOp: |
| 735 | """Produce the best available op for a single modified file. |
| 736 | |
| 737 | With ``repo_root``: load blobs, run semantic diff, return ``PatchOp``. |
| 738 | Without ``repo_root``: return coarse ``ReplaceOp``. |
| 739 | """ |
| 740 | handler = _handler_for_path(path) |
| 741 | if repo_root is not None and handler is not None: |
| 742 | try: |
| 743 | old_bytes = object_path(repo_root, old_hash).read_bytes() |
| 744 | new_bytes = object_path(repo_root, new_hash).read_bytes() |
| 745 | child_ops = _semantic_child_ops(path, old_bytes, new_bytes, handler) |
| 746 | if child_ops: |
| 747 | n = len(child_ops) |
| 748 | return PatchOp( |
| 749 | op="patch", |
| 750 | address=path, |
| 751 | child_ops=child_ops, |
| 752 | child_domain=f"bitcoin.{handler}", |
| 753 | child_summary=f"{n} {handler} change{'s' if n != 1 else ''}", |
| 754 | ) |
| 755 | except OSError: |
| 756 | logger.debug("bitcoin diff: blob not found for %s, falling back", path) |
| 757 | |
| 758 | return ReplaceOp( |
| 759 | op="replace", |
| 760 | address=path, |
| 761 | position=None, |
| 762 | old_content_id=old_hash, |
| 763 | new_content_id=new_hash, |
| 764 | old_summary=f"{path} (prev)", |
| 765 | new_summary=f"{path} (updated)", |
| 766 | ) |
| 767 | |
| 768 | |
| 769 | # --------------------------------------------------------------------------- |
| 770 | # CRDT state helpers |
| 771 | # --------------------------------------------------------------------------- |
| 772 | |
| 773 | _EMPTY_AW: AWMapDict = {"entries": [], "tombstones": []} |
| 774 | _EMPTY_OR: ORSetDict = {"entries": [], "tombstones": []} |
| 775 | |
| 776 | |
| 777 | def _load_aw(crdt_state: dict[str, str], key: str) -> AWMap: |
| 778 | raw = crdt_state.get(key, "{}") |
| 779 | data: AWMapDict = json.loads(raw) if raw != "{}" else _EMPTY_AW |
| 780 | return AWMap.from_dict(data) |
| 781 | |
| 782 | |
| 783 | def _load_or(crdt_state: dict[str, str], key: str) -> ORSet: |
| 784 | raw = crdt_state.get(key, "{}") |
| 785 | data: ORSetDict = json.loads(raw) if raw != "{}" else _EMPTY_OR |
| 786 | return ORSet.from_dict(data) |
| 787 | |
| 788 | |
| 789 | # --------------------------------------------------------------------------- |
| 790 | # BitcoinPlugin |
| 791 | # --------------------------------------------------------------------------- |
| 792 | |
| 793 | |
| 794 | class BitcoinPlugin: |
| 795 | """Bitcoin domain plugin — the full MuseDomainPlugin + OT + CRDT stack. |
| 796 | |
| 797 | Implements three protocol levels: |
| 798 | |
| 799 | 1. **Core** (:class:`~muse.domain.MuseDomainPlugin`) — snapshot, diff, |
| 800 | merge, drift, apply, schema. |
| 801 | 2. **OT merge** (:class:`~muse.domain.StructuredMergePlugin`) — operation- |
| 802 | level merge with double-spend detection. |
| 803 | 3. **CRDT** (:class:`~muse.domain.CRDTPlugin`) — convergent join for |
| 804 | high-throughput multi-agent write scenarios. |
| 805 | """ |
| 806 | |
| 807 | # ------------------------------------------------------------------ |
| 808 | # 1. snapshot |
| 809 | # ------------------------------------------------------------------ |
| 810 | |
| 811 | def snapshot(self, live_state: LiveState) -> StateSnapshot: |
| 812 | """Capture the working tree as a content-addressed manifest. |
| 813 | |
| 814 | Walks every non-hidden, non-ignored file in the working tree and |
| 815 | records its SHA-256 digest. Private keys are never stored — the |
| 816 | working tree must only contain watch-only descriptors (xpubs). |
| 817 | |
| 818 | Args: |
| 819 | live_state: Repository root ``pathlib.Path`` or an existing |
| 820 | ``SnapshotManifest`` dict (returned unchanged). |
| 821 | |
| 822 | Returns: |
| 823 | A ``SnapshotManifest`` mapping POSIX paths to SHA-256 digests. |
| 824 | """ |
| 825 | if isinstance(live_state, pathlib.Path): |
| 826 | from muse.core.ignore import is_ignored, load_ignore_config, resolve_patterns |
| 827 | from muse.core.stat_cache import load_cache |
| 828 | |
| 829 | workdir = live_state |
| 830 | repo_root = workdir |
| 831 | patterns = resolve_patterns(load_ignore_config(repo_root), _DOMAIN_NAME) |
| 832 | cache = load_cache(workdir) |
| 833 | root_str = str(workdir) |
| 834 | prefix_len = len(root_str) + 1 |
| 835 | files: dict[str, str] = {} |
| 836 | |
| 837 | for dirpath, dirnames, filenames in os.walk(root_str, followlinks=False): |
| 838 | dirnames[:] = sorted(d for d in dirnames if not d.startswith(".")) |
| 839 | for fname in sorted(filenames): |
| 840 | if fname.startswith("."): |
| 841 | continue |
| 842 | abs_str = os.path.join(dirpath, fname) |
| 843 | try: |
| 844 | st = os.lstat(abs_str) |
| 845 | except OSError: |
| 846 | continue |
| 847 | if not stat_mod.S_ISREG(st.st_mode): |
| 848 | continue |
| 849 | rel = abs_str[prefix_len:] |
| 850 | if os.sep != "/": |
| 851 | rel = rel.replace(os.sep, "/") |
| 852 | if is_ignored(rel, patterns): |
| 853 | continue |
| 854 | files[rel] = cache.get_cached( |
| 855 | rel, abs_str, st.st_mtime, st.st_size |
| 856 | ) |
| 857 | |
| 858 | cache.prune(set(files)) |
| 859 | cache.save() |
| 860 | return SnapshotManifest(files=files, domain=_DOMAIN_NAME) |
| 861 | |
| 862 | return live_state |
| 863 | |
| 864 | # ------------------------------------------------------------------ |
| 865 | # 2. diff |
| 866 | # ------------------------------------------------------------------ |
| 867 | |
| 868 | def diff( |
| 869 | self, |
| 870 | base: StateSnapshot, |
| 871 | target: StateSnapshot, |
| 872 | *, |
| 873 | repo_root: pathlib.Path | None = None, |
| 874 | ) -> StateDelta: |
| 875 | """Compute a structured delta between two Bitcoin state snapshots. |
| 876 | |
| 877 | With ``repo_root``: produces semantic ``PatchOp`` entries with |
| 878 | element-level child ops (UTXO-level, channel-level, field-level). |
| 879 | |
| 880 | Without ``repo_root``: produces coarse ``InsertOp`` / ``DeleteOp`` / |
| 881 | ``ReplaceOp`` at file granularity. |
| 882 | |
| 883 | Args: |
| 884 | base: Ancestor snapshot. |
| 885 | target: Later snapshot. |
| 886 | repo_root: Repository root for object store access. |
| 887 | |
| 888 | Returns: |
| 889 | A ``StructuredDelta`` describing every change from *base* to |
| 890 | *target*. |
| 891 | """ |
| 892 | base_files = base["files"] |
| 893 | target_files = target["files"] |
| 894 | base_paths = set(base_files) |
| 895 | target_paths = set(target_files) |
| 896 | |
| 897 | ops: list[DomainOp] = [] |
| 898 | |
| 899 | for path in sorted(target_paths - base_paths): |
| 900 | ops.append( |
| 901 | InsertOp( |
| 902 | op="insert", |
| 903 | address=path, |
| 904 | position=None, |
| 905 | content_id=target_files[path], |
| 906 | content_summary=f"new: {path}", |
| 907 | ) |
| 908 | ) |
| 909 | |
| 910 | for path in sorted(base_paths - target_paths): |
| 911 | ops.append( |
| 912 | DeleteOp( |
| 913 | op="delete", |
| 914 | address=path, |
| 915 | position=None, |
| 916 | content_id=base_files[path], |
| 917 | content_summary=f"removed: {path}", |
| 918 | ) |
| 919 | ) |
| 920 | |
| 921 | for path in sorted( |
| 922 | p for p in base_paths & target_paths if base_files[p] != target_files[p] |
| 923 | ): |
| 924 | ops.append( |
| 925 | _diff_modified_file( |
| 926 | path=path, |
| 927 | old_hash=base_files[path], |
| 928 | new_hash=target_files[path], |
| 929 | repo_root=repo_root, |
| 930 | ) |
| 931 | ) |
| 932 | |
| 933 | summary = _delta_summary(ops) |
| 934 | return StructuredDelta(domain=_DOMAIN_NAME, ops=ops, summary=summary) |
| 935 | |
| 936 | # ------------------------------------------------------------------ |
| 937 | # 3. merge |
| 938 | # ------------------------------------------------------------------ |
| 939 | |
| 940 | def merge( |
| 941 | self, |
| 942 | base: StateSnapshot, |
| 943 | left: StateSnapshot, |
| 944 | right: StateSnapshot, |
| 945 | *, |
| 946 | repo_root: pathlib.Path | None = None, |
| 947 | ) -> MergeResult: |
| 948 | """Three-way merge with Bitcoin-aware double-spend detection. |
| 949 | |
| 950 | Performs standard three-way file-level merge. When ``repo_root`` is |
| 951 | available and both branches touch ``wallet/utxos.json``, loads the |
| 952 | UTXO sets to detect double-spend candidates and promotes them to |
| 953 | structured ``ConflictRecord`` entries. |
| 954 | |
| 955 | Args: |
| 956 | base: Common ancestor snapshot. |
| 957 | left: Ours (current branch) snapshot. |
| 958 | right: Theirs (incoming branch) snapshot. |
| 959 | repo_root: Repository root for double-spend analysis. |
| 960 | |
| 961 | Returns: |
| 962 | A ``MergeResult`` with the reconciled manifest and any conflicts. |
| 963 | """ |
| 964 | base_files = base["files"] |
| 965 | left_files = left["files"] |
| 966 | right_files = right["files"] |
| 967 | |
| 968 | merged: dict[str, str] = dict(base_files) |
| 969 | conflicts: list[str] = [] |
| 970 | conflict_records: list[ConflictRecord] = [] |
| 971 | |
| 972 | all_paths = set(base_files) | set(left_files) | set(right_files) |
| 973 | for path in sorted(all_paths): |
| 974 | b_val = base_files.get(path) |
| 975 | l_val = left_files.get(path) |
| 976 | r_val = right_files.get(path) |
| 977 | |
| 978 | if l_val == r_val: |
| 979 | if l_val is None: |
| 980 | merged.pop(path, None) |
| 981 | else: |
| 982 | merged[path] = l_val |
| 983 | elif b_val == l_val: |
| 984 | if r_val is None: |
| 985 | merged.pop(path, None) |
| 986 | else: |
| 987 | merged[path] = r_val |
| 988 | elif b_val == r_val: |
| 989 | if l_val is None: |
| 990 | merged.pop(path, None) |
| 991 | else: |
| 992 | merged[path] = l_val |
| 993 | else: |
| 994 | conflicts.append(path) |
| 995 | merged[path] = l_val or r_val or b_val or "" |
| 996 | |
| 997 | # Bitcoin-specific: detect UTXO double-spend for utxos.json conflicts |
| 998 | if repo_root is not None: |
| 999 | for path in list(conflicts): |
| 1000 | if not path.endswith("utxos.json"): |
| 1001 | continue |
| 1002 | b_hash = base_files.get(path) |
| 1003 | l_hash = left_files.get(path) |
| 1004 | r_hash = right_files.get(path) |
| 1005 | if b_hash is None or l_hash is None or r_hash is None: |
| 1006 | continue |
| 1007 | try: |
| 1008 | base_utxos = _load_utxos( |
| 1009 | object_path(repo_root, b_hash).read_bytes() |
| 1010 | ) |
| 1011 | left_utxos = _load_utxos( |
| 1012 | object_path(repo_root, l_hash).read_bytes() |
| 1013 | ) |
| 1014 | right_utxos = _load_utxos( |
| 1015 | object_path(repo_root, r_hash).read_bytes() |
| 1016 | ) |
| 1017 | base_keys = {utxo_key(u) for u in base_utxos} |
| 1018 | left_keys = {utxo_key(u) for u in left_utxos} |
| 1019 | right_keys = {utxo_key(u) for u in right_utxos} |
| 1020 | our_spent = base_keys - left_keys |
| 1021 | their_spent = base_keys - right_keys |
| 1022 | dsc = double_spend_candidates(base_keys, our_spent, their_spent) |
| 1023 | if dsc: |
| 1024 | conflict_records.append( |
| 1025 | ConflictRecord( |
| 1026 | path=path, |
| 1027 | conflict_type="double_spend", |
| 1028 | ours_summary=f"spent {len(our_spent)} UTXO(s)", |
| 1029 | theirs_summary=f"spent {len(their_spent)} UTXO(s)", |
| 1030 | addresses=[f"{path}::{k}" for k in dsc], |
| 1031 | ) |
| 1032 | ) |
| 1033 | except OSError: |
| 1034 | logger.debug("bitcoin merge: blob not found for %s", path) |
| 1035 | |
| 1036 | return MergeResult( |
| 1037 | merged=SnapshotManifest(files=merged, domain=_DOMAIN_NAME), |
| 1038 | conflicts=conflicts, |
| 1039 | conflict_records=conflict_records, |
| 1040 | ) |
| 1041 | |
| 1042 | # ------------------------------------------------------------------ |
| 1043 | # 4. drift |
| 1044 | # ------------------------------------------------------------------ |
| 1045 | |
| 1046 | def drift(self, committed: StateSnapshot, live: LiveState) -> DriftReport: |
| 1047 | """Compare the last committed snapshot against the current live state. |
| 1048 | |
| 1049 | Used by ``muse status``. Produces a ``DriftReport`` describing every |
| 1050 | UTXO gained or spent, every channel state change, every strategy |
| 1051 | parameter update since the last commit. |
| 1052 | |
| 1053 | Args: |
| 1054 | committed: The last committed ``StateSnapshot``. |
| 1055 | live: Current live state (path or snapshot dict). |
| 1056 | |
| 1057 | Returns: |
| 1058 | A ``DriftReport`` with ``has_drift``, ``summary``, and ``delta``. |
| 1059 | """ |
| 1060 | current = self.snapshot(live) |
| 1061 | delta = self.diff(committed, current) |
| 1062 | has_drift = len(delta["ops"]) > 0 |
| 1063 | return DriftReport( |
| 1064 | has_drift=has_drift, |
| 1065 | summary=delta.get("summary", "working tree clean"), |
| 1066 | delta=delta, |
| 1067 | ) |
| 1068 | |
| 1069 | # ------------------------------------------------------------------ |
| 1070 | # 5. apply |
| 1071 | # ------------------------------------------------------------------ |
| 1072 | |
| 1073 | def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: |
| 1074 | """Apply a delta during ``muse checkout``. |
| 1075 | |
| 1076 | The core engine restores file-level objects from the object store. |
| 1077 | This hook exists for domain-level post-processing; Bitcoin currently |
| 1078 | requires none beyond file restoration. |
| 1079 | |
| 1080 | Args: |
| 1081 | delta: The typed operation list to apply. |
| 1082 | live_state: Current live state. |
| 1083 | |
| 1084 | Returns: |
| 1085 | The unchanged ``live_state`` (post-processing is a no-op here). |
| 1086 | """ |
| 1087 | return live_state |
| 1088 | |
| 1089 | # ------------------------------------------------------------------ |
| 1090 | # 6. schema |
| 1091 | # ------------------------------------------------------------------ |
| 1092 | |
| 1093 | def schema(self) -> DomainSchema: |
| 1094 | """Declare the multidimensional structure of Bitcoin state. |
| 1095 | |
| 1096 | Ten dimensions map to the ten recognized workdir file types. The |
| 1097 | ``merge_mode`` is ``"crdt"`` to signal that this plugin supports the |
| 1098 | :class:`~muse.domain.CRDTPlugin` convergent join path for multi-agent |
| 1099 | scenarios. |
| 1100 | |
| 1101 | Returns: |
| 1102 | A ``DomainSchema`` with all Bitcoin dimensions declared. |
| 1103 | """ |
| 1104 | return DomainSchema( |
| 1105 | domain=_DOMAIN_NAME, |
| 1106 | description=( |
| 1107 | "Bitcoin domain — multidimensional version control for wallet state, " |
| 1108 | "Lightning channels, agent strategies, and oracle data. " |
| 1109 | "Watch-only (no private keys). UTXO-level diff, channel-level merge, " |
| 1110 | "agent-strategy branching, and CRDT convergent join for multi-agent " |
| 1111 | "scenarios with millions of concurrent agents." |
| 1112 | ), |
| 1113 | top_level=SetSchema( |
| 1114 | kind="set", |
| 1115 | element_type="bitcoin_state_file", |
| 1116 | identity="by_content", |
| 1117 | ), |
| 1118 | dimensions=[ |
| 1119 | DimensionSpec( |
| 1120 | name="utxos", |
| 1121 | description=( |
| 1122 | "Unspent transaction outputs — the coin set. " |
| 1123 | "Identity: txid:vout. Double-spend detection activates " |
| 1124 | "when two branches delete the same UTXO." |
| 1125 | ), |
| 1126 | schema=SetSchema( |
| 1127 | kind="set", |
| 1128 | element_type="utxo", |
| 1129 | identity="by_id", |
| 1130 | ), |
| 1131 | independent_merge=True, |
| 1132 | ), |
| 1133 | DimensionSpec( |
| 1134 | name="transactions", |
| 1135 | description=( |
| 1136 | "Confirmed transaction history. Append-only: the blockchain " |
| 1137 | "never removes confirmed transactions. New txids from both " |
| 1138 | "branches are merged by union." |
| 1139 | ), |
| 1140 | schema=SequenceSchema( |
| 1141 | kind="sequence", |
| 1142 | element_type="transaction", |
| 1143 | identity="by_id", |
| 1144 | diff_algorithm="lcs", |
| 1145 | alphabet=None, |
| 1146 | ), |
| 1147 | independent_merge=True, |
| 1148 | ), |
| 1149 | DimensionSpec( |
| 1150 | name="labels", |
| 1151 | description=( |
| 1152 | "Address semantic annotations. Additive: concurrent label " |
| 1153 | "additions from multiple agents always survive. CRDT OR-Set " |
| 1154 | "semantics in CRDT mode." |
| 1155 | ), |
| 1156 | schema=SetSchema( |
| 1157 | kind="set", |
| 1158 | element_type="address_label", |
| 1159 | identity="by_id", |
| 1160 | ), |
| 1161 | independent_merge=True, |
| 1162 | ), |
| 1163 | DimensionSpec( |
| 1164 | name="descriptors", |
| 1165 | description="Watch-only wallet descriptors (xpubs). Never private keys.", |
| 1166 | schema=SetSchema( |
| 1167 | kind="set", |
| 1168 | element_type="descriptor", |
| 1169 | identity="by_id", |
| 1170 | ), |
| 1171 | independent_merge=True, |
| 1172 | ), |
| 1173 | DimensionSpec( |
| 1174 | name="channels", |
| 1175 | description=( |
| 1176 | "Lightning payment channel states. Identity: channel_id. " |
| 1177 | "Balance changes are MutateOps; open/close are Insert/DeleteOps." |
| 1178 | ), |
| 1179 | schema=SetSchema( |
| 1180 | kind="set", |
| 1181 | element_type="lightning_channel", |
| 1182 | identity="by_id", |
| 1183 | ), |
| 1184 | independent_merge=True, |
| 1185 | ), |
| 1186 | DimensionSpec( |
| 1187 | name="strategy", |
| 1188 | description=( |
| 1189 | "Agent DCA / fee / rebalancing configuration. Field-level " |
| 1190 | "MutateOps enable per-parameter conflict detection and LWW " |
| 1191 | "resolution in CRDT mode." |
| 1192 | ), |
| 1193 | schema=MapSchema( |
| 1194 | kind="map", |
| 1195 | key_type="strategy_field", |
| 1196 | value_schema=SetSchema( |
| 1197 | kind="set", |
| 1198 | element_type="field_value", |
| 1199 | identity="by_content", |
| 1200 | ), |
| 1201 | identity="by_key", |
| 1202 | ), |
| 1203 | independent_merge=False, |
| 1204 | ), |
| 1205 | DimensionSpec( |
| 1206 | name="oracle_prices", |
| 1207 | description=( |
| 1208 | "BTC/USD price feed. Time-ordered sequence of oracle ticks. " |
| 1209 | "Concurrent ticks from different sources are merged by union " |
| 1210 | "ordered by timestamp." |
| 1211 | ), |
| 1212 | schema=SequenceSchema( |
| 1213 | kind="sequence", |
| 1214 | element_type="price_tick", |
| 1215 | identity="by_id", |
| 1216 | diff_algorithm="lcs", |
| 1217 | alphabet=None, |
| 1218 | ), |
| 1219 | independent_merge=True, |
| 1220 | ), |
| 1221 | DimensionSpec( |
| 1222 | name="oracle_fees", |
| 1223 | description="Mempool fee surface time series (sat/vbyte per block target).", |
| 1224 | schema=SequenceSchema( |
| 1225 | kind="sequence", |
| 1226 | element_type="fee_estimate", |
| 1227 | identity="by_id", |
| 1228 | diff_algorithm="lcs", |
| 1229 | alphabet=None, |
| 1230 | ), |
| 1231 | independent_merge=True, |
| 1232 | ), |
| 1233 | DimensionSpec( |
| 1234 | name="network", |
| 1235 | description="Known P2P peers and local mempool state.", |
| 1236 | schema=SetSchema( |
| 1237 | kind="set", |
| 1238 | element_type="network_peer", |
| 1239 | identity="by_id", |
| 1240 | ), |
| 1241 | independent_merge=True, |
| 1242 | ), |
| 1243 | DimensionSpec( |
| 1244 | name="execution", |
| 1245 | description=( |
| 1246 | "Agent execution event log — append-only audit trail of every " |
| 1247 | "decision the agent made: DCA buys, fee bumps, channel opens, " |
| 1248 | "rebalances. Never deleted." |
| 1249 | ), |
| 1250 | schema=SequenceSchema( |
| 1251 | kind="sequence", |
| 1252 | element_type="execution_event", |
| 1253 | identity="by_id", |
| 1254 | diff_algorithm="lcs", |
| 1255 | alphabet=None, |
| 1256 | ), |
| 1257 | independent_merge=True, |
| 1258 | ), |
| 1259 | ], |
| 1260 | merge_mode="crdt", |
| 1261 | schema_version=1, |
| 1262 | ) |
| 1263 | |
| 1264 | # ------------------------------------------------------------------ |
| 1265 | # StructuredMergePlugin — operation-level OT merge |
| 1266 | # ------------------------------------------------------------------ |
| 1267 | |
| 1268 | def merge_ops( |
| 1269 | self, |
| 1270 | base: StateSnapshot, |
| 1271 | ours_snap: StateSnapshot, |
| 1272 | theirs_snap: StateSnapshot, |
| 1273 | ours_ops: list[DomainOp], |
| 1274 | theirs_ops: list[DomainOp], |
| 1275 | *, |
| 1276 | repo_root: pathlib.Path | None = None, |
| 1277 | ) -> MergeResult: |
| 1278 | """Operation-level three-way merge with double-spend detection. |
| 1279 | |
| 1280 | Uses the OT engine's commutativity oracle to detect op-level conflicts. |
| 1281 | Bitcoin-specific rule applied on top of standard OT: |
| 1282 | |
| 1283 | **Double-spend signal**: if both ``ours_ops`` and ``theirs_ops`` |
| 1284 | contain a ``DeleteOp`` for the same UTXO address |
| 1285 | (``"wallet/utxos.json::{txid}:{vout}"``), this is a strategy-layer |
| 1286 | double-spend attempt. The op is promoted to a structured conflict even |
| 1287 | if the standard OT oracle would allow it (since ``DeleteOp + DeleteOp`` |
| 1288 | normally commutes). |
| 1289 | |
| 1290 | Args: |
| 1291 | base: Common ancestor snapshot. |
| 1292 | ours_snap: Our branch's final snapshot. |
| 1293 | theirs_snap: Their branch's final snapshot. |
| 1294 | ours_ops: Our branch's typed operation list. |
| 1295 | theirs_ops: Their branch's typed operation list. |
| 1296 | repo_root: Repository root for ``.museattributes`` and object store. |
| 1297 | |
| 1298 | Returns: |
| 1299 | ``MergeResult`` with empty ``conflicts`` if all ops commute, or |
| 1300 | structured conflicts including double-spend records. |
| 1301 | """ |
| 1302 | result = merge_op_lists( |
| 1303 | base_ops=[], |
| 1304 | ours_ops=ours_ops, |
| 1305 | theirs_ops=theirs_ops, |
| 1306 | ) |
| 1307 | |
| 1308 | conflicts: list[str] = [] |
| 1309 | conflict_records: list[ConflictRecord] = [] |
| 1310 | |
| 1311 | # Standard OT conflicts |
| 1312 | if result.conflict_ops: |
| 1313 | seen: set[str] = set() |
| 1314 | for our_op, their_op in result.conflict_ops: |
| 1315 | addr = our_op["address"] |
| 1316 | seen.add(addr) |
| 1317 | conflict_records.append( |
| 1318 | ConflictRecord( |
| 1319 | path=addr.split("::")[0], |
| 1320 | conflict_type="symbol_edit_overlap", |
| 1321 | ours_summary=f"ours: {our_op['op']} {addr}", |
| 1322 | theirs_summary=f"theirs: {their_op['op']} {addr}", |
| 1323 | addresses=[addr], |
| 1324 | ) |
| 1325 | ) |
| 1326 | conflicts = sorted(seen) |
| 1327 | |
| 1328 | # Bitcoin-specific: double-spend detection on UTXO delete ops |
| 1329 | our_utxo_deletes: set[str] = { |
| 1330 | op["address"] |
| 1331 | for op in ours_ops |
| 1332 | if op["op"] == "delete" and "utxos.json::" in op["address"] |
| 1333 | } |
| 1334 | their_utxo_deletes: set[str] = { |
| 1335 | op["address"] |
| 1336 | for op in theirs_ops |
| 1337 | if op["op"] == "delete" and "utxos.json::" in op["address"] |
| 1338 | } |
| 1339 | double_spends = sorted(our_utxo_deletes & their_utxo_deletes) |
| 1340 | for addr in double_spends: |
| 1341 | if addr not in conflicts: |
| 1342 | conflicts.append(addr) |
| 1343 | conflict_records.append( |
| 1344 | ConflictRecord( |
| 1345 | path=addr.split("::")[0], |
| 1346 | conflict_type="double_spend", |
| 1347 | ours_summary=f"ours spent UTXO {addr.split('::')[-1]}", |
| 1348 | theirs_summary=f"theirs spent UTXO {addr.split('::')[-1]}", |
| 1349 | addresses=[addr], |
| 1350 | ) |
| 1351 | ) |
| 1352 | |
| 1353 | fallback = self.merge(base, ours_snap, theirs_snap, repo_root=repo_root) |
| 1354 | return MergeResult( |
| 1355 | merged=fallback.merged, |
| 1356 | conflicts=conflicts if conflicts else fallback.conflicts, |
| 1357 | conflict_records=conflict_records if conflict_records else fallback.conflict_records, |
| 1358 | applied_strategies=fallback.applied_strategies, |
| 1359 | dimension_reports=fallback.dimension_reports, |
| 1360 | op_log=result.merged_ops, |
| 1361 | ) |
| 1362 | |
| 1363 | # ------------------------------------------------------------------ |
| 1364 | # CRDTPlugin — convergent multi-agent join |
| 1365 | # ------------------------------------------------------------------ |
| 1366 | |
| 1367 | def crdt_schema(self) -> list[CRDTDimensionSpec]: |
| 1368 | """Declare the CRDT primitive for each Bitcoin dimension. |
| 1369 | |
| 1370 | Returns: |
| 1371 | Seven ``CRDTDimensionSpec`` entries, one per CRDT-backed dimension. |
| 1372 | """ |
| 1373 | return [ |
| 1374 | CRDTDimensionSpec( |
| 1375 | name="files_manifest", |
| 1376 | description=( |
| 1377 | "The file manifest itself — convergent AW-Map so concurrent " |
| 1378 | "file additions from any agent are always preserved." |
| 1379 | ), |
| 1380 | crdt_type="aw_map", |
| 1381 | independent_merge=True, |
| 1382 | ), |
| 1383 | CRDTDimensionSpec( |
| 1384 | name="utxos", |
| 1385 | description=( |
| 1386 | "UTXO set as AW-Map (txid:vout → UTXO data). Add-wins: " |
| 1387 | "a UTXO received by one agent is preserved even if another " |
| 1388 | "agent has a stale view. Spending is a remove; two concurrent " |
| 1389 | "removes of the same UTXO produce a double-spend warning at " |
| 1390 | "the OT layer." |
| 1391 | ), |
| 1392 | crdt_type="aw_map", |
| 1393 | independent_merge=True, |
| 1394 | ), |
| 1395 | CRDTDimensionSpec( |
| 1396 | name="labels", |
| 1397 | description=( |
| 1398 | "Address labels as OR-Set. Concurrent label additions from " |
| 1399 | "any agent win over removes. An agent labeling an address " |
| 1400 | "'cold storage' is never silently overwritten by a concurrent " |
| 1401 | "remove from another agent." |
| 1402 | ), |
| 1403 | crdt_type="or_set", |
| 1404 | independent_merge=True, |
| 1405 | ), |
| 1406 | CRDTDimensionSpec( |
| 1407 | name="channels", |
| 1408 | description=( |
| 1409 | "Lightning channels as AW-Map (channel_id → state JSON). " |
| 1410 | "Add-wins: a new channel opened by one agent is preserved " |
| 1411 | "under concurrent state from other agents." |
| 1412 | ), |
| 1413 | crdt_type="aw_map", |
| 1414 | independent_merge=True, |
| 1415 | ), |
| 1416 | CRDTDimensionSpec( |
| 1417 | name="strategy", |
| 1418 | description=( |
| 1419 | "Agent strategy as AW-Map (field_name → value). Each config " |
| 1420 | "field is an independent LWW register (via token ordering). " |
| 1421 | "One agent changing max_fee_rate never conflicts with another " |
| 1422 | "agent changing dca_amount_sat." |
| 1423 | ), |
| 1424 | crdt_type="aw_map", |
| 1425 | independent_merge=False, |
| 1426 | ), |
| 1427 | CRDTDimensionSpec( |
| 1428 | name="transactions", |
| 1429 | description=( |
| 1430 | "Confirmed transaction IDs as OR-Set. Append-only: once a " |
| 1431 | "txid is added it is never legitimately removed. Any agent " |
| 1432 | "that observes a confirmation adds it; the join is the union." |
| 1433 | ), |
| 1434 | crdt_type="or_set", |
| 1435 | independent_merge=True, |
| 1436 | ), |
| 1437 | CRDTDimensionSpec( |
| 1438 | name="mempool", |
| 1439 | description=( |
| 1440 | "Pending txids as OR-Set. Volatile: txids are added when " |
| 1441 | "seen in the mempool and removed when confirmed or evicted. " |
| 1442 | "The join gives every agent the union of all pending txids " |
| 1443 | "seen across the fleet." |
| 1444 | ), |
| 1445 | crdt_type="or_set", |
| 1446 | independent_merge=True, |
| 1447 | ), |
| 1448 | ] |
| 1449 | |
| 1450 | def join( |
| 1451 | self, |
| 1452 | a: CRDTSnapshotManifest, |
| 1453 | b: CRDTSnapshotManifest, |
| 1454 | ) -> CRDTSnapshotManifest: |
| 1455 | """Convergent join of two Bitcoin CRDT snapshots. |
| 1456 | |
| 1457 | Joins each dimension independently using its declared CRDT primitive. |
| 1458 | This operation is commutative, associative, and idempotent — any two |
| 1459 | agents that have received the same set of writes converge to identical |
| 1460 | state regardless of delivery order. |
| 1461 | |
| 1462 | The file manifest is rebuilt from the joined ``files_manifest`` AW-Map |
| 1463 | so that the core engine's content-addressed store remains consistent. |
| 1464 | |
| 1465 | Args: |
| 1466 | a: First CRDT snapshot manifest. |
| 1467 | b: Second CRDT snapshot manifest. |
| 1468 | |
| 1469 | Returns: |
| 1470 | The lattice join — a new ``CRDTSnapshotManifest`` that is the |
| 1471 | least upper bound of *a* and *b*. |
| 1472 | """ |
| 1473 | vc_a = VectorClock.from_dict(a["vclock"]) |
| 1474 | vc_b = VectorClock.from_dict(b["vclock"]) |
| 1475 | merged_vc = vc_a.merge(vc_b) |
| 1476 | |
| 1477 | files_a = _load_aw(a["crdt_state"], "files_manifest") |
| 1478 | files_b = _load_aw(b["crdt_state"], "files_manifest") |
| 1479 | merged_files_map = files_a.join(files_b) |
| 1480 | |
| 1481 | utxos_a = _load_aw(a["crdt_state"], "utxos") |
| 1482 | utxos_b = _load_aw(b["crdt_state"], "utxos") |
| 1483 | merged_utxos = utxos_a.join(utxos_b) |
| 1484 | |
| 1485 | labels_a = _load_or(a["crdt_state"], "labels") |
| 1486 | labels_b = _load_or(b["crdt_state"], "labels") |
| 1487 | merged_labels = labels_a.join(labels_b) |
| 1488 | |
| 1489 | channels_a = _load_aw(a["crdt_state"], "channels") |
| 1490 | channels_b = _load_aw(b["crdt_state"], "channels") |
| 1491 | merged_channels = channels_a.join(channels_b) |
| 1492 | |
| 1493 | strategy_a = _load_aw(a["crdt_state"], "strategy") |
| 1494 | strategy_b = _load_aw(b["crdt_state"], "strategy") |
| 1495 | merged_strategy = strategy_a.join(strategy_b) |
| 1496 | |
| 1497 | txns_a = _load_or(a["crdt_state"], "transactions") |
| 1498 | txns_b = _load_or(b["crdt_state"], "transactions") |
| 1499 | merged_txns = txns_a.join(txns_b) |
| 1500 | |
| 1501 | mempool_a = _load_or(a["crdt_state"], "mempool") |
| 1502 | mempool_b = _load_or(b["crdt_state"], "mempool") |
| 1503 | merged_mempool = mempool_a.join(mempool_b) |
| 1504 | |
| 1505 | merged_files = merged_files_map.to_plain_dict() |
| 1506 | |
| 1507 | crdt_state: dict[str, str] = { |
| 1508 | "files_manifest": json.dumps(merged_files_map.to_dict()), |
| 1509 | "utxos": json.dumps(merged_utxos.to_dict()), |
| 1510 | "labels": json.dumps(merged_labels.to_dict()), |
| 1511 | "channels": json.dumps(merged_channels.to_dict()), |
| 1512 | "strategy": json.dumps(merged_strategy.to_dict()), |
| 1513 | "transactions": json.dumps(merged_txns.to_dict()), |
| 1514 | "mempool": json.dumps(merged_mempool.to_dict()), |
| 1515 | } |
| 1516 | |
| 1517 | return CRDTSnapshotManifest( |
| 1518 | files=merged_files, |
| 1519 | domain=_DOMAIN_NAME, |
| 1520 | vclock=merged_vc.to_dict(), |
| 1521 | crdt_state=crdt_state, |
| 1522 | schema_version=1, |
| 1523 | ) |
| 1524 | |
| 1525 | def to_crdt_state(self, snapshot: StateSnapshot) -> CRDTSnapshotManifest: |
| 1526 | """Lift a plain snapshot into CRDT state representation. |
| 1527 | |
| 1528 | Initialises the ``files_manifest`` AW-Map from the snapshot's ``files`` |
| 1529 | dict. All domain-specific CRDT dimensions start empty and are populated |
| 1530 | lazily as agents commit content. |
| 1531 | |
| 1532 | Args: |
| 1533 | snapshot: A plain ``StateSnapshot`` to lift. |
| 1534 | |
| 1535 | Returns: |
| 1536 | A ``CRDTSnapshotManifest`` with the snapshot's files and empty |
| 1537 | per-dimension CRDT metadata. |
| 1538 | """ |
| 1539 | files_map = AWMap() |
| 1540 | for path, content_hash in snapshot["files"].items(): |
| 1541 | files_map = files_map.set(path, content_hash) |
| 1542 | |
| 1543 | empty_aw = json.dumps(_EMPTY_AW) |
| 1544 | empty_or = json.dumps(_EMPTY_OR) |
| 1545 | |
| 1546 | crdt_state: dict[str, str] = { |
| 1547 | "files_manifest": json.dumps(files_map.to_dict()), |
| 1548 | "utxos": empty_aw, |
| 1549 | "labels": empty_or, |
| 1550 | "channels": empty_aw, |
| 1551 | "strategy": empty_aw, |
| 1552 | "transactions": empty_or, |
| 1553 | "mempool": empty_or, |
| 1554 | } |
| 1555 | |
| 1556 | return CRDTSnapshotManifest( |
| 1557 | files=snapshot["files"], |
| 1558 | domain=_DOMAIN_NAME, |
| 1559 | vclock=VectorClock().to_dict(), |
| 1560 | crdt_state=crdt_state, |
| 1561 | schema_version=1, |
| 1562 | ) |
| 1563 | |
| 1564 | def from_crdt_state(self, crdt: CRDTSnapshotManifest) -> StateSnapshot: |
| 1565 | """Materialise a CRDT manifest back into a plain snapshot. |
| 1566 | |
| 1567 | Extracts the visible (non-tombstoned) file manifest from the |
| 1568 | ``files_manifest`` AW-Map. Used by ``muse show`` and CLI commands |
| 1569 | that need a standard ``StateSnapshot`` view. |
| 1570 | |
| 1571 | Args: |
| 1572 | crdt: A ``CRDTSnapshotManifest`` to materialise. |
| 1573 | |
| 1574 | Returns: |
| 1575 | A plain ``SnapshotManifest`` with the current visible files. |
| 1576 | """ |
| 1577 | files_map = _load_aw(crdt["crdt_state"], "files_manifest") |
| 1578 | visible = files_map.to_plain_dict() |
| 1579 | files = visible if visible else crdt["files"] |
| 1580 | return SnapshotManifest(files=files, domain=_DOMAIN_NAME) |
| 1581 | |
| 1582 | |
| 1583 | # --------------------------------------------------------------------------- |
| 1584 | # Delta summary helper |
| 1585 | # --------------------------------------------------------------------------- |
| 1586 | |
| 1587 | |
| 1588 | def _delta_summary(ops: list[DomainOp]) -> str: |
| 1589 | """Produce a concise human-readable summary of a delta's operations.""" |
| 1590 | if not ops: |
| 1591 | return "working tree clean" |
| 1592 | |
| 1593 | counts: dict[str, int] = {"insert": 0, "delete": 0, "replace": 0, "patch": 0, "mutate": 0} |
| 1594 | for op in ops: |
| 1595 | op_type = op["op"] |
| 1596 | if op_type in counts: |
| 1597 | counts[op_type] += 1 |
| 1598 | |
| 1599 | parts: list[str] = [] |
| 1600 | if counts["insert"]: |
| 1601 | parts.append(f"{counts['insert']} added") |
| 1602 | if counts["delete"]: |
| 1603 | parts.append(f"{counts['delete']} removed") |
| 1604 | if counts["patch"]: |
| 1605 | parts.append(f"{counts['patch']} updated") |
| 1606 | if counts["mutate"]: |
| 1607 | parts.append(f"{counts['mutate']} mutated") |
| 1608 | if counts["replace"]: |
| 1609 | parts.append(f"{counts['replace']} replaced") |
| 1610 | return ", ".join(parts) if parts else "changes present" |