gabriel / muse public
test_bitcoin_plugin.py python
1109 lines 39.5 KB
73edf876 feat(bitcoin): add Bitcoin domain plugin — multidimensional VCS for on-… Gabriel Cardona <gabriel@tellurstori.com> 4d ago
1 """Comprehensive test suite for the Bitcoin domain plugin.
2
3 Tests cover all three protocol levels:
4 - Core MuseDomainPlugin (snapshot, diff, merge, drift, apply, schema)
5 - StructuredMergePlugin (OT merge with double-spend detection)
6 - CRDTPlugin (convergent join — commutativity, associativity, idempotency)
7
8 Tests are organized by capability, each asserting exact behaviour
9 with realistic Bitcoin fixtures.
10 """
11
12 from __future__ import annotations
13
14 import hashlib
15 import json
16 import pathlib
17 import tempfile
18
19 import pytest
20
21 from muse.domain import (
22 CRDTPlugin,
23 DeleteOp,
24 InsertOp,
25 MuseDomainPlugin,
26 MutateOp,
27 PatchOp,
28 ReplaceOp,
29 SnapshotManifest,
30 StructuredMergePlugin,
31 )
32 from muse.plugins.bitcoin._query import (
33 balance_by_script_type,
34 channel_liquidity_totals,
35 channel_utilization,
36 coin_age_blocks,
37 confirmed_balance_sat,
38 double_spend_candidates,
39 fee_surface_str,
40 format_sat,
41 latest_fee_estimate,
42 latest_price,
43 mempool_summary_line,
44 strategy_summary_line,
45 total_balance_sat,
46 utxo_key,
47 utxo_summary_line,
48 )
49 from muse.plugins.bitcoin._types import (
50 AddressLabelRecord,
51 AgentStrategyRecord,
52 FeeEstimateRecord,
53 LightningChannelRecord,
54 OraclePriceTickRecord,
55 PendingTxRecord,
56 RoutingPolicyRecord,
57 UTXORecord,
58 )
59 from muse.plugins.bitcoin.plugin import (
60 BitcoinPlugin,
61 _diff_channels,
62 _diff_labels,
63 _diff_strategy,
64 _diff_time_series,
65 _diff_transactions,
66 _diff_utxos,
67 _handler_for_path,
68 )
69
70 # ---------------------------------------------------------------------------
71 # Fixtures
72 # ---------------------------------------------------------------------------
73
74
75 def _make_utxo(
76 txid: str = "abc" * 21 + "ab",
77 vout: int = 0,
78 amount_sat: int = 100_000,
79 script_type: str = "p2wpkh",
80 address: str = "bc1qtest",
81 confirmations: int = 6,
82 block_height: int | None = 850_000,
83 coinbase: bool = False,
84 label: str | None = None,
85 ) -> UTXORecord:
86 return UTXORecord(
87 txid=txid,
88 vout=vout,
89 amount_sat=amount_sat,
90 script_type=script_type, # type: ignore[arg-type]
91 address=address,
92 confirmations=confirmations,
93 block_height=block_height,
94 coinbase=coinbase,
95 label=label,
96 )
97
98
99 def _make_channel(
100 channel_id: str = "850000x1x0",
101 peer_pubkey: str = "0279" + "aa" * 32,
102 peer_alias: str | None = "ACINQ",
103 capacity_sat: int = 2_000_000,
104 local_balance_sat: int = 1_000_000,
105 remote_balance_sat: int = 900_000,
106 is_active: bool = True,
107 is_public: bool = True,
108 local_reserve_sat: int = 20_000,
109 remote_reserve_sat: int = 20_000,
110 unsettled_balance_sat: int = 0,
111 htlc_count: int = 0,
112 ) -> LightningChannelRecord:
113 return LightningChannelRecord(
114 channel_id=channel_id,
115 peer_pubkey=peer_pubkey,
116 peer_alias=peer_alias,
117 capacity_sat=capacity_sat,
118 local_balance_sat=local_balance_sat,
119 remote_balance_sat=remote_balance_sat,
120 is_active=is_active,
121 is_public=is_public,
122 local_reserve_sat=local_reserve_sat,
123 remote_reserve_sat=remote_reserve_sat,
124 unsettled_balance_sat=unsettled_balance_sat,
125 htlc_count=htlc_count,
126 )
127
128
129 def _make_label(
130 address: str = "bc1qtest",
131 label: str = "cold storage",
132 category: str = "income",
133 created_at: int = 1_700_000_000,
134 ) -> AddressLabelRecord:
135 return AddressLabelRecord(
136 address=address,
137 label=label,
138 category=category, # type: ignore[arg-type]
139 created_at=created_at,
140 )
141
142
143 def _make_strategy(
144 name: str = "conservative",
145 max_fee_rate_sat_vbyte: int = 10,
146 min_confirmations: int = 6,
147 utxo_consolidation_threshold: int = 20,
148 dca_amount_sat: int | None = 500_000,
149 dca_interval_blocks: int | None = 144,
150 lightning_rebalance_threshold: float = 0.2,
151 coin_selection: str = "branch_and_bound",
152 simulation_mode: bool = False,
153 ) -> AgentStrategyRecord:
154 return AgentStrategyRecord(
155 name=name,
156 max_fee_rate_sat_vbyte=max_fee_rate_sat_vbyte,
157 min_confirmations=min_confirmations,
158 utxo_consolidation_threshold=utxo_consolidation_threshold,
159 dca_amount_sat=dca_amount_sat,
160 dca_interval_blocks=dca_interval_blocks,
161 lightning_rebalance_threshold=lightning_rebalance_threshold,
162 coin_selection=coin_selection, # type: ignore[arg-type]
163 simulation_mode=simulation_mode,
164 )
165
166
167 def _make_price(
168 timestamp: int = 1_700_000_000,
169 block_height: int | None = 850_000,
170 price_usd: float = 62_000.0,
171 source: str = "coinbase",
172 ) -> OraclePriceTickRecord:
173 return OraclePriceTickRecord(
174 timestamp=timestamp,
175 block_height=block_height,
176 price_usd=price_usd,
177 source=source,
178 )
179
180
181 def _make_fee(
182 timestamp: int = 1_700_000_000,
183 block_height: int | None = 850_000,
184 t1: int = 30,
185 t6: int = 15,
186 t144: int = 3,
187 ) -> FeeEstimateRecord:
188 return FeeEstimateRecord(
189 timestamp=timestamp,
190 block_height=block_height,
191 target_1_block_sat_vbyte=t1,
192 target_6_block_sat_vbyte=t6,
193 target_144_block_sat_vbyte=t144,
194 )
195
196
197 def _json_bytes(obj: object) -> bytes:
198 return json.dumps(obj, sort_keys=True).encode()
199
200
201 def _sha256(data: bytes) -> str:
202 return hashlib.sha256(data).hexdigest()
203
204
205 def _make_manifest(files: dict[str, bytes]) -> SnapshotManifest:
206 return SnapshotManifest(
207 files={path: _sha256(data) for path, data in files.items()},
208 domain="bitcoin",
209 )
210
211
212 # ---------------------------------------------------------------------------
213 # Protocol conformance
214 # ---------------------------------------------------------------------------
215
216
217 class TestProtocolConformance:
218 def test_satisfies_muse_domain_plugin(self) -> None:
219 assert isinstance(BitcoinPlugin(), MuseDomainPlugin)
220
221 def test_satisfies_structured_merge_plugin(self) -> None:
222 assert isinstance(BitcoinPlugin(), StructuredMergePlugin)
223
224 def test_satisfies_crdt_plugin(self) -> None:
225 assert isinstance(BitcoinPlugin(), CRDTPlugin)
226
227
228 # ---------------------------------------------------------------------------
229 # snapshot
230 # ---------------------------------------------------------------------------
231
232
233 class TestSnapshot:
234 def test_snapshot_from_path_hashes_all_json_files(self) -> None:
235 plugin = BitcoinPlugin()
236 with tempfile.TemporaryDirectory() as tmp:
237 root = pathlib.Path(tmp)
238 (root / "wallet").mkdir()
239 (root / "wallet" / "utxos.json").write_bytes(b"[]")
240 (root / "wallet" / "labels.json").write_bytes(b"[]")
241
242 snap = plugin.snapshot(root)
243
244 assert snap["domain"] == "bitcoin"
245 assert "wallet/utxos.json" in snap["files"]
246 assert "wallet/labels.json" in snap["files"]
247
248 def test_snapshot_excludes_hidden_files(self) -> None:
249 plugin = BitcoinPlugin()
250 with tempfile.TemporaryDirectory() as tmp:
251 root = pathlib.Path(tmp)
252 (root / "wallet").mkdir()
253 (root / "wallet" / "utxos.json").write_bytes(b"[]")
254 (root / ".secret").write_bytes(b"private key would be here")
255
256 snap = plugin.snapshot(root)
257
258 assert ".secret" not in snap["files"]
259 assert "wallet/utxos.json" in snap["files"]
260
261 def test_snapshot_passthrough_for_existing_manifest(self) -> None:
262 plugin = BitcoinPlugin()
263 manifest = SnapshotManifest(
264 files={"wallet/utxos.json": "abc" * 21 + "ab"},
265 domain="bitcoin",
266 )
267 result = plugin.snapshot(manifest)
268 assert result is manifest
269
270 def test_snapshot_content_hash_is_sha256(self) -> None:
271 plugin = BitcoinPlugin()
272 with tempfile.TemporaryDirectory() as tmp:
273 root = pathlib.Path(tmp)
274 (root / "wallet").mkdir()
275 content = b"[]\n"
276 (root / "wallet" / "utxos.json").write_bytes(content)
277
278 snap = plugin.snapshot(root)
279
280 expected_hash = _sha256(content)
281 assert snap["files"]["wallet/utxos.json"] == expected_hash
282
283 def test_snapshot_deterministic(self) -> None:
284 plugin = BitcoinPlugin()
285 with tempfile.TemporaryDirectory() as tmp:
286 root = pathlib.Path(tmp)
287 (root / "wallet").mkdir()
288 (root / "wallet" / "utxos.json").write_bytes(b"[]")
289
290 snap1 = plugin.snapshot(root)
291 snap2 = plugin.snapshot(root)
292
293 assert snap1["files"] == snap2["files"]
294
295
296 # ---------------------------------------------------------------------------
297 # diff — file-level
298 # ---------------------------------------------------------------------------
299
300
301 class TestDiffFileLevel:
302 def setup_method(self) -> None:
303 self.plugin = BitcoinPlugin()
304
305 def test_added_file_produces_insert_op(self) -> None:
306 base = SnapshotManifest(files={}, domain="bitcoin")
307 target = SnapshotManifest(
308 files={"wallet/utxos.json": "a" * 64},
309 domain="bitcoin",
310 )
311 delta = self.plugin.diff(base, target)
312 assert len(delta["ops"]) == 1
313 assert delta["ops"][0]["op"] == "insert"
314 assert delta["ops"][0]["address"] == "wallet/utxos.json"
315
316 def test_removed_file_produces_delete_op(self) -> None:
317 base = SnapshotManifest(
318 files={"wallet/utxos.json": "a" * 64},
319 domain="bitcoin",
320 )
321 target = SnapshotManifest(files={}, domain="bitcoin")
322 delta = self.plugin.diff(base, target)
323 assert delta["ops"][0]["op"] == "delete"
324
325 def test_unchanged_file_produces_no_ops(self) -> None:
326 files = {"wallet/utxos.json": "a" * 64}
327 base = SnapshotManifest(files=files, domain="bitcoin")
328 target = SnapshotManifest(files=files, domain="bitcoin")
329 delta = self.plugin.diff(base, target)
330 assert len(delta["ops"]) == 0
331 assert "clean" in delta["summary"]
332
333 def test_modified_file_without_repo_root_produces_replace_op(self) -> None:
334 base = SnapshotManifest(
335 files={"wallet/utxos.json": "a" * 64},
336 domain="bitcoin",
337 )
338 target = SnapshotManifest(
339 files={"wallet/utxos.json": "b" * 64},
340 domain="bitcoin",
341 )
342 delta = self.plugin.diff(base, target)
343 assert delta["ops"][0]["op"] == "replace"
344
345 def test_summary_reflects_op_counts(self) -> None:
346 base = SnapshotManifest(
347 files={"wallet/utxos.json": "a" * 64, "wallet/labels.json": "b" * 64},
348 domain="bitcoin",
349 )
350 target = SnapshotManifest(
351 files={"wallet/utxos.json": "a" * 64, "strategy/agent.json": "c" * 64},
352 domain="bitcoin",
353 )
354 delta = self.plugin.diff(base, target)
355 assert "added" in delta["summary"]
356 assert "removed" in delta["summary"]
357
358
359 # ---------------------------------------------------------------------------
360 # diff — semantic UTXO level
361 # ---------------------------------------------------------------------------
362
363
364 class TestDiffUTXOs:
365 def test_new_utxo_produces_insert_op(self) -> None:
366 u = _make_utxo(txid="a" * 64, vout=0, amount_sat=500_000)
367 old = _json_bytes([])
368 new = _json_bytes([u])
369 ops = _diff_utxos("wallet/utxos.json", old, new)
370 assert len(ops) == 1
371 assert ops[0]["op"] == "insert"
372 assert "a" * 64 + ":0" in ops[0]["address"]
373 assert "0.00500000 BTC" in ops[0]["content_summary"]
374
375 def test_spent_utxo_produces_delete_op(self) -> None:
376 u = _make_utxo(txid="b" * 64, vout=1, amount_sat=200_000)
377 old = _json_bytes([u])
378 new = _json_bytes([])
379 ops = _diff_utxos("wallet/utxos.json", old, new)
380 assert len(ops) == 1
381 assert ops[0]["op"] == "delete"
382 assert "spent" in ops[0]["content_summary"]
383
384 def test_confirmation_update_produces_mutate_op(self) -> None:
385 txid = "c" * 64
386 u_old = _make_utxo(txid=txid, vout=0, confirmations=1)
387 u_new = _make_utxo(txid=txid, vout=0, confirmations=6)
388 old = _json_bytes([u_old])
389 new = _json_bytes([u_new])
390 ops = _diff_utxos("wallet/utxos.json", old, new)
391 assert len(ops) == 1
392 assert ops[0]["op"] == "mutate"
393 assert ops[0]["fields"]["confirmations"]["old"] == "1"
394 assert ops[0]["fields"]["confirmations"]["new"] == "6"
395
396 def test_unchanged_utxos_produce_no_ops(self) -> None:
397 u = _make_utxo()
398 data = _json_bytes([u])
399 ops = _diff_utxos("wallet/utxos.json", data, data)
400 assert ops == []
401
402 def test_multiple_utxos_diff(self) -> None:
403 u1 = _make_utxo(txid="d" * 64, vout=0, amount_sat=100_000)
404 u2 = _make_utxo(txid="e" * 64, vout=0, amount_sat=200_000)
405 u3 = _make_utxo(txid="f" * 64, vout=0, amount_sat=300_000)
406 old = _json_bytes([u1, u2])
407 new = _json_bytes([u2, u3])
408 ops = _diff_utxos("wallet/utxos.json", old, new)
409 op_types = {op["op"] for op in ops}
410 assert "insert" in op_types
411 assert "delete" in op_types
412
413
414 # ---------------------------------------------------------------------------
415 # diff — semantic channel level
416 # ---------------------------------------------------------------------------
417
418
419 class TestDiffChannels:
420 def test_new_channel_produces_insert_op(self) -> None:
421 ch = _make_channel()
422 ops = _diff_channels("channels/channels.json", _json_bytes([]), _json_bytes([ch]))
423 assert len(ops) == 1
424 assert ops[0]["op"] == "insert"
425 assert "opened" in ops[0]["content_summary"]
426
427 def test_closed_channel_produces_delete_op(self) -> None:
428 ch = _make_channel()
429 ops = _diff_channels("channels/channels.json", _json_bytes([ch]), _json_bytes([]))
430 assert len(ops) == 1
431 assert ops[0]["op"] == "delete"
432
433 def test_balance_change_produces_mutate_op(self) -> None:
434 ch_old = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=900_000)
435 ch_new = _make_channel(local_balance_sat=800_000, remote_balance_sat=1_100_000)
436 ops = _diff_channels(
437 "channels/channels.json",
438 _json_bytes([ch_old]),
439 _json_bytes([ch_new]),
440 )
441 assert len(ops) == 1
442 assert ops[0]["op"] == "mutate"
443 fields = ops[0]["fields"]
444 assert "local_balance_sat" in fields
445 assert "remote_balance_sat" in fields
446
447
448 # ---------------------------------------------------------------------------
449 # diff — strategy field level
450 # ---------------------------------------------------------------------------
451
452
453 class TestDiffStrategy:
454 def test_fee_rate_change_produces_mutate_op(self) -> None:
455 old_strat = _make_strategy(max_fee_rate_sat_vbyte=10)
456 new_strat = _make_strategy(max_fee_rate_sat_vbyte=50)
457 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_strat), _json_bytes(new_strat))
458 assert len(ops) == 1
459 assert ops[0]["op"] == "mutate"
460 assert "max_fee_rate_sat_vbyte" in ops[0]["fields"]
461
462 def test_simulation_mode_toggle_detected(self) -> None:
463 old_s = _make_strategy(simulation_mode=False)
464 new_s = _make_strategy(simulation_mode=True)
465 ops = _diff_strategy("strategy/agent.json", _json_bytes(old_s), _json_bytes(new_s))
466 assert ops[0]["fields"]["simulation_mode"]["old"] == "False"
467 assert ops[0]["fields"]["simulation_mode"]["new"] == "True"
468
469 def test_identical_strategy_no_ops(self) -> None:
470 s = _make_strategy()
471 data = _json_bytes(s)
472 ops = _diff_strategy("strategy/agent.json", data, data)
473 assert ops == []
474
475
476 # ---------------------------------------------------------------------------
477 # diff — time series (prices, fees)
478 # ---------------------------------------------------------------------------
479
480
481 class TestDiffTimeSeries:
482 def test_new_price_tick_produces_insert(self) -> None:
483 tick = _make_price(timestamp=1_000, price_usd=60_000.0)
484 ops = _diff_time_series(
485 "oracles/prices.json", _json_bytes([]), _json_bytes([tick]), "prices"
486 )
487 assert len(ops) == 1
488 assert ops[0]["op"] == "insert"
489 assert "$60,000.00" in ops[0]["content_summary"]
490
491 def test_new_fee_estimate_produces_insert(self) -> None:
492 fee = _make_fee(timestamp=2_000, t1=25, t6=12, t144=2)
493 ops = _diff_time_series(
494 "oracles/fees.json", _json_bytes([]), _json_bytes([fee]), "fees"
495 )
496 assert len(ops) == 1
497 assert "25" in ops[0]["content_summary"]
498
499 def test_existing_tick_not_duplicated(self) -> None:
500 tick = _make_price(timestamp=3_000)
501 data = _json_bytes([tick])
502 ops = _diff_time_series("oracles/prices.json", data, data, "prices")
503 assert ops == []
504
505
506 # ---------------------------------------------------------------------------
507 # diff with repo_root — produces PatchOp
508 # ---------------------------------------------------------------------------
509
510
511 class TestDiffWithRepoRoot:
512 def test_modified_utxos_json_produces_patch_op(self) -> None:
513 plugin = BitcoinPlugin()
514 u_old = _make_utxo(txid="a" * 64, vout=0, amount_sat=100_000)
515 u_new = _make_utxo(txid="b" * 64, vout=0, amount_sat=200_000)
516 old_bytes = _json_bytes([u_old])
517 new_bytes = _json_bytes([u_new])
518 old_hash = _sha256(old_bytes)
519 new_hash = _sha256(new_bytes)
520
521 with tempfile.TemporaryDirectory() as tmp:
522 root = pathlib.Path(tmp)
523 muse_dir = root / ".muse" / "objects"
524 muse_dir.mkdir(parents=True)
525 # Write blobs in sharded layout
526 (muse_dir / old_hash[:2]).mkdir(exist_ok=True)
527 (muse_dir / old_hash[:2] / old_hash[2:]).write_bytes(old_bytes)
528 (muse_dir / new_hash[:2]).mkdir(exist_ok=True)
529 (muse_dir / new_hash[:2] / new_hash[2:]).write_bytes(new_bytes)
530
531 base = SnapshotManifest(
532 files={"wallet/utxos.json": old_hash}, domain="bitcoin"
533 )
534 target = SnapshotManifest(
535 files={"wallet/utxos.json": new_hash}, domain="bitcoin"
536 )
537 delta = plugin.diff(base, target, repo_root=root)
538
539 assert delta["ops"][0]["op"] == "patch"
540 patch = delta["ops"][0]
541 assert patch["child_domain"] == "bitcoin.utxos"
542 child_ops = patch["child_ops"]
543 op_types = {op["op"] for op in child_ops}
544 assert "insert" in op_types
545 assert "delete" in op_types
546
547 def test_fallback_to_replace_when_object_missing(self) -> None:
548 plugin = BitcoinPlugin()
549 base = SnapshotManifest(
550 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
551 )
552 target = SnapshotManifest(
553 files={"wallet/utxos.json": "b" * 64}, domain="bitcoin"
554 )
555 with tempfile.TemporaryDirectory() as tmp:
556 root = pathlib.Path(tmp)
557 (root / ".muse" / "objects").mkdir(parents=True)
558 delta = plugin.diff(base, target, repo_root=root)
559
560 assert delta["ops"][0]["op"] == "replace"
561
562
563 # ---------------------------------------------------------------------------
564 # merge
565 # ---------------------------------------------------------------------------
566
567
568 class TestMerge:
569 def setup_method(self) -> None:
570 self.plugin = BitcoinPlugin()
571
572 def test_clean_merge_no_conflicts(self) -> None:
573 base = SnapshotManifest(files={}, domain="bitcoin")
574 left = SnapshotManifest(
575 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
576 )
577 right = SnapshotManifest(
578 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
579 )
580 result = self.plugin.merge(base, left, right)
581 assert result.is_clean
582 assert "wallet/utxos.json" in result.merged["files"]
583 assert "channels/channels.json" in result.merged["files"]
584
585 def test_both_sides_unchanged_no_conflict(self) -> None:
586 files = {"wallet/utxos.json": "a" * 64}
587 base = SnapshotManifest(files=files, domain="bitcoin")
588 left = SnapshotManifest(files=files, domain="bitcoin")
589 right = SnapshotManifest(files=files, domain="bitcoin")
590 result = self.plugin.merge(base, left, right)
591 assert result.is_clean
592
593 def test_only_left_changed_takes_left(self) -> None:
594 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
595 left = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
596 right = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
597 result = self.plugin.merge(base, left, right)
598 assert result.is_clean
599 assert result.merged["files"]["f"] == "b" * 64
600
601 def test_only_right_changed_takes_right(self) -> None:
602 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
603 left = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
604 right = SnapshotManifest(files={"f": "c" * 64}, domain="bitcoin")
605 result = self.plugin.merge(base, left, right)
606 assert result.is_clean
607 assert result.merged["files"]["f"] == "c" * 64
608
609 def test_both_changed_differently_produces_conflict(self) -> None:
610 base = SnapshotManifest(files={"wallet/utxos.json": "a" * 64}, domain="bitcoin")
611 left = SnapshotManifest(files={"wallet/utxos.json": "b" * 64}, domain="bitcoin")
612 right = SnapshotManifest(files={"wallet/utxos.json": "c" * 64}, domain="bitcoin")
613 result = self.plugin.merge(base, left, right)
614 assert not result.is_clean
615 assert "wallet/utxos.json" in result.conflicts
616
617 def test_double_spend_detection_in_merge(self) -> None:
618 u_base = _make_utxo(txid="d" * 64, vout=0, amount_sat=1_000_000)
619 # Left spent the base UTXO and got change at a different address
620 u_left_change = _make_utxo(txid="e" * 64, vout=0, amount_sat=900_000, address="bc1qleft")
621 # Right spent the SAME base UTXO in a different tx → double-spend
622 u_right_change = _make_utxo(txid="f" * 64, vout=0, amount_sat=850_000, address="bc1qright")
623 base_bytes = _json_bytes([u_base])
624 left_bytes = _json_bytes([u_left_change]) # left spent base UTXO, got change
625 right_bytes = _json_bytes([u_right_change]) # right also spent it → double-spend!
626
627 with tempfile.TemporaryDirectory() as tmp:
628 root = pathlib.Path(tmp)
629 muse_dir = root / ".muse" / "objects"
630 muse_dir.mkdir(parents=True)
631
632 for data in (base_bytes, left_bytes, right_bytes):
633 h = _sha256(data)
634 shard = muse_dir / h[:2]
635 shard.mkdir(exist_ok=True)
636 (shard / h[2:]).write_bytes(data)
637
638 base = SnapshotManifest(
639 files={"wallet/utxos.json": _sha256(base_bytes)}, domain="bitcoin"
640 )
641 left = SnapshotManifest(
642 files={"wallet/utxos.json": _sha256(left_bytes)}, domain="bitcoin"
643 )
644 right = SnapshotManifest(
645 files={"wallet/utxos.json": _sha256(right_bytes)}, domain="bitcoin"
646 )
647 result = self.plugin.merge(base, left, right, repo_root=root)
648
649 # Both sides deleted the same UTXO from the same base → double-spend
650 double_spend_records = [
651 cr for cr in result.conflict_records
652 if cr.conflict_type == "double_spend"
653 ]
654 assert len(double_spend_records) >= 1
655
656 def test_both_deleted_same_file_is_clean(self) -> None:
657 base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
658 left = SnapshotManifest(files={}, domain="bitcoin")
659 right = SnapshotManifest(files={}, domain="bitcoin")
660 result = self.plugin.merge(base, left, right)
661 assert result.is_clean
662 assert "f" not in result.merged["files"]
663
664
665 # ---------------------------------------------------------------------------
666 # drift
667 # ---------------------------------------------------------------------------
668
669
670 class TestDrift:
671 def test_no_drift_when_identical(self) -> None:
672 plugin = BitcoinPlugin()
673 snap = SnapshotManifest(
674 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
675 )
676 report = plugin.drift(snap, snap)
677 assert not report.has_drift
678
679 def test_drift_detected_when_file_added(self) -> None:
680 plugin = BitcoinPlugin()
681 committed = SnapshotManifest(files={}, domain="bitcoin")
682 live = SnapshotManifest(
683 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
684 )
685 report = plugin.drift(committed, live)
686 assert report.has_drift
687 assert "added" in report.summary
688
689 def test_drift_from_disk_detected(self) -> None:
690 plugin = BitcoinPlugin()
691 with tempfile.TemporaryDirectory() as tmp:
692 root = pathlib.Path(tmp)
693 (root / "wallet").mkdir()
694 (root / "wallet" / "utxos.json").write_bytes(b"[]")
695
696 committed = SnapshotManifest(files={}, domain="bitcoin")
697 report = plugin.drift(committed, root)
698
699 assert report.has_drift
700
701
702 # ---------------------------------------------------------------------------
703 # apply
704 # ---------------------------------------------------------------------------
705
706
707 class TestApply:
708 def test_apply_is_passthrough(self) -> None:
709 plugin = BitcoinPlugin()
710 delta = plugin.diff(
711 SnapshotManifest(files={}, domain="bitcoin"),
712 SnapshotManifest(files={}, domain="bitcoin"),
713 )
714 snap = SnapshotManifest(files={}, domain="bitcoin")
715 result = plugin.apply(delta, snap)
716 assert result is snap
717
718
719 # ---------------------------------------------------------------------------
720 # schema
721 # ---------------------------------------------------------------------------
722
723
724 class TestSchema:
725 def test_schema_returns_domain_schema(self) -> None:
726 from muse.core.schema import DomainSchema
727 s = BitcoinPlugin().schema()
728 assert isinstance(s, dict)
729 assert s["domain"] == "bitcoin"
730 assert s["schema_version"] == 1
731
732 def test_schema_has_ten_dimensions(self) -> None:
733 s = BitcoinPlugin().schema()
734 assert len(s["dimensions"]) == 10
735
736 def test_schema_merge_mode_is_crdt(self) -> None:
737 s = BitcoinPlugin().schema()
738 assert s["merge_mode"] == "crdt"
739
740 def test_dimension_names(self) -> None:
741 s = BitcoinPlugin().schema()
742 names = {d["name"] for d in s["dimensions"]}
743 expected = {
744 "utxos", "transactions", "labels", "descriptors",
745 "channels", "strategy", "oracle_prices", "oracle_fees",
746 "network", "execution",
747 }
748 assert names == expected
749
750 def test_strategy_dimension_is_not_independent(self) -> None:
751 s = BitcoinPlugin().schema()
752 strat = next(d for d in s["dimensions"] if d["name"] == "strategy")
753 assert strat["independent_merge"] is False
754
755
756 # ---------------------------------------------------------------------------
757 # OT merge — StructuredMergePlugin
758 # ---------------------------------------------------------------------------
759
760
761 class TestMergeOps:
762 def setup_method(self) -> None:
763 self.plugin = BitcoinPlugin()
764 self.base = SnapshotManifest(files={}, domain="bitcoin")
765 self.snap = SnapshotManifest(files={}, domain="bitcoin")
766
767 def test_non_conflicting_ops_clean_merge(self) -> None:
768 ours_ops = [
769 InsertOp(
770 op="insert",
771 address="wallet/utxos.json::aaa:0",
772 position=None,
773 content_id="a" * 64,
774 content_summary="received UTXO aaa:0",
775 )
776 ]
777 theirs_ops = [
778 InsertOp(
779 op="insert",
780 address="wallet/utxos.json::bbb:0",
781 position=None,
782 content_id="b" * 64,
783 content_summary="received UTXO bbb:0",
784 )
785 ]
786 result = self.plugin.merge_ops(
787 self.base, self.snap, self.snap, ours_ops, theirs_ops
788 )
789 # Different addresses → no double-spend, clean merge
790 assert len(result.conflict_records) == 0
791
792 def test_double_spend_detected_in_merge_ops(self) -> None:
793 utxo_addr = "wallet/utxos.json::deadbeef:0"
794 ours_ops = [
795 DeleteOp(
796 op="delete",
797 address=utxo_addr,
798 position=None,
799 content_id="a" * 64,
800 content_summary="spent UTXO deadbeef:0",
801 )
802 ]
803 theirs_ops = [
804 DeleteOp(
805 op="delete",
806 address=utxo_addr,
807 position=None,
808 content_id="a" * 64,
809 content_summary="spent UTXO deadbeef:0",
810 )
811 ]
812 result = self.plugin.merge_ops(
813 self.base, self.snap, self.snap, ours_ops, theirs_ops
814 )
815 double_spends = [
816 cr for cr in result.conflict_records
817 if cr.conflict_type == "double_spend"
818 ]
819 assert len(double_spends) == 1
820 assert "deadbeef:0" in double_spends[0].addresses[0]
821
822 def test_non_utxo_delete_not_flagged_as_double_spend(self) -> None:
823 addr = "channels/channels.json::850000x1x0"
824 ours_ops = [
825 DeleteOp(op="delete", address=addr, position=None,
826 content_id="a" * 64, content_summary="channel closed")
827 ]
828 theirs_ops = [
829 DeleteOp(op="delete", address=addr, position=None,
830 content_id="a" * 64, content_summary="channel closed")
831 ]
832 result = self.plugin.merge_ops(
833 self.base, self.snap, self.snap, ours_ops, theirs_ops
834 )
835 double_spends = [
836 cr for cr in result.conflict_records
837 if cr.conflict_type == "double_spend"
838 ]
839 assert len(double_spends) == 0
840
841
842 # ---------------------------------------------------------------------------
843 # CRDTPlugin
844 # ---------------------------------------------------------------------------
845
846
847 class TestCRDT:
848 def setup_method(self) -> None:
849 self.plugin = BitcoinPlugin()
850 self.base_snap = SnapshotManifest(
851 files={"wallet/utxos.json": "a" * 64},
852 domain="bitcoin",
853 )
854
855 def test_to_crdt_state_preserves_files(self) -> None:
856 crdt = self.plugin.to_crdt_state(self.base_snap)
857 assert crdt["domain"] == "bitcoin"
858 assert crdt["schema_version"] == 1
859 assert "wallet/utxos.json" in crdt["files"]
860
861 def test_from_crdt_state_returns_plain_snapshot(self) -> None:
862 crdt = self.plugin.to_crdt_state(self.base_snap)
863 snap = self.plugin.from_crdt_state(crdt)
864 assert snap["domain"] == "bitcoin"
865 assert "wallet/utxos.json" in snap["files"]
866
867 def test_join_idempotent(self) -> None:
868 crdt = self.plugin.to_crdt_state(self.base_snap)
869 joined = self.plugin.join(crdt, crdt)
870 result = self.plugin.from_crdt_state(joined)
871 original = self.plugin.from_crdt_state(crdt)
872 assert result["files"] == original["files"]
873
874 def test_join_commutative(self) -> None:
875 snap_a = SnapshotManifest(
876 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
877 )
878 snap_b = SnapshotManifest(
879 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
880 )
881 a = self.plugin.to_crdt_state(snap_a)
882 b = self.plugin.to_crdt_state(snap_b)
883 ab = self.plugin.from_crdt_state(self.plugin.join(a, b))
884 ba = self.plugin.from_crdt_state(self.plugin.join(b, a))
885 assert ab["files"] == ba["files"]
886
887 def test_join_associative(self) -> None:
888 snap_a = SnapshotManifest(files={"f1": "a" * 64}, domain="bitcoin")
889 snap_b = SnapshotManifest(files={"f2": "b" * 64}, domain="bitcoin")
890 snap_c = SnapshotManifest(files={"f3": "c" * 64}, domain="bitcoin")
891 a = self.plugin.to_crdt_state(snap_a)
892 b = self.plugin.to_crdt_state(snap_b)
893 c = self.plugin.to_crdt_state(snap_c)
894 ab_c = self.plugin.from_crdt_state(
895 self.plugin.join(self.plugin.join(a, b), c)
896 )
897 a_bc = self.plugin.from_crdt_state(
898 self.plugin.join(a, self.plugin.join(b, c))
899 )
900 assert ab_c["files"] == a_bc["files"]
901
902 def test_join_preserves_both_agents_files(self) -> None:
903 snap_a = SnapshotManifest(
904 files={"wallet/utxos.json": "a" * 64}, domain="bitcoin"
905 )
906 snap_b = SnapshotManifest(
907 files={"channels/channels.json": "b" * 64}, domain="bitcoin"
908 )
909 a = self.plugin.to_crdt_state(snap_a)
910 b = self.plugin.to_crdt_state(snap_b)
911 joined = self.plugin.from_crdt_state(self.plugin.join(a, b))
912 assert "wallet/utxos.json" in joined["files"]
913 assert "channels/channels.json" in joined["files"]
914
915 def test_crdt_schema_has_seven_dimensions(self) -> None:
916 dims = self.plugin.crdt_schema()
917 assert len(dims) == 7
918
919 def test_crdt_schema_dimension_types(self) -> None:
920 dims = self.plugin.crdt_schema()
921 types = {d["crdt_type"] for d in dims}
922 assert "aw_map" in types
923 assert "or_set" in types
924
925 def test_vector_clock_advances_on_join(self) -> None:
926 snap_a = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin")
927 snap_b = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin")
928 a = self.plugin.to_crdt_state(snap_a)
929 b = self.plugin.to_crdt_state(snap_b)
930 joined = self.plugin.join(a, b)
931 assert isinstance(joined["vclock"], dict)
932
933
934 # ---------------------------------------------------------------------------
935 # Query analytics
936 # ---------------------------------------------------------------------------
937
938
939 class TestQueryAnalytics:
940 def test_total_balance_sat(self) -> None:
941 utxos = [
942 _make_utxo(amount_sat=100_000),
943 _make_utxo(txid="b" * 64, amount_sat=200_000),
944 ]
945 assert total_balance_sat(utxos) == 300_000
946
947 def test_confirmed_balance_excludes_unconfirmed(self) -> None:
948 utxos = [
949 _make_utxo(amount_sat=100_000, confirmations=6),
950 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
951 ]
952 assert confirmed_balance_sat(utxos) == 100_000
953
954 def test_confirmed_balance_excludes_immature_coinbase(self) -> None:
955 utxos = [
956 _make_utxo(amount_sat=625_000_000, confirmations=50, coinbase=True),
957 _make_utxo(txid="c" * 64, amount_sat=100_000, confirmations=1),
958 ]
959 assert confirmed_balance_sat(utxos) == 100_000
960
961 def test_balance_by_script_type(self) -> None:
962 utxos = [
963 _make_utxo(amount_sat=100_000, script_type="p2wpkh"),
964 _make_utxo(txid="b" * 64, amount_sat=200_000, script_type="p2tr"),
965 _make_utxo(txid="c" * 64, amount_sat=50_000, script_type="p2wpkh"),
966 ]
967 breakdown = balance_by_script_type(utxos)
968 assert breakdown["p2wpkh"] == 150_000
969 assert breakdown["p2tr"] == 200_000
970
971 def test_coin_age_blocks(self) -> None:
972 u = _make_utxo(block_height=800_000)
973 assert coin_age_blocks(u, 850_000) == 50_000
974
975 def test_coin_age_blocks_unconfirmed(self) -> None:
976 u = _make_utxo(block_height=None)
977 assert coin_age_blocks(u, 850_000) is None
978
979 def test_format_sat_small(self) -> None:
980 assert format_sat(1_000) == "1,000 sats"
981
982 def test_format_sat_large(self) -> None:
983 result = format_sat(100_000_000)
984 assert "1.00000000 BTC" in result
985
986 def test_utxo_key(self) -> None:
987 u = _make_utxo(txid="abc", vout=3)
988 assert utxo_key(u) == "abc:3"
989
990 def test_double_spend_candidates_detects_concurrent_spends(self) -> None:
991 base = {"aaa:0", "bbb:1", "ccc:2"}
992 our_spent = {"aaa:0", "bbb:1"}
993 their_spent = {"aaa:0", "ccc:2"}
994 candidates = double_spend_candidates(base, our_spent, their_spent)
995 assert candidates == ["aaa:0"]
996
997 def test_double_spend_candidates_no_overlap(self) -> None:
998 base = {"aaa:0", "bbb:1"}
999 our_spent = {"aaa:0"}
1000 their_spent = {"bbb:1"}
1001 assert double_spend_candidates(base, our_spent, their_spent) == []
1002
1003 def test_channel_liquidity_totals(self) -> None:
1004 ch1 = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=500_000)
1005 ch2 = _make_channel(
1006 channel_id="x", local_balance_sat=200_000, remote_balance_sat=800_000
1007 )
1008 local, remote = channel_liquidity_totals([ch1, ch2])
1009 assert local == 1_200_000
1010 assert remote == 1_300_000
1011
1012 def test_channel_utilization(self) -> None:
1013 ch = _make_channel(
1014 capacity_sat=2_000_000,
1015 local_balance_sat=800_000,
1016 local_reserve_sat=20_000,
1017 remote_reserve_sat=20_000,
1018 )
1019 util = channel_utilization(ch)
1020 assert 0.0 <= util <= 1.0
1021
1022 def test_fee_surface_str(self) -> None:
1023 est = _make_fee(t1=42, t6=15, t144=3)
1024 s = fee_surface_str(est)
1025 assert "42" in s
1026 assert "15" in s
1027 assert "3" in s
1028 assert "sat/vbyte" in s
1029
1030 def test_latest_fee_estimate(self) -> None:
1031 old = _make_fee(timestamp=1_000)
1032 new = _make_fee(timestamp=2_000, t1=50)
1033 result = latest_fee_estimate([old, new])
1034 assert result is not None
1035 assert result["timestamp"] == 2_000
1036
1037 def test_latest_fee_estimate_empty(self) -> None:
1038 assert latest_fee_estimate([]) is None
1039
1040 def test_latest_price(self) -> None:
1041 p1 = _make_price(timestamp=1_000, price_usd=50_000.0)
1042 p2 = _make_price(timestamp=2_000, price_usd=70_000.0)
1043 assert latest_price([p1, p2]) == 70_000.0
1044
1045 def test_strategy_summary_line_simulation_mode(self) -> None:
1046 s = _make_strategy(simulation_mode=True, name="aggressive")
1047 line = strategy_summary_line(s)
1048 assert "SIM" in line
1049 assert "aggressive" in line
1050
1051 def test_utxo_summary_line(self) -> None:
1052 utxos = [
1053 _make_utxo(amount_sat=100_000, confirmations=6),
1054 _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0),
1055 ]
1056 line = utxo_summary_line(utxos)
1057 assert "2 UTXOs" in line
1058
1059 def test_mempool_summary_empty(self) -> None:
1060 line = mempool_summary_line([])
1061 assert "empty" in line
1062
1063
1064 # ---------------------------------------------------------------------------
1065 # Handler routing
1066 # ---------------------------------------------------------------------------
1067
1068
1069 class TestHandlerRouting:
1070 def test_utxos_json_routed(self) -> None:
1071 assert _handler_for_path("wallet/utxos.json") == "utxos"
1072
1073 def test_channels_json_routed(self) -> None:
1074 assert _handler_for_path("channels/channels.json") == "channels"
1075
1076 def test_agent_json_routed(self) -> None:
1077 assert _handler_for_path("strategy/agent.json") == "strategy"
1078
1079 def test_prices_json_routed(self) -> None:
1080 assert _handler_for_path("oracles/prices.json") == "prices"
1081
1082 def test_unknown_file_returns_none(self) -> None:
1083 assert _handler_for_path("README.md") is None
1084 assert _handler_for_path("custom/data.bin") is None
1085
1086
1087 # ---------------------------------------------------------------------------
1088 # Registry registration
1089 # ---------------------------------------------------------------------------
1090
1091
1092 class TestRegistry:
1093 def test_bitcoin_registered_in_plugin_registry(self) -> None:
1094 from muse.plugins.registry import registered_domains
1095 assert "bitcoin" in registered_domains()
1096
1097 def test_resolve_bitcoin_plugin(self) -> None:
1098 import json as _json
1099 from muse.plugins.registry import resolve_plugin
1100
1101 with tempfile.TemporaryDirectory() as tmp:
1102 root = pathlib.Path(tmp)
1103 (root / ".muse").mkdir()
1104 (root / ".muse" / "repo.json").write_text(
1105 _json.dumps({"domain": "bitcoin"})
1106 )
1107 plugin = resolve_plugin(root)
1108
1109 assert isinstance(plugin, BitcoinPlugin)