gabriel / muse public
test_bitcoin_plugin.py python
1128 lines 40.0 KB
8aa515d5 refactor: consolidate schema_version to single source of truth Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Comprehensive test suite for the Bitcoin domain plugin.
2
3 Tests cover all three protocol levels:
4 - Core MuseDomainPlugin (snapshot, diff, merge, drift, apply, schema)
5 - StructuredMergePlugin (OT merge with double-spend detection)
6 - CRDTPlugin (convergent join — commutativity, associativity, idempotency)
7
8 Tests are organized by capability, each asserting exact behaviour
9 with realistic Bitcoin fixtures.
10 """
11
12 from __future__ import annotations
13
14 import hashlib
15 import json
16 import pathlib
17 import tempfile
18
19 import pytest
20
21 from muse._version import __version__
22 from muse.domain import (
23 CRDTPlugin,
24 DeleteOp,
25 DomainOp,
26 InsertOp,
27 MuseDomainPlugin,
28 MutateOp,
29 PatchOp,
30 ReplaceOp,
31 SnapshotManifest,
32 StructuredMergePlugin,
33 )
34 from muse.plugins.bitcoin._query import (
35 balance_by_script_type,
36 channel_liquidity_totals,
37 channel_utilization,
38 coin_age_blocks,
39 confirmed_balance_sat,
40 double_spend_candidates,
41 fee_surface_str,
42 format_sat,
43 latest_fee_estimate,
44 latest_price,
45 mempool_summary_line,
46 strategy_summary_line,
47 total_balance_sat,
48 utxo_key,
49 utxo_summary_line,
50 )
51 from muse.plugins.bitcoin._types import (
52 AddressLabelRecord,
53 AgentStrategyRecord,
54 CoinCategory,
55 CoinSelectAlgo,
56 FeeEstimateRecord,
57 LightningChannelRecord,
58 OraclePriceTickRecord,
59 PendingTxRecord,
60 RoutingPolicyRecord,
61 ScriptType,
62 UTXORecord,
63 )
64 from muse.plugins.bitcoin.plugin import (
65 BitcoinPlugin,
66 _diff_channels,
67 _diff_labels,
68 _diff_strategy,
69 _diff_time_series,
70 _diff_transactions,
71 _diff_utxos,
72 _handler_for_path,
73 )
74
75 # ---------------------------------------------------------------------------
76 # Fixtures
77 # ---------------------------------------------------------------------------
78
79
80 def _make_utxo(
81 txid: str = "abc" * 21 + "ab",
82 vout: int = 0,
83 amount_sat: int = 100_000,
84 script_type: ScriptType = "p2wpkh",
85 address: str = "bc1qtest",
86 confirmations: int = 6,
87 block_height: int | None = 850_000,
88 coinbase: bool = False,
89 label: str | None = None,
90 ) -> UTXORecord:
91 return UTXORecord(
92 txid=txid,
93 vout=vout,
94 amount_sat=amount_sat,
95 script_type=script_type,
96 address=address,
97 confirmations=confirmations,
98 block_height=block_height,
99 coinbase=coinbase,
100 label=label,
101 )
102
103
104 def _make_channel(
105 channel_id: str = "850000x1x0",
106 peer_pubkey: str = "0279" + "aa" * 32,
107 peer_alias: str | None = "ACINQ",
108 capacity_sat: int = 2_000_000,
109 local_balance_sat: int = 1_000_000,
110 remote_balance_sat: int = 900_000,
111 is_active: bool = True,
112 is_public: bool = True,
113 local_reserve_sat: int = 20_000,
114 remote_reserve_sat: int = 20_000,
115 unsettled_balance_sat: int = 0,
116 htlc_count: int = 0,
117 ) -> LightningChannelRecord:
118 return LightningChannelRecord(
119 channel_id=channel_id,
120 peer_pubkey=peer_pubkey,
121 peer_alias=peer_alias,
122 capacity_sat=capacity_sat,
123 local_balance_sat=local_balance_sat,
124 remote_balance_sat=remote_balance_sat,
125 is_active=is_active,
126 is_public=is_public,
127 local_reserve_sat=local_reserve_sat,
128 remote_reserve_sat=remote_reserve_sat,
129 unsettled_balance_sat=unsettled_balance_sat,
130 htlc_count=htlc_count,
131 )
132
133
134 def _make_label(
135 address: str = "bc1qtest",
136 label: str = "cold storage",
137 category: CoinCategory = "income",
138 created_at: int = 1_700_000_000,
139 ) -> AddressLabelRecord:
140 return AddressLabelRecord(
141 address=address,
142 label=label,
143 category=category,
144 created_at=created_at,
145 )
146
147
148 def _make_strategy(
149 name: str = "conservative",
150 max_fee_rate_sat_vbyte: int = 10,
151 min_confirmations: int = 6,
152 utxo_consolidation_threshold: int = 20,
153 dca_amount_sat: int | None = 500_000,
154 dca_interval_blocks: int | None = 144,
155 lightning_rebalance_threshold: float = 0.2,
156 coin_selection: CoinSelectAlgo = "branch_and_bound",
157 simulation_mode: bool = False,
158 ) -> AgentStrategyRecord:
159 return AgentStrategyRecord(
160 name=name,
161 max_fee_rate_sat_vbyte=max_fee_rate_sat_vbyte,
162 min_confirmations=min_confirmations,
163 utxo_consolidation_threshold=utxo_consolidation_threshold,
164 dca_amount_sat=dca_amount_sat,
165 dca_interval_blocks=dca_interval_blocks,
166 lightning_rebalance_threshold=lightning_rebalance_threshold,
167 coin_selection=coin_selection,
168 simulation_mode=simulation_mode,
169 )
170
171
172 def _make_price(
173 timestamp: int = 1_700_000_000,
174 block_height: int | None = 850_000,
175 price_usd: float = 62_000.0,
176 source: str = "coinbase",
177 ) -> OraclePriceTickRecord:
178 return OraclePriceTickRecord(
179 timestamp=timestamp,
180 block_height=block_height,
181 price_usd=price_usd,
182 source=source,
183 )
184
185
186 def _make_fee(
187 timestamp: int = 1_700_000_000,
188 block_height: int | None = 850_000,
189 t1: int = 30,
190 t6: int = 15,
191 t144: int = 3,
192 ) -> FeeEstimateRecord:
193 return FeeEstimateRecord(
194 timestamp=timestamp,
195 block_height=block_height,
196 target_1_block_sat_vbyte=t1,
197 target_6_block_sat_vbyte=t6,
198 target_144_block_sat_vbyte=t144,
199 )
200
201
202 _AnyBitcoinRecord = (
203 UTXORecord
204 | LightningChannelRecord
205 | AddressLabelRecord
206 | AgentStrategyRecord
207 | FeeEstimateRecord
208 | RoutingPolicyRecord
209 | PendingTxRecord
210 | OraclePriceTickRecord
211 )
212
213
214 def _json_bytes(obj: _AnyBitcoinRecord | list[_AnyBitcoinRecord]) -> bytes:
215 return json.dumps(obj, sort_keys=True).encode()
216
217
218 def _sha256(data: bytes) -> str:
219 return hashlib.sha256(data).hexdigest()
220
221
222 def _make_manifest(files: dict[str, bytes]) -> SnapshotManifest:
223 return SnapshotManifest(
224 files={path: _sha256(data) for path, data in files.items()},
225 domain="bitcoin",
226 )
227
228
229 # ---------------------------------------------------------------------------
230 # Protocol conformance
231 # ---------------------------------------------------------------------------
232
233
234 class TestProtocolConformance:
235 def test_satisfies_muse_domain_plugin(self) -> None:
236 assert isinstance(BitcoinPlugin(), MuseDomainPlugin)
237
238 def test_satisfies_structured_merge_plugin(self) -> None:
239 assert isinstance(BitcoinPlugin(), StructuredMergePlugin)
240
241 def test_satisfies_crdt_plugin(self) -> None:
242 assert isinstance(BitcoinPlugin(), CRDTPlugin)
243
244
245 # ---------------------------------------------------------------------------
246 # snapshot
247 # ---------------------------------------------------------------------------
248
249
250 class TestSnapshot:
251 def test_snapshot_from_path_hashes_all_json_files(self) -> None:
252 plugin = BitcoinPlugin()
253 with tempfile.TemporaryDirectory() as tmp:
254 root = pathlib.Path(tmp)
255 (root / "wallet").mkdir()
256 (root / "wallet" / "utxos.json").write_bytes(b"[]")
257 (root / "wallet" / "labels.json").write_bytes(b"[]")
258
259 snap = plugin.snapshot(root)
260
261 assert snap["domain"] == "bitcoin"
262 assert "wallet/utxos.json" in snap["files"]
263 assert "wallet/labels.json" in snap["files"]
264
265 def test_snapshot_excludes_hidden_files(self) -> None:
266 plugin = BitcoinPlugin()
267 with tempfile.TemporaryDirectory() as tmp:
268 root = pathlib.Path(tmp)
269 (root / "wallet").mkdir()
270 (root / "wallet" / "utxos.json").write_bytes(b"[]")
271 (root / ".secret").write_bytes(b"private key would be here")
272
273 snap = plugin.snapshot(root)
274
275 assert ".secret" not in snap["files"]
276 assert "wallet/utxos.json" in snap["files"]
277
278 def test_snapshot_passthrough_for_existing_manifest(self) -> None:
279 plugin = BitcoinPlugin()
280 manifest = SnapshotManifest(
281 files={"wallet/utxos.json": "abc" * 21 + "ab"},
282 domain="bitcoin",
283 )
284 result = plugin.snapshot(manifest)
285 assert result is manifest
286
287 def test_snapshot_content_hash_is_sha256(self) -> None:
288 plugin = BitcoinPlugin()
289 with tempfile.TemporaryDirectory() as tmp:
290 root = pathlib.Path(tmp)
291 (root / "wallet").mkdir()
292 content = b"[]\n"
293 (root / "wallet" / "utxos.json").write_bytes(content)
294
295 snap = plugin.snapshot(root)
296
297 expected_hash = _sha256(content)
298 assert snap["files"]["wallet/utxos.json"] == expected_hash
299
300 def test_snapshot_deterministic(self) -> None:
301 plugin = BitcoinPlugin()
302 with tempfile.TemporaryDirectory() as tmp:
303 root = pathlib.Path(tmp)
304 (root / "wallet").mkdir()
305 (root / "wallet" / "utxos.json").write_bytes(b"[]")
306
307 snap1 = plugin.snapshot(root)
308 snap2 = plugin.snapshot(root)
309
310 assert snap1["files"] == snap2["files"]
311
312
313 # ---------------------------------------------------------------------------
314 # diff — file-level
315 # ---------------------------------------------------------------------------
316
317
318 class TestDiffFileLevel:
319 def setup_method(self) -> None:
320 self.plugin = BitcoinPlugin()
321
322 def test_added_file_produces_insert_op(self) -> None:
323 base = SnapshotManifest(files={}, domain="bitcoin")
324 target = SnapshotManifest(
325 files={"wallet/utxos.json": "a" * 64},
326 domain="bitcoin",
327 )
328 delta = self.plugin.diff(base, target)
329 assert len(delta["ops"]) == 1
330 assert delta["ops"][0]["op"] == "insert"
331 assert delta["ops"][0]["address"] == "wallet/utxos.json"
332
333 def test_removed_file_produces_delete_op(self) -> None:
334 base = SnapshotManifest(
335 files={"wallet/utxos.json": "a" * 64},
336 domain="bitcoin",
337 )
338 target = SnapshotManifest(files={}, domain="bitcoin")
339 delta = self.plugin.diff(base, target)
340 assert delta["ops"][0]["op"] == "delete"
341
342 def test_unchanged_file_produces_no_ops(self) -> None:
343 files = {"wallet/utxos.json": "a" * 64}
344 base = SnapshotManifest(files=files, domain="bitcoin")
345 target = SnapshotManifest(files=files, domain="bitcoin")
346 delta = self.plugin.diff(base, target)
347 assert len(delta["ops"]) == 0
348 assert "clean" in delta["summary"]
349
350 def test_modified_file_without_repo_root_produces_replace_op(self) -> None:
351 base = SnapshotManifest(
352 files={"wallet/utxos.json": "a" * 64},
353 domain="bitcoin",
354 )
355 target = SnapshotManifest(
356 files={"wallet/utxos.json": "b" * 64},
357 domain="bitcoin",
358 )
359 delta = self.plugin.diff(base, target)
360 assert delta["ops"][0]["op"] == "replace"
361
362 def test_summary_reflects_op_counts(self) -> None:
363 base = SnapshotManifest(
364 files={"wallet/utxos.json": "a" * 64, "wallet/labels.json": "b" * 64},
365 domain="bitcoin",
366 )
367 target = SnapshotManifest(
368 files={"wallet/utxos.json": "a" * 64, "strategy/agent.json": "c" * 64},
369 domain="bitcoin",
370 )
371 delta = self.plugin.diff(base, target)
372 assert "added" in delta["summary"]
373 assert "removed" in delta["summary"]
374
375
376 # ---------------------------------------------------------------------------
377 # diff — semantic UTXO level
378 # ---------------------------------------------------------------------------
379
380
381 class TestDiffUTXOs:
382 def test_new_utxo_produces_insert_op(self) -> None:
383 u = _make_utxo(txid="a" * 64, vout=0, amount_sat=500_000)
384 old = _json_bytes([])
385 new = _json_bytes([u])
386 ops = _diff_utxos("wallet/utxos.json", old, new)
387 assert len(ops) == 1
388 assert ops[0]["op"] == "insert"
389 assert "a" * 64 + ":0" in ops[0]["address"]
390 assert "0.00500000 BTC" in ops[0]["content_summary"]
391
392 def test_spent_utxo_produces_delete_op(self) -> None:
393 u = _make_utxo(txid="b" * 64, vout=1, amount_sat=200_000)
394 old = _json_bytes([u])
395 new = _json_bytes([])
396 ops = _diff_utxos("wallet/utxos.json", old, new)
397 assert len(ops) == 1
398 assert ops[0]["op"] == "delete"
399 assert "spent" in ops[0]["content_summary"]
400
401 def test_confirmation_update_produces_mutate_op(self) -> None:
402 txid = "c" * 64
403 u_old = _make_utxo(txid=txid, vout=0, confirmations=1)
404 u_new = _make_utxo(txid=txid, vout=0, confirmations=6)
405 old = _json_bytes([u_old])
406 new = _json_bytes([u_new])
407 ops = _diff_utxos("wallet/utxos.json", old, new)
408 assert len(ops) == 1
409 assert ops[0]["op"] == "mutate"
410 assert ops[0]["fields"]["confirmations"]["old"] == "1"
411 assert ops[0]["fields"]["confirmations"]["new"] == "6"
412
413 def test_unchanged_utxos_produce_no_ops(self) -> None:
414 u = _make_utxo()
415 data = _json_bytes([u])
416 ops = _diff_utxos("wallet/utxos.json", data, data)
417 assert ops == []
418
419 def test_multiple_utxos_diff(self) -> None:
420 u1 = _make_utxo(txid="d" * 64, vout=0, amount_sat=100_000)
421 u2 = _make_utxo(txid="e" * 64, vout=0, amount_sat=200_000)
422 u3 = _make_utxo(txid="f" * 64, vout=0, amount_sat=300_000)
423 old = _json_bytes([u1, u2])
424 new = _json_bytes([u2, u3])
425 ops = _diff_utxos("wallet/utxos.json", old, new)
426 op_types = {op["op"] for op in ops}
427 assert "insert" in op_types
428 assert "delete" in op_types
429
430
431 # ---------------------------------------------------------------------------
432 # diff — semantic channel level
433 # ---------------------------------------------------------------------------
434
435
436 class TestDiffChannels:
437 def test_new_channel_produces_insert_op(self) -> None:
438 ch = _make_channel()
439 ops = _diff_channels("channels/channels.json", _json_bytes([]), _json_bytes([ch]))
440 assert len(ops) == 1
441 assert ops[0]["op"] == "insert"
442 assert "opened" in ops[0]["content_summary"]
443
444 def test_closed_channel_produces_delete_op(self) -> None:
445 ch = _make_channel()
446 ops = _diff_channels("channels/channels.json", _json_bytes([ch]), _json_bytes([]))
447 assert len(ops) == 1
448 assert ops[0]["op"] == "delete"
449
450 def test_balance_change_produces_mutate_op(self) -> None:
451 ch_old = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=900_000)
452 ch_new = _make_channel(local_balance_sat=800_000, remote_balance_sat=1_100_000)
453 ops = _diff_channels(
454 "channels/channels.json",
455 _json_bytes([ch_old]),
456 _json_bytes([ch_new]),
457 )
458 assert len(ops) == 1
459 assert ops[0]["op"] == "mutate"
460 fields = ops[0]["fields"]
461 assert "local_balance_sat" in fields
462 assert "remote_balance_sat" in fields
463
464
465 # ---------------------------------------------------------------------------
466 # diff — strategy field level
467 # ---------------------------------------------------------------------------
468
469
470 class TestDiffStrategy:
471 def test_fee_rate_change_produces_mutate_op(self) -> None:
472 old_strat = _make_strategy(max_fee_rate_sat_vbyte=10)
473 new_strat = _make_strategy(max_fee_rate_sat_vbyte=50)
474 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_strat), _json_bytes(new_strat))
475 assert len(ops) == 1
476 assert ops[0]["op"] == "mutate"
477 assert "max_fee_rate_sat_vbyte" in ops[0]["fields"]
478
479 def test_simulation_mode_toggle_detected(self) -> None:
480 old_s = _make_strategy(simulation_mode=False)
481 new_s = _make_strategy(simulation_mode=True)
482 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_s), _json_bytes(new_s))
483 assert ops[0]["op"] == "mutate"
484 assert ops[0]["fields"]["simulation_mode"]["old"] == "False"
485 assert ops[0]["fields"]["simulation_mode"]["new"] == "True"
486
487 def test_identical_strategy_no_ops(self) -> None:
488 s = _make_strategy()
489 data = _json_bytes(s)
490 ops = _diff_strategy("strategy/agent.json", data, data)
491 assert ops == []
492
493
494 # ---------------------------------------------------------------------------
495 # diff — time series (prices, fees)
496 # ---------------------------------------------------------------------------
497
498
499 class TestDiffTimeSeries:
500 def test_new_price_tick_produces_insert(self) -> None:
501 tick = _make_price(timestamp=1_000, price_usd=60_000.0)
502 ops = _diff_time_series(
503 "oracles/prices.json", _json_bytes([]), _json_bytes([tick]), "prices"
504 )
505 assert len(ops) == 1
506 assert ops[0]["op"] == "insert"
507 assert "$60,000.00" in ops[0]["content_summary"]
508
509 def test_new_fee_estimate_produces_insert(self) -> None:
510 fee = _make_fee(timestamp=2_000, t1=25, t6=12, t144=2)
511 ops = _diff_time_series(
512 "oracles/fees.json", _json_bytes([]), _json_bytes([fee]), "fees"
513 )
514 assert len(ops) == 1
515 assert ops[0]["op"] == "insert"
516 assert "25" in ops[0]["content_summary"]
517
518 def test_existing_tick_not_duplicated(self) -> None:
519 tick = _make_price(timestamp=3_000)
520 data = _json_bytes([tick])
521 ops = _diff_time_series("oracles/prices.json", data, data, "prices")
522 assert ops == []
523
524
525 # ---------------------------------------------------------------------------
526 # diff with repo_root — produces PatchOp
527 # ---------------------------------------------------------------------------
528
529
530 class TestDiffWithRepoRoot:
531 def test_modified_utxos_json_produces_patch_op(self) -> None:
532 plugin = BitcoinPlugin()
533 u_old = _make_utxo(txid="a" * 64, vout=0, amount_sat=100_000)
534 u_new = _make_utxo(txid="b" * 64, vout=0, amount_sat=200_000)
535 old_bytes = _json_bytes([u_old])
536 new_bytes = _json_bytes([u_new])
537 old_hash = _sha256(old_bytes)
538 new_hash = _sha256(new_bytes)
539
540 with tempfile.TemporaryDirectory() as tmp:
541 root = pathlib.Path(tmp)
542 muse_dir = root / ".muse" / "objects"
543 muse_dir.mkdir(parents=True)
544 # Write blobs in sharded layout
545 (muse_dir / old_hash[:2]).mkdir(exist_ok=True)
546 (muse_dir / old_hash[:2] / old_hash[2:]).write_bytes(old_bytes)
547 (muse_dir / new_hash[:2]).mkdir(exist_ok=True)
548 (muse_dir / new_hash[:2] / new_hash[2:]).write_bytes(new_bytes)
549
550 base = SnapshotManifest(
551 files={"wallet/utxos.json": old_hash}, domain="bitcoin"
552 )
553 target = SnapshotManifest(
554 files={"wallet/utxos.json": new_hash}, domain="bitcoin"
555 )
556 delta = plugin.diff(base, target, repo_root=root)
557
558 assert delta["ops"][0]["op"] == "patch"
559 patch = delta["ops"][0]
560 assert patch["child_domain"] == "bitcoin.utxos"
561 child_ops = patch["child_ops"]
562 op_types = {op["op"] for op in child_ops}
563 assert "insert" in op_types
564 assert "delete" in op_types
565
566 def test_fallback_to_replace_when_object_missing(self) -> None:
567 plugin = BitcoinPlugin()
568 base = SnapshotManifest(
569 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
570 )
571 target = SnapshotManifest(
572 files={"wallet/utxos.json": "b" * 64}, domain="bitcoin"
573 )
574 with tempfile.TemporaryDirectory() as tmp:
575 root = pathlib.Path(tmp)
576 (root / ".muse" / "objects").mkdir(parents=True)
577 delta = plugin.diff(base, target, repo_root=root)
578
579 assert delta["ops"][0]["op"] == "replace"
580
581
582 # ---------------------------------------------------------------------------
583 # merge
584 # ---------------------------------------------------------------------------
585
586
587 class TestMerge:
588 def setup_method(self) -> None:
589 self.plugin = BitcoinPlugin()
590
591 def test_clean_merge_no_conflicts(self) -> None:
592 base = SnapshotManifest(files={}, domain="bitcoin")
593 left = SnapshotManifest(
594 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
595 )
596 right = SnapshotManifest(
597 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
598 )
599 result = self.plugin.merge(base, left, right)
600 assert result.is_clean
601 assert "wallet/utxos.json" in result.merged["files"]
602 assert "channels/channels.json" in result.merged["files"]
603
604 def test_both_sides_unchanged_no_conflict(self) -> None:
605 files = {"wallet/utxos.json": "a" * 64}
606 base = SnapshotManifest(files=files, domain="bitcoin")
607 left = SnapshotManifest(files=files, domain="bitcoin")
608 right = SnapshotManifest(files=files, domain="bitcoin")
609 result = self.plugin.merge(base, left, right)
610 assert result.is_clean
611
612 def test_only_left_changed_takes_left(self) -> None:
613 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
614 left = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
615 right = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
616 result = self.plugin.merge(base, left, right)
617 assert result.is_clean
618 assert result.merged["files"]["f"] == "b" * 64
619
620 def test_only_right_changed_takes_right(self) -> None:
621 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
622 left = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
623 right = SnapshotManifest(files={"f": "c" * 64}, domain="bitcoin")
624 result = self.plugin.merge(base, left, right)
625 assert result.is_clean
626 assert result.merged["files"]["f"] == "c" * 64
627
628 def test_both_changed_differently_produces_conflict(self) -> None:
629 base = SnapshotManifest(files={"wallet/utxos.json": "a" * 64}, domain="bitcoin")
630 left = SnapshotManifest(files={"wallet/utxos.json": "b" * 64}, domain="bitcoin")
631 right = SnapshotManifest(files={"wallet/utxos.json": "c" * 64}, domain="bitcoin")
632 result = self.plugin.merge(base, left, right)
633 assert not result.is_clean
634 assert "wallet/utxos.json" in result.conflicts
635
636 def test_double_spend_detection_in_merge(self) -> None:
637 u_base = _make_utxo(txid="d" * 64, vout=0, amount_sat=1_000_000)
638 # Left spent the base UTXO and got change at a different address
639 u_left_change = _make_utxo(txid="e" * 64, vout=0, amount_sat=900_000, address="bc1qleft")
640 # Right spent the SAME base UTXO in a different tx → double-spend
641 u_right_change = _make_utxo(txid="f" * 64, vout=0, amount_sat=850_000, address="bc1qright")
642 base_bytes = _json_bytes([u_base])
643 left_bytes = _json_bytes([u_left_change]) # left spent base UTXO, got change
644 right_bytes = _json_bytes([u_right_change]) # right also spent it → double-spend!
645
646 with tempfile.TemporaryDirectory() as tmp:
647 root = pathlib.Path(tmp)
648 muse_dir = root / ".muse" / "objects"
649 muse_dir.mkdir(parents=True)
650
651 for data in (base_bytes, left_bytes, right_bytes):
652 h = _sha256(data)
653 shard = muse_dir / h[:2]
654 shard.mkdir(exist_ok=True)
655 (shard / h[2:]).write_bytes(data)
656
657 base = SnapshotManifest(
658 files={"wallet/utxos.json": _sha256(base_bytes)}, domain="bitcoin"
659 )
660 left = SnapshotManifest(
661 files={"wallet/utxos.json": _sha256(left_bytes)}, domain="bitcoin"
662 )
663 right = SnapshotManifest(
664 files={"wallet/utxos.json": _sha256(right_bytes)}, domain="bitcoin"
665 )
666 result = self.plugin.merge(base, left, right, repo_root=root)
667
668 # Both sides deleted the same UTXO from the same base → double-spend
669 double_spend_records = [
670 cr for cr in result.conflict_records
671 if cr.conflict_type == "double_spend"
672 ]
673 assert len(double_spend_records) >= 1
674
675 def test_both_deleted_same_file_is_clean(self) -> None:
676 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
677 left = SnapshotManifest(files={}, domain="bitcoin")
678 right = SnapshotManifest(files={}, domain="bitcoin")
679 result = self.plugin.merge(base, left, right)
680 assert result.is_clean
681 assert "f" not in result.merged["files"]
682
683
684 # ---------------------------------------------------------------------------
685 # drift
686 # ---------------------------------------------------------------------------
687
688
689 class TestDrift:
690 def test_no_drift_when_identical(self) -> None:
691 plugin = BitcoinPlugin()
692 snap = SnapshotManifest(
693 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
694 )
695 report = plugin.drift(snap, snap)
696 assert not report.has_drift
697
698 def test_drift_detected_when_file_added(self) -> None:
699 plugin = BitcoinPlugin()
700 committed = SnapshotManifest(files={}, domain="bitcoin")
701 live = SnapshotManifest(
702 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
703 )
704 report = plugin.drift(committed, live)
705 assert report.has_drift
706 assert "added" in report.summary
707
708 def test_drift_from_disk_detected(self) -> None:
709 plugin = BitcoinPlugin()
710 with tempfile.TemporaryDirectory() as tmp:
711 root = pathlib.Path(tmp)
712 (root / "wallet").mkdir()
713 (root / "wallet" / "utxos.json").write_bytes(b"[]")
714
715 committed = SnapshotManifest(files={}, domain="bitcoin")
716 report = plugin.drift(committed, root)
717
718 assert report.has_drift
719
720
721 # ---------------------------------------------------------------------------
722 # apply
723 # ---------------------------------------------------------------------------
724
725
726 class TestApply:
727 def test_apply_is_passthrough(self) -> None:
728 plugin = BitcoinPlugin()
729 delta = plugin.diff(
730 SnapshotManifest(files={}, domain="bitcoin"),
731 SnapshotManifest(files={}, domain="bitcoin"),
732 )
733 snap = SnapshotManifest(files={}, domain="bitcoin")
734 result = plugin.apply(delta, snap)
735 assert result is snap
736
737
738 # ---------------------------------------------------------------------------
739 # schema
740 # ---------------------------------------------------------------------------
741
742
743 class TestSchema:
744 def test_schema_returns_domain_schema(self) -> None:
745 from muse.core.schema import DomainSchema
746 s = BitcoinPlugin().schema()
747 assert isinstance(s, dict)
748 assert s["domain"] == "bitcoin"
749 assert s["schema_version"] == __version__
750
751 def test_schema_has_ten_dimensions(self) -> None:
752 s = BitcoinPlugin().schema()
753 assert len(s["dimensions"]) == 10
754
755 def test_schema_merge_mode_is_crdt(self) -> None:
756 s = BitcoinPlugin().schema()
757 assert s["merge_mode"] == "crdt"
758
759 def test_dimension_names(self) -> None:
760 s = BitcoinPlugin().schema()
761 names = {d["name"] for d in s["dimensions"]}
762 expected = {
763 "utxos", "transactions", "labels", "descriptors",
764 "channels", "strategy", "oracle_prices", "oracle_fees",
765 "network", "execution",
766 }
767 assert names == expected
768
769 def test_strategy_dimension_is_not_independent(self) -> None:
770 s = BitcoinPlugin().schema()
771 strat = next(d for d in s["dimensions"] if d["name"] == "strategy")
772 assert strat["independent_merge"] is False
773
774
775 # ---------------------------------------------------------------------------
776 # OT merge — StructuredMergePlugin
777 # ---------------------------------------------------------------------------
778
779
780 class TestMergeOps:
781 def setup_method(self) -> None:
782 self.plugin = BitcoinPlugin()
783 self.base = SnapshotManifest(files={}, domain="bitcoin")
784 self.snap = SnapshotManifest(files={}, domain="bitcoin")
785
786 def test_non_conflicting_ops_clean_merge(self) -> None:
787 ours_ops: list[DomainOp] = [
788 InsertOp(
789 op="insert",
790 address="wallet/utxos.json::aaa:0",
791 position=None,
792 content_id="a" * 64,
793 content_summary="received UTXO aaa:0",
794 )
795 ]
796 theirs_ops: list[DomainOp] = [
797 InsertOp(
798 op="insert",
799 address="wallet/utxos.json::bbb:0",
800 position=None,
801 content_id="b" * 64,
802 content_summary="received UTXO bbb:0",
803 )
804 ]
805 result = self.plugin.merge_ops(
806 self.base, self.snap, self.snap, ours_ops, theirs_ops
807 )
808 # Different addresses → no double-spend, clean merge
809 assert len(result.conflict_records) == 0
810
811 def test_double_spend_detected_in_merge_ops(self) -> None:
812 utxo_addr = "wallet/utxos.json::deadbeef:0"
813 ours_ops: list[DomainOp] = [
814 DeleteOp(
815 op="delete",
816 address=utxo_addr,
817 position=None,
818 content_id="a" * 64,
819 content_summary="spent UTXO deadbeef:0",
820 )
821 ]
822 theirs_ops: list[DomainOp] = [
823 DeleteOp(
824 op="delete",
825 address=utxo_addr,
826 position=None,
827 content_id="a" * 64,
828 content_summary="spent UTXO deadbeef:0",
829 )
830 ]
831 result = self.plugin.merge_ops(
832 self.base, self.snap, self.snap, ours_ops, theirs_ops
833 )
834 double_spends = [
835 cr for cr in result.conflict_records
836 if cr.conflict_type == "double_spend"
837 ]
838 assert len(double_spends) == 1
839 assert "deadbeef:0" in double_spends[0].addresses[0]
840
841 def test_non_utxo_delete_not_flagged_as_double_spend(self) -> None:
842 addr = "channels/channels.json::850000x1x0"
843 ours_ops: list[DomainOp] = [
844 DeleteOp(op="delete", address=addr, position=None,
845 content_id="a" * 64, content_summary="channel closed")
846 ]
847 theirs_ops: list[DomainOp] = [
848 DeleteOp(op="delete", address=addr, position=None,
849 content_id="a" * 64, content_summary="channel closed")
850 ]
851 result = self.plugin.merge_ops(
852 self.base, self.snap, self.snap, ours_ops, theirs_ops
853 )
854 double_spends = [
855 cr for cr in result.conflict_records
856 if cr.conflict_type == "double_spend"
857 ]
858 assert len(double_spends) == 0
859
860
861 # ---------------------------------------------------------------------------
862 # CRDTPlugin
863 # ---------------------------------------------------------------------------
864
865
866 class TestCRDT:
867 def setup_method(self) -> None:
868 self.plugin = BitcoinPlugin()
869 self.base_snap = SnapshotManifest(
870 files={"wallet/utxos.json": "a" * 64},
871 domain="bitcoin",
872 )
873
874 def test_to_crdt_state_preserves_files(self) -> None:
875 crdt = self.plugin.to_crdt_state(self.base_snap)
876 assert crdt["domain"] == "bitcoin"
877 assert crdt["schema_version"] == __version__
878 assert "wallet/utxos.json" in crdt["files"]
879
880 def test_from_crdt_state_returns_plain_snapshot(self) -> None:
881 crdt = self.plugin.to_crdt_state(self.base_snap)
882 snap = self.plugin.from_crdt_state(crdt)
883 assert snap["domain"] == "bitcoin"
884 assert "wallet/utxos.json" in snap["files"]
885
886 def test_join_idempotent(self) -> None:
887 crdt = self.plugin.to_crdt_state(self.base_snap)
888 joined = self.plugin.join(crdt, crdt)
889 result = self.plugin.from_crdt_state(joined)
890 original = self.plugin.from_crdt_state(crdt)
891 assert result["files"] == original["files"]
892
893 def test_join_commutative(self) -> None:
894 snap_a = SnapshotManifest(
895 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
896 )
897 snap_b = SnapshotManifest(
898 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
899 )
900 a = self.plugin.to_crdt_state(snap_a)
901 b = self.plugin.to_crdt_state(snap_b)
902 ab = self.plugin.from_crdt_state(self.plugin.join(a, b))
903 ba = self.plugin.from_crdt_state(self.plugin.join(b, a))
904 assert ab["files"] == ba["files"]
905
906 def test_join_associative(self) -> None:
907 snap_a = SnapshotManifest(files={"f1": "a" * 64}, domain="bitcoin")
908 snap_b = SnapshotManifest(files={"f2": "b" * 64}, domain="bitcoin")
909 snap_c = SnapshotManifest(files={"f3": "c" * 64}, domain="bitcoin")
910 a = self.plugin.to_crdt_state(snap_a)
911 b = self.plugin.to_crdt_state(snap_b)
912 c = self.plugin.to_crdt_state(snap_c)
913 ab_c = self.plugin.from_crdt_state(
914 self.plugin.join(self.plugin.join(a, b), c)
915 )
916 a_bc = self.plugin.from_crdt_state(
917 self.plugin.join(a, self.plugin.join(b, c))
918 )
919 assert ab_c["files"] == a_bc["files"]
920
921 def test_join_preserves_both_agents_files(self) -> None:
922 snap_a = SnapshotManifest(
923 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
924 )
925 snap_b = SnapshotManifest(
926 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
927 )
928 a = self.plugin.to_crdt_state(snap_a)
929 b = self.plugin.to_crdt_state(snap_b)
930 joined = self.plugin.from_crdt_state(self.plugin.join(a, b))
931 assert "wallet/utxos.json" in joined["files"]
932 assert "channels/channels.json" in joined["files"]
933
934 def test_crdt_schema_has_seven_dimensions(self) -> None:
935 dims = self.plugin.crdt_schema()
936 assert len(dims) == 7
937
938 def test_crdt_schema_dimension_types(self) -> None:
939 dims = self.plugin.crdt_schema()
940 types = {d["crdt_type"] for d in dims}
941 assert "aw_map" in types
942 assert "or_set" in types
943
944 def test_vector_clock_advances_on_join(self) -> None:
945 snap_a = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
946 snap_b = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
947 a = self.plugin.to_crdt_state(snap_a)
948 b = self.plugin.to_crdt_state(snap_b)
949 joined = self.plugin.join(a, b)
950 assert isinstance(joined["vclock"], dict)
951
952
953 # ---------------------------------------------------------------------------
954 # Query analytics
955 # ---------------------------------------------------------------------------
956
957
958 class TestQueryAnalytics:
959 def test_total_balance_sat(self) -> None:
960 utxos = [
961 _make_utxo(amount_sat=100_000),
962 _make_utxo(txid="b" * 64, amount_sat=200_000),
963 ]
964 assert total_balance_sat(utxos) == 300_000
965
966 def test_confirmed_balance_excludes_unconfirmed(self) -> None:
967 utxos = [
968 _make_utxo(amount_sat=100_000, confirmations=6),
969 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
970 ]
971 assert confirmed_balance_sat(utxos) == 100_000
972
973 def test_confirmed_balance_excludes_immature_coinbase(self) -> None:
974 utxos = [
975 _make_utxo(amount_sat=625_000_000, confirmations=50, coinbase=True),
976 _make_utxo(txid="c" * 64, amount_sat=100_000, confirmations=1),
977 ]
978 assert confirmed_balance_sat(utxos) == 100_000
979
980 def test_balance_by_script_type(self) -> None:
981 utxos = [
982 _make_utxo(amount_sat=100_000, script_type="p2wpkh"),
983 _make_utxo(txid="b" * 64, amount_sat=200_000, script_type="p2tr"),
984 _make_utxo(txid="c" * 64, amount_sat=50_000, script_type="p2wpkh"),
985 ]
986 breakdown = balance_by_script_type(utxos)
987 assert breakdown["p2wpkh"] == 150_000
988 assert breakdown["p2tr"] == 200_000
989
990 def test_coin_age_blocks(self) -> None:
991 u = _make_utxo(block_height=800_000)
992 assert coin_age_blocks(u, 850_000) == 50_000
993
994 def test_coin_age_blocks_unconfirmed(self) -> None:
995 u = _make_utxo(block_height=None)
996 assert coin_age_blocks(u, 850_000) is None
997
998 def test_format_sat_small(self) -> None:
999 assert format_sat(1_000) == "1,000 sats"
1000
1001 def test_format_sat_large(self) -> None:
1002 result = format_sat(100_000_000)
1003 assert "1.00000000 BTC" in result
1004
1005 def test_utxo_key(self) -> None:
1006 u = _make_utxo(txid="abc", vout=3)
1007 assert utxo_key(u) == "abc:3"
1008
1009 def test_double_spend_candidates_detects_concurrent_spends(self) -> None:
1010 base = {"aaa:0", "bbb:1", "ccc:2"}
1011 our_spent = {"aaa:0", "bbb:1"}
1012 their_spent = {"aaa:0", "ccc:2"}
1013 candidates = double_spend_candidates(base, our_spent, their_spent)
1014 assert candidates == ["aaa:0"]
1015
1016 def test_double_spend_candidates_no_overlap(self) -> None:
1017 base = {"aaa:0", "bbb:1"}
1018 our_spent = {"aaa:0"}
1019 their_spent = {"bbb:1"}
1020 assert double_spend_candidates(base, our_spent, their_spent) == []
1021
1022 def test_channel_liquidity_totals(self) -> None:
1023 ch1 = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=500_000)
1024 ch2 = _make_channel(
1025 channel_id="x", local_balance_sat=200_000, remote_balance_sat=800_000
1026 )
1027 local, remote = channel_liquidity_totals([ch1, ch2])
1028 assert local == 1_200_000
1029 assert remote == 1_300_000
1030
1031 def test_channel_utilization(self) -> None:
1032 ch = _make_channel(
1033 capacity_sat=2_000_000,
1034 local_balance_sat=800_000,
1035 local_reserve_sat=20_000,
1036 remote_reserve_sat=20_000,
1037 )
1038 util = channel_utilization(ch)
1039 assert 0.0 <= util <= 1.0
1040
1041 def test_fee_surface_str(self) -> None:
1042 est = _make_fee(t1=42, t6=15, t144=3)
1043 s = fee_surface_str(est)
1044 assert "42" in s
1045 assert "15" in s
1046 assert "3" in s
1047 assert "sat/vbyte" in s
1048
1049 def test_latest_fee_estimate(self) -> None:
1050 old = _make_fee(timestamp=1_000)
1051 new = _make_fee(timestamp=2_000, t1=50)
1052 result = latest_fee_estimate([old, new])
1053 assert result is not None
1054 assert result["timestamp"] == 2_000
1055
1056 def test_latest_fee_estimate_empty(self) -> None:
1057 assert latest_fee_estimate([]) is None
1058
1059 def test_latest_price(self) -> None:
1060 p1 = _make_price(timestamp=1_000, price_usd=50_000.0)
1061 p2 = _make_price(timestamp=2_000, price_usd=70_000.0)
1062 assert latest_price([p1, p2]) == 70_000.0
1063
1064 def test_strategy_summary_line_simulation_mode(self) -> None:
1065 s = _make_strategy(simulation_mode=True, name="aggressive")
1066 line = strategy_summary_line(s)
1067 assert "SIM" in line
1068 assert "aggressive" in line
1069
1070 def test_utxo_summary_line(self) -> None:
1071 utxos = [
1072 _make_utxo(amount_sat=100_000, confirmations=6),
1073 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
1074 ]
1075 line = utxo_summary_line(utxos)
1076 assert "2 UTXOs" in line
1077
1078 def test_mempool_summary_empty(self) -> None:
1079 line = mempool_summary_line([])
1080 assert "empty" in line
1081
1082
1083 # ---------------------------------------------------------------------------
1084 # Handler routing
1085 # ---------------------------------------------------------------------------
1086
1087
1088 class TestHandlerRouting:
1089 def test_utxos_json_routed(self) -> None:
1090 assert _handler_for_path("wallet/utxos.json") == "utxos"
1091
1092 def test_channels_json_routed(self) -> None:
1093 assert _handler_for_path("channels/channels.json") == "channels"
1094
1095 def test_agent_json_routed(self) -> None:
1096 assert _handler_for_path("strategy/agent.json") == "strategy"
1097
1098 def test_prices_json_routed(self) -> None:
1099 assert _handler_for_path("oracles/prices.json") == "prices"
1100
1101 def test_unknown_file_returns_none(self) -> None:
1102 assert _handler_for_path("README.md") is None
1103 assert _handler_for_path("custom/data.bin") is None
1104
1105
1106 # ---------------------------------------------------------------------------
1107 # Registry registration
1108 # ---------------------------------------------------------------------------
1109
1110
1111 class TestRegistry:
1112 def test_bitcoin_registered_in_plugin_registry(self) -> None:
1113 from muse.plugins.registry import registered_domains
1114 assert "bitcoin" in registered_domains()
1115
1116 def test_resolve_bitcoin_plugin(self) -> None:
1117 import json as _json
1118 from muse.plugins.registry import resolve_plugin
1119
1120 with tempfile.TemporaryDirectory() as tmp:
1121 root = pathlib.Path(tmp)
1122 (root / ".muse").mkdir()
1123 (root / ".muse" / "repo.json").write_text(
1124 _json.dumps({"domain": "bitcoin"})
1125 )
1126 plugin = resolve_plugin(root)
1127
1128 assert isinstance(plugin, BitcoinPlugin)