gabriel / muse public
test_bitcoin_plugin.py python
1124 lines 39.8 KB
9ef121d1 feat: add Oh My ZSH plugin for Muse (all six phases) Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Comprehensive test suite for the Bitcoin domain plugin.
2
3 Tests cover all three protocol levels:
4 - Core MuseDomainPlugin (snapshot, diff, merge, drift, apply, schema)
5 - StructuredMergePlugin (OT merge with double-spend detection)
6 - CRDTPlugin (convergent join — commutativity, associativity, idempotency)
7
8 Tests are organized by capability, each asserting exact behaviour
9 with realistic Bitcoin fixtures.
10 """
11
12 from __future__ import annotations
13
14 import hashlib
15 import json
16 import pathlib
17 import tempfile
18
19 import pytest
20
21 from muse.domain import (
22 CRDTPlugin,
23 DeleteOp,
24 InsertOp,
25 MuseDomainPlugin,
26 MutateOp,
27 PatchOp,
28 ReplaceOp,
29 SnapshotManifest,
30 StructuredMergePlugin,
31 )
32 from muse.plugins.bitcoin._query import (
33 balance_by_script_type,
34 channel_liquidity_totals,
35 channel_utilization,
36 coin_age_blocks,
37 confirmed_balance_sat,
38 double_spend_candidates,
39 fee_surface_str,
40 format_sat,
41 latest_fee_estimate,
42 latest_price,
43 mempool_summary_line,
44 strategy_summary_line,
45 total_balance_sat,
46 utxo_key,
47 utxo_summary_line,
48 )
49 from muse.plugins.bitcoin._types import (
50 AddressLabelRecord,
51 AgentStrategyRecord,
52 CoinCategory,
53 CoinSelectAlgo,
54 FeeEstimateRecord,
55 LightningChannelRecord,
56 OraclePriceTickRecord,
57 PendingTxRecord,
58 RoutingPolicyRecord,
59 ScriptType,
60 UTXORecord,
61 )
62 from muse.plugins.bitcoin.plugin import (
63 BitcoinPlugin,
64 _diff_channels,
65 _diff_labels,
66 _diff_strategy,
67 _diff_time_series,
68 _diff_transactions,
69 _diff_utxos,
70 _handler_for_path,
71 )
72
73 # ---------------------------------------------------------------------------
74 # Fixtures
75 # ---------------------------------------------------------------------------
76
77
78 def _make_utxo(
79 txid: str = "abc" * 21 + "ab",
80 vout: int = 0,
81 amount_sat: int = 100_000,
82 script_type: ScriptType = "p2wpkh",
83 address: str = "bc1qtest",
84 confirmations: int = 6,
85 block_height: int | None = 850_000,
86 coinbase: bool = False,
87 label: str | None = None,
88 ) -> UTXORecord:
89 return UTXORecord(
90 txid=txid,
91 vout=vout,
92 amount_sat=amount_sat,
93 script_type=script_type,
94 address=address,
95 confirmations=confirmations,
96 block_height=block_height,
97 coinbase=coinbase,
98 label=label,
99 )
100
101
102 def _make_channel(
103 channel_id: str = "850000x1x0",
104 peer_pubkey: str = "0279" + "aa" * 32,
105 peer_alias: str | None = "ACINQ",
106 capacity_sat: int = 2_000_000,
107 local_balance_sat: int = 1_000_000,
108 remote_balance_sat: int = 900_000,
109 is_active: bool = True,
110 is_public: bool = True,
111 local_reserve_sat: int = 20_000,
112 remote_reserve_sat: int = 20_000,
113 unsettled_balance_sat: int = 0,
114 htlc_count: int = 0,
115 ) -> LightningChannelRecord:
116 return LightningChannelRecord(
117 channel_id=channel_id,
118 peer_pubkey=peer_pubkey,
119 peer_alias=peer_alias,
120 capacity_sat=capacity_sat,
121 local_balance_sat=local_balance_sat,
122 remote_balance_sat=remote_balance_sat,
123 is_active=is_active,
124 is_public=is_public,
125 local_reserve_sat=local_reserve_sat,
126 remote_reserve_sat=remote_reserve_sat,
127 unsettled_balance_sat=unsettled_balance_sat,
128 htlc_count=htlc_count,
129 )
130
131
132 def _make_label(
133 address: str = "bc1qtest",
134 label: str = "cold storage",
135 category: CoinCategory = "income",
136 created_at: int = 1_700_000_000,
137 ) -> AddressLabelRecord:
138 return AddressLabelRecord(
139 address=address,
140 label=label,
141 category=category,
142 created_at=created_at,
143 )
144
145
146 def _make_strategy(
147 name: str = "conservative",
148 max_fee_rate_sat_vbyte: int = 10,
149 min_confirmations: int = 6,
150 utxo_consolidation_threshold: int = 20,
151 dca_amount_sat: int | None = 500_000,
152 dca_interval_blocks: int | None = 144,
153 lightning_rebalance_threshold: float = 0.2,
154 coin_selection: CoinSelectAlgo = "branch_and_bound",
155 simulation_mode: bool = False,
156 ) -> AgentStrategyRecord:
157 return AgentStrategyRecord(
158 name=name,
159 max_fee_rate_sat_vbyte=max_fee_rate_sat_vbyte,
160 min_confirmations=min_confirmations,
161 utxo_consolidation_threshold=utxo_consolidation_threshold,
162 dca_amount_sat=dca_amount_sat,
163 dca_interval_blocks=dca_interval_blocks,
164 lightning_rebalance_threshold=lightning_rebalance_threshold,
165 coin_selection=coin_selection,
166 simulation_mode=simulation_mode,
167 )
168
169
170 def _make_price(
171 timestamp: int = 1_700_000_000,
172 block_height: int | None = 850_000,
173 price_usd: float = 62_000.0,
174 source: str = "coinbase",
175 ) -> OraclePriceTickRecord:
176 return OraclePriceTickRecord(
177 timestamp=timestamp,
178 block_height=block_height,
179 price_usd=price_usd,
180 source=source,
181 )
182
183
184 def _make_fee(
185 timestamp: int = 1_700_000_000,
186 block_height: int | None = 850_000,
187 t1: int = 30,
188 t6: int = 15,
189 t144: int = 3,
190 ) -> FeeEstimateRecord:
191 return FeeEstimateRecord(
192 timestamp=timestamp,
193 block_height=block_height,
194 target_1_block_sat_vbyte=t1,
195 target_6_block_sat_vbyte=t6,
196 target_144_block_sat_vbyte=t144,
197 )
198
199
200 _AnyBitcoinRecord = (
201 UTXORecord
202 | LightningChannelRecord
203 | AddressLabelRecord
204 | AgentStrategyRecord
205 | FeeEstimateRecord
206 | RoutingPolicyRecord
207 | PendingTxRecord
208 | OraclePriceTickRecord
209 )
210
211
212 def _json_bytes(obj: _AnyBitcoinRecord | list[_AnyBitcoinRecord]) -> bytes:
213 return json.dumps(obj, sort_keys=True).encode()
214
215
216 def _sha256(data: bytes) -> str:
217 return hashlib.sha256(data).hexdigest()
218
219
220 def _make_manifest(files: dict[str, bytes]) -> SnapshotManifest:
221 return SnapshotManifest(
222 files={path: _sha256(data) for path, data in files.items()},
223 domain="bitcoin",
224 )
225
226
227 # ---------------------------------------------------------------------------
228 # Protocol conformance
229 # ---------------------------------------------------------------------------
230
231
232 class TestProtocolConformance:
233 def test_satisfies_muse_domain_plugin(self) -> None:
234 assert isinstance(BitcoinPlugin(), MuseDomainPlugin)
235
236 def test_satisfies_structured_merge_plugin(self) -> None:
237 assert isinstance(BitcoinPlugin(), StructuredMergePlugin)
238
239 def test_satisfies_crdt_plugin(self) -> None:
240 assert isinstance(BitcoinPlugin(), CRDTPlugin)
241
242
243 # ---------------------------------------------------------------------------
244 # snapshot
245 # ---------------------------------------------------------------------------
246
247
248 class TestSnapshot:
249 def test_snapshot_from_path_hashes_all_json_files(self) -> None:
250 plugin = BitcoinPlugin()
251 with tempfile.TemporaryDirectory() as tmp:
252 root = pathlib.Path(tmp)
253 (root / "wallet").mkdir()
254 (root / "wallet" / "utxos.json").write_bytes(b"[]")
255 (root / "wallet" / "labels.json").write_bytes(b"[]")
256
257 snap = plugin.snapshot(root)
258
259 assert snap["domain"] == "bitcoin"
260 assert "wallet/utxos.json" in snap["files"]
261 assert "wallet/labels.json" in snap["files"]
262
263 def test_snapshot_excludes_hidden_files(self) -> None:
264 plugin = BitcoinPlugin()
265 with tempfile.TemporaryDirectory() as tmp:
266 root = pathlib.Path(tmp)
267 (root / "wallet").mkdir()
268 (root / "wallet" / "utxos.json").write_bytes(b"[]")
269 (root / ".secret").write_bytes(b"private key would be here")
270
271 snap = plugin.snapshot(root)
272
273 assert ".secret" not in snap["files"]
274 assert "wallet/utxos.json" in snap["files"]
275
276 def test_snapshot_passthrough_for_existing_manifest(self) -> None:
277 plugin = BitcoinPlugin()
278 manifest = SnapshotManifest(
279 files={"wallet/utxos.json": "abc" * 21 + "ab"},
280 domain="bitcoin",
281 )
282 result = plugin.snapshot(manifest)
283 assert result is manifest
284
285 def test_snapshot_content_hash_is_sha256(self) -> None:
286 plugin = BitcoinPlugin()
287 with tempfile.TemporaryDirectory() as tmp:
288 root = pathlib.Path(tmp)
289 (root / "wallet").mkdir()
290 content = b"[]\n"
291 (root / "wallet" / "utxos.json").write_bytes(content)
292
293 snap = plugin.snapshot(root)
294
295 expected_hash = _sha256(content)
296 assert snap["files"]["wallet/utxos.json"] == expected_hash
297
298 def test_snapshot_deterministic(self) -> None:
299 plugin = BitcoinPlugin()
300 with tempfile.TemporaryDirectory() as tmp:
301 root = pathlib.Path(tmp)
302 (root / "wallet").mkdir()
303 (root / "wallet" / "utxos.json").write_bytes(b"[]")
304
305 snap1 = plugin.snapshot(root)
306 snap2 = plugin.snapshot(root)
307
308 assert snap1["files"] == snap2["files"]
309
310
311 # ---------------------------------------------------------------------------
312 # diff — file-level
313 # ---------------------------------------------------------------------------
314
315
316 class TestDiffFileLevel:
317 def setup_method(self) -> None:
318 self.plugin = BitcoinPlugin()
319
320 def test_added_file_produces_insert_op(self) -> None:
321 base = SnapshotManifest(files={}, domain="bitcoin")
322 target = SnapshotManifest(
323 files={"wallet/utxos.json": "a" * 64},
324 domain="bitcoin",
325 )
326 delta = self.plugin.diff(base, target)
327 assert len(delta["ops"]) == 1
328 assert delta["ops"][0]["op"] == "insert"
329 assert delta["ops"][0]["address"] == "wallet/utxos.json"
330
331 def test_removed_file_produces_delete_op(self) -> None:
332 base = SnapshotManifest(
333 files={"wallet/utxos.json": "a" * 64},
334 domain="bitcoin",
335 )
336 target = SnapshotManifest(files={}, domain="bitcoin")
337 delta = self.plugin.diff(base, target)
338 assert delta["ops"][0]["op"] == "delete"
339
340 def test_unchanged_file_produces_no_ops(self) -> None:
341 files = {"wallet/utxos.json": "a" * 64}
342 base = SnapshotManifest(files=files, domain="bitcoin")
343 target = SnapshotManifest(files=files, domain="bitcoin")
344 delta = self.plugin.diff(base, target)
345 assert len(delta["ops"]) == 0
346 assert "clean" in delta["summary"]
347
348 def test_modified_file_without_repo_root_produces_replace_op(self) -> None:
349 base = SnapshotManifest(
350 files={"wallet/utxos.json": "a" * 64},
351 domain="bitcoin",
352 )
353 target = SnapshotManifest(
354 files={"wallet/utxos.json": "b" * 64},
355 domain="bitcoin",
356 )
357 delta = self.plugin.diff(base, target)
358 assert delta["ops"][0]["op"] == "replace"
359
360 def test_summary_reflects_op_counts(self) -> None:
361 base = SnapshotManifest(
362 files={"wallet/utxos.json": "a" * 64, "wallet/labels.json": "b" * 64},
363 domain="bitcoin",
364 )
365 target = SnapshotManifest(
366 files={"wallet/utxos.json": "a" * 64, "strategy/agent.json": "c" * 64},
367 domain="bitcoin",
368 )
369 delta = self.plugin.diff(base, target)
370 assert "added" in delta["summary"]
371 assert "removed" in delta["summary"]
372
373
374 # ---------------------------------------------------------------------------
375 # diff — semantic UTXO level
376 # ---------------------------------------------------------------------------
377
378
379 class TestDiffUTXOs:
380 def test_new_utxo_produces_insert_op(self) -> None:
381 u = _make_utxo(txid="a" * 64, vout=0, amount_sat=500_000)
382 old = _json_bytes([])
383 new = _json_bytes([u])
384 ops = _diff_utxos("wallet/utxos.json", old, new)
385 assert len(ops) == 1
386 assert ops[0]["op"] == "insert"
387 assert "a" * 64 + ":0" in ops[0]["address"]
388 assert "0.00500000 BTC" in ops[0]["content_summary"]
389
390 def test_spent_utxo_produces_delete_op(self) -> None:
391 u = _make_utxo(txid="b" * 64, vout=1, amount_sat=200_000)
392 old = _json_bytes([u])
393 new = _json_bytes([])
394 ops = _diff_utxos("wallet/utxos.json", old, new)
395 assert len(ops) == 1
396 assert ops[0]["op"] == "delete"
397 assert "spent" in ops[0]["content_summary"]
398
399 def test_confirmation_update_produces_mutate_op(self) -> None:
400 txid = "c" * 64
401 u_old = _make_utxo(txid=txid, vout=0, confirmations=1)
402 u_new = _make_utxo(txid=txid, vout=0, confirmations=6)
403 old = _json_bytes([u_old])
404 new = _json_bytes([u_new])
405 ops = _diff_utxos("wallet/utxos.json", old, new)
406 assert len(ops) == 1
407 assert ops[0]["op"] == "mutate"
408 assert ops[0]["fields"]["confirmations"]["old"] == "1"
409 assert ops[0]["fields"]["confirmations"]["new"] == "6"
410
411 def test_unchanged_utxos_produce_no_ops(self) -> None:
412 u = _make_utxo()
413 data = _json_bytes([u])
414 ops = _diff_utxos("wallet/utxos.json", data, data)
415 assert ops == []
416
417 def test_multiple_utxos_diff(self) -> None:
418 u1 = _make_utxo(txid="d" * 64, vout=0, amount_sat=100_000)
419 u2 = _make_utxo(txid="e" * 64, vout=0, amount_sat=200_000)
420 u3 = _make_utxo(txid="f" * 64, vout=0, amount_sat=300_000)
421 old = _json_bytes([u1, u2])
422 new = _json_bytes([u2, u3])
423 ops = _diff_utxos("wallet/utxos.json", old, new)
424 op_types = {op["op"] for op in ops}
425 assert "insert" in op_types
426 assert "delete" in op_types
427
428
429 # ---------------------------------------------------------------------------
430 # diff — semantic channel level
431 # ---------------------------------------------------------------------------
432
433
434 class TestDiffChannels:
435 def test_new_channel_produces_insert_op(self) -> None:
436 ch = _make_channel()
437 ops = _diff_channels("channels/channels.json", _json_bytes([]), _json_bytes([ch]))
438 assert len(ops) == 1
439 assert ops[0]["op"] == "insert"
440 assert "opened" in ops[0]["content_summary"]
441
442 def test_closed_channel_produces_delete_op(self) -> None:
443 ch = _make_channel()
444 ops = _diff_channels("channels/channels.json", _json_bytes([ch]), _json_bytes([]))
445 assert len(ops) == 1
446 assert ops[0]["op"] == "delete"
447
448 def test_balance_change_produces_mutate_op(self) -> None:
449 ch_old = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=900_000)
450 ch_new = _make_channel(local_balance_sat=800_000, remote_balance_sat=1_100_000)
451 ops = _diff_channels(
452 "channels/channels.json",
453 _json_bytes([ch_old]),
454 _json_bytes([ch_new]),
455 )
456 assert len(ops) == 1
457 assert ops[0]["op"] == "mutate"
458 fields = ops[0]["fields"]
459 assert "local_balance_sat" in fields
460 assert "remote_balance_sat" in fields
461
462
463 # ---------------------------------------------------------------------------
464 # diff — strategy field level
465 # ---------------------------------------------------------------------------
466
467
468 class TestDiffStrategy:
469 def test_fee_rate_change_produces_mutate_op(self) -> None:
470 old_strat = _make_strategy(max_fee_rate_sat_vbyte=10)
471 new_strat = _make_strategy(max_fee_rate_sat_vbyte=50)
472 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_strat), _json_bytes(new_strat))
473 assert len(ops) == 1
474 assert ops[0]["op"] == "mutate"
475 assert "max_fee_rate_sat_vbyte" in ops[0]["fields"]
476
477 def test_simulation_mode_toggle_detected(self) -> None:
478 old_s = _make_strategy(simulation_mode=False)
479 new_s = _make_strategy(simulation_mode=True)
480 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_s), _json_bytes(new_s))
481 assert ops[0]["fields"]["simulation_mode"]["old"] == "False"
482 assert ops[0]["fields"]["simulation_mode"]["new"] == "True"
483
484 def test_identical_strategy_no_ops(self) -> None:
485 s = _make_strategy()
486 data = _json_bytes(s)
487 ops = _diff_strategy("strategy/agent.json", data, data)
488 assert ops == []
489
490
491 # ---------------------------------------------------------------------------
492 # diff — time series (prices, fees)
493 # ---------------------------------------------------------------------------
494
495
496 class TestDiffTimeSeries:
497 def test_new_price_tick_produces_insert(self) -> None:
498 tick = _make_price(timestamp=1_000, price_usd=60_000.0)
499 ops = _diff_time_series(
500 "oracles/prices.json", _json_bytes([]), _json_bytes([tick]), "prices"
501 )
502 assert len(ops) == 1
503 assert ops[0]["op"] == "insert"
504 assert "$60,000.00" in ops[0]["content_summary"]
505
506 def test_new_fee_estimate_produces_insert(self) -> None:
507 fee = _make_fee(timestamp=2_000, t1=25, t6=12, t144=2)
508 ops = _diff_time_series(
509 "oracles/fees.json", _json_bytes([]), _json_bytes([fee]), "fees"
510 )
511 assert len(ops) == 1
512 assert "25" in ops[0]["content_summary"]
513
514 def test_existing_tick_not_duplicated(self) -> None:
515 tick = _make_price(timestamp=3_000)
516 data = _json_bytes([tick])
517 ops = _diff_time_series("oracles/prices.json", data, data, "prices")
518 assert ops == []
519
520
521 # ---------------------------------------------------------------------------
522 # diff with repo_root — produces PatchOp
523 # ---------------------------------------------------------------------------
524
525
526 class TestDiffWithRepoRoot:
527 def test_modified_utxos_json_produces_patch_op(self) -> None:
528 plugin = BitcoinPlugin()
529 u_old = _make_utxo(txid="a" * 64, vout=0, amount_sat=100_000)
530 u_new = _make_utxo(txid="b" * 64, vout=0, amount_sat=200_000)
531 old_bytes = _json_bytes([u_old])
532 new_bytes = _json_bytes([u_new])
533 old_hash = _sha256(old_bytes)
534 new_hash = _sha256(new_bytes)
535
536 with tempfile.TemporaryDirectory() as tmp:
537 root = pathlib.Path(tmp)
538 muse_dir = root / ".muse" / "objects"
539 muse_dir.mkdir(parents=True)
540 # Write blobs in sharded layout
541 (muse_dir / old_hash[:2]).mkdir(exist_ok=True)
542 (muse_dir / old_hash[:2] / old_hash[2:]).write_bytes(old_bytes)
543 (muse_dir / new_hash[:2]).mkdir(exist_ok=True)
544 (muse_dir / new_hash[:2] / new_hash[2:]).write_bytes(new_bytes)
545
546 base = SnapshotManifest(
547 files={"wallet/utxos.json": old_hash}, domain="bitcoin"
548 )
549 target = SnapshotManifest(
550 files={"wallet/utxos.json": new_hash}, domain="bitcoin"
551 )
552 delta = plugin.diff(base, target, repo_root=root)
553
554 assert delta["ops"][0]["op"] == "patch"
555 patch = delta["ops"][0]
556 assert patch["child_domain"] == "bitcoin.utxos"
557 child_ops = patch["child_ops"]
558 op_types = {op["op"] for op in child_ops}
559 assert "insert" in op_types
560 assert "delete" in op_types
561
562 def test_fallback_to_replace_when_object_missing(self) -> None:
563 plugin = BitcoinPlugin()
564 base = SnapshotManifest(
565 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
566 )
567 target = SnapshotManifest(
568 files={"wallet/utxos.json": "b" * 64}, domain="bitcoin"
569 )
570 with tempfile.TemporaryDirectory() as tmp:
571 root = pathlib.Path(tmp)
572 (root / ".muse" / "objects").mkdir(parents=True)
573 delta = plugin.diff(base, target, repo_root=root)
574
575 assert delta["ops"][0]["op"] == "replace"
576
577
578 # ---------------------------------------------------------------------------
579 # merge
580 # ---------------------------------------------------------------------------
581
582
583 class TestMerge:
584 def setup_method(self) -> None:
585 self.plugin = BitcoinPlugin()
586
587 def test_clean_merge_no_conflicts(self) -> None:
588 base = SnapshotManifest(files={}, domain="bitcoin")
589 left = SnapshotManifest(
590 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
591 )
592 right = SnapshotManifest(
593 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
594 )
595 result = self.plugin.merge(base, left, right)
596 assert result.is_clean
597 assert "wallet/utxos.json" in result.merged["files"]
598 assert "channels/channels.json" in result.merged["files"]
599
600 def test_both_sides_unchanged_no_conflict(self) -> None:
601 files = {"wallet/utxos.json": "a" * 64}
602 base = SnapshotManifest(files=files, domain="bitcoin")
603 left = SnapshotManifest(files=files, domain="bitcoin")
604 right = SnapshotManifest(files=files, domain="bitcoin")
605 result = self.plugin.merge(base, left, right)
606 assert result.is_clean
607
608 def test_only_left_changed_takes_left(self) -> None:
609 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
610 left = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
611 right = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
612 result = self.plugin.merge(base, left, right)
613 assert result.is_clean
614 assert result.merged["files"]["f"] == "b" * 64
615
616 def test_only_right_changed_takes_right(self) -> None:
617 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
618 left = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
619 right = SnapshotManifest(files={"f": "c" * 64}, domain="bitcoin")
620 result = self.plugin.merge(base, left, right)
621 assert result.is_clean
622 assert result.merged["files"]["f"] == "c" * 64
623
624 def test_both_changed_differently_produces_conflict(self) -> None:
625 base = SnapshotManifest(files={"wallet/utxos.json": "a" * 64}, domain="bitcoin")
626 left = SnapshotManifest(files={"wallet/utxos.json": "b" * 64}, domain="bitcoin")
627 right = SnapshotManifest(files={"wallet/utxos.json": "c" * 64}, domain="bitcoin")
628 result = self.plugin.merge(base, left, right)
629 assert not result.is_clean
630 assert "wallet/utxos.json" in result.conflicts
631
632 def test_double_spend_detection_in_merge(self) -> None:
633 u_base = _make_utxo(txid="d" * 64, vout=0, amount_sat=1_000_000)
634 # Left spent the base UTXO and got change at a different address
635 u_left_change = _make_utxo(txid="e" * 64, vout=0, amount_sat=900_000, address="bc1qleft")
636 # Right spent the SAME base UTXO in a different tx → double-spend
637 u_right_change = _make_utxo(txid="f" * 64, vout=0, amount_sat=850_000, address="bc1qright")
638 base_bytes = _json_bytes([u_base])
639 left_bytes = _json_bytes([u_left_change]) # left spent base UTXO, got change
640 right_bytes = _json_bytes([u_right_change]) # right also spent it → double-spend!
641
642 with tempfile.TemporaryDirectory() as tmp:
643 root = pathlib.Path(tmp)
644 muse_dir = root / ".muse" / "objects"
645 muse_dir.mkdir(parents=True)
646
647 for data in (base_bytes, left_bytes, right_bytes):
648 h = _sha256(data)
649 shard = muse_dir / h[:2]
650 shard.mkdir(exist_ok=True)
651 (shard / h[2:]).write_bytes(data)
652
653 base = SnapshotManifest(
654 files={"wallet/utxos.json": _sha256(base_bytes)}, domain="bitcoin"
655 )
656 left = SnapshotManifest(
657 files={"wallet/utxos.json": _sha256(left_bytes)}, domain="bitcoin"
658 )
659 right = SnapshotManifest(
660 files={"wallet/utxos.json": _sha256(right_bytes)}, domain="bitcoin"
661 )
662 result = self.plugin.merge(base, left, right, repo_root=root)
663
664 # Both sides deleted the same UTXO from the same base → double-spend
665 double_spend_records = [
666 cr for cr in result.conflict_records
667 if cr.conflict_type == "double_spend"
668 ]
669 assert len(double_spend_records) >= 1
670
671 def test_both_deleted_same_file_is_clean(self) -> None:
672 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
673 left = SnapshotManifest(files={}, domain="bitcoin")
674 right = SnapshotManifest(files={}, domain="bitcoin")
675 result = self.plugin.merge(base, left, right)
676 assert result.is_clean
677 assert "f" not in result.merged["files"]
678
679
680 # ---------------------------------------------------------------------------
681 # drift
682 # ---------------------------------------------------------------------------
683
684
685 class TestDrift:
686 def test_no_drift_when_identical(self) -> None:
687 plugin = BitcoinPlugin()
688 snap = SnapshotManifest(
689 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
690 )
691 report = plugin.drift(snap, snap)
692 assert not report.has_drift
693
694 def test_drift_detected_when_file_added(self) -> None:
695 plugin = BitcoinPlugin()
696 committed = SnapshotManifest(files={}, domain="bitcoin")
697 live = SnapshotManifest(
698 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
699 )
700 report = plugin.drift(committed, live)
701 assert report.has_drift
702 assert "added" in report.summary
703
704 def test_drift_from_disk_detected(self) -> None:
705 plugin = BitcoinPlugin()
706 with tempfile.TemporaryDirectory() as tmp:
707 root = pathlib.Path(tmp)
708 (root / "wallet").mkdir()
709 (root / "wallet" / "utxos.json").write_bytes(b"[]")
710
711 committed = SnapshotManifest(files={}, domain="bitcoin")
712 report = plugin.drift(committed, root)
713
714 assert report.has_drift
715
716
717 # ---------------------------------------------------------------------------
718 # apply
719 # ---------------------------------------------------------------------------
720
721
722 class TestApply:
723 def test_apply_is_passthrough(self) -> None:
724 plugin = BitcoinPlugin()
725 delta = plugin.diff(
726 SnapshotManifest(files={}, domain="bitcoin"),
727 SnapshotManifest(files={}, domain="bitcoin"),
728 )
729 snap = SnapshotManifest(files={}, domain="bitcoin")
730 result = plugin.apply(delta, snap)
731 assert result is snap
732
733
734 # ---------------------------------------------------------------------------
735 # schema
736 # ---------------------------------------------------------------------------
737
738
739 class TestSchema:
740 def test_schema_returns_domain_schema(self) -> None:
741 from muse.core.schema import DomainSchema
742 s = BitcoinPlugin().schema()
743 assert isinstance(s, dict)
744 assert s["domain"] == "bitcoin"
745 assert s["schema_version"] == 1
746
747 def test_schema_has_ten_dimensions(self) -> None:
748 s = BitcoinPlugin().schema()
749 assert len(s["dimensions"]) == 10
750
751 def test_schema_merge_mode_is_crdt(self) -> None:
752 s = BitcoinPlugin().schema()
753 assert s["merge_mode"] == "crdt"
754
755 def test_dimension_names(self) -> None:
756 s = BitcoinPlugin().schema()
757 names = {d["name"] for d in s["dimensions"]}
758 expected = {
759 "utxos", "transactions", "labels", "descriptors",
760 "channels", "strategy", "oracle_prices", "oracle_fees",
761 "network", "execution",
762 }
763 assert names == expected
764
765 def test_strategy_dimension_is_not_independent(self) -> None:
766 s = BitcoinPlugin().schema()
767 strat = next(d for d in s["dimensions"] if d["name"] == "strategy")
768 assert strat["independent_merge"] is False
769
770
771 # ---------------------------------------------------------------------------
772 # OT merge — StructuredMergePlugin
773 # ---------------------------------------------------------------------------
774
775
776 class TestMergeOps:
777 def setup_method(self) -> None:
778 self.plugin = BitcoinPlugin()
779 self.base = SnapshotManifest(files={}, domain="bitcoin")
780 self.snap = SnapshotManifest(files={}, domain="bitcoin")
781
782 def test_non_conflicting_ops_clean_merge(self) -> None:
783 ours_ops = [
784 InsertOp(
785 op="insert",
786 address="wallet/utxos.json::aaa:0",
787 position=None,
788 content_id="a" * 64,
789 content_summary="received UTXO aaa:0",
790 )
791 ]
792 theirs_ops = [
793 InsertOp(
794 op="insert",
795 address="wallet/utxos.json::bbb:0",
796 position=None,
797 content_id="b" * 64,
798 content_summary="received UTXO bbb:0",
799 )
800 ]
801 result = self.plugin.merge_ops(
802 self.base, self.snap, self.snap, ours_ops, theirs_ops
803 )
804 # Different addresses → no double-spend, clean merge
805 assert len(result.conflict_records) == 0
806
807 def test_double_spend_detected_in_merge_ops(self) -> None:
808 utxo_addr = "wallet/utxos.json::deadbeef:0"
809 ours_ops = [
810 DeleteOp(
811 op="delete",
812 address=utxo_addr,
813 position=None,
814 content_id="a" * 64,
815 content_summary="spent UTXO deadbeef:0",
816 )
817 ]
818 theirs_ops = [
819 DeleteOp(
820 op="delete",
821 address=utxo_addr,
822 position=None,
823 content_id="a" * 64,
824 content_summary="spent UTXO deadbeef:0",
825 )
826 ]
827 result = self.plugin.merge_ops(
828 self.base, self.snap, self.snap, ours_ops, theirs_ops
829 )
830 double_spends = [
831 cr for cr in result.conflict_records
832 if cr.conflict_type == "double_spend"
833 ]
834 assert len(double_spends) == 1
835 assert "deadbeef:0" in double_spends[0].addresses[0]
836
837 def test_non_utxo_delete_not_flagged_as_double_spend(self) -> None:
838 addr = "channels/channels.json::850000x1x0"
839 ours_ops = [
840 DeleteOp(op="delete", address=addr, position=None,
841 content_id="a" * 64, content_summary="channel closed")
842 ]
843 theirs_ops = [
844 DeleteOp(op="delete", address=addr, position=None,
845 content_id="a" * 64, content_summary="channel closed")
846 ]
847 result = self.plugin.merge_ops(
848 self.base, self.snap, self.snap, ours_ops, theirs_ops
849 )
850 double_spends = [
851 cr for cr in result.conflict_records
852 if cr.conflict_type == "double_spend"
853 ]
854 assert len(double_spends) == 0
855
856
857 # ---------------------------------------------------------------------------
858 # CRDTPlugin
859 # ---------------------------------------------------------------------------
860
861
862 class TestCRDT:
863 def setup_method(self) -> None:
864 self.plugin = BitcoinPlugin()
865 self.base_snap = SnapshotManifest(
866 files={"wallet/utxos.json": "a" * 64},
867 domain="bitcoin",
868 )
869
870 def test_to_crdt_state_preserves_files(self) -> None:
871 crdt = self.plugin.to_crdt_state(self.base_snap)
872 assert crdt["domain"] == "bitcoin"
873 assert crdt["schema_version"] == 1
874 assert "wallet/utxos.json" in crdt["files"]
875
876 def test_from_crdt_state_returns_plain_snapshot(self) -> None:
877 crdt = self.plugin.to_crdt_state(self.base_snap)
878 snap = self.plugin.from_crdt_state(crdt)
879 assert snap["domain"] == "bitcoin"
880 assert "wallet/utxos.json" in snap["files"]
881
882 def test_join_idempotent(self) -> None:
883 crdt = self.plugin.to_crdt_state(self.base_snap)
884 joined = self.plugin.join(crdt, crdt)
885 result = self.plugin.from_crdt_state(joined)
886 original = self.plugin.from_crdt_state(crdt)
887 assert result["files"] == original["files"]
888
889 def test_join_commutative(self) -> None:
890 snap_a = SnapshotManifest(
891 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
892 )
893 snap_b = SnapshotManifest(
894 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
895 )
896 a = self.plugin.to_crdt_state(snap_a)
897 b = self.plugin.to_crdt_state(snap_b)
898 ab = self.plugin.from_crdt_state(self.plugin.join(a, b))
899 ba = self.plugin.from_crdt_state(self.plugin.join(b, a))
900 assert ab["files"] == ba["files"]
901
902 def test_join_associative(self) -> None:
903 snap_a = SnapshotManifest(files={"f1": "a" * 64}, domain="bitcoin")
904 snap_b = SnapshotManifest(files={"f2": "b" * 64}, domain="bitcoin")
905 snap_c = SnapshotManifest(files={"f3": "c" * 64}, domain="bitcoin")
906 a = self.plugin.to_crdt_state(snap_a)
907 b = self.plugin.to_crdt_state(snap_b)
908 c = self.plugin.to_crdt_state(snap_c)
909 ab_c = self.plugin.from_crdt_state(
910 self.plugin.join(self.plugin.join(a, b), c)
911 )
912 a_bc = self.plugin.from_crdt_state(
913 self.plugin.join(a, self.plugin.join(b, c))
914 )
915 assert ab_c["files"] == a_bc["files"]
916
917 def test_join_preserves_both_agents_files(self) -> None:
918 snap_a = SnapshotManifest(
919 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
920 )
921 snap_b = SnapshotManifest(
922 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
923 )
924 a = self.plugin.to_crdt_state(snap_a)
925 b = self.plugin.to_crdt_state(snap_b)
926 joined = self.plugin.from_crdt_state(self.plugin.join(a, b))
927 assert "wallet/utxos.json" in joined["files"]
928 assert "channels/channels.json" in joined["files"]
929
930 def test_crdt_schema_has_seven_dimensions(self) -> None:
931 dims = self.plugin.crdt_schema()
932 assert len(dims) == 7
933
934 def test_crdt_schema_dimension_types(self) -> None:
935 dims = self.plugin.crdt_schema()
936 types = {d["crdt_type"] for d in dims}
937 assert "aw_map" in types
938 assert "or_set" in types
939
940 def test_vector_clock_advances_on_join(self) -> None:
941 snap_a = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
942 snap_b = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
943 a = self.plugin.to_crdt_state(snap_a)
944 b = self.plugin.to_crdt_state(snap_b)
945 joined = self.plugin.join(a, b)
946 assert isinstance(joined["vclock"], dict)
947
948
949 # ---------------------------------------------------------------------------
950 # Query analytics
951 # ---------------------------------------------------------------------------
952
953
954 class TestQueryAnalytics:
955 def test_total_balance_sat(self) -> None:
956 utxos = [
957 _make_utxo(amount_sat=100_000),
958 _make_utxo(txid="b" * 64, amount_sat=200_000),
959 ]
960 assert total_balance_sat(utxos) == 300_000
961
962 def test_confirmed_balance_excludes_unconfirmed(self) -> None:
963 utxos = [
964 _make_utxo(amount_sat=100_000, confirmations=6),
965 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
966 ]
967 assert confirmed_balance_sat(utxos) == 100_000
968
969 def test_confirmed_balance_excludes_immature_coinbase(self) -> None:
970 utxos = [
971 _make_utxo(amount_sat=625_000_000, confirmations=50, coinbase=True),
972 _make_utxo(txid="c" * 64, amount_sat=100_000, confirmations=1),
973 ]
974 assert confirmed_balance_sat(utxos) == 100_000
975
976 def test_balance_by_script_type(self) -> None:
977 utxos = [
978 _make_utxo(amount_sat=100_000, script_type="p2wpkh"),
979 _make_utxo(txid="b" * 64, amount_sat=200_000, script_type="p2tr"),
980 _make_utxo(txid="c" * 64, amount_sat=50_000, script_type="p2wpkh"),
981 ]
982 breakdown = balance_by_script_type(utxos)
983 assert breakdown["p2wpkh"] == 150_000
984 assert breakdown["p2tr"] == 200_000
985
986 def test_coin_age_blocks(self) -> None:
987 u = _make_utxo(block_height=800_000)
988 assert coin_age_blocks(u, 850_000) == 50_000
989
990 def test_coin_age_blocks_unconfirmed(self) -> None:
991 u = _make_utxo(block_height=None)
992 assert coin_age_blocks(u, 850_000) is None
993
994 def test_format_sat_small(self) -> None:
995 assert format_sat(1_000) == "1,000 sats"
996
997 def test_format_sat_large(self) -> None:
998 result = format_sat(100_000_000)
999 assert "1.00000000 BTC" in result
1000
1001 def test_utxo_key(self) -> None:
1002 u = _make_utxo(txid="abc", vout=3)
1003 assert utxo_key(u) == "abc:3"
1004
1005 def test_double_spend_candidates_detects_concurrent_spends(self) -> None:
1006 base = {"aaa:0", "bbb:1", "ccc:2"}
1007 our_spent = {"aaa:0", "bbb:1"}
1008 their_spent = {"aaa:0", "ccc:2"}
1009 candidates = double_spend_candidates(base, our_spent, their_spent)
1010 assert candidates == ["aaa:0"]
1011
1012 def test_double_spend_candidates_no_overlap(self) -> None:
1013 base = {"aaa:0", "bbb:1"}
1014 our_spent = {"aaa:0"}
1015 their_spent = {"bbb:1"}
1016 assert double_spend_candidates(base, our_spent, their_spent) == []
1017
1018 def test_channel_liquidity_totals(self) -> None:
1019 ch1 = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=500_000)
1020 ch2 = _make_channel(
1021 channel_id="x", local_balance_sat=200_000, remote_balance_sat=800_000
1022 )
1023 local, remote = channel_liquidity_totals([ch1, ch2])
1024 assert local == 1_200_000
1025 assert remote == 1_300_000
1026
1027 def test_channel_utilization(self) -> None:
1028 ch = _make_channel(
1029 capacity_sat=2_000_000,
1030 local_balance_sat=800_000,
1031 local_reserve_sat=20_000,
1032 remote_reserve_sat=20_000,
1033 )
1034 util = channel_utilization(ch)
1035 assert 0.0 <= util <= 1.0
1036
1037 def test_fee_surface_str(self) -> None:
1038 est = _make_fee(t1=42, t6=15, t144=3)
1039 s = fee_surface_str(est)
1040 assert "42" in s
1041 assert "15" in s
1042 assert "3" in s
1043 assert "sat/vbyte" in s
1044
1045 def test_latest_fee_estimate(self) -> None:
1046 old = _make_fee(timestamp=1_000)
1047 new = _make_fee(timestamp=2_000, t1=50)
1048 result = latest_fee_estimate([old, new])
1049 assert result is not None
1050 assert result["timestamp"] == 2_000
1051
1052 def test_latest_fee_estimate_empty(self) -> None:
1053 assert latest_fee_estimate([]) is None
1054
1055 def test_latest_price(self) -> None:
1056 p1 = _make_price(timestamp=1_000, price_usd=50_000.0)
1057 p2 = _make_price(timestamp=2_000, price_usd=70_000.0)
1058 assert latest_price([p1, p2]) == 70_000.0
1059
1060 def test_strategy_summary_line_simulation_mode(self) -> None:
1061 s = _make_strategy(simulation_mode=True, name="aggressive")
1062 line = strategy_summary_line(s)
1063 assert "SIM" in line
1064 assert "aggressive" in line
1065
1066 def test_utxo_summary_line(self) -> None:
1067 utxos = [
1068 _make_utxo(amount_sat=100_000, confirmations=6),
1069 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
1070 ]
1071 line = utxo_summary_line(utxos)
1072 assert "2 UTXOs" in line
1073
1074 def test_mempool_summary_empty(self) -> None:
1075 line = mempool_summary_line([])
1076 assert "empty" in line
1077
1078
1079 # ---------------------------------------------------------------------------
1080 # Handler routing
1081 # ---------------------------------------------------------------------------
1082
1083
1084 class TestHandlerRouting:
1085 def test_utxos_json_routed(self) -> None:
1086 assert _handler_for_path("wallet/utxos.json") == "utxos"
1087
1088 def test_channels_json_routed(self) -> None:
1089 assert _handler_for_path("channels/channels.json") == "channels"
1090
1091 def test_agent_json_routed(self) -> None:
1092 assert _handler_for_path("strategy/agent.json") == "strategy"
1093
1094 def test_prices_json_routed(self) -> None:
1095 assert _handler_for_path("oracles/prices.json") == "prices"
1096
1097 def test_unknown_file_returns_none(self) -> None:
1098 assert _handler_for_path("README.md") is None
1099 assert _handler_for_path("custom/data.bin") is None
1100
1101
1102 # ---------------------------------------------------------------------------
1103 # Registry registration
1104 # ---------------------------------------------------------------------------
1105
1106
1107 class TestRegistry:
1108 def test_bitcoin_registered_in_plugin_registry(self) -> None:
1109 from muse.plugins.registry import registered_domains
1110 assert "bitcoin" in registered_domains()
1111
1112 def test_resolve_bitcoin_plugin(self) -> None:
1113 import json as _json
1114 from muse.plugins.registry import resolve_plugin
1115
1116 with tempfile.TemporaryDirectory() as tmp:
1117 root = pathlib.Path(tmp)
1118 (root / ".muse").mkdir()
1119 (root / ".muse" / "repo.json").write_text(
1120 _json.dumps({"domain": "bitcoin"})
1121 )
1122 plugin = resolve_plugin(root)
1123
1124 assert isinstance(plugin, BitcoinPlugin)