gabriel / muse public
plugin.py python
1610 lines 58.9 KB
73edf876 feat(bitcoin): add Bitcoin domain plugin — multidimensional VCS for on-… Gabriel Cardona <gabriel@tellurstori.com> 4d ago
1 """Bitcoin domain plugin — multidimensional version control for Bitcoin state.
2
3 This plugin implements the full :class:`~muse.domain.MuseDomainPlugin` protocol
4 plus both optional extensions:
5
6 - :class:`~muse.domain.StructuredMergePlugin` — operation-level OT merge with
7 **double-spend detection**: when two agents concurrently mark the same UTXO
8 as spent, MUSE surfaces the conflict before any transaction hits the mempool.
9
10 - :class:`~muse.domain.CRDTPlugin` — convergent CRDT join for high-throughput
11 multi-agent scenarios where millions of agents operate per second. ``join``
12 always succeeds; no conflict state exists in CRDT mode.
13
14 What is versioned
15 -----------------
16 The plugin versions your *relationship* with Bitcoin — never private keys.
17 All descriptors are watch-only xpubs. Signing happens outside MUSE; MUSE
18 tracks the intent, the strategy, and the outcomes.
19
20 +--------------------------+-----------------------------------------------+
21 | Workdir path | What it tracks |
22 +==========================+===============================================+
23 | wallet/utxos.json | Unspent transaction outputs (the coin set) |
24 | wallet/transactions.json | Confirmed transaction history |
25 | wallet/labels.json | Address semantic annotations |
26 | wallet/descriptors.json | Watch-only wallet descriptors (xpubs) |
27 | channels/channels.json | Lightning payment channel states |
28 | channels/routing.json | Lightning routing policies |
29 | strategy/agent.json | Agent DCA / fee / rebalancing configuration |
30 | strategy/execution.json | Agent decision event log (append-only) |
31 | oracles/prices.json | BTC/USD price feed |
32 | oracles/fees.json | Mempool fee surface snapshots |
33 | network/peers.json | Known P2P network peers |
34 | network/mempool.json | Local mempool view (pending transactions) |
35 +--------------------------+-----------------------------------------------+
36
37 Branch semantics
38 ----------------
39 ``main`` — real wallet state: UTXOs, confirmed transactions, active channels.
40 ``feat/<name>`` — strategy experiment: ``simulation_mode: true`` in
41 ``strategy/agent.json`` means no real transactions are broadcast.
42
43 MUSE merge checks whether the strategy branch and the main branch have
44 incompatible UTXO spends — the OT merge engine's double-spend detector runs
45 before any signing happens.
46 """
47
48 from __future__ import annotations
49
50 import hashlib
51 import json
52 import logging
53 import os
54 import pathlib
55 import stat as stat_mod
56 from typing import Literal
57
58 from muse.core.crdts import AWMap, ORSet, VectorClock
59 from muse.core.crdts.aw_map import AWMapDict
60 from muse.core.crdts.or_set import ORSetDict
61 from muse.core.object_store import object_path
62 from muse.core.op_transform import merge_op_lists
63 from muse.core.schema import (
64 CRDTDimensionSpec,
65 DimensionSpec,
66 DomainSchema,
67 MapSchema,
68 SequenceSchema,
69 SetSchema,
70 )
71 from muse.domain import (
72 CRDTSnapshotManifest,
73 ConflictRecord,
74 DeleteOp,
75 DomainOp,
76 DriftReport,
77 FieldMutation,
78 InsertOp,
79 LiveState,
80 MergeResult,
81 MutateOp,
82 PatchOp,
83 ReplaceOp,
84 SnapshotManifest,
85 StateDelta,
86 StateSnapshot,
87 StructuredDelta,
88 )
89 from muse.plugins.bitcoin._query import (
90 channel_summary_line,
91 double_spend_candidates,
92 fee_surface_str,
93 format_sat,
94 latest_fee_estimate,
95 latest_price,
96 mempool_summary_line,
97 strategy_summary_line,
98 total_balance_sat,
99 utxo_key,
100 utxo_summary_line,
101 )
102 from muse.plugins.bitcoin._types import (
103 AddressLabelRecord,
104 AgentStrategyRecord,
105 DescriptorRecord,
106 ExecutionEventRecord,
107 FeeEstimateRecord,
108 LightningChannelRecord,
109 NetworkPeerRecord,
110 OraclePriceTickRecord,
111 PendingTxRecord,
112 RoutingPolicyRecord,
113 TransactionRecord,
114 UTXORecord,
115 )
116
117 logger = logging.getLogger(__name__)
118
119 _DOMAIN_NAME = "bitcoin"
120
121 # Recognized semantic file suffixes → diff handler key
122 _SEMANTIC_SUFFIXES: dict[str, str] = {
123 "utxos.json": "utxos",
124 "transactions.json": "transactions",
125 "labels.json": "labels",
126 "descriptors.json": "descriptors",
127 "channels.json": "channels",
128 "routing.json": "routing",
129 "agent.json": "strategy",
130 "execution.json": "execution",
131 "prices.json": "prices",
132 "fees.json": "fees",
133 "peers.json": "peers",
134 "mempool.json": "mempool",
135 }
136
137
138 # ---------------------------------------------------------------------------
139 # Internal helpers — JSON loading (typed)
140 # ---------------------------------------------------------------------------
141
142
143 def _load_utxos(data: bytes) -> list[UTXORecord]:
144 result: list[UTXORecord] = json.loads(data)
145 return result
146
147
148 def _load_transactions(data: bytes) -> list[TransactionRecord]:
149 result: list[TransactionRecord] = json.loads(data)
150 return result
151
152
153 def _load_labels(data: bytes) -> list[AddressLabelRecord]:
154 result: list[AddressLabelRecord] = json.loads(data)
155 return result
156
157
158 def _load_descriptors(data: bytes) -> list[DescriptorRecord]:
159 result: list[DescriptorRecord] = json.loads(data)
160 return result
161
162
163 def _load_channels(data: bytes) -> list[LightningChannelRecord]:
164 result: list[LightningChannelRecord] = json.loads(data)
165 return result
166
167
168 def _load_routing(data: bytes) -> list[RoutingPolicyRecord]:
169 result: list[RoutingPolicyRecord] = json.loads(data)
170 return result
171
172
173 def _load_strategy(data: bytes) -> AgentStrategyRecord:
174 result: AgentStrategyRecord = json.loads(data)
175 return result
176
177
178 def _load_execution(data: bytes) -> list[ExecutionEventRecord]:
179 result: list[ExecutionEventRecord] = json.loads(data)
180 return result
181
182
183 def _load_prices(data: bytes) -> list[OraclePriceTickRecord]:
184 result: list[OraclePriceTickRecord] = json.loads(data)
185 return result
186
187
188 def _load_fees(data: bytes) -> list[FeeEstimateRecord]:
189 result: list[FeeEstimateRecord] = json.loads(data)
190 return result
191
192
193 def _load_peers(data: bytes) -> list[NetworkPeerRecord]:
194 result: list[NetworkPeerRecord] = json.loads(data)
195 return result
196
197
198 def _load_mempool(data: bytes) -> list[PendingTxRecord]:
199 result: list[PendingTxRecord] = json.loads(data)
200 return result
201
202
203 def _content_hash(data: bytes) -> str:
204 return hashlib.sha256(data).hexdigest()
205
206
207 def _hash_file(path: pathlib.Path) -> str:
208 return _content_hash(path.read_bytes())
209
210
211 def _jhash(json_str: str) -> str:
212 """SHA-256 of a pre-serialized canonical JSON string."""
213 return hashlib.sha256(json_str.encode()).hexdigest()
214
215
216
217
218 # ---------------------------------------------------------------------------
219 # Semantic diff helpers — one per recognized file type
220 # ---------------------------------------------------------------------------
221
222
223 def _diff_utxos(
224 path: str,
225 old_data: bytes,
226 new_data: bytes,
227 ) -> list[DomainOp]:
228 """Diff two UTXO sets at the UTXO level (txid:vout identity)."""
229 base_list = _load_utxos(old_data)
230 target_list = _load_utxos(new_data)
231
232 base_map: dict[str, UTXORecord] = {utxo_key(u): u for u in base_list}
233 target_map: dict[str, UTXORecord] = {utxo_key(u): u for u in target_list}
234
235 ops: list[DomainOp] = []
236
237 for key in sorted(set(target_map) - set(base_map)):
238 utxo = target_map[key]
239 ops.append(
240 InsertOp(
241 op="insert",
242 address=f"{path}::{key}",
243 position=None,
244 content_id=_jhash(json.dumps(utxo, sort_keys=True, separators=(",", ":"))),
245 content_summary=(
246 f"received +{format_sat(utxo['amount_sat'])}"
247 f" at {utxo['address'][:16]}…"
248 f" ({utxo['script_type']})"
249 ),
250 )
251 )
252
253 for key in sorted(set(base_map) - set(target_map)):
254 utxo = base_map[key]
255 ops.append(
256 DeleteOp(
257 op="delete",
258 address=f"{path}::{key}",
259 position=None,
260 content_id=_jhash(json.dumps(utxo, sort_keys=True, separators=(",", ":"))),
261 content_summary=(
262 f"spent -{format_sat(utxo['amount_sat'])}"
263 f" from {utxo['address'][:16]}…"
264 ),
265 )
266 )
267
268 for key in sorted(set(base_map) & set(target_map)):
269 b = base_map[key]
270 t = target_map[key]
271 b_id = _jhash(json.dumps(b, sort_keys=True, separators=(",", ":")))
272 t_id = _jhash(json.dumps(t, sort_keys=True, separators=(",", ":")))
273 if b_id == t_id:
274 continue
275 fields: dict[str, FieldMutation] = {}
276 if b["confirmations"] != t["confirmations"]:
277 fields["confirmations"] = FieldMutation(
278 old=str(b["confirmations"]),
279 new=str(t["confirmations"]),
280 )
281 if b["label"] != t["label"]:
282 fields["label"] = FieldMutation(
283 old=str(b["label"] or ""),
284 new=str(t["label"] or ""),
285 )
286 if fields:
287 ops.append(
288 MutateOp(
289 op="mutate",
290 address=f"{path}::{key}",
291 entity_id=key,
292 old_content_id=b_id,
293 new_content_id=t_id,
294 fields=fields,
295 old_summary=f"UTXO {key} (prev)",
296 new_summary=f"UTXO {key} (updated)",
297 position=None,
298 )
299 )
300 else:
301 ops.append(
302 ReplaceOp(
303 op="replace",
304 address=f"{path}::{key}",
305 position=None,
306 old_content_id=b_id,
307 new_content_id=t_id,
308 old_summary=f"UTXO {key} (prev)",
309 new_summary=f"UTXO {key} (updated)",
310 )
311 )
312
313 return ops
314
315
316 def _diff_transactions(
317 path: str,
318 old_data: bytes,
319 new_data: bytes,
320 ) -> list[DomainOp]:
321 """Diff confirmed transaction histories (append-only by txid)."""
322 base_txids: set[str] = {t["txid"] for t in _load_transactions(old_data)}
323 target_map: dict[str, TransactionRecord] = {
324 t["txid"]: t for t in _load_transactions(new_data)
325 }
326
327 ops: list[DomainOp] = []
328 for txid in sorted(set(target_map) - base_txids):
329 tx = target_map[txid]
330 ops.append(
331 InsertOp(
332 op="insert",
333 address=f"{path}::{txid}",
334 position=None,
335 content_id=_jhash(json.dumps(tx, sort_keys=True, separators=(",", ":"))),
336 content_summary=(
337 f"tx {txid[:12]}…"
338 f" fee={format_sat(tx['fee_sat'])}"
339 f" {'confirmed' if tx['confirmed'] else 'unconfirmed'}"
340 ),
341 )
342 )
343 return ops
344
345
346 def _diff_labels(
347 path: str,
348 old_data: bytes,
349 new_data: bytes,
350 ) -> list[DomainOp]:
351 """Diff address label sets (by address identity)."""
352 base_map: dict[str, AddressLabelRecord] = {
353 lbl["address"]: lbl for lbl in _load_labels(old_data)
354 }
355 target_map: dict[str, AddressLabelRecord] = {
356 lbl["address"]: lbl for lbl in _load_labels(new_data)
357 }
358
359 ops: list[DomainOp] = []
360 for addr in sorted(set(target_map) - set(base_map)):
361 lbl = target_map[addr]
362 ops.append(
363 InsertOp(
364 op="insert",
365 address=f"{path}::{addr}",
366 position=None,
367 content_id=_jhash(json.dumps(lbl, sort_keys=True, separators=(",", ":"))),
368 content_summary=f"label {lbl['label']!r} → {addr[:16]}…",
369 )
370 )
371 for addr in sorted(set(base_map) - set(target_map)):
372 lbl = base_map[addr]
373 ops.append(
374 DeleteOp(
375 op="delete",
376 address=f"{path}::{addr}",
377 position=None,
378 content_id=_jhash(json.dumps(lbl, sort_keys=True, separators=(",", ":"))),
379 content_summary=f"removed label from {addr[:16]}…",
380 )
381 )
382 for addr in sorted(set(base_map) & set(target_map)):
383 b, t = base_map[addr], target_map[addr]
384 b_id = _jhash(json.dumps(b, sort_keys=True, separators=(",", ":")))
385 t_id = _jhash(json.dumps(t, sort_keys=True, separators=(",", ":")))
386 if b_id != t_id:
387 ops.append(
388 MutateOp(
389 op="mutate",
390 address=f"{path}::{addr}",
391 entity_id=addr,
392 old_content_id=b_id,
393 new_content_id=t_id,
394 fields={"label": FieldMutation(old=b["label"], new=t["label"])},
395 old_summary=f"label {b['label']!r}",
396 new_summary=f"label {t['label']!r}",
397 position=None,
398 )
399 )
400 return ops
401
402
403 def _diff_channels(
404 path: str,
405 old_data: bytes,
406 new_data: bytes,
407 ) -> list[DomainOp]:
408 """Diff Lightning channel states (by channel_id identity)."""
409 base_map: dict[str, LightningChannelRecord] = {
410 c["channel_id"]: c for c in _load_channels(old_data)
411 }
412 target_map: dict[str, LightningChannelRecord] = {
413 c["channel_id"]: c for c in _load_channels(new_data)
414 }
415
416 ops: list[DomainOp] = []
417 for cid in sorted(set(target_map) - set(base_map)):
418 ch = target_map[cid]
419 ops.append(
420 InsertOp(
421 op="insert",
422 address=f"{path}::{cid}",
423 position=None,
424 content_id=_jhash(json.dumps(ch, sort_keys=True, separators=(",", ":"))),
425 content_summary=(
426 f"channel opened with {(ch['peer_alias'] or ch['peer_pubkey'][:16] + '…')}"
427 f" capacity={format_sat(ch['capacity_sat'])}"
428 ),
429 )
430 )
431 for cid in sorted(set(base_map) - set(target_map)):
432 ch = base_map[cid]
433 ops.append(
434 DeleteOp(
435 op="delete",
436 address=f"{path}::{cid}",
437 position=None,
438 content_id=_jhash(json.dumps(ch, sort_keys=True, separators=(",", ":"))),
439 content_summary=(
440 f"channel closed with {(ch['peer_alias'] or ch['peer_pubkey'][:16] + '…')}"
441 ),
442 )
443 )
444 for cid in sorted(set(base_map) & set(target_map)):
445 b, t = base_map[cid], target_map[cid]
446 b_id = _jhash(json.dumps(b, sort_keys=True, separators=(",", ":")))
447 t_id = _jhash(json.dumps(t, sort_keys=True, separators=(",", ":")))
448 if b_id == t_id:
449 continue
450 ch_fields: dict[str, FieldMutation] = {}
451 if b["local_balance_sat"] != t["local_balance_sat"]:
452 ch_fields["local_balance_sat"] = FieldMutation(
453 old=str(b["local_balance_sat"]), new=str(t["local_balance_sat"])
454 )
455 if b["remote_balance_sat"] != t["remote_balance_sat"]:
456 ch_fields["remote_balance_sat"] = FieldMutation(
457 old=str(b["remote_balance_sat"]), new=str(t["remote_balance_sat"])
458 )
459 if b["is_active"] != t["is_active"]:
460 ch_fields["is_active"] = FieldMutation(
461 old=str(b["is_active"]), new=str(t["is_active"])
462 )
463 if b["htlc_count"] != t["htlc_count"]:
464 ch_fields["htlc_count"] = FieldMutation(
465 old=str(b["htlc_count"]), new=str(t["htlc_count"])
466 )
467 if not ch_fields:
468 ch_fields["state"] = FieldMutation(old="prev", new="updated")
469 ops.append(
470 MutateOp(
471 op="mutate",
472 address=f"{path}::{cid}",
473 entity_id=cid,
474 old_content_id=b_id,
475 new_content_id=t_id,
476 fields=ch_fields,
477 old_summary=f"channel {cid} (prev)",
478 new_summary=f"channel {cid} (updated)",
479 position=None,
480 )
481 )
482 return ops
483
484
485 def _diff_strategy(
486 path: str,
487 old_data: bytes,
488 new_data: bytes,
489 ) -> list[DomainOp]:
490 """Diff agent strategy at field level using MutateOp per changed field."""
491 base_strat = _load_strategy(old_data)
492 target_strat = _load_strategy(new_data)
493 b_id = _jhash(json.dumps(base_strat, sort_keys=True, separators=(",", ":")))
494 t_id = _jhash(json.dumps(target_strat, sort_keys=True, separators=(",", ":")))
495 if b_id == t_id:
496 return []
497 strat_fields: dict[str, FieldMutation] = {}
498 if base_strat["name"] != target_strat["name"]:
499 strat_fields["name"] = FieldMutation(old=base_strat["name"], new=target_strat["name"])
500 if base_strat["max_fee_rate_sat_vbyte"] != target_strat["max_fee_rate_sat_vbyte"]:
501 strat_fields["max_fee_rate_sat_vbyte"] = FieldMutation(
502 old=str(base_strat["max_fee_rate_sat_vbyte"]),
503 new=str(target_strat["max_fee_rate_sat_vbyte"]),
504 )
505 if base_strat["min_confirmations"] != target_strat["min_confirmations"]:
506 strat_fields["min_confirmations"] = FieldMutation(
507 old=str(base_strat["min_confirmations"]),
508 new=str(target_strat["min_confirmations"]),
509 )
510 if base_strat["utxo_consolidation_threshold"] != target_strat["utxo_consolidation_threshold"]:
511 strat_fields["utxo_consolidation_threshold"] = FieldMutation(
512 old=str(base_strat["utxo_consolidation_threshold"]),
513 new=str(target_strat["utxo_consolidation_threshold"]),
514 )
515 if base_strat["dca_amount_sat"] != target_strat["dca_amount_sat"]:
516 strat_fields["dca_amount_sat"] = FieldMutation(
517 old=str(base_strat["dca_amount_sat"]),
518 new=str(target_strat["dca_amount_sat"]),
519 )
520 if base_strat["dca_interval_blocks"] != target_strat["dca_interval_blocks"]:
521 strat_fields["dca_interval_blocks"] = FieldMutation(
522 old=str(base_strat["dca_interval_blocks"]),
523 new=str(target_strat["dca_interval_blocks"]),
524 )
525 if base_strat["lightning_rebalance_threshold"] != target_strat["lightning_rebalance_threshold"]:
526 strat_fields["lightning_rebalance_threshold"] = FieldMutation(
527 old=str(base_strat["lightning_rebalance_threshold"]),
528 new=str(target_strat["lightning_rebalance_threshold"]),
529 )
530 if base_strat["coin_selection"] != target_strat["coin_selection"]:
531 strat_fields["coin_selection"] = FieldMutation(
532 old=base_strat["coin_selection"], new=target_strat["coin_selection"]
533 )
534 if base_strat["simulation_mode"] != target_strat["simulation_mode"]:
535 strat_fields["simulation_mode"] = FieldMutation(
536 old=str(base_strat["simulation_mode"]),
537 new=str(target_strat["simulation_mode"]),
538 )
539 return [
540 MutateOp(
541 op="mutate",
542 address=path,
543 entity_id="agent_strategy",
544 old_content_id=b_id,
545 new_content_id=t_id,
546 fields=strat_fields,
547 old_summary=strategy_summary_line(base_strat),
548 new_summary=strategy_summary_line(target_strat),
549 position=None,
550 )
551 ]
552
553
554 def _diff_execution(
555 path: str,
556 old_data: bytes,
557 new_data: bytes,
558 ) -> list[DomainOp]:
559 """Diff execution event logs (append-only by timestamp+txid)."""
560 base_set: set[str] = {
561 f"{e['timestamp']}:{e['txid'] or ''}" for e in _load_execution(old_data)
562 }
563 ops: list[DomainOp] = []
564 for ev in _load_execution(new_data):
565 key = f"{ev['timestamp']}:{ev['txid'] or ''}"
566 if key not in base_set:
567 ops.append(
568 InsertOp(
569 op="insert",
570 address=f"{path}::{key}",
571 position=None,
572 content_id=_jhash(json.dumps(ev, sort_keys=True, separators=(",", ":"))),
573 content_summary=f"event {ev['event_type']}: {ev['note'][:60]}",
574 )
575 )
576 return ops
577
578
579 def _diff_time_series(
580 path: str,
581 old_data: bytes,
582 new_data: bytes,
583 loader_key: Literal["prices", "fees"],
584 ) -> list[DomainOp]:
585 """Diff an append-only time-series dimension (prices or fees)."""
586 ops: list[DomainOp] = []
587
588 if loader_key == "prices":
589 base_tss: set[int] = {p["timestamp"] for p in _load_prices(old_data)}
590 for tick in _load_prices(new_data):
591 if tick["timestamp"] not in base_tss:
592 price_str = f"${tick['price_usd']:,.2f}"
593 ops.append(
594 InsertOp(
595 op="insert",
596 address=f"{path}::{tick['timestamp']}",
597 position=None,
598 content_id=_jhash(json.dumps(tick, sort_keys=True, separators=(",", ":"))),
599 content_summary=f"BTC/USD {price_str} from {tick['source']}",
600 )
601 )
602 else:
603 base_tss_f: set[int] = {e["timestamp"] for e in _load_fees(old_data)}
604 for est in _load_fees(new_data):
605 if est["timestamp"] not in base_tss_f:
606 ops.append(
607 InsertOp(
608 op="insert",
609 address=f"{path}::{est['timestamp']}",
610 position=None,
611 content_id=_jhash(json.dumps(est, sort_keys=True, separators=(",", ":"))),
612 content_summary=fee_surface_str(est),
613 )
614 )
615 return ops
616
617
618 def _diff_peers(
619 path: str,
620 old_data: bytes,
621 new_data: bytes,
622 ) -> list[DomainOp]:
623 """Diff peer lists (by pubkey identity)."""
624 base_map: dict[str, NetworkPeerRecord] = {
625 p["pubkey"]: p for p in _load_peers(old_data)
626 }
627 target_map: dict[str, NetworkPeerRecord] = {
628 p["pubkey"]: p for p in _load_peers(new_data)
629 }
630 ops: list[DomainOp] = []
631 for pk in sorted(set(target_map) - set(base_map)):
632 peer = target_map[pk]
633 ops.append(
634 InsertOp(
635 op="insert",
636 address=f"{path}::{pk}",
637 position=None,
638 content_id=_jhash(json.dumps(peer, sort_keys=True, separators=(",", ":"))),
639 content_summary=f"peer {peer['alias'] or pk[:16]}… connected={peer['connected']}",
640 )
641 )
642 for pk in sorted(set(base_map) - set(target_map)):
643 peer = base_map[pk]
644 ops.append(
645 DeleteOp(
646 op="delete",
647 address=f"{path}::{pk}",
648 position=None,
649 content_id=_jhash(json.dumps(peer, sort_keys=True, separators=(",", ":"))),
650 content_summary=f"peer removed: {peer['alias'] or pk[:16]}…",
651 )
652 )
653 return ops
654
655
656 def _diff_mempool(
657 path: str,
658 old_data: bytes,
659 new_data: bytes,
660 ) -> list[DomainOp]:
661 """Diff mempool snapshots (volatile set by txid)."""
662 base_txids: set[str] = {t["txid"] for t in _load_mempool(old_data)}
663 target_map: dict[str, PendingTxRecord] = {
664 t["txid"]: t for t in _load_mempool(new_data)
665 }
666 ops: list[DomainOp] = []
667 for txid in sorted(set(target_map) - base_txids):
668 tx = target_map[txid]
669 ops.append(
670 InsertOp(
671 op="insert",
672 address=f"{path}::{txid}",
673 position=None,
674 content_id=_jhash(json.dumps(tx, sort_keys=True, separators=(",", ":"))),
675 content_summary=mempool_summary_line([tx]),
676 )
677 )
678 for txid in sorted(base_txids - set(target_map)):
679 ops.append(
680 DeleteOp(
681 op="delete",
682 address=f"{path}::{txid}",
683 position=None,
684 content_id=txid,
685 content_summary=f"tx {txid[:12]}… left mempool",
686 )
687 )
688 return ops
689
690
691 def _semantic_child_ops(
692 path: str,
693 old_data: bytes,
694 new_data: bytes,
695 handler: str,
696 ) -> list[DomainOp]:
697 """Dispatch to the correct semantic diff helper for a known file."""
698 if handler == "utxos":
699 return _diff_utxos(path, old_data, new_data)
700 if handler == "transactions":
701 return _diff_transactions(path, old_data, new_data)
702 if handler == "labels":
703 return _diff_labels(path, old_data, new_data)
704 if handler == "channels":
705 return _diff_channels(path, old_data, new_data)
706 if handler == "strategy":
707 return _diff_strategy(path, old_data, new_data)
708 if handler == "execution":
709 return _diff_execution(path, old_data, new_data)
710 if handler == "prices":
711 return _diff_time_series(path, old_data, new_data, "prices")
712 if handler == "fees":
713 return _diff_time_series(path, old_data, new_data, "fees")
714 if handler == "peers":
715 return _diff_peers(path, old_data, new_data)
716 if handler == "mempool":
717 return _diff_mempool(path, old_data, new_data)
718 return []
719
720
721 def _handler_for_path(path: str) -> str | None:
722 """Return the semantic handler key for a workdir path, or ``None``."""
723 for suffix, handler in _SEMANTIC_SUFFIXES.items():
724 if path.endswith(suffix):
725 return handler
726 return None
727
728
729 def _diff_modified_file(
730 path: str,
731 old_hash: str,
732 new_hash: str,
733 repo_root: pathlib.Path | None,
734 ) -> DomainOp:
735 """Produce the best available op for a single modified file.
736
737 With ``repo_root``: load blobs, run semantic diff, return ``PatchOp``.
738 Without ``repo_root``: return coarse ``ReplaceOp``.
739 """
740 handler = _handler_for_path(path)
741 if repo_root is not None and handler is not None:
742 try:
743 old_bytes = object_path(repo_root, old_hash).read_bytes()
744 new_bytes = object_path(repo_root, new_hash).read_bytes()
745 child_ops = _semantic_child_ops(path, old_bytes, new_bytes, handler)
746 if child_ops:
747 n = len(child_ops)
748 return PatchOp(
749 op="patch",
750 address=path,
751 child_ops=child_ops,
752 child_domain=f"bitcoin.{handler}",
753 child_summary=f"{n} {handler} change{'s' if n != 1 else ''}",
754 )
755 except OSError:
756 logger.debug("bitcoin diff: blob not found for %s, falling back", path)
757
758 return ReplaceOp(
759 op="replace",
760 address=path,
761 position=None,
762 old_content_id=old_hash,
763 new_content_id=new_hash,
764 old_summary=f"{path} (prev)",
765 new_summary=f"{path} (updated)",
766 )
767
768
769 # ---------------------------------------------------------------------------
770 # CRDT state helpers
771 # ---------------------------------------------------------------------------
772
773 _EMPTY_AW: AWMapDict = {"entries": [], "tombstones": []}
774 _EMPTY_OR: ORSetDict = {"entries": [], "tombstones": []}
775
776
777 def _load_aw(crdt_state: dict[str, str], key: str) -> AWMap:
778 raw = crdt_state.get(key, "{}")
779 data: AWMapDict = json.loads(raw) if raw != "{}" else _EMPTY_AW
780 return AWMap.from_dict(data)
781
782
783 def _load_or(crdt_state: dict[str, str], key: str) -> ORSet:
784 raw = crdt_state.get(key, "{}")
785 data: ORSetDict = json.loads(raw) if raw != "{}" else _EMPTY_OR
786 return ORSet.from_dict(data)
787
788
789 # ---------------------------------------------------------------------------
790 # BitcoinPlugin
791 # ---------------------------------------------------------------------------
792
793
794 class BitcoinPlugin:
795 """Bitcoin domain plugin — the full MuseDomainPlugin + OT + CRDT stack.
796
797 Implements three protocol levels:
798
799 1. **Core** (:class:`~muse.domain.MuseDomainPlugin`) — snapshot, diff,
800 merge, drift, apply, schema.
801 2. **OT merge** (:class:`~muse.domain.StructuredMergePlugin`) — operation-
802 level merge with double-spend detection.
803 3. **CRDT** (:class:`~muse.domain.CRDTPlugin`) — convergent join for
804 high-throughput multi-agent write scenarios.
805 """
806
807 # ------------------------------------------------------------------
808 # 1. snapshot
809 # ------------------------------------------------------------------
810
811 def snapshot(self, live_state: LiveState) -> StateSnapshot:
812 """Capture the working tree as a content-addressed manifest.
813
814 Walks every non-hidden, non-ignored file in the working tree and
815 records its SHA-256 digest. Private keys are never stored — the
816 working tree must only contain watch-only descriptors (xpubs).
817
818 Args:
819 live_state: Repository root ``pathlib.Path`` or an existing
820 ``SnapshotManifest`` dict (returned unchanged).
821
822 Returns:
823 A ``SnapshotManifest`` mapping POSIX paths to SHA-256 digests.
824 """
825 if isinstance(live_state, pathlib.Path):
826 from muse.core.ignore import is_ignored, load_ignore_config, resolve_patterns
827 from muse.core.stat_cache import load_cache
828
829 workdir = live_state
830 repo_root = workdir
831 patterns = resolve_patterns(load_ignore_config(repo_root), _DOMAIN_NAME)
832 cache = load_cache(workdir)
833 root_str = str(workdir)
834 prefix_len = len(root_str) + 1
835 files: dict[str, str] = {}
836
837 for dirpath, dirnames, filenames in os.walk(root_str, followlinks=False):
838 dirnames[:] = sorted(d for d in dirnames if not d.startswith("."))
839 for fname in sorted(filenames):
840 if fname.startswith("."):
841 continue
842 abs_str = os.path.join(dirpath, fname)
843 try:
844 st = os.lstat(abs_str)
845 except OSError:
846 continue
847 if not stat_mod.S_ISREG(st.st_mode):
848 continue
849 rel = abs_str[prefix_len:]
850 if os.sep != "/":
851 rel = rel.replace(os.sep, "/")
852 if is_ignored(rel, patterns):
853 continue
854 files[rel] = cache.get_cached(
855 rel, abs_str, st.st_mtime, st.st_size
856 )
857
858 cache.prune(set(files))
859 cache.save()
860 return SnapshotManifest(files=files, domain=_DOMAIN_NAME)
861
862 return live_state
863
864 # ------------------------------------------------------------------
865 # 2. diff
866 # ------------------------------------------------------------------
867
868 def diff(
869 self,
870 base: StateSnapshot,
871 target: StateSnapshot,
872 *,
873 repo_root: pathlib.Path | None = None,
874 ) -> StateDelta:
875 """Compute a structured delta between two Bitcoin state snapshots.
876
877 With ``repo_root``: produces semantic ``PatchOp`` entries with
878 element-level child ops (UTXO-level, channel-level, field-level).
879
880 Without ``repo_root``: produces coarse ``InsertOp`` / ``DeleteOp`` /
881 ``ReplaceOp`` at file granularity.
882
883 Args:
884 base: Ancestor snapshot.
885 target: Later snapshot.
886 repo_root: Repository root for object store access.
887
888 Returns:
889 A ``StructuredDelta`` describing every change from *base* to
890 *target*.
891 """
892 base_files = base["files"]
893 target_files = target["files"]
894 base_paths = set(base_files)
895 target_paths = set(target_files)
896
897 ops: list[DomainOp] = []
898
899 for path in sorted(target_paths - base_paths):
900 ops.append(
901 InsertOp(
902 op="insert",
903 address=path,
904 position=None,
905 content_id=target_files[path],
906 content_summary=f"new: {path}",
907 )
908 )
909
910 for path in sorted(base_paths - target_paths):
911 ops.append(
912 DeleteOp(
913 op="delete",
914 address=path,
915 position=None,
916 content_id=base_files[path],
917 content_summary=f"removed: {path}",
918 )
919 )
920
921 for path in sorted(
922 p for p in base_paths & target_paths if base_files[p] != target_files[p]
923 ):
924 ops.append(
925 _diff_modified_file(
926 path=path,
927 old_hash=base_files[path],
928 new_hash=target_files[path],
929 repo_root=repo_root,
930 )
931 )
932
933 summary = _delta_summary(ops)
934 return StructuredDelta(domain=_DOMAIN_NAME, ops=ops, summary=summary)
935
936 # ------------------------------------------------------------------
937 # 3. merge
938 # ------------------------------------------------------------------
939
940 def merge(
941 self,
942 base: StateSnapshot,
943 left: StateSnapshot,
944 right: StateSnapshot,
945 *,
946 repo_root: pathlib.Path | None = None,
947 ) -> MergeResult:
948 """Three-way merge with Bitcoin-aware double-spend detection.
949
950 Performs standard three-way file-level merge. When ``repo_root`` is
951 available and both branches touch ``wallet/utxos.json``, loads the
952 UTXO sets to detect double-spend candidates and promotes them to
953 structured ``ConflictRecord`` entries.
954
955 Args:
956 base: Common ancestor snapshot.
957 left: Ours (current branch) snapshot.
958 right: Theirs (incoming branch) snapshot.
959 repo_root: Repository root for double-spend analysis.
960
961 Returns:
962 A ``MergeResult`` with the reconciled manifest and any conflicts.
963 """
964 base_files = base["files"]
965 left_files = left["files"]
966 right_files = right["files"]
967
968 merged: dict[str, str] = dict(base_files)
969 conflicts: list[str] = []
970 conflict_records: list[ConflictRecord] = []
971
972 all_paths = set(base_files) | set(left_files) | set(right_files)
973 for path in sorted(all_paths):
974 b_val = base_files.get(path)
975 l_val = left_files.get(path)
976 r_val = right_files.get(path)
977
978 if l_val == r_val:
979 if l_val is None:
980 merged.pop(path, None)
981 else:
982 merged[path] = l_val
983 elif b_val == l_val:
984 if r_val is None:
985 merged.pop(path, None)
986 else:
987 merged[path] = r_val
988 elif b_val == r_val:
989 if l_val is None:
990 merged.pop(path, None)
991 else:
992 merged[path] = l_val
993 else:
994 conflicts.append(path)
995 merged[path] = l_val or r_val or b_val or ""
996
997 # Bitcoin-specific: detect UTXO double-spend for utxos.json conflicts
998 if repo_root is not None:
999 for path in list(conflicts):
1000 if not path.endswith("utxos.json"):
1001 continue
1002 b_hash = base_files.get(path)
1003 l_hash = left_files.get(path)
1004 r_hash = right_files.get(path)
1005 if b_hash is None or l_hash is None or r_hash is None:
1006 continue
1007 try:
1008 base_utxos = _load_utxos(
1009 object_path(repo_root, b_hash).read_bytes()
1010 )
1011 left_utxos = _load_utxos(
1012 object_path(repo_root, l_hash).read_bytes()
1013 )
1014 right_utxos = _load_utxos(
1015 object_path(repo_root, r_hash).read_bytes()
1016 )
1017 base_keys = {utxo_key(u) for u in base_utxos}
1018 left_keys = {utxo_key(u) for u in left_utxos}
1019 right_keys = {utxo_key(u) for u in right_utxos}
1020 our_spent = base_keys - left_keys
1021 their_spent = base_keys - right_keys
1022 dsc = double_spend_candidates(base_keys, our_spent, their_spent)
1023 if dsc:
1024 conflict_records.append(
1025 ConflictRecord(
1026 path=path,
1027 conflict_type="double_spend",
1028 ours_summary=f"spent {len(our_spent)} UTXO(s)",
1029 theirs_summary=f"spent {len(their_spent)} UTXO(s)",
1030 addresses=[f"{path}::{k}" for k in dsc],
1031 )
1032 )
1033 except OSError:
1034 logger.debug("bitcoin merge: blob not found for %s", path)
1035
1036 return MergeResult(
1037 merged=SnapshotManifest(files=merged, domain=_DOMAIN_NAME),
1038 conflicts=conflicts,
1039 conflict_records=conflict_records,
1040 )
1041
1042 # ------------------------------------------------------------------
1043 # 4. drift
1044 # ------------------------------------------------------------------
1045
1046 def drift(self, committed: StateSnapshot, live: LiveState) -> DriftReport:
1047 """Compare the last committed snapshot against the current live state.
1048
1049 Used by ``muse status``. Produces a ``DriftReport`` describing every
1050 UTXO gained or spent, every channel state change, every strategy
1051 parameter update since the last commit.
1052
1053 Args:
1054 committed: The last committed ``StateSnapshot``.
1055 live: Current live state (path or snapshot dict).
1056
1057 Returns:
1058 A ``DriftReport`` with ``has_drift``, ``summary``, and ``delta``.
1059 """
1060 current = self.snapshot(live)
1061 delta = self.diff(committed, current)
1062 has_drift = len(delta["ops"]) > 0
1063 return DriftReport(
1064 has_drift=has_drift,
1065 summary=delta.get("summary", "working tree clean"),
1066 delta=delta,
1067 )
1068
1069 # ------------------------------------------------------------------
1070 # 5. apply
1071 # ------------------------------------------------------------------
1072
1073 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState:
1074 """Apply a delta during ``muse checkout``.
1075
1076 The core engine restores file-level objects from the object store.
1077 This hook exists for domain-level post-processing; Bitcoin currently
1078 requires none beyond file restoration.
1079
1080 Args:
1081 delta: The typed operation list to apply.
1082 live_state: Current live state.
1083
1084 Returns:
1085 The unchanged ``live_state`` (post-processing is a no-op here).
1086 """
1087 return live_state
1088
1089 # ------------------------------------------------------------------
1090 # 6. schema
1091 # ------------------------------------------------------------------
1092
1093 def schema(self) -> DomainSchema:
1094 """Declare the multidimensional structure of Bitcoin state.
1095
1096 Ten dimensions map to the ten recognized workdir file types. The
1097 ``merge_mode`` is ``"crdt"`` to signal that this plugin supports the
1098 :class:`~muse.domain.CRDTPlugin` convergent join path for multi-agent
1099 scenarios.
1100
1101 Returns:
1102 A ``DomainSchema`` with all Bitcoin dimensions declared.
1103 """
1104 return DomainSchema(
1105 domain=_DOMAIN_NAME,
1106 description=(
1107 "Bitcoin domain — multidimensional version control for wallet state, "
1108 "Lightning channels, agent strategies, and oracle data. "
1109 "Watch-only (no private keys). UTXO-level diff, channel-level merge, "
1110 "agent-strategy branching, and CRDT convergent join for multi-agent "
1111 "scenarios with millions of concurrent agents."
1112 ),
1113 top_level=SetSchema(
1114 kind="set",
1115 element_type="bitcoin_state_file",
1116 identity="by_content",
1117 ),
1118 dimensions=[
1119 DimensionSpec(
1120 name="utxos",
1121 description=(
1122 "Unspent transaction outputs — the coin set. "
1123 "Identity: txid:vout. Double-spend detection activates "
1124 "when two branches delete the same UTXO."
1125 ),
1126 schema=SetSchema(
1127 kind="set",
1128 element_type="utxo",
1129 identity="by_id",
1130 ),
1131 independent_merge=True,
1132 ),
1133 DimensionSpec(
1134 name="transactions",
1135 description=(
1136 "Confirmed transaction history. Append-only: the blockchain "
1137 "never removes confirmed transactions. New txids from both "
1138 "branches are merged by union."
1139 ),
1140 schema=SequenceSchema(
1141 kind="sequence",
1142 element_type="transaction",
1143 identity="by_id",
1144 diff_algorithm="lcs",
1145 alphabet=None,
1146 ),
1147 independent_merge=True,
1148 ),
1149 DimensionSpec(
1150 name="labels",
1151 description=(
1152 "Address semantic annotations. Additive: concurrent label "
1153 "additions from multiple agents always survive. CRDT OR-Set "
1154 "semantics in CRDT mode."
1155 ),
1156 schema=SetSchema(
1157 kind="set",
1158 element_type="address_label",
1159 identity="by_id",
1160 ),
1161 independent_merge=True,
1162 ),
1163 DimensionSpec(
1164 name="descriptors",
1165 description="Watch-only wallet descriptors (xpubs). Never private keys.",
1166 schema=SetSchema(
1167 kind="set",
1168 element_type="descriptor",
1169 identity="by_id",
1170 ),
1171 independent_merge=True,
1172 ),
1173 DimensionSpec(
1174 name="channels",
1175 description=(
1176 "Lightning payment channel states. Identity: channel_id. "
1177 "Balance changes are MutateOps; open/close are Insert/DeleteOps."
1178 ),
1179 schema=SetSchema(
1180 kind="set",
1181 element_type="lightning_channel",
1182 identity="by_id",
1183 ),
1184 independent_merge=True,
1185 ),
1186 DimensionSpec(
1187 name="strategy",
1188 description=(
1189 "Agent DCA / fee / rebalancing configuration. Field-level "
1190 "MutateOps enable per-parameter conflict detection and LWW "
1191 "resolution in CRDT mode."
1192 ),
1193 schema=MapSchema(
1194 kind="map",
1195 key_type="strategy_field",
1196 value_schema=SetSchema(
1197 kind="set",
1198 element_type="field_value",
1199 identity="by_content",
1200 ),
1201 identity="by_key",
1202 ),
1203 independent_merge=False,
1204 ),
1205 DimensionSpec(
1206 name="oracle_prices",
1207 description=(
1208 "BTC/USD price feed. Time-ordered sequence of oracle ticks. "
1209 "Concurrent ticks from different sources are merged by union "
1210 "ordered by timestamp."
1211 ),
1212 schema=SequenceSchema(
1213 kind="sequence",
1214 element_type="price_tick",
1215 identity="by_id",
1216 diff_algorithm="lcs",
1217 alphabet=None,
1218 ),
1219 independent_merge=True,
1220 ),
1221 DimensionSpec(
1222 name="oracle_fees",
1223 description="Mempool fee surface time series (sat/vbyte per block target).",
1224 schema=SequenceSchema(
1225 kind="sequence",
1226 element_type="fee_estimate",
1227 identity="by_id",
1228 diff_algorithm="lcs",
1229 alphabet=None,
1230 ),
1231 independent_merge=True,
1232 ),
1233 DimensionSpec(
1234 name="network",
1235 description="Known P2P peers and local mempool state.",
1236 schema=SetSchema(
1237 kind="set",
1238 element_type="network_peer",
1239 identity="by_id",
1240 ),
1241 independent_merge=True,
1242 ),
1243 DimensionSpec(
1244 name="execution",
1245 description=(
1246 "Agent execution event log — append-only audit trail of every "
1247 "decision the agent made: DCA buys, fee bumps, channel opens, "
1248 "rebalances. Never deleted."
1249 ),
1250 schema=SequenceSchema(
1251 kind="sequence",
1252 element_type="execution_event",
1253 identity="by_id",
1254 diff_algorithm="lcs",
1255 alphabet=None,
1256 ),
1257 independent_merge=True,
1258 ),
1259 ],
1260 merge_mode="crdt",
1261 schema_version=1,
1262 )
1263
1264 # ------------------------------------------------------------------
1265 # StructuredMergePlugin — operation-level OT merge
1266 # ------------------------------------------------------------------
1267
1268 def merge_ops(
1269 self,
1270 base: StateSnapshot,
1271 ours_snap: StateSnapshot,
1272 theirs_snap: StateSnapshot,
1273 ours_ops: list[DomainOp],
1274 theirs_ops: list[DomainOp],
1275 *,
1276 repo_root: pathlib.Path | None = None,
1277 ) -> MergeResult:
1278 """Operation-level three-way merge with double-spend detection.
1279
1280 Uses the OT engine's commutativity oracle to detect op-level conflicts.
1281 Bitcoin-specific rule applied on top of standard OT:
1282
1283 **Double-spend signal**: if both ``ours_ops`` and ``theirs_ops``
1284 contain a ``DeleteOp`` for the same UTXO address
1285 (``"wallet/utxos.json::{txid}:{vout}"``), this is a strategy-layer
1286 double-spend attempt. The op is promoted to a structured conflict even
1287 if the standard OT oracle would allow it (since ``DeleteOp + DeleteOp``
1288 normally commutes).
1289
1290 Args:
1291 base: Common ancestor snapshot.
1292 ours_snap: Our branch's final snapshot.
1293 theirs_snap: Their branch's final snapshot.
1294 ours_ops: Our branch's typed operation list.
1295 theirs_ops: Their branch's typed operation list.
1296 repo_root: Repository root for ``.museattributes`` and object store.
1297
1298 Returns:
1299 ``MergeResult`` with empty ``conflicts`` if all ops commute, or
1300 structured conflicts including double-spend records.
1301 """
1302 result = merge_op_lists(
1303 base_ops=[],
1304 ours_ops=ours_ops,
1305 theirs_ops=theirs_ops,
1306 )
1307
1308 conflicts: list[str] = []
1309 conflict_records: list[ConflictRecord] = []
1310
1311 # Standard OT conflicts
1312 if result.conflict_ops:
1313 seen: set[str] = set()
1314 for our_op, their_op in result.conflict_ops:
1315 addr = our_op["address"]
1316 seen.add(addr)
1317 conflict_records.append(
1318 ConflictRecord(
1319 path=addr.split("::")[0],
1320 conflict_type="symbol_edit_overlap",
1321 ours_summary=f"ours: {our_op['op']} {addr}",
1322 theirs_summary=f"theirs: {their_op['op']} {addr}",
1323 addresses=[addr],
1324 )
1325 )
1326 conflicts = sorted(seen)
1327
1328 # Bitcoin-specific: double-spend detection on UTXO delete ops
1329 our_utxo_deletes: set[str] = {
1330 op["address"]
1331 for op in ours_ops
1332 if op["op"] == "delete" and "utxos.json::" in op["address"]
1333 }
1334 their_utxo_deletes: set[str] = {
1335 op["address"]
1336 for op in theirs_ops
1337 if op["op"] == "delete" and "utxos.json::" in op["address"]
1338 }
1339 double_spends = sorted(our_utxo_deletes & their_utxo_deletes)
1340 for addr in double_spends:
1341 if addr not in conflicts:
1342 conflicts.append(addr)
1343 conflict_records.append(
1344 ConflictRecord(
1345 path=addr.split("::")[0],
1346 conflict_type="double_spend",
1347 ours_summary=f"ours spent UTXO {addr.split('::')[-1]}",
1348 theirs_summary=f"theirs spent UTXO {addr.split('::')[-1]}",
1349 addresses=[addr],
1350 )
1351 )
1352
1353 fallback = self.merge(base, ours_snap, theirs_snap, repo_root=repo_root)
1354 return MergeResult(
1355 merged=fallback.merged,
1356 conflicts=conflicts if conflicts else fallback.conflicts,
1357 conflict_records=conflict_records if conflict_records else fallback.conflict_records,
1358 applied_strategies=fallback.applied_strategies,
1359 dimension_reports=fallback.dimension_reports,
1360 op_log=result.merged_ops,
1361 )
1362
1363 # ------------------------------------------------------------------
1364 # CRDTPlugin — convergent multi-agent join
1365 # ------------------------------------------------------------------
1366
1367 def crdt_schema(self) -> list[CRDTDimensionSpec]:
1368 """Declare the CRDT primitive for each Bitcoin dimension.
1369
1370 Returns:
1371 Seven ``CRDTDimensionSpec`` entries, one per CRDT-backed dimension.
1372 """
1373 return [
1374 CRDTDimensionSpec(
1375 name="files_manifest",
1376 description=(
1377 "The file manifest itself — convergent AW-Map so concurrent "
1378 "file additions from any agent are always preserved."
1379 ),
1380 crdt_type="aw_map",
1381 independent_merge=True,
1382 ),
1383 CRDTDimensionSpec(
1384 name="utxos",
1385 description=(
1386 "UTXO set as AW-Map (txid:vout → UTXO data). Add-wins: "
1387 "a UTXO received by one agent is preserved even if another "
1388 "agent has a stale view. Spending is a remove; two concurrent "
1389 "removes of the same UTXO produce a double-spend warning at "
1390 "the OT layer."
1391 ),
1392 crdt_type="aw_map",
1393 independent_merge=True,
1394 ),
1395 CRDTDimensionSpec(
1396 name="labels",
1397 description=(
1398 "Address labels as OR-Set. Concurrent label additions from "
1399 "any agent win over removes. An agent labeling an address "
1400 "'cold storage' is never silently overwritten by a concurrent "
1401 "remove from another agent."
1402 ),
1403 crdt_type="or_set",
1404 independent_merge=True,
1405 ),
1406 CRDTDimensionSpec(
1407 name="channels",
1408 description=(
1409 "Lightning channels as AW-Map (channel_id → state JSON). "
1410 "Add-wins: a new channel opened by one agent is preserved "
1411 "under concurrent state from other agents."
1412 ),
1413 crdt_type="aw_map",
1414 independent_merge=True,
1415 ),
1416 CRDTDimensionSpec(
1417 name="strategy",
1418 description=(
1419 "Agent strategy as AW-Map (field_name → value). Each config "
1420 "field is an independent LWW register (via token ordering). "
1421 "One agent changing max_fee_rate never conflicts with another "
1422 "agent changing dca_amount_sat."
1423 ),
1424 crdt_type="aw_map",
1425 independent_merge=False,
1426 ),
1427 CRDTDimensionSpec(
1428 name="transactions",
1429 description=(
1430 "Confirmed transaction IDs as OR-Set. Append-only: once a "
1431 "txid is added it is never legitimately removed. Any agent "
1432 "that observes a confirmation adds it; the join is the union."
1433 ),
1434 crdt_type="or_set",
1435 independent_merge=True,
1436 ),
1437 CRDTDimensionSpec(
1438 name="mempool",
1439 description=(
1440 "Pending txids as OR-Set. Volatile: txids are added when "
1441 "seen in the mempool and removed when confirmed or evicted. "
1442 "The join gives every agent the union of all pending txids "
1443 "seen across the fleet."
1444 ),
1445 crdt_type="or_set",
1446 independent_merge=True,
1447 ),
1448 ]
1449
1450 def join(
1451 self,
1452 a: CRDTSnapshotManifest,
1453 b: CRDTSnapshotManifest,
1454 ) -> CRDTSnapshotManifest:
1455 """Convergent join of two Bitcoin CRDT snapshots.
1456
1457 Joins each dimension independently using its declared CRDT primitive.
1458 This operation is commutative, associative, and idempotent — any two
1459 agents that have received the same set of writes converge to identical
1460 state regardless of delivery order.
1461
1462 The file manifest is rebuilt from the joined ``files_manifest`` AW-Map
1463 so that the core engine's content-addressed store remains consistent.
1464
1465 Args:
1466 a: First CRDT snapshot manifest.
1467 b: Second CRDT snapshot manifest.
1468
1469 Returns:
1470 The lattice join — a new ``CRDTSnapshotManifest`` that is the
1471 least upper bound of *a* and *b*.
1472 """
1473 vc_a = VectorClock.from_dict(a["vclock"])
1474 vc_b = VectorClock.from_dict(b["vclock"])
1475 merged_vc = vc_a.merge(vc_b)
1476
1477 files_a = _load_aw(a["crdt_state"], "files_manifest")
1478 files_b = _load_aw(b["crdt_state"], "files_manifest")
1479 merged_files_map = files_a.join(files_b)
1480
1481 utxos_a = _load_aw(a["crdt_state"], "utxos")
1482 utxos_b = _load_aw(b["crdt_state"], "utxos")
1483 merged_utxos = utxos_a.join(utxos_b)
1484
1485 labels_a = _load_or(a["crdt_state"], "labels")
1486 labels_b = _load_or(b["crdt_state"], "labels")
1487 merged_labels = labels_a.join(labels_b)
1488
1489 channels_a = _load_aw(a["crdt_state"], "channels")
1490 channels_b = _load_aw(b["crdt_state"], "channels")
1491 merged_channels = channels_a.join(channels_b)
1492
1493 strategy_a = _load_aw(a["crdt_state"], "strategy")
1494 strategy_b = _load_aw(b["crdt_state"], "strategy")
1495 merged_strategy = strategy_a.join(strategy_b)
1496
1497 txns_a = _load_or(a["crdt_state"], "transactions")
1498 txns_b = _load_or(b["crdt_state"], "transactions")
1499 merged_txns = txns_a.join(txns_b)
1500
1501 mempool_a = _load_or(a["crdt_state"], "mempool")
1502 mempool_b = _load_or(b["crdt_state"], "mempool")
1503 merged_mempool = mempool_a.join(mempool_b)
1504
1505 merged_files = merged_files_map.to_plain_dict()
1506
1507 crdt_state: dict[str, str] = {
1508 "files_manifest": json.dumps(merged_files_map.to_dict()),
1509 "utxos": json.dumps(merged_utxos.to_dict()),
1510 "labels": json.dumps(merged_labels.to_dict()),
1511 "channels": json.dumps(merged_channels.to_dict()),
1512 "strategy": json.dumps(merged_strategy.to_dict()),
1513 "transactions": json.dumps(merged_txns.to_dict()),
1514 "mempool": json.dumps(merged_mempool.to_dict()),
1515 }
1516
1517 return CRDTSnapshotManifest(
1518 files=merged_files,
1519 domain=_DOMAIN_NAME,
1520 vclock=merged_vc.to_dict(),
1521 crdt_state=crdt_state,
1522 schema_version=1,
1523 )
1524
1525 def to_crdt_state(self, snapshot: StateSnapshot) -> CRDTSnapshotManifest:
1526 """Lift a plain snapshot into CRDT state representation.
1527
1528 Initialises the ``files_manifest`` AW-Map from the snapshot's ``files``
1529 dict. All domain-specific CRDT dimensions start empty and are populated
1530 lazily as agents commit content.
1531
1532 Args:
1533 snapshot: A plain ``StateSnapshot`` to lift.
1534
1535 Returns:
1536 A ``CRDTSnapshotManifest`` with the snapshot's files and empty
1537 per-dimension CRDT metadata.
1538 """
1539 files_map = AWMap()
1540 for path, content_hash in snapshot["files"].items():
1541 files_map = files_map.set(path, content_hash)
1542
1543 empty_aw = json.dumps(_EMPTY_AW)
1544 empty_or = json.dumps(_EMPTY_OR)
1545
1546 crdt_state: dict[str, str] = {
1547 "files_manifest": json.dumps(files_map.to_dict()),
1548 "utxos": empty_aw,
1549 "labels": empty_or,
1550 "channels": empty_aw,
1551 "strategy": empty_aw,
1552 "transactions": empty_or,
1553 "mempool": empty_or,
1554 }
1555
1556 return CRDTSnapshotManifest(
1557 files=snapshot["files"],
1558 domain=_DOMAIN_NAME,
1559 vclock=VectorClock().to_dict(),
1560 crdt_state=crdt_state,
1561 schema_version=1,
1562 )
1563
1564 def from_crdt_state(self, crdt: CRDTSnapshotManifest) -> StateSnapshot:
1565 """Materialise a CRDT manifest back into a plain snapshot.
1566
1567 Extracts the visible (non-tombstoned) file manifest from the
1568 ``files_manifest`` AW-Map. Used by ``muse show`` and CLI commands
1569 that need a standard ``StateSnapshot`` view.
1570
1571 Args:
1572 crdt: A ``CRDTSnapshotManifest`` to materialise.
1573
1574 Returns:
1575 A plain ``SnapshotManifest`` with the current visible files.
1576 """
1577 files_map = _load_aw(crdt["crdt_state"], "files_manifest")
1578 visible = files_map.to_plain_dict()
1579 files = visible if visible else crdt["files"]
1580 return SnapshotManifest(files=files, domain=_DOMAIN_NAME)
1581
1582
1583 # ---------------------------------------------------------------------------
1584 # Delta summary helper
1585 # ---------------------------------------------------------------------------
1586
1587
1588 def _delta_summary(ops: list[DomainOp]) -> str:
1589 """Produce a concise human-readable summary of a delta's operations."""
1590 if not ops:
1591 return "working tree clean"
1592
1593 counts: dict[str, int] = {"insert": 0, "delete": 0, "replace": 0, "patch": 0, "mutate": 0}
1594 for op in ops:
1595 op_type = op["op"]
1596 if op_type in counts:
1597 counts[op_type] += 1
1598
1599 parts: list[str] = []
1600 if counts["insert"]:
1601 parts.append(f"{counts['insert']} added")
1602 if counts["delete"]:
1603 parts.append(f"{counts['delete']} removed")
1604 if counts["patch"]:
1605 parts.append(f"{counts['patch']} updated")
1606 if counts["mutate"]:
1607 parts.append(f"{counts['mutate']} mutated")
1608 if counts["replace"]:
1609 parts.append(f"{counts['replace']} replaced")
1610 return ", ".join(parts) if parts else "changes present"