gabriel / muse public
test_bitcoin_plugin.py python
1127 lines 40.0 KB
838d4a3e feat(omzsh-plugin): strip to minimal, secure shell integration Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Comprehensive test suite for the Bitcoin domain plugin.
2
3 Tests cover all three protocol levels:
4 - Core MuseDomainPlugin (snapshot, diff, merge, drift, apply, schema)
5 - StructuredMergePlugin (OT merge with double-spend detection)
6 - CRDTPlugin (convergent join — commutativity, associativity, idempotency)
7
8 Tests are organized by capability, each asserting exact behaviour
9 with realistic Bitcoin fixtures.
10 """
11
12 from __future__ import annotations
13
14 import hashlib
15 import json
16 import pathlib
17 import tempfile
18
19 import pytest
20
21 from muse.domain import (
22 CRDTPlugin,
23 DeleteOp,
24 DomainOp,
25 InsertOp,
26 MuseDomainPlugin,
27 MutateOp,
28 PatchOp,
29 ReplaceOp,
30 SnapshotManifest,
31 StructuredMergePlugin,
32 )
33 from muse.plugins.bitcoin._query import (
34 balance_by_script_type,
35 channel_liquidity_totals,
36 channel_utilization,
37 coin_age_blocks,
38 confirmed_balance_sat,
39 double_spend_candidates,
40 fee_surface_str,
41 format_sat,
42 latest_fee_estimate,
43 latest_price,
44 mempool_summary_line,
45 strategy_summary_line,
46 total_balance_sat,
47 utxo_key,
48 utxo_summary_line,
49 )
50 from muse.plugins.bitcoin._types import (
51 AddressLabelRecord,
52 AgentStrategyRecord,
53 CoinCategory,
54 CoinSelectAlgo,
55 FeeEstimateRecord,
56 LightningChannelRecord,
57 OraclePriceTickRecord,
58 PendingTxRecord,
59 RoutingPolicyRecord,
60 ScriptType,
61 UTXORecord,
62 )
63 from muse.plugins.bitcoin.plugin import (
64 BitcoinPlugin,
65 _diff_channels,
66 _diff_labels,
67 _diff_strategy,
68 _diff_time_series,
69 _diff_transactions,
70 _diff_utxos,
71 _handler_for_path,
72 )
73
74 # ---------------------------------------------------------------------------
75 # Fixtures
76 # ---------------------------------------------------------------------------
77
78
79 def _make_utxo(
80 txid: str = "abc" * 21 + "ab",
81 vout: int = 0,
82 amount_sat: int = 100_000,
83 script_type: ScriptType = "p2wpkh",
84 address: str = "bc1qtest",
85 confirmations: int = 6,
86 block_height: int | None = 850_000,
87 coinbase: bool = False,
88 label: str | None = None,
89 ) -> UTXORecord:
90 return UTXORecord(
91 txid=txid,
92 vout=vout,
93 amount_sat=amount_sat,
94 script_type=script_type,
95 address=address,
96 confirmations=confirmations,
97 block_height=block_height,
98 coinbase=coinbase,
99 label=label,
100 )
101
102
103 def _make_channel(
104 channel_id: str = "850000x1x0",
105 peer_pubkey: str = "0279" + "aa" * 32,
106 peer_alias: str | None = "ACINQ",
107 capacity_sat: int = 2_000_000,
108 local_balance_sat: int = 1_000_000,
109 remote_balance_sat: int = 900_000,
110 is_active: bool = True,
111 is_public: bool = True,
112 local_reserve_sat: int = 20_000,
113 remote_reserve_sat: int = 20_000,
114 unsettled_balance_sat: int = 0,
115 htlc_count: int = 0,
116 ) -> LightningChannelRecord:
117 return LightningChannelRecord(
118 channel_id=channel_id,
119 peer_pubkey=peer_pubkey,
120 peer_alias=peer_alias,
121 capacity_sat=capacity_sat,
122 local_balance_sat=local_balance_sat,
123 remote_balance_sat=remote_balance_sat,
124 is_active=is_active,
125 is_public=is_public,
126 local_reserve_sat=local_reserve_sat,
127 remote_reserve_sat=remote_reserve_sat,
128 unsettled_balance_sat=unsettled_balance_sat,
129 htlc_count=htlc_count,
130 )
131
132
133 def _make_label(
134 address: str = "bc1qtest",
135 label: str = "cold storage",
136 category: CoinCategory = "income",
137 created_at: int = 1_700_000_000,
138 ) -> AddressLabelRecord:
139 return AddressLabelRecord(
140 address=address,
141 label=label,
142 category=category,
143 created_at=created_at,
144 )
145
146
147 def _make_strategy(
148 name: str = "conservative",
149 max_fee_rate_sat_vbyte: int = 10,
150 min_confirmations: int = 6,
151 utxo_consolidation_threshold: int = 20,
152 dca_amount_sat: int | None = 500_000,
153 dca_interval_blocks: int | None = 144,
154 lightning_rebalance_threshold: float = 0.2,
155 coin_selection: CoinSelectAlgo = "branch_and_bound",
156 simulation_mode: bool = False,
157 ) -> AgentStrategyRecord:
158 return AgentStrategyRecord(
159 name=name,
160 max_fee_rate_sat_vbyte=max_fee_rate_sat_vbyte,
161 min_confirmations=min_confirmations,
162 utxo_consolidation_threshold=utxo_consolidation_threshold,
163 dca_amount_sat=dca_amount_sat,
164 dca_interval_blocks=dca_interval_blocks,
165 lightning_rebalance_threshold=lightning_rebalance_threshold,
166 coin_selection=coin_selection,
167 simulation_mode=simulation_mode,
168 )
169
170
171 def _make_price(
172 timestamp: int = 1_700_000_000,
173 block_height: int | None = 850_000,
174 price_usd: float = 62_000.0,
175 source: str = "coinbase",
176 ) -> OraclePriceTickRecord:
177 return OraclePriceTickRecord(
178 timestamp=timestamp,
179 block_height=block_height,
180 price_usd=price_usd,
181 source=source,
182 )
183
184
185 def _make_fee(
186 timestamp: int = 1_700_000_000,
187 block_height: int | None = 850_000,
188 t1: int = 30,
189 t6: int = 15,
190 t144: int = 3,
191 ) -> FeeEstimateRecord:
192 return FeeEstimateRecord(
193 timestamp=timestamp,
194 block_height=block_height,
195 target_1_block_sat_vbyte=t1,
196 target_6_block_sat_vbyte=t6,
197 target_144_block_sat_vbyte=t144,
198 )
199
200
201 _AnyBitcoinRecord = (
202 UTXORecord
203 | LightningChannelRecord
204 | AddressLabelRecord
205 | AgentStrategyRecord
206 | FeeEstimateRecord
207 | RoutingPolicyRecord
208 | PendingTxRecord
209 | OraclePriceTickRecord
210 )
211
212
213 def _json_bytes(obj: _AnyBitcoinRecord | list[_AnyBitcoinRecord]) -> bytes:
214 return json.dumps(obj, sort_keys=True).encode()
215
216
217 def _sha256(data: bytes) -> str:
218 return hashlib.sha256(data).hexdigest()
219
220
221 def _make_manifest(files: dict[str, bytes]) -> SnapshotManifest:
222 return SnapshotManifest(
223 files={path: _sha256(data) for path, data in files.items()},
224 domain="bitcoin",
225 )
226
227
228 # ---------------------------------------------------------------------------
229 # Protocol conformance
230 # ---------------------------------------------------------------------------
231
232
233 class TestProtocolConformance:
234 def test_satisfies_muse_domain_plugin(self) -> None:
235 assert isinstance(BitcoinPlugin(), MuseDomainPlugin)
236
237 def test_satisfies_structured_merge_plugin(self) -> None:
238 assert isinstance(BitcoinPlugin(), StructuredMergePlugin)
239
240 def test_satisfies_crdt_plugin(self) -> None:
241 assert isinstance(BitcoinPlugin(), CRDTPlugin)
242
243
244 # ---------------------------------------------------------------------------
245 # snapshot
246 # ---------------------------------------------------------------------------
247
248
249 class TestSnapshot:
250 def test_snapshot_from_path_hashes_all_json_files(self) -> None:
251 plugin = BitcoinPlugin()
252 with tempfile.TemporaryDirectory() as tmp:
253 root = pathlib.Path(tmp)
254 (root / "wallet").mkdir()
255 (root / "wallet" / "utxos.json").write_bytes(b"[]")
256 (root / "wallet" / "labels.json").write_bytes(b"[]")
257
258 snap = plugin.snapshot(root)
259
260 assert snap["domain"] == "bitcoin"
261 assert "wallet/utxos.json" in snap["files"]
262 assert "wallet/labels.json" in snap["files"]
263
264 def test_snapshot_excludes_hidden_files(self) -> None:
265 plugin = BitcoinPlugin()
266 with tempfile.TemporaryDirectory() as tmp:
267 root = pathlib.Path(tmp)
268 (root / "wallet").mkdir()
269 (root / "wallet" / "utxos.json").write_bytes(b"[]")
270 (root / ".secret").write_bytes(b"private key would be here")
271
272 snap = plugin.snapshot(root)
273
274 assert ".secret" not in snap["files"]
275 assert "wallet/utxos.json" in snap["files"]
276
277 def test_snapshot_passthrough_for_existing_manifest(self) -> None:
278 plugin = BitcoinPlugin()
279 manifest = SnapshotManifest(
280 files={"wallet/utxos.json": "abc" * 21 + "ab"},
281 domain="bitcoin",
282 )
283 result = plugin.snapshot(manifest)
284 assert result is manifest
285
286 def test_snapshot_content_hash_is_sha256(self) -> None:
287 plugin = BitcoinPlugin()
288 with tempfile.TemporaryDirectory() as tmp:
289 root = pathlib.Path(tmp)
290 (root / "wallet").mkdir()
291 content = b"[]\n"
292 (root / "wallet" / "utxos.json").write_bytes(content)
293
294 snap = plugin.snapshot(root)
295
296 expected_hash = _sha256(content)
297 assert snap["files"]["wallet/utxos.json"] == expected_hash
298
299 def test_snapshot_deterministic(self) -> None:
300 plugin = BitcoinPlugin()
301 with tempfile.TemporaryDirectory() as tmp:
302 root = pathlib.Path(tmp)
303 (root / "wallet").mkdir()
304 (root / "wallet" / "utxos.json").write_bytes(b"[]")
305
306 snap1 = plugin.snapshot(root)
307 snap2 = plugin.snapshot(root)
308
309 assert snap1["files"] == snap2["files"]
310
311
312 # ---------------------------------------------------------------------------
313 # diff — file-level
314 # ---------------------------------------------------------------------------
315
316
317 class TestDiffFileLevel:
318 def setup_method(self) -> None:
319 self.plugin = BitcoinPlugin()
320
321 def test_added_file_produces_insert_op(self) -> None:
322 base = SnapshotManifest(files={}, domain="bitcoin")
323 target = SnapshotManifest(
324 files={"wallet/utxos.json": "a" * 64},
325 domain="bitcoin",
326 )
327 delta = self.plugin.diff(base, target)
328 assert len(delta["ops"]) == 1
329 assert delta["ops"][0]["op"] == "insert"
330 assert delta["ops"][0]["address"] == "wallet/utxos.json"
331
332 def test_removed_file_produces_delete_op(self) -> None:
333 base = SnapshotManifest(
334 files={"wallet/utxos.json": "a" * 64},
335 domain="bitcoin",
336 )
337 target = SnapshotManifest(files={}, domain="bitcoin")
338 delta = self.plugin.diff(base, target)
339 assert delta["ops"][0]["op"] == "delete"
340
341 def test_unchanged_file_produces_no_ops(self) -> None:
342 files = {"wallet/utxos.json": "a" * 64}
343 base = SnapshotManifest(files=files, domain="bitcoin")
344 target = SnapshotManifest(files=files, domain="bitcoin")
345 delta = self.plugin.diff(base, target)
346 assert len(delta["ops"]) == 0
347 assert "clean" in delta["summary"]
348
349 def test_modified_file_without_repo_root_produces_replace_op(self) -> None:
350 base = SnapshotManifest(
351 files={"wallet/utxos.json": "a" * 64},
352 domain="bitcoin",
353 )
354 target = SnapshotManifest(
355 files={"wallet/utxos.json": "b" * 64},
356 domain="bitcoin",
357 )
358 delta = self.plugin.diff(base, target)
359 assert delta["ops"][0]["op"] == "replace"
360
361 def test_summary_reflects_op_counts(self) -> None:
362 base = SnapshotManifest(
363 files={"wallet/utxos.json": "a" * 64, "wallet/labels.json": "b" * 64},
364 domain="bitcoin",
365 )
366 target = SnapshotManifest(
367 files={"wallet/utxos.json": "a" * 64, "strategy/agent.json": "c" * 64},
368 domain="bitcoin",
369 )
370 delta = self.plugin.diff(base, target)
371 assert "added" in delta["summary"]
372 assert "removed" in delta["summary"]
373
374
375 # ---------------------------------------------------------------------------
376 # diff — semantic UTXO level
377 # ---------------------------------------------------------------------------
378
379
380 class TestDiffUTXOs:
381 def test_new_utxo_produces_insert_op(self) -> None:
382 u = _make_utxo(txid="a" * 64, vout=0, amount_sat=500_000)
383 old = _json_bytes([])
384 new = _json_bytes([u])
385 ops = _diff_utxos("wallet/utxos.json", old, new)
386 assert len(ops) == 1
387 assert ops[0]["op"] == "insert"
388 assert "a" * 64 + ":0" in ops[0]["address"]
389 assert "0.00500000 BTC" in ops[0]["content_summary"]
390
391 def test_spent_utxo_produces_delete_op(self) -> None:
392 u = _make_utxo(txid="b" * 64, vout=1, amount_sat=200_000)
393 old = _json_bytes([u])
394 new = _json_bytes([])
395 ops = _diff_utxos("wallet/utxos.json", old, new)
396 assert len(ops) == 1
397 assert ops[0]["op"] == "delete"
398 assert "spent" in ops[0]["content_summary"]
399
400 def test_confirmation_update_produces_mutate_op(self) -> None:
401 txid = "c" * 64
402 u_old = _make_utxo(txid=txid, vout=0, confirmations=1)
403 u_new = _make_utxo(txid=txid, vout=0, confirmations=6)
404 old = _json_bytes([u_old])
405 new = _json_bytes([u_new])
406 ops = _diff_utxos("wallet/utxos.json", old, new)
407 assert len(ops) == 1
408 assert ops[0]["op"] == "mutate"
409 assert ops[0]["fields"]["confirmations"]["old"] == "1"
410 assert ops[0]["fields"]["confirmations"]["new"] == "6"
411
412 def test_unchanged_utxos_produce_no_ops(self) -> None:
413 u = _make_utxo()
414 data = _json_bytes([u])
415 ops = _diff_utxos("wallet/utxos.json", data, data)
416 assert ops == []
417
418 def test_multiple_utxos_diff(self) -> None:
419 u1 = _make_utxo(txid="d" * 64, vout=0, amount_sat=100_000)
420 u2 = _make_utxo(txid="e" * 64, vout=0, amount_sat=200_000)
421 u3 = _make_utxo(txid="f" * 64, vout=0, amount_sat=300_000)
422 old = _json_bytes([u1, u2])
423 new = _json_bytes([u2, u3])
424 ops = _diff_utxos("wallet/utxos.json", old, new)
425 op_types = {op["op"] for op in ops}
426 assert "insert" in op_types
427 assert "delete" in op_types
428
429
430 # ---------------------------------------------------------------------------
431 # diff — semantic channel level
432 # ---------------------------------------------------------------------------
433
434
435 class TestDiffChannels:
436 def test_new_channel_produces_insert_op(self) -> None:
437 ch = _make_channel()
438 ops = _diff_channels("channels/channels.json", _json_bytes([]), _json_bytes([ch]))
439 assert len(ops) == 1
440 assert ops[0]["op"] == "insert"
441 assert "opened" in ops[0]["content_summary"]
442
443 def test_closed_channel_produces_delete_op(self) -> None:
444 ch = _make_channel()
445 ops = _diff_channels("channels/channels.json", _json_bytes([ch]), _json_bytes([]))
446 assert len(ops) == 1
447 assert ops[0]["op"] == "delete"
448
449 def test_balance_change_produces_mutate_op(self) -> None:
450 ch_old = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=900_000)
451 ch_new = _make_channel(local_balance_sat=800_000, remote_balance_sat=1_100_000)
452 ops = _diff_channels(
453 "channels/channels.json",
454 _json_bytes([ch_old]),
455 _json_bytes([ch_new]),
456 )
457 assert len(ops) == 1
458 assert ops[0]["op"] == "mutate"
459 fields = ops[0]["fields"]
460 assert "local_balance_sat" in fields
461 assert "remote_balance_sat" in fields
462
463
464 # ---------------------------------------------------------------------------
465 # diff — strategy field level
466 # ---------------------------------------------------------------------------
467
468
469 class TestDiffStrategy:
470 def test_fee_rate_change_produces_mutate_op(self) -> None:
471 old_strat = _make_strategy(max_fee_rate_sat_vbyte=10)
472 new_strat = _make_strategy(max_fee_rate_sat_vbyte=50)
473 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_strat), _json_bytes(new_strat))
474 assert len(ops) == 1
475 assert ops[0]["op"] == "mutate"
476 assert "max_fee_rate_sat_vbyte" in ops[0]["fields"]
477
478 def test_simulation_mode_toggle_detected(self) -> None:
479 old_s = _make_strategy(simulation_mode=False)
480 new_s = _make_strategy(simulation_mode=True)
481 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_s), _json_bytes(new_s))
482 assert ops[0]["op"] == "mutate"
483 assert ops[0]["fields"]["simulation_mode"]["old"] == "False"
484 assert ops[0]["fields"]["simulation_mode"]["new"] == "True"
485
486 def test_identical_strategy_no_ops(self) -> None:
487 s = _make_strategy()
488 data = _json_bytes(s)
489 ops = _diff_strategy("strategy/agent.json", data, data)
490 assert ops == []
491
492
493 # ---------------------------------------------------------------------------
494 # diff — time series (prices, fees)
495 # ---------------------------------------------------------------------------
496
497
498 class TestDiffTimeSeries:
499 def test_new_price_tick_produces_insert(self) -> None:
500 tick = _make_price(timestamp=1_000, price_usd=60_000.0)
501 ops = _diff_time_series(
502 "oracles/prices.json", _json_bytes([]), _json_bytes([tick]), "prices"
503 )
504 assert len(ops) == 1
505 assert ops[0]["op"] == "insert"
506 assert "$60,000.00" in ops[0]["content_summary"]
507
508 def test_new_fee_estimate_produces_insert(self) -> None:
509 fee = _make_fee(timestamp=2_000, t1=25, t6=12, t144=2)
510 ops = _diff_time_series(
511 "oracles/fees.json", _json_bytes([]), _json_bytes([fee]), "fees"
512 )
513 assert len(ops) == 1
514 assert ops[0]["op"] == "insert"
515 assert "25" in ops[0]["content_summary"]
516
517 def test_existing_tick_not_duplicated(self) -> None:
518 tick = _make_price(timestamp=3_000)
519 data = _json_bytes([tick])
520 ops = _diff_time_series("oracles/prices.json", data, data, "prices")
521 assert ops == []
522
523
524 # ---------------------------------------------------------------------------
525 # diff with repo_root — produces PatchOp
526 # ---------------------------------------------------------------------------
527
528
529 class TestDiffWithRepoRoot:
530 def test_modified_utxos_json_produces_patch_op(self) -> None:
531 plugin = BitcoinPlugin()
532 u_old = _make_utxo(txid="a" * 64, vout=0, amount_sat=100_000)
533 u_new = _make_utxo(txid="b" * 64, vout=0, amount_sat=200_000)
534 old_bytes = _json_bytes([u_old])
535 new_bytes = _json_bytes([u_new])
536 old_hash = _sha256(old_bytes)
537 new_hash = _sha256(new_bytes)
538
539 with tempfile.TemporaryDirectory() as tmp:
540 root = pathlib.Path(tmp)
541 muse_dir = root / ".muse" / "objects"
542 muse_dir.mkdir(parents=True)
543 # Write blobs in sharded layout
544 (muse_dir / old_hash[:2]).mkdir(exist_ok=True)
545 (muse_dir / old_hash[:2] / old_hash[2:]).write_bytes(old_bytes)
546 (muse_dir / new_hash[:2]).mkdir(exist_ok=True)
547 (muse_dir / new_hash[:2] / new_hash[2:]).write_bytes(new_bytes)
548
549 base = SnapshotManifest(
550 files={"wallet/utxos.json": old_hash}, domain="bitcoin"
551 )
552 target = SnapshotManifest(
553 files={"wallet/utxos.json": new_hash}, domain="bitcoin"
554 )
555 delta = plugin.diff(base, target, repo_root=root)
556
557 assert delta["ops"][0]["op"] == "patch"
558 patch = delta["ops"][0]
559 assert patch["child_domain"] == "bitcoin.utxos"
560 child_ops = patch["child_ops"]
561 op_types = {op["op"] for op in child_ops}
562 assert "insert" in op_types
563 assert "delete" in op_types
564
565 def test_fallback_to_replace_when_object_missing(self) -> None:
566 plugin = BitcoinPlugin()
567 base = SnapshotManifest(
568 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
569 )
570 target = SnapshotManifest(
571 files={"wallet/utxos.json": "b" * 64}, domain="bitcoin"
572 )
573 with tempfile.TemporaryDirectory() as tmp:
574 root = pathlib.Path(tmp)
575 (root / ".muse" / "objects").mkdir(parents=True)
576 delta = plugin.diff(base, target, repo_root=root)
577
578 assert delta["ops"][0]["op"] == "replace"
579
580
581 # ---------------------------------------------------------------------------
582 # merge
583 # ---------------------------------------------------------------------------
584
585
586 class TestMerge:
587 def setup_method(self) -> None:
588 self.plugin = BitcoinPlugin()
589
590 def test_clean_merge_no_conflicts(self) -> None:
591 base = SnapshotManifest(files={}, domain="bitcoin")
592 left = SnapshotManifest(
593 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
594 )
595 right = SnapshotManifest(
596 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
597 )
598 result = self.plugin.merge(base, left, right)
599 assert result.is_clean
600 assert "wallet/utxos.json" in result.merged["files"]
601 assert "channels/channels.json" in result.merged["files"]
602
603 def test_both_sides_unchanged_no_conflict(self) -> None:
604 files = {"wallet/utxos.json": "a" * 64}
605 base = SnapshotManifest(files=files, domain="bitcoin")
606 left = SnapshotManifest(files=files, domain="bitcoin")
607 right = SnapshotManifest(files=files, domain="bitcoin")
608 result = self.plugin.merge(base, left, right)
609 assert result.is_clean
610
611 def test_only_left_changed_takes_left(self) -> None:
612 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
613 left = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
614 right = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
615 result = self.plugin.merge(base, left, right)
616 assert result.is_clean
617 assert result.merged["files"]["f"] == "b" * 64
618
619 def test_only_right_changed_takes_right(self) -> None:
620 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
621 left = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
622 right = SnapshotManifest(files={"f": "c" * 64}, domain="bitcoin")
623 result = self.plugin.merge(base, left, right)
624 assert result.is_clean
625 assert result.merged["files"]["f"] == "c" * 64
626
627 def test_both_changed_differently_produces_conflict(self) -> None:
628 base = SnapshotManifest(files={"wallet/utxos.json": "a" * 64}, domain="bitcoin")
629 left = SnapshotManifest(files={"wallet/utxos.json": "b" * 64}, domain="bitcoin")
630 right = SnapshotManifest(files={"wallet/utxos.json": "c" * 64}, domain="bitcoin")
631 result = self.plugin.merge(base, left, right)
632 assert not result.is_clean
633 assert "wallet/utxos.json" in result.conflicts
634
635 def test_double_spend_detection_in_merge(self) -> None:
636 u_base = _make_utxo(txid="d" * 64, vout=0, amount_sat=1_000_000)
637 # Left spent the base UTXO and got change at a different address
638 u_left_change = _make_utxo(txid="e" * 64, vout=0, amount_sat=900_000, address="bc1qleft")
639 # Right spent the SAME base UTXO in a different tx → double-spend
640 u_right_change = _make_utxo(txid="f" * 64, vout=0, amount_sat=850_000, address="bc1qright")
641 base_bytes = _json_bytes([u_base])
642 left_bytes = _json_bytes([u_left_change]) # left spent base UTXO, got change
643 right_bytes = _json_bytes([u_right_change]) # right also spent it → double-spend!
644
645 with tempfile.TemporaryDirectory() as tmp:
646 root = pathlib.Path(tmp)
647 muse_dir = root / ".muse" / "objects"
648 muse_dir.mkdir(parents=True)
649
650 for data in (base_bytes, left_bytes, right_bytes):
651 h = _sha256(data)
652 shard = muse_dir / h[:2]
653 shard.mkdir(exist_ok=True)
654 (shard / h[2:]).write_bytes(data)
655
656 base = SnapshotManifest(
657 files={"wallet/utxos.json": _sha256(base_bytes)}, domain="bitcoin"
658 )
659 left = SnapshotManifest(
660 files={"wallet/utxos.json": _sha256(left_bytes)}, domain="bitcoin"
661 )
662 right = SnapshotManifest(
663 files={"wallet/utxos.json": _sha256(right_bytes)}, domain="bitcoin"
664 )
665 result = self.plugin.merge(base, left, right, repo_root=root)
666
667 # Both sides deleted the same UTXO from the same base → double-spend
668 double_spend_records = [
669 cr for cr in result.conflict_records
670 if cr.conflict_type == "double_spend"
671 ]
672 assert len(double_spend_records) >= 1
673
674 def test_both_deleted_same_file_is_clean(self) -> None:
675 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
676 left = SnapshotManifest(files={}, domain="bitcoin")
677 right = SnapshotManifest(files={}, domain="bitcoin")
678 result = self.plugin.merge(base, left, right)
679 assert result.is_clean
680 assert "f" not in result.merged["files"]
681
682
683 # ---------------------------------------------------------------------------
684 # drift
685 # ---------------------------------------------------------------------------
686
687
688 class TestDrift:
689 def test_no_drift_when_identical(self) -> None:
690 plugin = BitcoinPlugin()
691 snap = SnapshotManifest(
692 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
693 )
694 report = plugin.drift(snap, snap)
695 assert not report.has_drift
696
697 def test_drift_detected_when_file_added(self) -> None:
698 plugin = BitcoinPlugin()
699 committed = SnapshotManifest(files={}, domain="bitcoin")
700 live = SnapshotManifest(
701 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
702 )
703 report = plugin.drift(committed, live)
704 assert report.has_drift
705 assert "added" in report.summary
706
707 def test_drift_from_disk_detected(self) -> None:
708 plugin = BitcoinPlugin()
709 with tempfile.TemporaryDirectory() as tmp:
710 root = pathlib.Path(tmp)
711 (root / "wallet").mkdir()
712 (root / "wallet" / "utxos.json").write_bytes(b"[]")
713
714 committed = SnapshotManifest(files={}, domain="bitcoin")
715 report = plugin.drift(committed, root)
716
717 assert report.has_drift
718
719
720 # ---------------------------------------------------------------------------
721 # apply
722 # ---------------------------------------------------------------------------
723
724
725 class TestApply:
726 def test_apply_is_passthrough(self) -> None:
727 plugin = BitcoinPlugin()
728 delta = plugin.diff(
729 SnapshotManifest(files={}, domain="bitcoin"),
730 SnapshotManifest(files={}, domain="bitcoin"),
731 )
732 snap = SnapshotManifest(files={}, domain="bitcoin")
733 result = plugin.apply(delta, snap)
734 assert result is snap
735
736
737 # ---------------------------------------------------------------------------
738 # schema
739 # ---------------------------------------------------------------------------
740
741
742 class TestSchema:
743 def test_schema_returns_domain_schema(self) -> None:
744 from muse.core.schema import DomainSchema
745 s = BitcoinPlugin().schema()
746 assert isinstance(s, dict)
747 assert s["domain"] == "bitcoin"
748 assert s["schema_version"] == 1
749
750 def test_schema_has_ten_dimensions(self) -> None:
751 s = BitcoinPlugin().schema()
752 assert len(s["dimensions"]) == 10
753
754 def test_schema_merge_mode_is_crdt(self) -> None:
755 s = BitcoinPlugin().schema()
756 assert s["merge_mode"] == "crdt"
757
758 def test_dimension_names(self) -> None:
759 s = BitcoinPlugin().schema()
760 names = {d["name"] for d in s["dimensions"]}
761 expected = {
762 "utxos", "transactions", "labels", "descriptors",
763 "channels", "strategy", "oracle_prices", "oracle_fees",
764 "network", "execution",
765 }
766 assert names == expected
767
768 def test_strategy_dimension_is_not_independent(self) -> None:
769 s = BitcoinPlugin().schema()
770 strat = next(d for d in s["dimensions"] if d["name"] == "strategy")
771 assert strat["independent_merge"] is False
772
773
774 # ---------------------------------------------------------------------------
775 # OT merge — StructuredMergePlugin
776 # ---------------------------------------------------------------------------
777
778
779 class TestMergeOps:
780 def setup_method(self) -> None:
781 self.plugin = BitcoinPlugin()
782 self.base = SnapshotManifest(files={}, domain="bitcoin")
783 self.snap = SnapshotManifest(files={}, domain="bitcoin")
784
785 def test_non_conflicting_ops_clean_merge(self) -> None:
786 ours_ops: list[DomainOp] = [
787 InsertOp(
788 op="insert",
789 address="wallet/utxos.json::aaa:0",
790 position=None,
791 content_id="a" * 64,
792 content_summary="received UTXO aaa:0",
793 )
794 ]
795 theirs_ops: list[DomainOp] = [
796 InsertOp(
797 op="insert",
798 address="wallet/utxos.json::bbb:0",
799 position=None,
800 content_id="b" * 64,
801 content_summary="received UTXO bbb:0",
802 )
803 ]
804 result = self.plugin.merge_ops(
805 self.base, self.snap, self.snap, ours_ops, theirs_ops
806 )
807 # Different addresses → no double-spend, clean merge
808 assert len(result.conflict_records) == 0
809
810 def test_double_spend_detected_in_merge_ops(self) -> None:
811 utxo_addr = "wallet/utxos.json::deadbeef:0"
812 ours_ops: list[DomainOp] = [
813 DeleteOp(
814 op="delete",
815 address=utxo_addr,
816 position=None,
817 content_id="a" * 64,
818 content_summary="spent UTXO deadbeef:0",
819 )
820 ]
821 theirs_ops: list[DomainOp] = [
822 DeleteOp(
823 op="delete",
824 address=utxo_addr,
825 position=None,
826 content_id="a" * 64,
827 content_summary="spent UTXO deadbeef:0",
828 )
829 ]
830 result = self.plugin.merge_ops(
831 self.base, self.snap, self.snap, ours_ops, theirs_ops
832 )
833 double_spends = [
834 cr for cr in result.conflict_records
835 if cr.conflict_type == "double_spend"
836 ]
837 assert len(double_spends) == 1
838 assert "deadbeef:0" in double_spends[0].addresses[0]
839
840 def test_non_utxo_delete_not_flagged_as_double_spend(self) -> None:
841 addr = "channels/channels.json::850000x1x0"
842 ours_ops: list[DomainOp] = [
843 DeleteOp(op="delete", address=addr, position=None,
844 content_id="a" * 64, content_summary="channel closed")
845 ]
846 theirs_ops: list[DomainOp] = [
847 DeleteOp(op="delete", address=addr, position=None,
848 content_id="a" * 64, content_summary="channel closed")
849 ]
850 result = self.plugin.merge_ops(
851 self.base, self.snap, self.snap, ours_ops, theirs_ops
852 )
853 double_spends = [
854 cr for cr in result.conflict_records
855 if cr.conflict_type == "double_spend"
856 ]
857 assert len(double_spends) == 0
858
859
860 # ---------------------------------------------------------------------------
861 # CRDTPlugin
862 # ---------------------------------------------------------------------------
863
864
865 class TestCRDT:
866 def setup_method(self) -> None:
867 self.plugin = BitcoinPlugin()
868 self.base_snap = SnapshotManifest(
869 files={"wallet/utxos.json": "a" * 64},
870 domain="bitcoin",
871 )
872
873 def test_to_crdt_state_preserves_files(self) -> None:
874 crdt = self.plugin.to_crdt_state(self.base_snap)
875 assert crdt["domain"] == "bitcoin"
876 assert crdt["schema_version"] == 1
877 assert "wallet/utxos.json" in crdt["files"]
878
879 def test_from_crdt_state_returns_plain_snapshot(self) -> None:
880 crdt = self.plugin.to_crdt_state(self.base_snap)
881 snap = self.plugin.from_crdt_state(crdt)
882 assert snap["domain"] == "bitcoin"
883 assert "wallet/utxos.json" in snap["files"]
884
885 def test_join_idempotent(self) -> None:
886 crdt = self.plugin.to_crdt_state(self.base_snap)
887 joined = self.plugin.join(crdt, crdt)
888 result = self.plugin.from_crdt_state(joined)
889 original = self.plugin.from_crdt_state(crdt)
890 assert result["files"] == original["files"]
891
892 def test_join_commutative(self) -> None:
893 snap_a = SnapshotManifest(
894 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
895 )
896 snap_b = SnapshotManifest(
897 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
898 )
899 a = self.plugin.to_crdt_state(snap_a)
900 b = self.plugin.to_crdt_state(snap_b)
901 ab = self.plugin.from_crdt_state(self.plugin.join(a, b))
902 ba = self.plugin.from_crdt_state(self.plugin.join(b, a))
903 assert ab["files"] == ba["files"]
904
905 def test_join_associative(self) -> None:
906 snap_a = SnapshotManifest(files={"f1": "a" * 64}, domain="bitcoin")
907 snap_b = SnapshotManifest(files={"f2": "b" * 64}, domain="bitcoin")
908 snap_c = SnapshotManifest(files={"f3": "c" * 64}, domain="bitcoin")
909 a = self.plugin.to_crdt_state(snap_a)
910 b = self.plugin.to_crdt_state(snap_b)
911 c = self.plugin.to_crdt_state(snap_c)
912 ab_c = self.plugin.from_crdt_state(
913 self.plugin.join(self.plugin.join(a, b), c)
914 )
915 a_bc = self.plugin.from_crdt_state(
916 self.plugin.join(a, self.plugin.join(b, c))
917 )
918 assert ab_c["files"] == a_bc["files"]
919
920 def test_join_preserves_both_agents_files(self) -> None:
921 snap_a = SnapshotManifest(
922 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
923 )
924 snap_b = SnapshotManifest(
925 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
926 )
927 a = self.plugin.to_crdt_state(snap_a)
928 b = self.plugin.to_crdt_state(snap_b)
929 joined = self.plugin.from_crdt_state(self.plugin.join(a, b))
930 assert "wallet/utxos.json" in joined["files"]
931 assert "channels/channels.json" in joined["files"]
932
933 def test_crdt_schema_has_seven_dimensions(self) -> None:
934 dims = self.plugin.crdt_schema()
935 assert len(dims) == 7
936
937 def test_crdt_schema_dimension_types(self) -> None:
938 dims = self.plugin.crdt_schema()
939 types = {d["crdt_type"] for d in dims}
940 assert "aw_map" in types
941 assert "or_set" in types
942
943 def test_vector_clock_advances_on_join(self) -> None:
944 snap_a = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
945 snap_b = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
946 a = self.plugin.to_crdt_state(snap_a)
947 b = self.plugin.to_crdt_state(snap_b)
948 joined = self.plugin.join(a, b)
949 assert isinstance(joined["vclock"], dict)
950
951
952 # ---------------------------------------------------------------------------
953 # Query analytics
954 # ---------------------------------------------------------------------------
955
956
957 class TestQueryAnalytics:
958 def test_total_balance_sat(self) -> None:
959 utxos = [
960 _make_utxo(amount_sat=100_000),
961 _make_utxo(txid="b" * 64, amount_sat=200_000),
962 ]
963 assert total_balance_sat(utxos) == 300_000
964
965 def test_confirmed_balance_excludes_unconfirmed(self) -> None:
966 utxos = [
967 _make_utxo(amount_sat=100_000, confirmations=6),
968 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
969 ]
970 assert confirmed_balance_sat(utxos) == 100_000
971
972 def test_confirmed_balance_excludes_immature_coinbase(self) -> None:
973 utxos = [
974 _make_utxo(amount_sat=625_000_000, confirmations=50, coinbase=True),
975 _make_utxo(txid="c" * 64, amount_sat=100_000, confirmations=1),
976 ]
977 assert confirmed_balance_sat(utxos) == 100_000
978
979 def test_balance_by_script_type(self) -> None:
980 utxos = [
981 _make_utxo(amount_sat=100_000, script_type="p2wpkh"),
982 _make_utxo(txid="b" * 64, amount_sat=200_000, script_type="p2tr"),
983 _make_utxo(txid="c" * 64, amount_sat=50_000, script_type="p2wpkh"),
984 ]
985 breakdown = balance_by_script_type(utxos)
986 assert breakdown["p2wpkh"] == 150_000
987 assert breakdown["p2tr"] == 200_000
988
989 def test_coin_age_blocks(self) -> None:
990 u = _make_utxo(block_height=800_000)
991 assert coin_age_blocks(u, 850_000) == 50_000
992
993 def test_coin_age_blocks_unconfirmed(self) -> None:
994 u = _make_utxo(block_height=None)
995 assert coin_age_blocks(u, 850_000) is None
996
997 def test_format_sat_small(self) -> None:
998 assert format_sat(1_000) == "1,000 sats"
999
1000 def test_format_sat_large(self) -> None:
1001 result = format_sat(100_000_000)
1002 assert "1.00000000 BTC" in result
1003
1004 def test_utxo_key(self) -> None:
1005 u = _make_utxo(txid="abc", vout=3)
1006 assert utxo_key(u) == "abc:3"
1007
1008 def test_double_spend_candidates_detects_concurrent_spends(self) -> None:
1009 base = {"aaa:0", "bbb:1", "ccc:2"}
1010 our_spent = {"aaa:0", "bbb:1"}
1011 their_spent = {"aaa:0", "ccc:2"}
1012 candidates = double_spend_candidates(base, our_spent, their_spent)
1013 assert candidates == ["aaa:0"]
1014
1015 def test_double_spend_candidates_no_overlap(self) -> None:
1016 base = {"aaa:0", "bbb:1"}
1017 our_spent = {"aaa:0"}
1018 their_spent = {"bbb:1"}
1019 assert double_spend_candidates(base, our_spent, their_spent) == []
1020
1021 def test_channel_liquidity_totals(self) -> None:
1022 ch1 = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=500_000)
1023 ch2 = _make_channel(
1024 channel_id="x", local_balance_sat=200_000, remote_balance_sat=800_000
1025 )
1026 local, remote = channel_liquidity_totals([ch1, ch2])
1027 assert local == 1_200_000
1028 assert remote == 1_300_000
1029
1030 def test_channel_utilization(self) -> None:
1031 ch = _make_channel(
1032 capacity_sat=2_000_000,
1033 local_balance_sat=800_000,
1034 local_reserve_sat=20_000,
1035 remote_reserve_sat=20_000,
1036 )
1037 util = channel_utilization(ch)
1038 assert 0.0 <= util <= 1.0
1039
1040 def test_fee_surface_str(self) -> None:
1041 est = _make_fee(t1=42, t6=15, t144=3)
1042 s = fee_surface_str(est)
1043 assert "42" in s
1044 assert "15" in s
1045 assert "3" in s
1046 assert "sat/vbyte" in s
1047
1048 def test_latest_fee_estimate(self) -> None:
1049 old = _make_fee(timestamp=1_000)
1050 new = _make_fee(timestamp=2_000, t1=50)
1051 result = latest_fee_estimate([old, new])
1052 assert result is not None
1053 assert result["timestamp"] == 2_000
1054
1055 def test_latest_fee_estimate_empty(self) -> None:
1056 assert latest_fee_estimate([]) is None
1057
1058 def test_latest_price(self) -> None:
1059 p1 = _make_price(timestamp=1_000, price_usd=50_000.0)
1060 p2 = _make_price(timestamp=2_000, price_usd=70_000.0)
1061 assert latest_price([p1, p2]) == 70_000.0
1062
1063 def test_strategy_summary_line_simulation_mode(self) -> None:
1064 s = _make_strategy(simulation_mode=True, name="aggressive")
1065 line = strategy_summary_line(s)
1066 assert "SIM" in line
1067 assert "aggressive" in line
1068
1069 def test_utxo_summary_line(self) -> None:
1070 utxos = [
1071 _make_utxo(amount_sat=100_000, confirmations=6),
1072 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
1073 ]
1074 line = utxo_summary_line(utxos)
1075 assert "2 UTXOs" in line
1076
1077 def test_mempool_summary_empty(self) -> None:
1078 line = mempool_summary_line([])
1079 assert "empty" in line
1080
1081
1082 # ---------------------------------------------------------------------------
1083 # Handler routing
1084 # ---------------------------------------------------------------------------
1085
1086
1087 class TestHandlerRouting:
1088 def test_utxos_json_routed(self) -> None:
1089 assert _handler_for_path("wallet/utxos.json") == "utxos"
1090
1091 def test_channels_json_routed(self) -> None:
1092 assert _handler_for_path("channels/channels.json") == "channels"
1093
1094 def test_agent_json_routed(self) -> None:
1095 assert _handler_for_path("strategy/agent.json") == "strategy"
1096
1097 def test_prices_json_routed(self) -> None:
1098 assert _handler_for_path("oracles/prices.json") == "prices"
1099
1100 def test_unknown_file_returns_none(self) -> None:
1101 assert _handler_for_path("README.md") is None
1102 assert _handler_for_path("custom/data.bin") is None
1103
1104
1105 # ---------------------------------------------------------------------------
1106 # Registry registration
1107 # ---------------------------------------------------------------------------
1108
1109
1110 class TestRegistry:
1111 def test_bitcoin_registered_in_plugin_registry(self) -> None:
1112 from muse.plugins.registry import registered_domains
1113 assert "bitcoin" in registered_domains()
1114
1115 def test_resolve_bitcoin_plugin(self) -> None:
1116 import json as _json
1117 from muse.plugins.registry import resolve_plugin
1118
1119 with tempfile.TemporaryDirectory() as tmp:
1120 root = pathlib.Path(tmp)
1121 (root / ".muse").mkdir()
1122 (root / ".muse" / "repo.json").write_text(
1123 _json.dumps({"domain": "bitcoin"})
1124 )
1125 plugin = resolve_plugin(root)
1126
1127 assert isinstance(plugin, BitcoinPlugin)