gabriel / muse public
test_bitcoin_plugin.py python
1125 lines 39.9 KB
15cf97e9 feat(bitcoin): add semantic porcelain layer — 19 Bitcoin-idiomatic CLI … Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Comprehensive test suite for the Bitcoin domain plugin.
2
3 Tests cover all three protocol levels:
4 - Core MuseDomainPlugin (snapshot, diff, merge, drift, apply, schema)
5 - StructuredMergePlugin (OT merge with double-spend detection)
6 - CRDTPlugin (convergent join — commutativity, associativity, idempotency)
7
8 Tests are organized by capability, each asserting exact behaviour
9 with realistic Bitcoin fixtures.
10 """
11
12 from __future__ import annotations
13
14 import hashlib
15 import json
16 import pathlib
17 import tempfile
18
19 import pytest
20
21 from muse.domain import (
22 CRDTPlugin,
23 DeleteOp,
24 DomainOp,
25 InsertOp,
26 MuseDomainPlugin,
27 MutateOp,
28 PatchOp,
29 ReplaceOp,
30 SnapshotManifest,
31 StructuredMergePlugin,
32 )
33 from muse.plugins.bitcoin._query import (
34 balance_by_script_type,
35 channel_liquidity_totals,
36 channel_utilization,
37 coin_age_blocks,
38 confirmed_balance_sat,
39 double_spend_candidates,
40 fee_surface_str,
41 format_sat,
42 latest_fee_estimate,
43 latest_price,
44 mempool_summary_line,
45 strategy_summary_line,
46 total_balance_sat,
47 utxo_key,
48 utxo_summary_line,
49 )
50 from muse.plugins.bitcoin._types import (
51 AddressLabelRecord,
52 AgentStrategyRecord,
53 CoinCategory,
54 CoinSelectAlgo,
55 FeeEstimateRecord,
56 LightningChannelRecord,
57 OraclePriceTickRecord,
58 PendingTxRecord,
59 RoutingPolicyRecord,
60 ScriptType,
61 UTXORecord,
62 )
63 from muse.plugins.bitcoin.plugin import (
64 BitcoinPlugin,
65 _diff_channels,
66 _diff_labels,
67 _diff_strategy,
68 _diff_time_series,
69 _diff_transactions,
70 _diff_utxos,
71 _handler_for_path,
72 )
73
74 # ---------------------------------------------------------------------------
75 # Fixtures
76 # ---------------------------------------------------------------------------
77
78
79 def _make_utxo(
80 txid: str = "abc" * 21 + "ab",
81 vout: int = 0,
82 amount_sat: int = 100_000,
83 script_type: ScriptType = "p2wpkh",
84 address: str = "bc1qtest",
85 confirmations: int = 6,
86 block_height: int | None = 850_000,
87 coinbase: bool = False,
88 label: str | None = None,
89 ) -> UTXORecord:
90 return UTXORecord(
91 txid=txid,
92 vout=vout,
93 amount_sat=amount_sat,
94 script_type=script_type,
95 address=address,
96 confirmations=confirmations,
97 block_height=block_height,
98 coinbase=coinbase,
99 label=label,
100 )
101
102
103 def _make_channel(
104 channel_id: str = "850000x1x0",
105 peer_pubkey: str = "0279" + "aa" * 32,
106 peer_alias: str | None = "ACINQ",
107 capacity_sat: int = 2_000_000,
108 local_balance_sat: int = 1_000_000,
109 remote_balance_sat: int = 900_000,
110 is_active: bool = True,
111 is_public: bool = True,
112 local_reserve_sat: int = 20_000,
113 remote_reserve_sat: int = 20_000,
114 unsettled_balance_sat: int = 0,
115 htlc_count: int = 0,
116 ) -> LightningChannelRecord:
117 return LightningChannelRecord(
118 channel_id=channel_id,
119 peer_pubkey=peer_pubkey,
120 peer_alias=peer_alias,
121 capacity_sat=capacity_sat,
122 local_balance_sat=local_balance_sat,
123 remote_balance_sat=remote_balance_sat,
124 is_active=is_active,
125 is_public=is_public,
126 local_reserve_sat=local_reserve_sat,
127 remote_reserve_sat=remote_reserve_sat,
128 unsettled_balance_sat=unsettled_balance_sat,
129 htlc_count=htlc_count,
130 )
131
132
133 def _make_label(
134 address: str = "bc1qtest",
135 label: str = "cold storage",
136 category: CoinCategory = "income",
137 created_at: int = 1_700_000_000,
138 ) -> AddressLabelRecord:
139 return AddressLabelRecord(
140 address=address,
141 label=label,
142 category=category,
143 created_at=created_at,
144 )
145
146
147 def _make_strategy(
148 name: str = "conservative",
149 max_fee_rate_sat_vbyte: int = 10,
150 min_confirmations: int = 6,
151 utxo_consolidation_threshold: int = 20,
152 dca_amount_sat: int | None = 500_000,
153 dca_interval_blocks: int | None = 144,
154 lightning_rebalance_threshold: float = 0.2,
155 coin_selection: CoinSelectAlgo = "branch_and_bound",
156 simulation_mode: bool = False,
157 ) -> AgentStrategyRecord:
158 return AgentStrategyRecord(
159 name=name,
160 max_fee_rate_sat_vbyte=max_fee_rate_sat_vbyte,
161 min_confirmations=min_confirmations,
162 utxo_consolidation_threshold=utxo_consolidation_threshold,
163 dca_amount_sat=dca_amount_sat,
164 dca_interval_blocks=dca_interval_blocks,
165 lightning_rebalance_threshold=lightning_rebalance_threshold,
166 coin_selection=coin_selection,
167 simulation_mode=simulation_mode,
168 )
169
170
171 def _make_price(
172 timestamp: int = 1_700_000_000,
173 block_height: int | None = 850_000,
174 price_usd: float = 62_000.0,
175 source: str = "coinbase",
176 ) -> OraclePriceTickRecord:
177 return OraclePriceTickRecord(
178 timestamp=timestamp,
179 block_height=block_height,
180 price_usd=price_usd,
181 source=source,
182 )
183
184
185 def _make_fee(
186 timestamp: int = 1_700_000_000,
187 block_height: int | None = 850_000,
188 t1: int = 30,
189 t6: int = 15,
190 t144: int = 3,
191 ) -> FeeEstimateRecord:
192 return FeeEstimateRecord(
193 timestamp=timestamp,
194 block_height=block_height,
195 target_1_block_sat_vbyte=t1,
196 target_6_block_sat_vbyte=t6,
197 target_144_block_sat_vbyte=t144,
198 )
199
200
201 def _json_bytes(
202 obj: (
203 list[UTXORecord]
204 | list[LightningChannelRecord]
205 | list[OraclePriceTickRecord]
206 | list[FeeEstimateRecord]
207 | list[PendingTxRecord]
208 | list[AddressLabelRecord]
209 | AgentStrategyRecord
210 ),
211 ) -> bytes:
212 return json.dumps(obj, sort_keys=True).encode()
213
214
215 def _sha256(data: bytes) -> str:
216 return hashlib.sha256(data).hexdigest()
217
218
219 def _make_manifest(files: dict[str, bytes]) -> SnapshotManifest:
220 return SnapshotManifest(
221 files={path: _sha256(data) for path, data in files.items()},
222 domain="bitcoin",
223 )
224
225
226 # ---------------------------------------------------------------------------
227 # Protocol conformance
228 # ---------------------------------------------------------------------------
229
230
231 class TestProtocolConformance:
232 def test_satisfies_muse_domain_plugin(self) -> None:
233 assert isinstance(BitcoinPlugin(), MuseDomainPlugin)
234
235 def test_satisfies_structured_merge_plugin(self) -> None:
236 assert isinstance(BitcoinPlugin(), StructuredMergePlugin)
237
238 def test_satisfies_crdt_plugin(self) -> None:
239 assert isinstance(BitcoinPlugin(), CRDTPlugin)
240
241
242 # ---------------------------------------------------------------------------
243 # snapshot
244 # ---------------------------------------------------------------------------
245
246
247 class TestSnapshot:
248 def test_snapshot_from_path_hashes_all_json_files(self) -> None:
249 plugin = BitcoinPlugin()
250 with tempfile.TemporaryDirectory() as tmp:
251 root = pathlib.Path(tmp)
252 (root / "wallet").mkdir()
253 (root / "wallet" / "utxos.json").write_bytes(b"[]")
254 (root / "wallet" / "labels.json").write_bytes(b"[]")
255
256 snap = plugin.snapshot(root)
257
258 assert snap["domain"] == "bitcoin"
259 assert "wallet/utxos.json" in snap["files"]
260 assert "wallet/labels.json" in snap["files"]
261
262 def test_snapshot_excludes_hidden_files(self) -> None:
263 plugin = BitcoinPlugin()
264 with tempfile.TemporaryDirectory() as tmp:
265 root = pathlib.Path(tmp)
266 (root / "wallet").mkdir()
267 (root / "wallet" / "utxos.json").write_bytes(b"[]")
268 (root / ".secret").write_bytes(b"private key would be here")
269
270 snap = plugin.snapshot(root)
271
272 assert ".secret" not in snap["files"]
273 assert "wallet/utxos.json" in snap["files"]
274
275 def test_snapshot_passthrough_for_existing_manifest(self) -> None:
276 plugin = BitcoinPlugin()
277 manifest = SnapshotManifest(
278 files={"wallet/utxos.json": "abc" * 21 + "ab"},
279 domain="bitcoin",
280 )
281 result = plugin.snapshot(manifest)
282 assert result is manifest
283
284 def test_snapshot_content_hash_is_sha256(self) -> None:
285 plugin = BitcoinPlugin()
286 with tempfile.TemporaryDirectory() as tmp:
287 root = pathlib.Path(tmp)
288 (root / "wallet").mkdir()
289 content = b"[]\n"
290 (root / "wallet" / "utxos.json").write_bytes(content)
291
292 snap = plugin.snapshot(root)
293
294 expected_hash = _sha256(content)
295 assert snap["files"]["wallet/utxos.json"] == expected_hash
296
297 def test_snapshot_deterministic(self) -> None:
298 plugin = BitcoinPlugin()
299 with tempfile.TemporaryDirectory() as tmp:
300 root = pathlib.Path(tmp)
301 (root / "wallet").mkdir()
302 (root / "wallet" / "utxos.json").write_bytes(b"[]")
303
304 snap1 = plugin.snapshot(root)
305 snap2 = plugin.snapshot(root)
306
307 assert snap1["files"] == snap2["files"]
308
309
310 # ---------------------------------------------------------------------------
311 # diff — file-level
312 # ---------------------------------------------------------------------------
313
314
315 class TestDiffFileLevel:
316 def setup_method(self) -> None:
317 self.plugin = BitcoinPlugin()
318
319 def test_added_file_produces_insert_op(self) -> None:
320 base = SnapshotManifest(files={}, domain="bitcoin")
321 target = SnapshotManifest(
322 files={"wallet/utxos.json": "a" * 64},
323 domain="bitcoin",
324 )
325 delta = self.plugin.diff(base, target)
326 assert len(delta["ops"]) == 1
327 assert delta["ops"][0]["op"] == "insert"
328 assert delta["ops"][0]["address"] == "wallet/utxos.json"
329
330 def test_removed_file_produces_delete_op(self) -> None:
331 base = SnapshotManifest(
332 files={"wallet/utxos.json": "a" * 64},
333 domain="bitcoin",
334 )
335 target = SnapshotManifest(files={}, domain="bitcoin")
336 delta = self.plugin.diff(base, target)
337 assert delta["ops"][0]["op"] == "delete"
338
339 def test_unchanged_file_produces_no_ops(self) -> None:
340 files = {"wallet/utxos.json": "a" * 64}
341 base = SnapshotManifest(files=files, domain="bitcoin")
342 target = SnapshotManifest(files=files, domain="bitcoin")
343 delta = self.plugin.diff(base, target)
344 assert len(delta["ops"]) == 0
345 assert "clean" in delta["summary"]
346
347 def test_modified_file_without_repo_root_produces_replace_op(self) -> None:
348 base = SnapshotManifest(
349 files={"wallet/utxos.json": "a" * 64},
350 domain="bitcoin",
351 )
352 target = SnapshotManifest(
353 files={"wallet/utxos.json": "b" * 64},
354 domain="bitcoin",
355 )
356 delta = self.plugin.diff(base, target)
357 assert delta["ops"][0]["op"] == "replace"
358
359 def test_summary_reflects_op_counts(self) -> None:
360 base = SnapshotManifest(
361 files={"wallet/utxos.json": "a" * 64, "wallet/labels.json": "b" * 64},
362 domain="bitcoin",
363 )
364 target = SnapshotManifest(
365 files={"wallet/utxos.json": "a" * 64, "strategy/agent.json": "c" * 64},
366 domain="bitcoin",
367 )
368 delta = self.plugin.diff(base, target)
369 assert "added" in delta["summary"]
370 assert "removed" in delta["summary"]
371
372
373 # ---------------------------------------------------------------------------
374 # diff — semantic UTXO level
375 # ---------------------------------------------------------------------------
376
377
378 class TestDiffUTXOs:
379 def test_new_utxo_produces_insert_op(self) -> None:
380 u = _make_utxo(txid="a" * 64, vout=0, amount_sat=500_000)
381 old = _json_bytes([])
382 new = _json_bytes([u])
383 ops = _diff_utxos("wallet/utxos.json", old, new)
384 assert len(ops) == 1
385 assert ops[0]["op"] == "insert"
386 assert "a" * 64 + ":0" in ops[0]["address"]
387 assert "0.00500000 BTC" in ops[0]["content_summary"]
388
389 def test_spent_utxo_produces_delete_op(self) -> None:
390 u = _make_utxo(txid="b" * 64, vout=1, amount_sat=200_000)
391 old = _json_bytes([u])
392 new = _json_bytes([])
393 ops = _diff_utxos("wallet/utxos.json", old, new)
394 assert len(ops) == 1
395 assert ops[0]["op"] == "delete"
396 assert "spent" in ops[0]["content_summary"]
397
398 def test_confirmation_update_produces_mutate_op(self) -> None:
399 txid = "c" * 64
400 u_old = _make_utxo(txid=txid, vout=0, confirmations=1)
401 u_new = _make_utxo(txid=txid, vout=0, confirmations=6)
402 old = _json_bytes([u_old])
403 new = _json_bytes([u_new])
404 ops = _diff_utxos("wallet/utxos.json", old, new)
405 assert len(ops) == 1
406 assert ops[0]["op"] == "mutate"
407 assert ops[0]["fields"]["confirmations"]["old"] == "1"
408 assert ops[0]["fields"]["confirmations"]["new"] == "6"
409
410 def test_unchanged_utxos_produce_no_ops(self) -> None:
411 u = _make_utxo()
412 data = _json_bytes([u])
413 ops = _diff_utxos("wallet/utxos.json", data, data)
414 assert ops == []
415
416 def test_multiple_utxos_diff(self) -> None:
417 u1 = _make_utxo(txid="d" * 64, vout=0, amount_sat=100_000)
418 u2 = _make_utxo(txid="e" * 64, vout=0, amount_sat=200_000)
419 u3 = _make_utxo(txid="f" * 64, vout=0, amount_sat=300_000)
420 old = _json_bytes([u1, u2])
421 new = _json_bytes([u2, u3])
422 ops = _diff_utxos("wallet/utxos.json", old, new)
423 op_types = {op["op"] for op in ops}
424 assert "insert" in op_types
425 assert "delete" in op_types
426
427
428 # ---------------------------------------------------------------------------
429 # diff — semantic channel level
430 # ---------------------------------------------------------------------------
431
432
433 class TestDiffChannels:
434 def test_new_channel_produces_insert_op(self) -> None:
435 ch = _make_channel()
436 ops = _diff_channels("channels/channels.json", _json_bytes([]), _json_bytes([ch]))
437 assert len(ops) == 1
438 assert ops[0]["op"] == "insert"
439 assert "opened" in ops[0]["content_summary"]
440
441 def test_closed_channel_produces_delete_op(self) -> None:
442 ch = _make_channel()
443 ops = _diff_channels("channels/channels.json", _json_bytes([ch]), _json_bytes([]))
444 assert len(ops) == 1
445 assert ops[0]["op"] == "delete"
446
447 def test_balance_change_produces_mutate_op(self) -> None:
448 ch_old = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=900_000)
449 ch_new = _make_channel(local_balance_sat=800_000, remote_balance_sat=1_100_000)
450 ops = _diff_channels(
451 "channels/channels.json",
452 _json_bytes([ch_old]),
453 _json_bytes([ch_new]),
454 )
455 assert len(ops) == 1
456 assert ops[0]["op"] == "mutate"
457 fields = ops[0]["fields"]
458 assert "local_balance_sat" in fields
459 assert "remote_balance_sat" in fields
460
461
462 # ---------------------------------------------------------------------------
463 # diff — strategy field level
464 # ---------------------------------------------------------------------------
465
466
467 class TestDiffStrategy:
468 def test_fee_rate_change_produces_mutate_op(self) -> None:
469 old_strat = _make_strategy(max_fee_rate_sat_vbyte=10)
470 new_strat = _make_strategy(max_fee_rate_sat_vbyte=50)
471 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_strat), _json_bytes(new_strat))
472 assert len(ops) == 1
473 assert ops[0]["op"] == "mutate"
474 assert "max_fee_rate_sat_vbyte" in ops[0]["fields"]
475
476 def test_simulation_mode_toggle_detected(self) -> None:
477 old_s = _make_strategy(simulation_mode=False)
478 new_s = _make_strategy(simulation_mode=True)
479 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_s), _json_bytes(new_s))
480 assert ops[0]["op"] == "mutate"
481 assert ops[0]["fields"]["simulation_mode"]["old"] == "False"
482 assert ops[0]["fields"]["simulation_mode"]["new"] == "True"
483
484 def test_identical_strategy_no_ops(self) -> None:
485 s = _make_strategy()
486 data = _json_bytes(s)
487 ops = _diff_strategy("strategy/agent.json", data, data)
488 assert ops == []
489
490
491 # ---------------------------------------------------------------------------
492 # diff — time series (prices, fees)
493 # ---------------------------------------------------------------------------
494
495
496 class TestDiffTimeSeries:
497 def test_new_price_tick_produces_insert(self) -> None:
498 tick = _make_price(timestamp=1_000, price_usd=60_000.0)
499 ops = _diff_time_series(
500 "oracles/prices.json", _json_bytes([]), _json_bytes([tick]), "prices"
501 )
502 assert len(ops) == 1
503 assert ops[0]["op"] == "insert"
504 assert "$60,000.00" in ops[0]["content_summary"]
505
506 def test_new_fee_estimate_produces_insert(self) -> None:
507 fee = _make_fee(timestamp=2_000, t1=25, t6=12, t144=2)
508 ops = _diff_time_series(
509 "oracles/fees.json", _json_bytes([]), _json_bytes([fee]), "fees"
510 )
511 assert len(ops) == 1
512 assert ops[0]["op"] == "insert"
513 assert "25" in ops[0]["content_summary"]
514
515 def test_existing_tick_not_duplicated(self) -> None:
516 tick = _make_price(timestamp=3_000)
517 data = _json_bytes([tick])
518 ops = _diff_time_series("oracles/prices.json", data, data, "prices")
519 assert ops == []
520
521
522 # ---------------------------------------------------------------------------
523 # diff with repo_root — produces PatchOp
524 # ---------------------------------------------------------------------------
525
526
527 class TestDiffWithRepoRoot:
528 def test_modified_utxos_json_produces_patch_op(self) -> None:
529 plugin = BitcoinPlugin()
530 u_old = _make_utxo(txid="a" * 64, vout=0, amount_sat=100_000)
531 u_new = _make_utxo(txid="b" * 64, vout=0, amount_sat=200_000)
532 old_bytes = _json_bytes([u_old])
533 new_bytes = _json_bytes([u_new])
534 old_hash = _sha256(old_bytes)
535 new_hash = _sha256(new_bytes)
536
537 with tempfile.TemporaryDirectory() as tmp:
538 root = pathlib.Path(tmp)
539 muse_dir = root / ".muse" / "objects"
540 muse_dir.mkdir(parents=True)
541 # Write blobs in sharded layout
542 (muse_dir / old_hash[:2]).mkdir(exist_ok=True)
543 (muse_dir / old_hash[:2] / old_hash[2:]).write_bytes(old_bytes)
544 (muse_dir / new_hash[:2]).mkdir(exist_ok=True)
545 (muse_dir / new_hash[:2] / new_hash[2:]).write_bytes(new_bytes)
546
547 base = SnapshotManifest(
548 files={"wallet/utxos.json": old_hash}, domain="bitcoin"
549 )
550 target = SnapshotManifest(
551 files={"wallet/utxos.json": new_hash}, domain="bitcoin"
552 )
553 delta = plugin.diff(base, target, repo_root=root)
554
555 assert delta["ops"][0]["op"] == "patch"
556 patch = delta["ops"][0]
557 assert patch["child_domain"] == "bitcoin.utxos"
558 child_ops = patch["child_ops"]
559 op_types = {op["op"] for op in child_ops}
560 assert "insert" in op_types
561 assert "delete" in op_types
562
563 def test_fallback_to_replace_when_object_missing(self) -> None:
564 plugin = BitcoinPlugin()
565 base = SnapshotManifest(
566 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
567 )
568 target = SnapshotManifest(
569 files={"wallet/utxos.json": "b" * 64}, domain="bitcoin"
570 )
571 with tempfile.TemporaryDirectory() as tmp:
572 root = pathlib.Path(tmp)
573 (root / ".muse" / "objects").mkdir(parents=True)
574 delta = plugin.diff(base, target, repo_root=root)
575
576 assert delta["ops"][0]["op"] == "replace"
577
578
579 # ---------------------------------------------------------------------------
580 # merge
581 # ---------------------------------------------------------------------------
582
583
584 class TestMerge:
585 def setup_method(self) -> None:
586 self.plugin = BitcoinPlugin()
587
588 def test_clean_merge_no_conflicts(self) -> None:
589 base = SnapshotManifest(files={}, domain="bitcoin")
590 left = SnapshotManifest(
591 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
592 )
593 right = SnapshotManifest(
594 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
595 )
596 result = self.plugin.merge(base, left, right)
597 assert result.is_clean
598 assert "wallet/utxos.json" in result.merged["files"]
599 assert "channels/channels.json" in result.merged["files"]
600
601 def test_both_sides_unchanged_no_conflict(self) -> None:
602 files = {"wallet/utxos.json": "a" * 64}
603 base = SnapshotManifest(files=files, domain="bitcoin")
604 left = SnapshotManifest(files=files, domain="bitcoin")
605 right = SnapshotManifest(files=files, domain="bitcoin")
606 result = self.plugin.merge(base, left, right)
607 assert result.is_clean
608
609 def test_only_left_changed_takes_left(self) -> None:
610 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
611 left = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
612 right = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
613 result = self.plugin.merge(base, left, right)
614 assert result.is_clean
615 assert result.merged["files"]["f"] == "b" * 64
616
617 def test_only_right_changed_takes_right(self) -> None:
618 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
619 left = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
620 right = SnapshotManifest(files={"f": "c" * 64}, domain="bitcoin")
621 result = self.plugin.merge(base, left, right)
622 assert result.is_clean
623 assert result.merged["files"]["f"] == "c" * 64
624
625 def test_both_changed_differently_produces_conflict(self) -> None:
626 base = SnapshotManifest(files={"wallet/utxos.json": "a" * 64}, domain="bitcoin")
627 left = SnapshotManifest(files={"wallet/utxos.json": "b" * 64}, domain="bitcoin")
628 right = SnapshotManifest(files={"wallet/utxos.json": "c" * 64}, domain="bitcoin")
629 result = self.plugin.merge(base, left, right)
630 assert not result.is_clean
631 assert "wallet/utxos.json" in result.conflicts
632
633 def test_double_spend_detection_in_merge(self) -> None:
634 u_base = _make_utxo(txid="d" * 64, vout=0, amount_sat=1_000_000)
635 # Left spent the base UTXO and got change at a different address
636 u_left_change = _make_utxo(txid="e" * 64, vout=0, amount_sat=900_000, address="bc1qleft")
637 # Right spent the SAME base UTXO in a different tx → double-spend
638 u_right_change = _make_utxo(txid="f" * 64, vout=0, amount_sat=850_000, address="bc1qright")
639 base_bytes = _json_bytes([u_base])
640 left_bytes = _json_bytes([u_left_change]) # left spent base UTXO, got change
641 right_bytes = _json_bytes([u_right_change]) # right also spent it → double-spend!
642
643 with tempfile.TemporaryDirectory() as tmp:
644 root = pathlib.Path(tmp)
645 muse_dir = root / ".muse" / "objects"
646 muse_dir.mkdir(parents=True)
647
648 for data in (base_bytes, left_bytes, right_bytes):
649 h = _sha256(data)
650 shard = muse_dir / h[:2]
651 shard.mkdir(exist_ok=True)
652 (shard / h[2:]).write_bytes(data)
653
654 base = SnapshotManifest(
655 files={"wallet/utxos.json": _sha256(base_bytes)}, domain="bitcoin"
656 )
657 left = SnapshotManifest(
658 files={"wallet/utxos.json": _sha256(left_bytes)}, domain="bitcoin"
659 )
660 right = SnapshotManifest(
661 files={"wallet/utxos.json": _sha256(right_bytes)}, domain="bitcoin"
662 )
663 result = self.plugin.merge(base, left, right, repo_root=root)
664
665 # Both sides deleted the same UTXO from the same base → double-spend
666 double_spend_records = [
667 cr for cr in result.conflict_records
668 if cr.conflict_type == "double_spend"
669 ]
670 assert len(double_spend_records) >= 1
671
672 def test_both_deleted_same_file_is_clean(self) -> None:
673 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
674 left = SnapshotManifest(files={}, domain="bitcoin")
675 right = SnapshotManifest(files={}, domain="bitcoin")
676 result = self.plugin.merge(base, left, right)
677 assert result.is_clean
678 assert "f" not in result.merged["files"]
679
680
681 # ---------------------------------------------------------------------------
682 # drift
683 # ---------------------------------------------------------------------------
684
685
686 class TestDrift:
687 def test_no_drift_when_identical(self) -> None:
688 plugin = BitcoinPlugin()
689 snap = SnapshotManifest(
690 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
691 )
692 report = plugin.drift(snap, snap)
693 assert not report.has_drift
694
695 def test_drift_detected_when_file_added(self) -> None:
696 plugin = BitcoinPlugin()
697 committed = SnapshotManifest(files={}, domain="bitcoin")
698 live = SnapshotManifest(
699 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
700 )
701 report = plugin.drift(committed, live)
702 assert report.has_drift
703 assert "added" in report.summary
704
705 def test_drift_from_disk_detected(self) -> None:
706 plugin = BitcoinPlugin()
707 with tempfile.TemporaryDirectory() as tmp:
708 root = pathlib.Path(tmp)
709 (root / "wallet").mkdir()
710 (root / "wallet" / "utxos.json").write_bytes(b"[]")
711
712 committed = SnapshotManifest(files={}, domain="bitcoin")
713 report = plugin.drift(committed, root)
714
715 assert report.has_drift
716
717
718 # ---------------------------------------------------------------------------
719 # apply
720 # ---------------------------------------------------------------------------
721
722
723 class TestApply:
724 def test_apply_is_passthrough(self) -> None:
725 plugin = BitcoinPlugin()
726 delta = plugin.diff(
727 SnapshotManifest(files={}, domain="bitcoin"),
728 SnapshotManifest(files={}, domain="bitcoin"),
729 )
730 snap = SnapshotManifest(files={}, domain="bitcoin")
731 result = plugin.apply(delta, snap)
732 assert result is snap
733
734
735 # ---------------------------------------------------------------------------
736 # schema
737 # ---------------------------------------------------------------------------
738
739
740 class TestSchema:
741 def test_schema_returns_domain_schema(self) -> None:
742 from muse.core.schema import DomainSchema
743 s = BitcoinPlugin().schema()
744 assert isinstance(s, dict)
745 assert s["domain"] == "bitcoin"
746 assert s["schema_version"] == 1
747
748 def test_schema_has_ten_dimensions(self) -> None:
749 s = BitcoinPlugin().schema()
750 assert len(s["dimensions"]) == 10
751
752 def test_schema_merge_mode_is_crdt(self) -> None:
753 s = BitcoinPlugin().schema()
754 assert s["merge_mode"] == "crdt"
755
756 def test_dimension_names(self) -> None:
757 s = BitcoinPlugin().schema()
758 names = {d["name"] for d in s["dimensions"]}
759 expected = {
760 "utxos", "transactions", "labels", "descriptors",
761 "channels", "strategy", "oracle_prices", "oracle_fees",
762 "network", "execution",
763 }
764 assert names == expected
765
766 def test_strategy_dimension_is_not_independent(self) -> None:
767 s = BitcoinPlugin().schema()
768 strat = next(d for d in s["dimensions"] if d["name"] == "strategy")
769 assert strat["independent_merge"] is False
770
771
772 # ---------------------------------------------------------------------------
773 # OT merge — StructuredMergePlugin
774 # ---------------------------------------------------------------------------
775
776
777 class TestMergeOps:
778 def setup_method(self) -> None:
779 self.plugin = BitcoinPlugin()
780 self.base = SnapshotManifest(files={}, domain="bitcoin")
781 self.snap = SnapshotManifest(files={}, domain="bitcoin")
782
783 def test_non_conflicting_ops_clean_merge(self) -> None:
784 ours_ops: list[DomainOp] = [
785 InsertOp(
786 op="insert",
787 address="wallet/utxos.json::aaa:0",
788 position=None,
789 content_id="a" * 64,
790 content_summary="received UTXO aaa:0",
791 )
792 ]
793 theirs_ops: list[DomainOp] = [
794 InsertOp(
795 op="insert",
796 address="wallet/utxos.json::bbb:0",
797 position=None,
798 content_id="b" * 64,
799 content_summary="received UTXO bbb:0",
800 )
801 ]
802 result = self.plugin.merge_ops(
803 self.base, self.snap, self.snap, ours_ops, theirs_ops
804 )
805 # Different addresses → no double-spend, clean merge
806 assert len(result.conflict_records) == 0
807
808 def test_double_spend_detected_in_merge_ops(self) -> None:
809 utxo_addr = "wallet/utxos.json::deadbeef:0"
810 ours_ops: list[DomainOp] = [
811 DeleteOp(
812 op="delete",
813 address=utxo_addr,
814 position=None,
815 content_id="a" * 64,
816 content_summary="spent UTXO deadbeef:0",
817 )
818 ]
819 theirs_ops: list[DomainOp] = [
820 DeleteOp(
821 op="delete",
822 address=utxo_addr,
823 position=None,
824 content_id="a" * 64,
825 content_summary="spent UTXO deadbeef:0",
826 )
827 ]
828 result = self.plugin.merge_ops(
829 self.base, self.snap, self.snap, ours_ops, theirs_ops
830 )
831 double_spends = [
832 cr for cr in result.conflict_records
833 if cr.conflict_type == "double_spend"
834 ]
835 assert len(double_spends) == 1
836 assert "deadbeef:0" in double_spends[0].addresses[0]
837
838 def test_non_utxo_delete_not_flagged_as_double_spend(self) -> None:
839 addr = "channels/channels.json::850000x1x0"
840 ours_ops: list[DomainOp] = [
841 DeleteOp(op="delete", address=addr, position=None,
842 content_id="a" * 64, content_summary="channel closed")
843 ]
844 theirs_ops: list[DomainOp] = [
845 DeleteOp(op="delete", address=addr, position=None,
846 content_id="a" * 64, content_summary="channel closed")
847 ]
848 result = self.plugin.merge_ops(
849 self.base, self.snap, self.snap, ours_ops, theirs_ops
850 )
851 double_spends = [
852 cr for cr in result.conflict_records
853 if cr.conflict_type == "double_spend"
854 ]
855 assert len(double_spends) == 0
856
857
858 # ---------------------------------------------------------------------------
859 # CRDTPlugin
860 # ---------------------------------------------------------------------------
861
862
863 class TestCRDT:
864 def setup_method(self) -> None:
865 self.plugin = BitcoinPlugin()
866 self.base_snap = SnapshotManifest(
867 files={"wallet/utxos.json": "a" * 64},
868 domain="bitcoin",
869 )
870
871 def test_to_crdt_state_preserves_files(self) -> None:
872 crdt = self.plugin.to_crdt_state(self.base_snap)
873 assert crdt["domain"] == "bitcoin"
874 assert crdt["schema_version"] == 1
875 assert "wallet/utxos.json" in crdt["files"]
876
877 def test_from_crdt_state_returns_plain_snapshot(self) -> None:
878 crdt = self.plugin.to_crdt_state(self.base_snap)
879 snap = self.plugin.from_crdt_state(crdt)
880 assert snap["domain"] == "bitcoin"
881 assert "wallet/utxos.json" in snap["files"]
882
883 def test_join_idempotent(self) -> None:
884 crdt = self.plugin.to_crdt_state(self.base_snap)
885 joined = self.plugin.join(crdt, crdt)
886 result = self.plugin.from_crdt_state(joined)
887 original = self.plugin.from_crdt_state(crdt)
888 assert result["files"] == original["files"]
889
890 def test_join_commutative(self) -> None:
891 snap_a = SnapshotManifest(
892 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
893 )
894 snap_b = SnapshotManifest(
895 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
896 )
897 a = self.plugin.to_crdt_state(snap_a)
898 b = self.plugin.to_crdt_state(snap_b)
899 ab = self.plugin.from_crdt_state(self.plugin.join(a, b))
900 ba = self.plugin.from_crdt_state(self.plugin.join(b, a))
901 assert ab["files"] == ba["files"]
902
903 def test_join_associative(self) -> None:
904 snap_a = SnapshotManifest(files={"f1": "a" * 64}, domain="bitcoin")
905 snap_b = SnapshotManifest(files={"f2": "b" * 64}, domain="bitcoin")
906 snap_c = SnapshotManifest(files={"f3": "c" * 64}, domain="bitcoin")
907 a = self.plugin.to_crdt_state(snap_a)
908 b = self.plugin.to_crdt_state(snap_b)
909 c = self.plugin.to_crdt_state(snap_c)
910 ab_c = self.plugin.from_crdt_state(
911 self.plugin.join(self.plugin.join(a, b), c)
912 )
913 a_bc = self.plugin.from_crdt_state(
914 self.plugin.join(a, self.plugin.join(b, c))
915 )
916 assert ab_c["files"] == a_bc["files"]
917
918 def test_join_preserves_both_agents_files(self) -> None:
919 snap_a = SnapshotManifest(
920 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
921 )
922 snap_b = SnapshotManifest(
923 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
924 )
925 a = self.plugin.to_crdt_state(snap_a)
926 b = self.plugin.to_crdt_state(snap_b)
927 joined = self.plugin.from_crdt_state(self.plugin.join(a, b))
928 assert "wallet/utxos.json" in joined["files"]
929 assert "channels/channels.json" in joined["files"]
930
931 def test_crdt_schema_has_seven_dimensions(self) -> None:
932 dims = self.plugin.crdt_schema()
933 assert len(dims) == 7
934
935 def test_crdt_schema_dimension_types(self) -> None:
936 dims = self.plugin.crdt_schema()
937 types = {d["crdt_type"] for d in dims}
938 assert "aw_map" in types
939 assert "or_set" in types
940
941 def test_vector_clock_advances_on_join(self) -> None:
942 snap_a = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
943 snap_b = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
944 a = self.plugin.to_crdt_state(snap_a)
945 b = self.plugin.to_crdt_state(snap_b)
946 joined = self.plugin.join(a, b)
947 assert isinstance(joined["vclock"], dict)
948
949
950 # ---------------------------------------------------------------------------
951 # Query analytics
952 # ---------------------------------------------------------------------------
953
954
955 class TestQueryAnalytics:
956 def test_total_balance_sat(self) -> None:
957 utxos = [
958 _make_utxo(amount_sat=100_000),
959 _make_utxo(txid="b" * 64, amount_sat=200_000),
960 ]
961 assert total_balance_sat(utxos) == 300_000
962
963 def test_confirmed_balance_excludes_unconfirmed(self) -> None:
964 utxos = [
965 _make_utxo(amount_sat=100_000, confirmations=6),
966 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
967 ]
968 assert confirmed_balance_sat(utxos) == 100_000
969
970 def test_confirmed_balance_excludes_immature_coinbase(self) -> None:
971 utxos = [
972 _make_utxo(amount_sat=625_000_000, confirmations=50, coinbase=True),
973 _make_utxo(txid="c" * 64, amount_sat=100_000, confirmations=1),
974 ]
975 assert confirmed_balance_sat(utxos) == 100_000
976
977 def test_balance_by_script_type(self) -> None:
978 utxos = [
979 _make_utxo(amount_sat=100_000, script_type="p2wpkh"),
980 _make_utxo(txid="b" * 64, amount_sat=200_000, script_type="p2tr"),
981 _make_utxo(txid="c" * 64, amount_sat=50_000, script_type="p2wpkh"),
982 ]
983 breakdown = balance_by_script_type(utxos)
984 assert breakdown["p2wpkh"] == 150_000
985 assert breakdown["p2tr"] == 200_000
986
987 def test_coin_age_blocks(self) -> None:
988 u = _make_utxo(block_height=800_000)
989 assert coin_age_blocks(u, 850_000) == 50_000
990
991 def test_coin_age_blocks_unconfirmed(self) -> None:
992 u = _make_utxo(block_height=None)
993 assert coin_age_blocks(u, 850_000) is None
994
995 def test_format_sat_small(self) -> None:
996 assert format_sat(1_000) == "1,000 sats"
997
998 def test_format_sat_large(self) -> None:
999 result = format_sat(100_000_000)
1000 assert "1.00000000 BTC" in result
1001
1002 def test_utxo_key(self) -> None:
1003 u = _make_utxo(txid="abc", vout=3)
1004 assert utxo_key(u) == "abc:3"
1005
1006 def test_double_spend_candidates_detects_concurrent_spends(self) -> None:
1007 base = {"aaa:0", "bbb:1", "ccc:2"}
1008 our_spent = {"aaa:0", "bbb:1"}
1009 their_spent = {"aaa:0", "ccc:2"}
1010 candidates = double_spend_candidates(base, our_spent, their_spent)
1011 assert candidates == ["aaa:0"]
1012
1013 def test_double_spend_candidates_no_overlap(self) -> None:
1014 base = {"aaa:0", "bbb:1"}
1015 our_spent = {"aaa:0"}
1016 their_spent = {"bbb:1"}
1017 assert double_spend_candidates(base, our_spent, their_spent) == []
1018
1019 def test_channel_liquidity_totals(self) -> None:
1020 ch1 = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=500_000)
1021 ch2 = _make_channel(
1022 channel_id="x", local_balance_sat=200_000, remote_balance_sat=800_000
1023 )
1024 local, remote = channel_liquidity_totals([ch1, ch2])
1025 assert local == 1_200_000
1026 assert remote == 1_300_000
1027
1028 def test_channel_utilization(self) -> None:
1029 ch = _make_channel(
1030 capacity_sat=2_000_000,
1031 local_balance_sat=800_000,
1032 local_reserve_sat=20_000,
1033 remote_reserve_sat=20_000,
1034 )
1035 util = channel_utilization(ch)
1036 assert 0.0 <= util <= 1.0
1037
1038 def test_fee_surface_str(self) -> None:
1039 est = _make_fee(t1=42, t6=15, t144=3)
1040 s = fee_surface_str(est)
1041 assert "42" in s
1042 assert "15" in s
1043 assert "3" in s
1044 assert "sat/vbyte" in s
1045
1046 def test_latest_fee_estimate(self) -> None:
1047 old = _make_fee(timestamp=1_000)
1048 new = _make_fee(timestamp=2_000, t1=50)
1049 result = latest_fee_estimate([old, new])
1050 assert result is not None
1051 assert result["timestamp"] == 2_000
1052
1053 def test_latest_fee_estimate_empty(self) -> None:
1054 assert latest_fee_estimate([]) is None
1055
1056 def test_latest_price(self) -> None:
1057 p1 = _make_price(timestamp=1_000, price_usd=50_000.0)
1058 p2 = _make_price(timestamp=2_000, price_usd=70_000.0)
1059 assert latest_price([p1, p2]) == 70_000.0
1060
1061 def test_strategy_summary_line_simulation_mode(self) -> None:
1062 s = _make_strategy(simulation_mode=True, name="aggressive")
1063 line = strategy_summary_line(s)
1064 assert "SIM" in line
1065 assert "aggressive" in line
1066
1067 def test_utxo_summary_line(self) -> None:
1068 utxos = [
1069 _make_utxo(amount_sat=100_000, confirmations=6),
1070 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
1071 ]
1072 line = utxo_summary_line(utxos)
1073 assert "2 UTXOs" in line
1074
1075 def test_mempool_summary_empty(self) -> None:
1076 line = mempool_summary_line([])
1077 assert "empty" in line
1078
1079
1080 # ---------------------------------------------------------------------------
1081 # Handler routing
1082 # ---------------------------------------------------------------------------
1083
1084
1085 class TestHandlerRouting:
1086 def test_utxos_json_routed(self) -> None:
1087 assert _handler_for_path("wallet/utxos.json") == "utxos"
1088
1089 def test_channels_json_routed(self) -> None:
1090 assert _handler_for_path("channels/channels.json") == "channels"
1091
1092 def test_agent_json_routed(self) -> None:
1093 assert _handler_for_path("strategy/agent.json") == "strategy"
1094
1095 def test_prices_json_routed(self) -> None:
1096 assert _handler_for_path("oracles/prices.json") == "prices"
1097
1098 def test_unknown_file_returns_none(self) -> None:
1099 assert _handler_for_path("README.md") is None
1100 assert _handler_for_path("custom/data.bin") is None
1101
1102
1103 # ---------------------------------------------------------------------------
1104 # Registry registration
1105 # ---------------------------------------------------------------------------
1106
1107
1108 class TestRegistry:
1109 def test_bitcoin_registered_in_plugin_registry(self) -> None:
1110 from muse.plugins.registry import registered_domains
1111 assert "bitcoin" in registered_domains()
1112
1113 def test_resolve_bitcoin_plugin(self) -> None:
1114 import json as _json
1115 from muse.plugins.registry import resolve_plugin
1116
1117 with tempfile.TemporaryDirectory() as tmp:
1118 root = pathlib.Path(tmp)
1119 (root / ".muse").mkdir()
1120 (root / ".muse" / "repo.json").write_text(
1121 _json.dumps({"domain": "bitcoin"})
1122 )
1123 plugin = resolve_plugin(root)
1124
1125 assert isinstance(plugin, BitcoinPlugin)